use ahash::{AHashMap, AHashSet};
use log::{debug, trace};
use parity_scale_codec::{Decode, Encode};
use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};
use rand::seq::SliceRandom;
use sc_network::{PeerId, ReputationChange};
use sc_network_common::protocol::role::ObservedRole;
use sc_network_gossip::{MessageIntent, ValidatorContext};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_finality_grandpa::AuthorityId;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use super::{benefit, cost, Round, SetId, NEIGHBOR_REBROADCAST_PERIOD};
use crate::{environment, CatchUp, CompactCommit, SignedMessage, LOG_TARGET};
use std::{
collections::{HashSet, VecDeque},
time::{Duration, Instant},
};
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
const CATCH_UP_REQUEST_TIMEOUT: Duration = Duration::from_secs(45);
const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(30);
const CATCH_UP_THRESHOLD: u64 = 2;
const ROUND_DURATION: u32 = 5;
const PROPAGATION_SOME: f32 = 1.5;
const PROPAGATION_ALL: f32 = 3.0;
const LUCKY_PEERS: usize = 4;
type Report = (PeerId, ReputationChange);
#[derive(Debug, PartialEq, Clone, Copy)]
enum Consider {
Accept,
RejectPast,
RejectFuture,
RejectOutOfScope,
}
#[derive(Debug)]
struct View<N> {
round: Round, set_id: SetId, last_commit: Option<N>, last_update: Option<Instant>, }
impl<N> Default for View<N> {
fn default() -> Self {
View { round: Round(1), set_id: SetId(0), last_commit: None, last_update: None }
}
}
impl<N: Ord> View<N> {
fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
if set_id < self.set_id {
return Consider::RejectPast
}
if set_id > self.set_id {
return Consider::RejectFuture
}
if round.0 > self.round.0.saturating_add(1) {
return Consider::RejectFuture
}
if round.0 < self.round.0.saturating_sub(1) {
return Consider::RejectPast
}
Consider::Accept
}
fn consider_global(&self, set_id: SetId, number: N) -> Consider {
if set_id < self.set_id {
return Consider::RejectPast
}
if set_id > self.set_id {
return Consider::RejectFuture
}
match self.last_commit {
None => Consider::Accept,
Some(ref num) =>
if num < &number {
Consider::Accept
} else {
Consider::RejectPast
},
}
}
}
struct LocalView<N> {
round: Round,
set_id: SetId,
last_commit: Option<(N, Round, SetId)>,
round_start: Instant,
}
impl<N> LocalView<N> {
fn new(set_id: SetId, round: Round) -> LocalView<N> {
LocalView { set_id, round, last_commit: None, round_start: Instant::now() }
}
fn as_view(&self) -> View<&N> {
View {
round: self.round,
set_id: self.set_id,
last_commit: self.last_commit_height(),
last_update: None,
}
}
fn update_set(&mut self, set_id: SetId) {
if set_id != self.set_id {
self.set_id = set_id;
self.round = Round(1);
self.round_start = Instant::now();
}
}
fn update_round(&mut self, round: Round) {
self.round = round;
self.round_start = Instant::now();
}
fn last_commit_height(&self) -> Option<&N> {
self.last_commit.as_ref().map(|(number, _, _)| number)
}
}
const KEEP_RECENT_ROUNDS: usize = 3;
struct KeepTopics<B: BlockT> {
current_set: SetId,
rounds: VecDeque<(Round, SetId)>,
reverse_map: AHashMap<B::Hash, (Option<Round>, SetId)>,
}
impl<B: BlockT> KeepTopics<B> {
fn new() -> Self {
KeepTopics {
current_set: SetId(0),
rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 2),
reverse_map: Default::default(),
}
}
fn push(&mut self, round: Round, set_id: SetId) {
self.current_set = std::cmp::max(self.current_set, set_id);
if !self.rounds.contains(&(round, set_id)) {
self.rounds.push_back((round, set_id));
}
self.rounds.push_back((Round(round.0.saturating_add(1)), set_id));
while self.rounds.len() > KEEP_RECENT_ROUNDS + 2 {
let _ = self.rounds.pop_front();
}
let mut map = AHashMap::with_capacity(KEEP_RECENT_ROUNDS + 3);
map.insert(super::global_topic::<B>(self.current_set.0), (None, self.current_set));
for &(round, set) in &self.rounds {
map.insert(super::round_topic::<B>(round.0, set.0), (Some(round), set));
}
self.reverse_map = map;
}
fn topic_info(&self, topic: &B::Hash) -> Option<(Option<Round>, SetId)> {
self.reverse_map.get(topic).cloned()
}
}
fn neighbor_topics<B: BlockT>(view: &View<NumberFor<B>>) -> Vec<B::Hash> {
let s = view.set_id;
let mut topics =
vec![super::global_topic::<B>(s.0), super::round_topic::<B>(view.round.0, s.0)];
if view.round.0 != 0 {
let r = Round(view.round.0 - 1);
topics.push(super::round_topic::<B>(r.0, s.0))
}
topics
}
#[derive(Debug, Encode, Decode)]
pub(super) enum GossipMessage<Block: BlockT> {
Vote(VoteMessage<Block>),
Commit(FullCommitMessage<Block>),
Neighbor(VersionedNeighborPacket<NumberFor<Block>>),
CatchUpRequest(CatchUpRequestMessage),
CatchUp(FullCatchUpMessage<Block>),
}
impl<Block: BlockT> From<NeighborPacket<NumberFor<Block>>> for GossipMessage<Block> {
fn from(neighbor: NeighborPacket<NumberFor<Block>>) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor))
}
}
#[derive(Debug, Encode, Decode)]
pub(super) struct VoteMessage<Block: BlockT> {
pub(super) round: Round,
pub(super) set_id: SetId,
pub(super) message: SignedMessage<Block::Header>,
}
#[derive(Debug, Encode, Decode)]
pub(super) struct FullCommitMessage<Block: BlockT> {
pub(super) round: Round,
pub(super) set_id: SetId,
pub(super) message: CompactCommit<Block::Header>,
}
#[derive(Debug, Encode, Decode, Clone)]
pub(super) struct NeighborPacket<N> {
pub(super) round: Round,
pub(super) set_id: SetId,
pub(super) commit_finalized_height: N,
}
#[derive(Debug, Encode, Decode)]
pub(super) enum VersionedNeighborPacket<N> {
#[codec(index = 1)]
V1(NeighborPacket<N>),
}
impl<N> VersionedNeighborPacket<N> {
fn into_neighbor_packet(self) -> NeighborPacket<N> {
match self {
VersionedNeighborPacket::V1(p) => p,
}
}
}
#[derive(Clone, Debug, Encode, Decode)]
pub(super) struct CatchUpRequestMessage {
pub(super) round: Round,
pub(super) set_id: SetId,
}
#[derive(Debug, Encode, Decode)]
pub(super) struct FullCatchUpMessage<Block: BlockT> {
pub(super) set_id: SetId,
pub(super) message: CatchUp<Block::Header>,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub(super) enum Misbehavior {
InvalidViewChange,
DuplicateNeighborMessage,
UndecodablePacket(i32),
BadCatchUpMessage { signatures_checked: i32 },
BadCommitMessage { signatures_checked: i32, blocks_loaded: i32, equivocations_caught: i32 },
FutureMessage,
OutOfScopeMessage,
}
impl Misbehavior {
pub(super) fn cost(&self) -> ReputationChange {
use Misbehavior::*;
match *self {
InvalidViewChange => cost::INVALID_VIEW_CHANGE,
DuplicateNeighborMessage => cost::DUPLICATE_NEIGHBOR_MESSAGE,
UndecodablePacket(bytes) => ReputationChange::new(
bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
"Grandpa: Bad packet",
),
BadCatchUpMessage { signatures_checked } => ReputationChange::new(
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked),
"Grandpa: Bad cath-up message",
),
BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => {
let cost = cost::PER_SIGNATURE_CHECKED
.saturating_mul(signatures_checked)
.saturating_add(cost::PER_BLOCK_LOADED.saturating_mul(blocks_loaded));
let benefit = equivocations_caught.saturating_mul(benefit::PER_EQUIVOCATION);
ReputationChange::new(
(benefit as i32).saturating_add(cost as i32),
"Grandpa: Bad commit",
)
},
FutureMessage => cost::FUTURE_MESSAGE,
OutOfScopeMessage => cost::OUT_OF_SCOPE_MESSAGE,
}
}
}
#[derive(Debug)]
struct PeerInfo<N> {
view: View<N>,
roles: ObservedRole,
}
impl<N> PeerInfo<N> {
fn new(roles: ObservedRole) -> Self {
PeerInfo { view: View::default(), roles }
}
}
struct Peers<N> {
inner: AHashMap<PeerId, PeerInfo<N>>,
first_stage_peers: AHashSet<PeerId>,
second_stage_peers: HashSet<PeerId>,
lucky_light_peers: HashSet<PeerId>,
neighbor_rebroadcast_period: Duration,
}
impl<N: Ord> Peers<N> {
fn new(neighbor_rebroadcast_period: Duration) -> Self {
Peers {
inner: Default::default(),
first_stage_peers: Default::default(),
second_stage_peers: Default::default(),
lucky_light_peers: Default::default(),
neighbor_rebroadcast_period,
}
}
fn new_peer(&mut self, who: PeerId, role: ObservedRole) {
match role {
ObservedRole::Authority if self.first_stage_peers.len() < LUCKY_PEERS => {
self.first_stage_peers.insert(who);
},
ObservedRole::Authority if self.second_stage_peers.len() < LUCKY_PEERS => {
self.second_stage_peers.insert(who);
},
ObservedRole::Light if self.lucky_light_peers.len() < LUCKY_PEERS => {
self.lucky_light_peers.insert(who);
},
_ => {},
}
self.inner.insert(who, PeerInfo::new(role));
}
fn peer_disconnected(&mut self, who: &PeerId) {
self.inner.remove(who);
self.first_stage_peers.remove(who);
self.second_stage_peers.remove(who);
self.lucky_light_peers.remove(who);
}
fn update_peer_state(
&mut self,
who: &PeerId,
update: NeighborPacket<N>,
) -> Result<Option<&View<N>>, Misbehavior> {
let peer = match self.inner.get_mut(who) {
None => return Ok(None),
Some(p) => p,
};
let invalid_change = peer.view.set_id > update.set_id ||
peer.view.round > update.round && peer.view.set_id == update.set_id ||
peer.view.last_commit.as_ref() > Some(&update.commit_finalized_height);
if invalid_change {
return Err(Misbehavior::InvalidViewChange)
}
let now = Instant::now();
let duplicate_packet = (update.set_id, update.round, Some(&update.commit_finalized_height)) ==
(peer.view.set_id, peer.view.round, peer.view.last_commit.as_ref());
if duplicate_packet {
if let Some(last_update) = peer.view.last_update {
if now < last_update + self.neighbor_rebroadcast_period / 2 {
return Err(Misbehavior::DuplicateNeighborMessage)
}
}
}
peer.view = View {
round: update.round,
set_id: update.set_id,
last_commit: Some(update.commit_finalized_height),
last_update: Some(now),
};
trace!(
target: LOG_TARGET,
"Peer {} updated view. Now at {:?}, {:?}",
who,
peer.view.round,
peer.view.set_id
);
Ok(Some(&peer.view))
}
fn update_commit_height(&mut self, who: &PeerId, new_height: N) -> Result<(), Misbehavior> {
let peer = match self.inner.get_mut(who) {
None => return Ok(()),
Some(p) => p,
};
if peer.view.last_commit.as_ref() > Some(&new_height) {
return Err(Misbehavior::InvalidViewChange)
}
peer.view.last_commit = Some(new_height);
Ok(())
}
fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> {
self.inner.get(who)
}
fn reshuffle(&mut self) {
let shuffled_peers = {
let mut peers =
self.inner.iter().map(|(peer_id, info)| (*peer_id, info)).collect::<Vec<_>>();
peers.shuffle(&mut rand::thread_rng());
peers
};
let shuffled_authorities = shuffled_peers.iter().filter_map(|(peer_id, info)| {
if matches!(info.roles, ObservedRole::Authority) {
Some(peer_id)
} else {
None
}
});
let mut first_stage_peers = AHashSet::new();
let mut second_stage_peers = HashSet::new();
let half_lucky = LUCKY_PEERS / 2;
let one_and_a_half_lucky = LUCKY_PEERS + half_lucky;
for (n_authorities_added, peer_id) in shuffled_authorities.enumerate() {
if n_authorities_added < half_lucky {
first_stage_peers.insert(*peer_id);
} else if n_authorities_added < one_and_a_half_lucky {
second_stage_peers.insert(*peer_id);
} else {
break
}
}
let n_second_stage_peers = LUCKY_PEERS.max((shuffled_peers.len() as f32).sqrt() as usize);
for (peer_id, info) in &shuffled_peers {
if info.roles.is_light() {
continue
}
if first_stage_peers.len() < LUCKY_PEERS {
first_stage_peers.insert(*peer_id);
second_stage_peers.remove(peer_id);
} else if second_stage_peers.len() < n_second_stage_peers {
if !first_stage_peers.contains(peer_id) {
second_stage_peers.insert(*peer_id);
}
} else {
break
}
}
let lucky_light_peers = shuffled_peers
.into_iter()
.filter_map(|(peer_id, info)| if info.roles.is_light() { Some(peer_id) } else { None })
.take(LUCKY_PEERS)
.collect();
self.first_stage_peers = first_stage_peers;
self.second_stage_peers = second_stage_peers;
self.lucky_light_peers = lucky_light_peers;
}
}
#[derive(Debug, PartialEq)]
pub(super) enum Action<H> {
Keep(H, ReputationChange),
ProcessAndDiscard(H, ReputationChange),
Discard(ReputationChange),
}
#[derive(Debug)]
enum PendingCatchUp {
None,
Requesting { who: PeerId, request: CatchUpRequestMessage, instant: Instant },
Processing { instant: Instant },
}
enum CatchUpConfig {
Enabled { only_from_authorities: bool },
Disabled,
}
impl CatchUpConfig {
fn enabled(only_from_authorities: bool) -> CatchUpConfig {
CatchUpConfig::Enabled { only_from_authorities }
}
fn disabled() -> CatchUpConfig {
CatchUpConfig::Disabled
}
fn request_allowed<N>(&self, peer: &PeerInfo<N>) -> bool {
match self {
CatchUpConfig::Disabled => false,
CatchUpConfig::Enabled { only_from_authorities, .. } => match peer.roles {
ObservedRole::Authority => true,
ObservedRole::Light => false,
ObservedRole::Full => !only_from_authorities,
},
}
}
}
struct Inner<Block: BlockT> {
local_view: Option<LocalView<NumberFor<Block>>>,
peers: Peers<NumberFor<Block>>,
live_topics: KeepTopics<Block>,
authorities: Vec<AuthorityId>,
config: crate::Config,
next_rebroadcast: Instant,
pending_catch_up: PendingCatchUp,
catch_up_config: CatchUpConfig,
}
type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)>;
impl<Block: BlockT> Inner<Block> {
fn new(config: crate::Config) -> Self {
let catch_up_config = if config.observer_enabled {
if config.local_role.is_authority() {
CatchUpConfig::enabled(true)
} else {
CatchUpConfig::disabled()
}
} else {
CatchUpConfig::enabled(false)
};
Inner {
local_view: None,
peers: Peers::new(NEIGHBOR_REBROADCAST_PERIOD),
live_topics: KeepTopics::new(),
next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
authorities: Vec::new(),
pending_catch_up: PendingCatchUp::None,
catch_up_config,
config,
}
}
fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
{
let local_view = match self.local_view {
None => return None,
Some(ref mut v) =>
if v.round == round {
return None
} else {
v
},
};
let set_id = local_view.set_id;
debug!(
target: LOG_TARGET,
"Voter {} noting beginning of round {:?} to network.",
self.config.name(),
(round, set_id)
);
local_view.update_round(round);
self.live_topics.push(round, set_id);
self.peers.reshuffle();
}
self.multicast_neighbor_packet()
}
fn note_set(&mut self, set_id: SetId, authorities: Vec<AuthorityId>) -> MaybeMessage<Block> {
{
let local_view = match self.local_view {
ref mut x @ None => x.get_or_insert(LocalView::new(set_id, Round(1))),
Some(ref mut v) =>
if v.set_id == set_id {
let diff_authorities = self.authorities.iter().collect::<HashSet<_>>() !=
authorities.iter().collect::<HashSet<_>>();
if diff_authorities {
debug!(target: LOG_TARGET,
"Gossip validator noted set {:?} twice with different authorities. \
Was the authority set hard forked?",
set_id,
);
self.authorities = authorities;
}
return None
} else {
v
},
};
local_view.update_set(set_id);
self.live_topics.push(Round(1), set_id);
self.authorities = authorities;
}
self.multicast_neighbor_packet()
}
fn note_commit_finalized(
&mut self,
round: Round,
set_id: SetId,
finalized: NumberFor<Block>,
) -> MaybeMessage<Block> {
{
match self.local_view {
None => return None,
Some(ref mut v) =>
if v.last_commit_height() < Some(&finalized) {
v.last_commit = Some((finalized, round, set_id));
} else {
return None
},
};
}
self.multicast_neighbor_packet()
}
fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
self.local_view
.as_ref()
.map(LocalView::as_view)
.map(|v| v.consider_vote(round, set_id))
.unwrap_or(Consider::RejectOutOfScope)
}
fn consider_global(&self, set_id: SetId, number: NumberFor<Block>) -> Consider {
self.local_view
.as_ref()
.map(LocalView::as_view)
.map(|v| v.consider_global(set_id, &number))
.unwrap_or(Consider::RejectOutOfScope)
}
fn cost_past_rejection(
&self,
_who: &PeerId,
_round: Round,
_set_id: SetId,
) -> ReputationChange {
cost::PAST_REJECTION
}
fn validate_round_message(
&self,
who: &PeerId,
full: &VoteMessage<Block>,
) -> Action<Block::Hash> {
match self.consider_vote(full.round, full.set_id) {
Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
Consider::RejectOutOfScope =>
return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
Consider::RejectPast =>
return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
Consider::Accept => {},
}
if !self.authorities.contains(&full.message.id) {
debug!(target: LOG_TARGET, "Message from unknown voter: {}", full.message.id);
telemetry!(
self.config.telemetry;
CONSENSUS_DEBUG;
"afg.bad_msg_signature";
"signature" => ?full.message.id,
);
return Action::Discard(cost::UNKNOWN_VOTER)
}
if !sp_finality_grandpa::check_message_signature(
&full.message.message,
&full.message.id,
&full.message.signature,
full.round.0,
full.set_id.0,
) {
debug!(target: LOG_TARGET, "Bad message signature {}", full.message.id);
telemetry!(
self.config.telemetry;
CONSENSUS_DEBUG;
"afg.bad_msg_signature";
"signature" => ?full.message.id,
);
return Action::Discard(cost::BAD_SIGNATURE)
}
let topic = super::round_topic::<Block>(full.round.0, full.set_id.0);
Action::Keep(topic, benefit::ROUND_MESSAGE)
}
fn validate_commit_message(
&mut self,
who: &PeerId,
full: &FullCommitMessage<Block>,
) -> Action<Block::Hash> {
if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) {
return Action::Discard(misbehavior.cost())
}
match self.consider_global(full.set_id, full.message.target_number) {
Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
Consider::RejectPast =>
return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
Consider::RejectOutOfScope =>
return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
Consider::Accept => {},
}
if full.message.precommits.len() != full.message.auth_data.len() ||
full.message.precommits.is_empty()
{
debug!(target: LOG_TARGET, "Malformed compact commit");
telemetry!(
self.config.telemetry;
CONSENSUS_DEBUG;
"afg.malformed_compact_commit";
"precommits_len" => ?full.message.precommits.len(),
"auth_data_len" => ?full.message.auth_data.len(),
"precommits_is_empty" => ?full.message.precommits.is_empty(),
);
return Action::Discard(cost::MALFORMED_COMMIT)
}
let topic = super::global_topic::<Block>(full.set_id.0);
Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT)
}
fn validate_catch_up_message(
&mut self,
who: &PeerId,
full: &FullCatchUpMessage<Block>,
) -> Action<Block::Hash> {
match &self.pending_catch_up {
PendingCatchUp::Requesting { who: peer, request, instant } => {
if peer != who {
return Action::Discard(Misbehavior::OutOfScopeMessage.cost())
}
if request.set_id != full.set_id {
return Action::Discard(cost::MALFORMED_CATCH_UP)
}
if request.round.0 > full.message.round_number {
return Action::Discard(cost::MALFORMED_CATCH_UP)
}
if full.message.prevotes.is_empty() || full.message.precommits.is_empty() {
return Action::Discard(cost::MALFORMED_CATCH_UP)
}
self.pending_catch_up = PendingCatchUp::Processing { instant: *instant };
let topic = super::global_topic::<Block>(full.set_id.0);
Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_CATCH_UP)
},
_ => Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
}
}
fn note_catch_up_message_processed(&mut self) {
match &self.pending_catch_up {
PendingCatchUp::Processing { .. } => {
self.pending_catch_up = PendingCatchUp::None;
},
state => debug!(
target: LOG_TARGET,
"Noted processed catch up message when state was: {:?}", state,
),
}
}
fn handle_catch_up_request(
&mut self,
who: &PeerId,
request: CatchUpRequestMessage,
set_state: &environment::SharedVoterSetState<Block>,
) -> (Option<GossipMessage<Block>>, Action<Block::Hash>) {
let local_view = match self.local_view {
None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
Some(ref view) => view,
};
if request.set_id != local_view.set_id {
if request.set_id.0.saturating_add(1) == local_view.set_id.0 &&
local_view.round.0.saturating_sub(CATCH_UP_THRESHOLD) == 0
{
return (None, Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP))
}
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
}
match self.peers.peer(who) {
None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
Some(peer) if peer.view.round >= request.round =>
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
_ => {},
}
let last_completed_round = set_state.read().last_completed_round();
if last_completed_round.number < request.round.0 {
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
}
trace!(
target: LOG_TARGET,
"Replying to catch-up request for round {} from {} with round {}",
request.round.0,
who,
last_completed_round.number,
);
let mut prevotes = Vec::new();
let mut precommits = Vec::new();
for vote in last_completed_round.votes {
match vote.message {
finality_grandpa::Message::Prevote(prevote) => {
prevotes.push(finality_grandpa::SignedPrevote {
prevote,
signature: vote.signature,
id: vote.id,
});
},
finality_grandpa::Message::Precommit(precommit) => {
precommits.push(finality_grandpa::SignedPrecommit {
precommit,
signature: vote.signature,
id: vote.id,
});
},
_ => {},
}
}
let (base_hash, base_number) = last_completed_round.base;
let catch_up = CatchUp::<Block::Header> {
round_number: last_completed_round.number,
prevotes,
precommits,
base_hash,
base_number,
};
let full_catch_up = GossipMessage::CatchUp::<Block>(FullCatchUpMessage {
set_id: request.set_id,
message: catch_up,
});
(Some(full_catch_up), Action::Discard(cost::CATCH_UP_REPLY))
}
fn try_catch_up(&mut self, who: &PeerId) -> (Option<GossipMessage<Block>>, Option<Report>) {
let mut catch_up = None;
let mut report = None;
if let (Some(peer), Some(local_view)) = (self.peers.peer(who), &self.local_view) {
if self.catch_up_config.request_allowed(peer) &&
peer.view.set_id == local_view.set_id &&
peer.view.round.0.saturating_sub(CATCH_UP_THRESHOLD) > local_view.round.0
{
let round = peer.view.round.0 - 1; let request =
CatchUpRequestMessage { set_id: peer.view.set_id, round: Round(round) };
let (catch_up_allowed, catch_up_report) = self.note_catch_up_request(who, &request);
if catch_up_allowed {
debug!(
target: LOG_TARGET,
"Sending catch-up request for round {} to {}", round, who,
);
catch_up = Some(GossipMessage::<Block>::CatchUpRequest(request));
}
report = catch_up_report;
}
}
(catch_up, report)
}
fn import_neighbor_message(
&mut self,
who: &PeerId,
update: NeighborPacket<NumberFor<Block>>,
) -> (Vec<Block::Hash>, Action<Block::Hash>, Option<GossipMessage<Block>>, Option<Report>) {
let update_res = self.peers.update_peer_state(who, update);
let (cost_benefit, topics) = match update_res {
Ok(view) =>
(benefit::NEIGHBOR_MESSAGE, view.map(|view| neighbor_topics::<Block>(view))),
Err(misbehavior) => (misbehavior.cost(), None),
};
let (catch_up, report) = match update_res {
Ok(_) => self.try_catch_up(who),
_ => (None, None),
};
let neighbor_topics = topics.unwrap_or_default();
let action = Action::Discard(cost_benefit);
(neighbor_topics, action, catch_up, report)
}
fn multicast_neighbor_packet(&self) -> MaybeMessage<Block> {
self.local_view.as_ref().map(|local_view| {
let packet = NeighborPacket {
round: local_view.round,
set_id: local_view.set_id,
commit_finalized_height: *local_view.last_commit_height().unwrap_or(&Zero::zero()),
};
let peers = self
.peers
.inner
.iter()
.filter_map(|(id, info)| {
if info.roles.is_light() {
None
} else {
Some(id)
}
})
.cloned()
.collect();
(peers, packet)
})
}
fn note_catch_up_request(
&mut self,
who: &PeerId,
catch_up_request: &CatchUpRequestMessage,
) -> (bool, Option<Report>) {
let report = match &self.pending_catch_up {
PendingCatchUp::Requesting { who: peer, instant, .. } => {
if instant.elapsed() <= CATCH_UP_REQUEST_TIMEOUT {
return (false, None)
} else {
Some((*peer, cost::CATCH_UP_REQUEST_TIMEOUT))
}
},
PendingCatchUp::Processing { instant, .. } => {
if instant.elapsed() < CATCH_UP_PROCESS_TIMEOUT {
return (false, None)
} else {
None
}
},
_ => None,
};
self.pending_catch_up = PendingCatchUp::Requesting {
who: *who,
request: catch_up_request.clone(),
instant: Instant::now(),
};
(true, report)
}
fn round_message_allowed(&self, who: &PeerId) -> bool {
let round_duration = self.config.gossip_duration * ROUND_DURATION;
let round_elapsed = match self.local_view {
Some(ref local_view) => local_view.round_start.elapsed(),
None => return false,
};
if round_elapsed < round_duration.mul_f32(PROPAGATION_SOME) {
self.peers.first_stage_peers.contains(who)
} else if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) {
self.peers.first_stage_peers.contains(who) ||
self.peers.second_stage_peers.contains(who)
} else {
self.peers.peer(who).map(|info| !info.roles.is_light()).unwrap_or(false)
}
}
fn global_message_allowed(&self, who: &PeerId) -> bool {
let round_duration = self.config.gossip_duration * ROUND_DURATION;
let round_elapsed = match self.local_view {
Some(ref local_view) => local_view.round_start.elapsed(),
None => return false,
};
if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) {
self.peers.first_stage_peers.contains(who) ||
self.peers.second_stage_peers.contains(who) ||
self.peers.lucky_light_peers.contains(who)
} else {
true
}
}
}
pub(crate) struct Metrics {
messages_validated: CounterVec<U64>,
}
impl Metrics {
pub(crate) fn register(
registry: &prometheus_endpoint::Registry,
) -> Result<Self, PrometheusError> {
Ok(Self {
messages_validated: register(
CounterVec::new(
Opts::new(
"substrate_finality_grandpa_communication_gossip_validator_messages",
"Number of messages validated by the finality grandpa gossip validator.",
),
&["message", "action"],
)?,
registry,
)?,
})
}
}
pub(super) struct GossipValidator<Block: BlockT> {
inner: parking_lot::RwLock<Inner<Block>>,
set_state: environment::SharedVoterSetState<Block>,
report_sender: TracingUnboundedSender<PeerReport>,
metrics: Option<Metrics>,
telemetry: Option<TelemetryHandle>,
}
impl<Block: BlockT> GossipValidator<Block> {
pub(super) fn new(
config: crate::Config,
set_state: environment::SharedVoterSetState<Block>,
prometheus_registry: Option<&Registry>,
telemetry: Option<TelemetryHandle>,
) -> (GossipValidator<Block>, TracingUnboundedReceiver<PeerReport>) {
let metrics = match prometheus_registry.map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
None
},
None => None,
};
let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000);
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
report_sender: tx,
metrics,
telemetry,
};
(val, rx)
}
pub(super) fn note_round<F>(&self, round: Round, send_neighbor: F)
where
F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
{
let maybe_msg = self.inner.write().note_round(round);
if let Some((to, msg)) = maybe_msg {
send_neighbor(to, msg);
}
}
pub(super) fn note_set<F>(&self, set_id: SetId, authorities: Vec<AuthorityId>, send_neighbor: F)
where
F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
{
let maybe_msg = self.inner.write().note_set(set_id, authorities);
if let Some((to, msg)) = maybe_msg {
send_neighbor(to, msg);
}
}
pub(super) fn note_commit_finalized<F>(
&self,
round: Round,
set_id: SetId,
finalized: NumberFor<Block>,
send_neighbor: F,
) where
F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
{
let maybe_msg = self.inner.write().note_commit_finalized(round, set_id, finalized);
if let Some((to, msg)) = maybe_msg {
send_neighbor(to, msg);
}
}
pub(super) fn note_catch_up_message_processed(&self) {
self.inner.write().note_catch_up_message_processed();
}
fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
}
pub(super) fn do_validate(
&self,
who: &PeerId,
mut data: &[u8],
) -> (Action<Block::Hash>, Vec<Block::Hash>, Option<GossipMessage<Block>>) {
let mut broadcast_topics = Vec::new();
let mut peer_reply = None;
let message_name;
let action = {
match GossipMessage::<Block>::decode(&mut data) {
Ok(GossipMessage::Vote(ref message)) => {
message_name = Some("vote");
self.inner.write().validate_round_message(who, message)
},
Ok(GossipMessage::Commit(ref message)) => {
message_name = Some("commit");
self.inner.write().validate_commit_message(who, message)
},
Ok(GossipMessage::Neighbor(update)) => {
message_name = Some("neighbor");
let (topics, action, catch_up, report) = self
.inner
.write()
.import_neighbor_message(who, update.into_neighbor_packet());
if let Some((peer, cost_benefit)) = report {
self.report(peer, cost_benefit);
}
broadcast_topics = topics;
peer_reply = catch_up;
action
},
Ok(GossipMessage::CatchUp(ref message)) => {
message_name = Some("catch_up");
self.inner.write().validate_catch_up_message(who, message)
},
Ok(GossipMessage::CatchUpRequest(request)) => {
message_name = Some("catch_up_request");
let (reply, action) =
self.inner.write().handle_catch_up_request(who, request, &self.set_state);
peer_reply = reply;
action
},
Err(e) => {
message_name = None;
debug!(target: LOG_TARGET, "Error decoding message: {}", e);
telemetry!(
self.telemetry;
CONSENSUS_DEBUG;
"afg.err_decoding_msg";
"" => "",
);
let len = std::cmp::min(i32::MAX as usize, data.len()) as i32;
Action::Discard(Misbehavior::UndecodablePacket(len).cost())
},
}
};
if let (Some(metrics), Some(message_name)) = (&self.metrics, message_name) {
let action_name = match action {
Action::Keep(_, _) => "keep",
Action::ProcessAndDiscard(_, _) => "process_and_discard",
Action::Discard(_) => "discard",
};
metrics.messages_validated.with_label_values(&[message_name, action_name]).inc();
}
(action, broadcast_topics, peer_reply)
}
#[cfg(test)]
fn inner(&self) -> &parking_lot::RwLock<Inner<Block>> {
&self.inner
}
}
impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Block> {
fn new_peer(
&self,
context: &mut dyn ValidatorContext<Block>,
who: &PeerId,
roles: ObservedRole,
) {
let packet = {
let mut inner = self.inner.write();
inner.peers.new_peer(*who, roles);
inner.local_view.as_ref().map(|v| NeighborPacket {
round: v.round,
set_id: v.set_id,
commit_finalized_height: *v.last_commit_height().unwrap_or(&Zero::zero()),
})
};
if let Some(packet) = packet {
let packet_data = GossipMessage::<Block>::from(packet).encode();
context.send_message(who, packet_data);
}
}
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
self.inner.write().peers.peer_disconnected(who);
}
fn validate(
&self,
context: &mut dyn ValidatorContext<Block>,
who: &PeerId,
data: &[u8],
) -> sc_network_gossip::ValidationResult<Block::Hash> {
let (action, broadcast_topics, peer_reply) = self.do_validate(who, data);
if let Some(msg) = peer_reply {
context.send_message(who, msg.encode());
}
for topic in broadcast_topics {
context.send_topic(who, topic, false);
}
match action {
Action::Keep(topic, cb) => {
self.report(*who, cb);
context.broadcast_message(topic, data.to_vec(), false);
sc_network_gossip::ValidationResult::ProcessAndKeep(topic)
},
Action::ProcessAndDiscard(topic, cb) => {
self.report(*who, cb);
sc_network_gossip::ValidationResult::ProcessAndDiscard(topic)
},
Action::Discard(cb) => {
self.report(*who, cb);
sc_network_gossip::ValidationResult::Discard
},
}
}
fn message_allowed<'a>(
&'a self,
) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
let (inner, do_rebroadcast) = {
use parking_lot::RwLockWriteGuard;
let mut inner = self.inner.write();
let now = Instant::now();
let do_rebroadcast = if now >= inner.next_rebroadcast {
inner.next_rebroadcast = now + REBROADCAST_AFTER;
true
} else {
false
};
(RwLockWriteGuard::downgrade(inner), do_rebroadcast)
};
Box::new(move |who, intent, topic, mut data| {
if let MessageIntent::PeriodicRebroadcast = intent {
return do_rebroadcast
}
let peer = match inner.peers.peer(who) {
None => return false,
Some(x) => x,
};
let (maybe_round, set_id) = match inner.live_topics.topic_info(topic) {
None => return false,
Some(x) => x,
};
if let MessageIntent::Broadcast = intent {
if maybe_round.is_some() {
if !inner.round_message_allowed(who) {
return false
}
} else if !inner.global_message_allowed(who) {
return false
}
}
if let Some(round) = maybe_round {
return peer.view.consider_vote(round, set_id) == Consider::Accept
}
let local_view = match inner.local_view {
Some(ref v) => v,
None => return false, };
match GossipMessage::<Block>::decode(&mut data) {
Err(_) => false,
Ok(GossipMessage::Commit(full)) => {
peer.view.consider_global(set_id, full.message.target_number) ==
Consider::Accept && Some(&full.message.target_number) ==
local_view.last_commit_height()
},
Ok(GossipMessage::Neighbor(_)) => false,
Ok(GossipMessage::CatchUpRequest(_)) => false,
Ok(GossipMessage::CatchUp(_)) => false,
Ok(GossipMessage::Vote(_)) => false, }
})
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
let inner = self.inner.read();
Box::new(move |topic, mut data| {
match inner.live_topics.topic_info(&topic) {
None => return true,
Some((Some(_), _)) => return false,
Some((None, _)) => {},
};
let local_view = match inner.local_view {
Some(ref v) => v,
None => return true, };
match GossipMessage::<Block>::decode(&mut data) {
Err(_) => true,
Ok(GossipMessage::Commit(full)) => match local_view.last_commit {
Some((number, round, set_id)) =>
!(full.message.target_number == number &&
full.round == round && full.set_id == set_id),
None => true,
},
Ok(_) => true,
}
})
}
}
pub(super) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}
#[cfg(test)]
mod tests {
use super::{super::NEIGHBOR_REBROADCAST_PERIOD, environment::SharedVoterSetState, *};
use crate::communication;
use sc_network::config::Role;
use sc_network_gossip::Validator as GossipValidatorT;
use sp_core::{crypto::UncheckedFrom, H256};
use std::time::Instant;
use substrate_test_runtime_client::runtime::{Block, Header};
fn config() -> crate::Config {
crate::Config {
gossip_duration: Duration::from_millis(10),
justification_period: 256,
keystore: None,
name: None,
local_role: Role::Authority,
observer_enabled: true,
telemetry: None,
protocol_name: communication::grandpa_protocol_name::NAME.into(),
}
}
fn voter_set_state() -> SharedVoterSetState<Block> {
use crate::{authorities::AuthoritySet, environment::VoterSetState};
let base = (H256::zero(), 0);
let voters = vec![(AuthorityId::unchecked_from([1; 32]), 1)];
let voters = AuthoritySet::genesis(voters).unwrap();
let set_state = VoterSetState::live(0, &voters, base);
set_state.into()
}
#[test]
fn view_vote_rules() {
let view = View {
round: Round(100),
set_id: SetId(1),
last_commit: Some(1000u64),
last_update: None,
};
assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast);
assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast);
assert_eq!(view.consider_vote(Round(1000), SetId(0)), Consider::RejectPast);
assert_eq!(view.consider_vote(Round(99), SetId(1)), Consider::Accept);
assert_eq!(view.consider_vote(Round(100), SetId(1)), Consider::Accept);
assert_eq!(view.consider_vote(Round(101), SetId(1)), Consider::Accept);
assert_eq!(view.consider_vote(Round(102), SetId(1)), Consider::RejectFuture);
assert_eq!(view.consider_vote(Round(1), SetId(2)), Consider::RejectFuture);
assert_eq!(view.consider_vote(Round(1000), SetId(2)), Consider::RejectFuture);
}
#[test]
fn view_global_message_rules() {
let view = View {
round: Round(100),
set_id: SetId(2),
last_commit: Some(1000u64),
last_update: None,
};
assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture);
assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture);
assert_eq!(view.consider_global(SetId(3), 10000), Consider::RejectFuture);
assert_eq!(view.consider_global(SetId(1), 1), Consider::RejectPast);
assert_eq!(view.consider_global(SetId(1), 1000), Consider::RejectPast);
assert_eq!(view.consider_global(SetId(1), 10000), Consider::RejectPast);
assert_eq!(view.consider_global(SetId(2), 1), Consider::RejectPast);
assert_eq!(view.consider_global(SetId(2), 1000), Consider::RejectPast);
assert_eq!(view.consider_global(SetId(2), 1001), Consider::Accept);
assert_eq!(view.consider_global(SetId(2), 10000), Consider::Accept);
}
#[test]
fn unknown_peer_cannot_be_updated() {
let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
let id = PeerId::random();
let update =
NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50 };
let res = peers.update_peer_state(&id, update.clone());
assert!(res.unwrap().is_none());
peers.new_peer(id, ObservedRole::Authority);
peers.peer_disconnected(&id);
let res = peers.update_peer_state(&id, update.clone());
assert!(res.unwrap().is_none());
}
#[test]
fn update_peer_state() {
let update1 =
NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50u32 };
let update2 =
NeighborPacket { round: Round(6), set_id: SetId(10), commit_finalized_height: 60 };
let update3 =
NeighborPacket { round: Round(2), set_id: SetId(11), commit_finalized_height: 61 };
let update4 =
NeighborPacket { round: Round(3), set_id: SetId(11), commit_finalized_height: 80 };
const SHORT_NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(1);
let mut peers = Peers::new(SHORT_NEIGHBOR_REBROADCAST_PERIOD);
let id = PeerId::random();
peers.new_peer(id, ObservedRole::Authority);
let check_update = |peers: &mut Peers<_>, update: NeighborPacket<_>| {
let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap();
assert_eq!(view.round, update.round);
assert_eq!(view.set_id, update.set_id);
assert_eq!(view.last_commit, Some(update.commit_finalized_height));
};
check_update(&mut peers, update1);
check_update(&mut peers, update2);
check_update(&mut peers, update3);
check_update(&mut peers, update4.clone());
peers.inner.get_mut(&id).unwrap().view.last_update =
Some(Instant::now() - SHORT_NEIGHBOR_REBROADCAST_PERIOD);
check_update(&mut peers, update4);
}
#[test]
fn invalid_view_change() {
let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
let id = PeerId::random();
peers.new_peer(id, ObservedRole::Authority);
peers
.update_peer_state(
&id,
NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
)
.unwrap()
.unwrap();
let mut check_update = move |update: NeighborPacket<_>, misbehavior| {
let err = peers.update_peer_state(&id, update.clone()).unwrap_err();
assert_eq!(err, misbehavior);
};
check_update(
NeighborPacket { round: Round(9), set_id: SetId(10), commit_finalized_height: 10 },
Misbehavior::InvalidViewChange,
);
check_update(
NeighborPacket { round: Round(10), set_id: SetId(9), commit_finalized_height: 10 },
Misbehavior::InvalidViewChange,
);
check_update(
NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 9 },
Misbehavior::InvalidViewChange,
);
check_update(
NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
Misbehavior::DuplicateNeighborMessage,
);
check_update(
NeighborPacket { round: Round(11), set_id: SetId(10), commit_finalized_height: 9 },
Misbehavior::InvalidViewChange,
);
check_update(
NeighborPacket { round: Round(10), set_id: SetId(11), commit_finalized_height: 9 },
Misbehavior::InvalidViewChange,
);
}
#[test]
fn messages_not_expired_immediately() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
let set_id = 1;
val.note_set(SetId(set_id), Vec::new(), |_, _| {});
for round_num in 1u64..10 {
val.note_round(Round(round_num), |_, _| {});
}
{
let mut is_expired = val.message_expired();
let last_kept_round = 10u64 - KEEP_RECENT_ROUNDS as u64 - 1;
for round_num in 1u64..last_kept_round {
let topic = communication::round_topic::<Block>(round_num, 1);
assert!(is_expired(topic, &[1, 2, 3]));
}
for round_num in last_kept_round..10 {
let topic = communication::round_topic::<Block>(round_num, 1);
assert!(!is_expired(topic, &[1, 2, 3]));
}
}
}
#[test]
fn message_from_unknown_authority_discarded() {
assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE);
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
let set_id = 1;
let auth = AuthorityId::unchecked_from([1u8; 32]);
let peer = PeerId::random();
val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
val.note_round(Round(1), |_, _| {});
let inner = val.inner.read();
let unknown_voter = inner.validate_round_message(
&peer,
&VoteMessage {
round: Round(1),
set_id: SetId(set_id),
message: SignedMessage::<Header> {
message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote {
target_hash: Default::default(),
target_number: 10,
}),
signature: UncheckedFrom::unchecked_from([1; 64]),
id: UncheckedFrom::unchecked_from([2u8; 32]),
},
},
);
let bad_sig = inner.validate_round_message(
&peer,
&VoteMessage {
round: Round(1),
set_id: SetId(set_id),
message: SignedMessage::<Header> {
message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote {
target_hash: Default::default(),
target_number: 10,
}),
signature: UncheckedFrom::unchecked_from([1; 64]),
id: auth.clone(),
},
},
);
assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER));
assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE));
}
#[test]
fn unsolicited_catch_up_messages_discarded() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
let set_id = 1;
let auth = AuthorityId::unchecked_from([1u8; 32]);
let peer = PeerId::random();
val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
val.note_round(Round(1), |_, _| {});
let validate_catch_up = || {
let mut inner = val.inner.write();
inner.validate_catch_up_message(
&peer,
&FullCatchUpMessage {
set_id: SetId(set_id),
message: finality_grandpa::CatchUp {
round_number: 10,
prevotes: Default::default(),
precommits: Default::default(),
base_hash: Default::default(),
base_number: Default::default(),
},
},
)
};
assert_eq!(validate_catch_up(), Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
let noted = val.inner.write().note_catch_up_request(
&peer,
&CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) },
);
assert!(noted.0);
assert_eq!(validate_catch_up(), Action::Discard(cost::MALFORMED_CATCH_UP));
}
#[test]
fn unanswerable_catch_up_requests_discarded() {
let set_state: SharedVoterSetState<Block> = {
let mut completed_rounds = voter_set_state().read().completed_rounds();
completed_rounds.push(environment::CompletedRound {
number: 2,
state: finality_grandpa::round::State::genesis(Default::default()),
base: Default::default(),
votes: Default::default(),
});
let mut current_rounds = environment::CurrentRounds::<Block>::new();
current_rounds.insert(3, environment::HasVoted::No);
let set_state =
environment::VoterSetState::<Block>::Live { completed_rounds, current_rounds };
set_state.into()
};
let (val, _) = GossipValidator::<Block>::new(config(), set_state.clone(), None, None);
let set_id = 1;
let auth = AuthorityId::unchecked_from([1u8; 32]);
let peer = PeerId::random();
val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
val.note_round(Round(3), |_, _| {});
let mut inner = val.inner.write();
inner.peers.new_peer(peer, ObservedRole::Authority);
let res = inner.handle_catch_up_request(
&peer,
CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) },
&set_state,
);
assert!(res.0.is_none());
assert_eq!(res.1, Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
let res = inner.handle_catch_up_request(
&peer,
CatchUpRequestMessage { set_id: SetId(set_id), round: Round(2) },
&set_state,
);
match res.0.unwrap() {
GossipMessage::CatchUp(catch_up) => {
assert_eq!(catch_up.set_id, SetId(set_id));
assert_eq!(catch_up.message.round_number, 2);
assert_eq!(res.1, Action::Discard(cost::CATCH_UP_REPLY));
},
_ => panic!("expected catch up message"),
};
}
#[test]
fn detects_honest_out_of_scope_catch_requests() {
let set_state = voter_set_state();
let (val, _) = GossipValidator::<Block>::new(config(), set_state.clone(), None, None);
val.note_set(SetId(2), Vec::new(), |_, _| {});
let peer = PeerId::random();
val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
let send_request = |set_id, round| {
let mut inner = val.inner.write();
inner.handle_catch_up_request(
&peer,
CatchUpRequestMessage { set_id: SetId(set_id), round: Round(round) },
&set_state,
)
};
let assert_res = |res: (Option<_>, Action<_>), honest| {
assert!(res.0.is_none());
assert_eq!(
res.1,
if honest {
Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP)
} else {
Action::Discard(Misbehavior::OutOfScopeMessage.cost())
},
);
};
assert_res(send_request(1, 1), true);
assert_res(send_request(1, 10), true);
assert_res(send_request(0, 1), false);
assert_res(send_request(0, 10), false);
val.note_round(Round(3), |_, _| {});
assert_res(send_request(1, 1), false);
assert_res(send_request(1, 2), false);
}
#[test]
fn issues_catch_up_request_on_neighbor_packet_import() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
val.note_set(SetId(1), Vec::new(), |_, _| {});
let peer = PeerId::random();
val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
let import_neighbor_message = |set_id, round| {
let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
&peer,
NeighborPacket {
round: Round(round),
set_id: SetId(set_id),
commit_finalized_height: 42,
},
);
catch_up_request
};
match import_neighbor_message(1, 42) {
Some(GossipMessage::CatchUpRequest(request)) => {
assert_eq!(request.set_id, SetId(1));
assert_eq!(request.round, Round(41));
},
_ => panic!("expected catch up message"),
}
val.note_round(Round(41), |_, _| {});
match import_neighbor_message(1, 42) {
None => {},
_ => panic!("expected no catch up message"),
}
match import_neighbor_message(1, 40) {
None => {},
_ => panic!("expected no catch up message"),
}
match import_neighbor_message(2, 42) {
None => {},
_ => panic!("expected no catch up message"),
}
}
#[test]
fn doesnt_send_catch_up_requests_when_disabled() {
let config = {
let mut c = config();
c.local_role = Role::Full;
c.observer_enabled = true;
c
};
let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
val.note_set(SetId(1), Vec::new(), |_, _| {});
let peer = PeerId::random();
val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
&peer,
NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
);
match catch_up_request {
None => {},
_ => panic!("expected no catch up message"),
}
}
#[test]
fn doesnt_send_catch_up_requests_to_non_authorities_when_observer_enabled() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
val.note_set(SetId(1), Vec::new(), |_, _| {});
let peer_authority = PeerId::random();
let peer_full = PeerId::random();
val.inner.write().peers.new_peer(peer_authority, ObservedRole::Authority);
val.inner.write().peers.new_peer(peer_full, ObservedRole::Full);
let import_neighbor_message = |peer| {
let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
&peer,
NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
);
catch_up_request
};
if import_neighbor_message(peer_full).is_some() {
panic!("expected no catch up message");
}
match import_neighbor_message(peer_authority) {
Some(GossipMessage::CatchUpRequest(request)) => {
assert_eq!(request.set_id, SetId(1));
assert_eq!(request.round, Round(41));
},
_ => panic!("expected catch up message"),
}
}
#[test]
fn sends_catch_up_requests_to_non_authorities_when_observer_disabled() {
let config = {
let mut c = config();
c.observer_enabled = false;
c
};
let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
val.note_set(SetId(1), Vec::new(), |_, _| {});
let peer_full = PeerId::random();
val.inner.write().peers.new_peer(peer_full, ObservedRole::Full);
let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
&peer_full,
NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
);
match catch_up_request {
Some(GossipMessage::CatchUpRequest(request)) => {
assert_eq!(request.set_id, SetId(1));
assert_eq!(request.round, Round(41));
},
_ => panic!("expected catch up message"),
}
}
#[test]
fn doesnt_expire_next_round_messages() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
val.note_set(SetId(1), Vec::new(), |_, _| {});
val.note_round(Round(9), |_, _| {});
val.note_round(Round(10), |_, _| {});
let mut is_expired = val.message_expired();
for round in &[9, 10, 11] {
assert!(!is_expired(communication::round_topic::<Block>(*round, 1), &[]))
}
}
#[test]
fn progressively_gossips_to_more_peers_as_round_duration_increases() {
let mut config = config();
config.gossip_duration = Duration::from_secs(300); let round_duration = config.gossip_duration * ROUND_DURATION;
let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
val.note_set(SetId(0), Vec::new(), |_, _| {});
let mut authorities = Vec::new();
authorities.resize_with(30, || PeerId::random());
let mut full_nodes = Vec::new();
full_nodes.resize_with(30, || PeerId::random());
for i in 0..30 {
val.inner.write().peers.new_peer(authorities[i], ObservedRole::Authority);
val.inner.write().peers.new_peer(full_nodes[i], ObservedRole::Full);
}
let test = |rounds_elapsed, peers| {
val.inner.write().local_view.as_mut().unwrap().round_start = Instant::now() -
Duration::from_millis(
(round_duration.as_millis() as f32 * rounds_elapsed) as u64,
);
val.inner.write().peers.reshuffle();
let mut message_allowed = val.message_allowed();
move || {
let mut allowed = 0;
for peer in peers {
if message_allowed(
peer,
MessageIntent::Broadcast,
&communication::round_topic::<Block>(1, 0),
&[],
) {
allowed += 1;
}
}
allowed
}
};
fn trial<F: FnMut() -> usize>(mut test: F) -> usize {
let mut results = Vec::new();
let n = 1000;
for _ in 0..n {
results.push(test());
}
let n = results.len();
let sum: usize = results.iter().sum();
sum / n
}
let all_peers = authorities.iter().chain(full_nodes.iter()).cloned().collect();
assert!(trial(test(1.0, &authorities)) >= LUCKY_PEERS / 2);
assert_eq!(trial(test(1.0, &all_peers)), LUCKY_PEERS);
assert!(trial(test(PROPAGATION_SOME * 1.1, &authorities)) >= LUCKY_PEERS);
assert_eq!(
trial(test(2.0, &all_peers)),
LUCKY_PEERS + (all_peers.len() as f64).sqrt() as usize,
);
assert_eq!(trial(test(PROPAGATION_ALL * 1.1, &all_peers)), all_peers.len());
}
#[test]
fn never_gossips_round_messages_to_light_clients() {
let config = config();
let round_duration = config.gossip_duration * ROUND_DURATION;
let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
val.note_set(SetId(0), Vec::new(), |_, _| {});
let light_peer = PeerId::random();
val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
assert!(!val.message_allowed()(
&light_peer,
MessageIntent::Broadcast,
&communication::round_topic::<Block>(1, 0),
&[],
));
val.inner.write().local_view.as_mut().unwrap().round_start =
Instant::now() - round_duration * 10;
assert!(!val.message_allowed()(
&light_peer,
MessageIntent::Broadcast,
&communication::round_topic::<Block>(1, 0),
&[],
));
val.inner
.write()
.peers
.update_peer_state(
&light_peer,
NeighborPacket { round: Round(1), set_id: SetId(0), commit_finalized_height: 1 },
)
.unwrap();
val.note_commit_finalized(Round(1), SetId(0), 2, |_, _| {});
let commit = {
let commit = finality_grandpa::CompactCommit {
target_hash: H256::random(),
target_number: 2,
precommits: Vec::new(),
auth_data: Vec::new(),
};
communication::gossip::GossipMessage::<Block>::Commit(
communication::gossip::FullCommitMessage {
round: Round(2),
set_id: SetId(0),
message: commit,
},
)
.encode()
};
assert!(val.message_allowed()(
&light_peer,
MessageIntent::Broadcast,
&communication::global_topic::<Block>(0),
&commit,
));
}
#[test]
fn only_gossip_commits_to_peers_on_same_set() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
val.note_set(SetId(1), Vec::new(), |_, _| {});
let peer1 = PeerId::random();
val.inner.write().peers.new_peer(peer1, ObservedRole::Authority);
val.inner
.write()
.peers
.update_peer_state(
&peer1,
NeighborPacket { round: Round(1), set_id: SetId(1), commit_finalized_height: 1 },
)
.unwrap();
let peer2 = PeerId::random();
val.inner.write().peers.new_peer(peer2, ObservedRole::Authority);
let commit = {
let commit = finality_grandpa::CompactCommit {
target_hash: H256::random(),
target_number: 2,
precommits: Vec::new(),
auth_data: Vec::new(),
};
communication::gossip::GossipMessage::<Block>::Commit(
communication::gossip::FullCommitMessage {
round: Round(1),
set_id: SetId(1),
message: commit,
},
)
.encode()
};
val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {});
let mut message_allowed = val.message_allowed();
assert!(message_allowed(
&peer1,
MessageIntent::Broadcast,
&communication::global_topic::<Block>(1),
&commit,
));
assert!(!message_allowed(
&peer2,
MessageIntent::Broadcast,
&communication::global_topic::<Block>(1),
&commit,
));
}
#[test]
fn expire_commits_from_older_rounds() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
let commit = |round, set_id, target_number| {
let commit = finality_grandpa::CompactCommit {
target_hash: H256::random(),
target_number,
precommits: Vec::new(),
auth_data: Vec::new(),
};
communication::gossip::GossipMessage::<Block>::Commit(
communication::gossip::FullCommitMessage {
round: Round(round),
set_id: SetId(set_id),
message: commit,
},
)
.encode()
};
val.note_set(SetId(1), Vec::new(), |_, _| {});
val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {});
let mut message_expired = val.message_expired();
assert!(!message_expired(communication::global_topic::<Block>(1), &commit(1, 1, 2),));
assert!(message_expired(communication::global_topic::<Block>(1), &commit(1, 1, 1)));
assert!(message_expired(communication::global_topic::<Block>(1), &commit(0, 1, 2)));
}
#[test]
fn allow_noting_different_authorities_for_same_set() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
let a1 = vec![UncheckedFrom::unchecked_from([0; 32])];
val.note_set(SetId(1), a1.clone(), |_, _| {});
assert_eq!(val.inner().read().authorities, a1);
let a2 =
vec![UncheckedFrom::unchecked_from([1; 32]), UncheckedFrom::unchecked_from([2; 32])];
val.note_set(SetId(1), a2.clone(), |_, _| {});
assert_eq!(val.inner().read().authorities, a2);
}
}