use std::{
marker::{PhantomData, Unpin},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use finality_grandpa::{voter, voter_set::VoterSet, BlockNumberOps, Error as GrandpaError};
use futures::prelude::*;
use log::{debug, info, warn};
use sc_client_api::backend::Backend;
use sc_telemetry::TelemetryHandle;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain::HeaderMetadata;
use sp_consensus::SelectChain;
use sp_finality_grandpa::AuthorityId;
use sp_keystore::SyncCryptoStorePtr;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use crate::{
authorities::SharedAuthoritySet,
aux_schema::PersistentData,
communication::{Network as NetworkT, NetworkBridge},
environment, global_communication,
notification::GrandpaJustificationSender,
ClientForGrandpa, CommandOrError, CommunicationIn, Config, Error, LinkHalf, VoterCommand,
VoterSetState, LOG_TARGET,
};
struct ObserverChain<'a, Block: BlockT, Client> {
client: &'a Arc<Client>,
_phantom: PhantomData<Block>,
}
impl<'a, Block, Client> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
for ObserverChain<'a, Block, Client>
where
Block: BlockT,
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
NumberFor<Block>: BlockNumberOps,
{
fn ancestry(
&self,
base: Block::Hash,
block: Block::Hash,
) -> Result<Vec<Block::Hash>, GrandpaError> {
environment::ancestry(self.client, base, block)
}
}
fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
client: &Arc<Client>,
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
voters: &Arc<VoterSet<AuthorityId>>,
justification_sender: &Option<GrandpaJustificationSender<Block>>,
last_finalized_number: NumberFor<Block>,
commits: S,
note_round: F,
telemetry: Option<TelemetryHandle>,
) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
where
NumberFor<Block>: BlockNumberOps,
S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
F: Fn(u64),
BE: Backend<Block>,
Client: ClientForGrandpa<Block, BE>,
{
let authority_set = authority_set.clone();
let client = client.clone();
let voters = voters.clone();
let justification_sender = justification_sender.clone();
let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| {
let (round, commit, callback) = match global {
voter::CommunicationIn::Commit(round, commit, callback) => {
let commit = finality_grandpa::Commit::from(commit);
(round, commit, callback)
},
voter::CommunicationIn::CatchUp(..) => {
return future::ok(last_finalized_number)
},
};
if commit.target_number <= last_finalized_number {
return future::ok(last_finalized_number)
}
let validation_result = match finality_grandpa::validate_commit(
&commit,
&voters,
&ObserverChain { client: &client, _phantom: PhantomData },
) {
Ok(r) => r,
Err(e) => return future::err(e.into()),
};
if validation_result.is_valid() {
let finalized_hash = commit.target_hash;
let finalized_number = commit.target_number;
match environment::finalize_block(
client.clone(),
&authority_set,
None,
finalized_hash,
finalized_number,
(round, commit).into(),
false,
justification_sender.as_ref(),
telemetry.clone(),
) {
Ok(_) => {},
Err(e) => return future::err(e),
};
note_round(round + 1);
finality_grandpa::process_commit_validation_result(validation_result, callback);
future::ok(finalized_number)
} else {
debug!(target: LOG_TARGET, "Received invalid commit: ({:?}, {:?})", round, commit);
finality_grandpa::process_commit_validation_result(validation_result, callback);
future::ok(last_finalized_number)
}
});
observer.map_ok(|_| ())
}
pub fn run_grandpa_observer<BE, Block: BlockT, Client, N, SC>(
config: Config,
link: LinkHalf<Block, Client, SC>,
network: N,
) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
where
BE: Backend<Block> + Unpin + 'static,
N: NetworkT<Block>,
SC: SelectChain<Block>,
NumberFor<Block>: BlockNumberOps,
Client: ClientForGrandpa<Block, BE> + 'static,
{
let LinkHalf {
client,
persistent_data,
voter_commands_rx,
justification_sender,
telemetry,
..
} = link;
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
None,
telemetry.clone(),
);
let observer_work = ObserverWork::new(
client,
network,
persistent_data,
config.keystore,
voter_commands_rx,
Some(justification_sender),
telemetry,
);
let observer_work = observer_work.map_ok(|_| ()).map_err(|e| {
warn!("GRANDPA Observer failed: {}", e);
});
Ok(observer_work.map(drop))
}
#[must_use]
struct ObserverWork<B: BlockT, BE, Client, N: NetworkT<B>> {
observer:
Pin<Box<dyn Future<Output = Result<(), CommandOrError<B::Hash, NumberFor<B>>>> + Send>>,
client: Arc<Client>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
keystore: Option<SyncCryptoStorePtr>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
justification_sender: Option<GrandpaJustificationSender<B>>,
telemetry: Option<TelemetryHandle>,
_phantom: PhantomData<BE>,
}
impl<B, BE, Client, Network> ObserverWork<B, BE, Client, Network>
where
B: BlockT,
BE: Backend<B> + 'static,
Client: ClientForGrandpa<B, BE> + 'static,
Network: NetworkT<B>,
NumberFor<B>: BlockNumberOps,
{
fn new(
client: Arc<Client>,
network: NetworkBridge<B, Network>,
persistent_data: PersistentData<B>,
keystore: Option<SyncCryptoStorePtr>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
justification_sender: Option<GrandpaJustificationSender<B>>,
telemetry: Option<TelemetryHandle>,
) -> Self {
let mut work = ObserverWork {
observer: Box::pin(future::pending()) as Pin<Box<_>>,
client,
network,
persistent_data,
keystore: keystore.clone(),
voter_commands_rx,
justification_sender,
telemetry,
_phantom: PhantomData,
};
work.rebuild_observer();
work
}
fn rebuild_observer(&mut self) {
let set_id = self.persistent_data.authority_set.set_id();
let voters = Arc::new(self.persistent_data.authority_set.current_authorities());
let (global_in, _) = global_communication(
set_id,
&voters,
self.client.clone(),
&self.network,
self.keystore.as_ref(),
None,
);
let last_finalized_number = self.client.info().finalized_number;
let note_round = {
let network = self.network.clone();
let voters = voters.clone();
move |round| {
network.note_round(
crate::communication::Round(round),
crate::communication::SetId(set_id),
&voters,
)
}
};
let observer = grandpa_observer(
&self.client,
&self.persistent_data.authority_set,
&voters,
&self.justification_sender,
last_finalized_number,
global_in,
note_round,
self.telemetry.clone(),
);
self.observer = Box::pin(observer);
}
fn handle_voter_command(
&mut self,
command: VoterCommand<B::Hash, NumberFor<B>>,
) -> Result<(), Error> {
self.persistent_data.set_state = match command {
VoterCommand::Pause(reason) => {
info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
let completed_rounds = self.persistent_data.set_state.read().completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
set_state
},
VoterCommand::ChangeAuthorities(new) => {
let set_state = VoterSetState::live(
new.set_id,
&*self.persistent_data.authority_set.inner(),
(new.canon_hash, new.canon_number),
);
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
set_state
},
}
.into();
self.rebuild_observer();
Ok(())
}
}
impl<B, BE, C, N> Future for ObserverWork<B, BE, C, N>
where
B: BlockT,
BE: Backend<B> + Unpin + 'static,
C: ClientForGrandpa<B, BE> + 'static,
N: NetworkT<B>,
NumberFor<B>: BlockNumberOps,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.observer), cx) {
Poll::Pending => {},
Poll::Ready(Ok(())) => {
return Poll::Ready(Ok(()))
},
Poll::Ready(Err(CommandOrError::Error(e))) => {
return Poll::Ready(Err(e))
},
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
},
}
match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
Poll::Pending => {},
Poll::Ready(None) => {
return Poll::Ready(Ok(()))
},
Poll::Ready(Some(command)) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
},
}
Future::poll(Pin::new(&mut self.network), cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
aux_schema,
communication::tests::{make_test_network, Event},
};
use assert_matches::assert_matches;
use sc_network::PeerId;
use sc_utils::mpsc::tracing_unbounded;
use sp_blockchain::HeaderBackend as _;
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
use futures::executor;
#[test]
fn observer_work_polls_underlying_network_bridge() {
let (tester_fut, _network) = make_test_network();
let mut tester = executor::block_on(tester_fut);
let (client, backend) = {
let builder = TestClientBuilder::with_default_backend();
let backend = builder.backend();
let (client, _) = builder.build_with_longest_chain();
(Arc::new(client), backend)
};
let voters = vec![(sp_keyring::Ed25519Keyring::Alice.public().into(), 1)];
let persistent_data =
aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters))
.unwrap();
let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000);
let observer = ObserverWork::new(
client,
tester.net_handle.clone(),
persistent_data,
None,
voter_command_rx,
None,
None,
);
let peer_id = PeerId::random();
tester.trigger_gossip_validator_reputation_change(&peer_id);
executor::block_on(async move {
assert!(observer.now_or_never().is_none());
match tester.events.next().now_or_never() {
Some(Some(Event::EventStream(_))) => {},
_ => panic!("expected event stream request"),
};
assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _))));
});
}
}