#![warn(missing_docs)]
use futures::{prelude::*, StreamExt};
use log::{debug, error, info};
use parity_scale_codec::Decode;
use parking_lot::RwLock;
use prometheus_endpoint::{PrometheusError, Registry};
use sc_client_api::{
backend::{AuxStore, Backend},
utils::is_descendent_of,
BlockchainEvents, CallExecutor, ExecutionStrategy, ExecutorProvider, Finalizer, LockImportRun,
StorageProvider, TransactionFor,
};
use sc_consensus::BlockImport;
use sc_network_common::protocol::ProtocolName;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppKey;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
use sp_consensus::SelectChain;
use sp_core::crypto::ByteArray;
use sp_finality_grandpa::{
AuthorityList, AuthoritySignature, SetId, CLIENT_LOG_TARGET as LOG_TARGET,
};
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, Zero},
};
pub use finality_grandpa::BlockNumberOps;
use finality_grandpa::{voter, voter_set::VoterSet, Error as GrandpaError};
use std::{
fmt, io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
macro_rules! grandpa_log {
($condition:expr, $($msg: expr),+ $(,)?) => {
{
let log_level = if $condition {
log::Level::Debug
} else {
log::Level::Info
};
log::log!(target: LOG_TARGET, log_level, $($msg),+);
}
};
}
mod authorities;
mod aux_schema;
mod communication;
mod environment;
mod finality_proof;
mod import;
mod justification;
mod notification;
mod observer;
mod until_imported;
mod voting_rule;
pub mod warp_proof;
pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet};
pub use aux_schema::best_justification;
pub use communication::grandpa_protocol_name::standard_name as protocol_standard_name;
pub use finality_grandpa::voter::report;
pub use finality_proof::{FinalityProof, FinalityProofError, FinalityProofProvider};
pub use import::{find_forced_change, find_scheduled_change, GrandpaBlockImport};
pub use justification::GrandpaJustification;
pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
pub use observer::run_grandpa_observer;
pub use voting_rule::{
BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRuleResult,
VotingRulesBuilder,
};
use aux_schema::PersistentData;
use communication::{Network as NetworkT, NetworkBridge};
use environment::{Environment, VoterSetState};
use until_imported::UntilGlobalMessageBlocksImported;
pub use sp_finality_grandpa::{
AuthorityId, AuthorityPair, CatchUp, Commit, CompactCommit, GrandpaApi, Message, Precommit,
Prevote, PrimaryPropose, ScheduledChange, SignedMessage,
};
use std::marker::PhantomData;
#[cfg(test)]
mod tests;
type CommunicationIn<Block> = voter::CommunicationIn<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
type CommunicationInH<Block, H> =
voter::CommunicationIn<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
type CommunicationOutH<Block, H> =
voter::CommunicationOut<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
pub struct SharedVoterState {
inner: Arc<RwLock<Option<Box<dyn voter::VoterState<AuthorityId> + Sync + Send>>>>,
}
impl SharedVoterState {
pub fn empty() -> Self {
Self { inner: Arc::new(RwLock::new(None)) }
}
fn reset(
&self,
voter_state: Box<dyn voter::VoterState<AuthorityId> + Sync + Send>,
) -> Option<()> {
let mut shared_voter_state = self.inner.try_write_for(Duration::from_secs(1))?;
*shared_voter_state = Some(voter_state);
Some(())
}
pub fn voter_state(&self) -> Option<report::VoterState<AuthorityId>> {
self.inner.read().as_ref().map(|vs| vs.get())
}
}
impl Clone for SharedVoterState {
fn clone(&self) -> Self {
SharedVoterState { inner: self.inner.clone() }
}
}
#[derive(Clone)]
pub struct Config {
pub gossip_duration: Duration,
pub justification_period: u32,
pub observer_enabled: bool,
pub local_role: sc_network::config::Role,
pub name: Option<String>,
pub keystore: Option<SyncCryptoStorePtr>,
pub telemetry: Option<TelemetryHandle>,
pub protocol_name: ProtocolName,
}
impl Config {
fn name(&self) -> &str {
self.name.as_deref().unwrap_or("<unknown>")
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("grandpa error: {0}")]
Grandpa(#[from] GrandpaError),
#[error("network error: {0}")]
Network(String),
#[error("blockchain error: {0}")]
Blockchain(String),
#[error("could not complete a round on disk: {0}")]
Client(#[from] ClientError),
#[error("could not sign outgoing message: {0}")]
Signing(String),
#[error("safety invariant has been violated: {0}")]
Safety(String),
#[error("a timer failed to fire: {0}")]
Timer(io::Error),
#[error("runtime API request failed: {0}")]
RuntimeApi(sp_api::ApiError),
}
pub(crate) trait BlockStatus<Block: BlockT> {
fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
}
impl<Block: BlockT, Client> BlockStatus<Block> for Arc<Client>
where
Client: HeaderBackend<Block>,
NumberFor<Block>: BlockNumberOps,
{
fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
self.block_number_from_id(&BlockId::Hash(hash))
.map_err(|e| Error::Blockchain(e.to_string()))
}
}
pub trait ClientForGrandpa<Block, BE>:
LockImportRun<Block, BE>
+ Finalizer<Block, BE>
+ AuxStore
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ HeaderBackend<Block>
+ BlockchainEvents<Block>
+ ProvideRuntimeApi<Block>
+ ExecutorProvider<Block>
+ BlockImport<Block, Transaction = TransactionFor<BE, Block>, Error = sp_consensus::Error>
+ StorageProvider<Block, BE>
where
BE: Backend<Block>,
Block: BlockT,
{
}
impl<Block, BE, T> ClientForGrandpa<Block, BE> for T
where
BE: Backend<Block>,
Block: BlockT,
T: LockImportRun<Block, BE>
+ Finalizer<Block, BE>
+ AuxStore
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ HeaderBackend<Block>
+ BlockchainEvents<Block>
+ ProvideRuntimeApi<Block>
+ ExecutorProvider<Block>
+ BlockImport<Block, Transaction = TransactionFor<BE, Block>, Error = sp_consensus::Error>
+ StorageProvider<Block, BE>,
{
}
pub(crate) trait BlockSyncRequester<Block: BlockT> {
fn set_sync_fork_request(
&self,
peers: Vec<sc_network::PeerId>,
hash: Block::Hash,
number: NumberFor<Block>,
);
}
impl<Block, Network> BlockSyncRequester<Block> for NetworkBridge<Block, Network>
where
Block: BlockT,
Network: NetworkT<Block>,
{
fn set_sync_fork_request(
&self,
peers: Vec<sc_network::PeerId>,
hash: Block::Hash,
number: NumberFor<Block>,
) {
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
}
}
#[derive(Debug)]
pub(crate) struct NewAuthoritySet<H, N> {
pub(crate) canon_number: N,
pub(crate) canon_hash: H,
pub(crate) set_id: SetId,
pub(crate) authorities: AuthorityList,
}
#[derive(Debug)]
pub(crate) enum VoterCommand<H, N> {
Pause(String),
ChangeAuthorities(NewAuthoritySet<H, N>),
}
impl<H, N> fmt::Display for VoterCommand<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
}
}
}
#[derive(Debug)]
pub(crate) enum CommandOrError<H, N> {
Error(Error),
VoterCommand(VoterCommand<H, N>),
}
impl<H, N> From<Error> for CommandOrError<H, N> {
fn from(e: Error) -> Self {
CommandOrError::Error(e)
}
}
impl<H, N> From<ClientError> for CommandOrError<H, N> {
fn from(e: ClientError) -> Self {
CommandOrError::Error(Error::Client(e))
}
}
impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
fn from(e: finality_grandpa::Error) -> Self {
CommandOrError::Error(Error::from(e))
}
}
impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
fn from(e: VoterCommand<H, N>) -> Self {
CommandOrError::VoterCommand(e)
}
}
impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> {}
impl<H, N> fmt::Display for CommandOrError<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
CommandOrError::Error(ref e) => write!(f, "{}", e),
CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
}
}
}
pub struct LinkHalf<Block: BlockT, C, SC> {
client: Arc<C>,
select_chain: SC,
persistent_data: PersistentData<Block>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
justification_sender: GrandpaJustificationSender<Block>,
justification_stream: GrandpaJustificationStream<Block>,
telemetry: Option<TelemetryHandle>,
}
impl<Block: BlockT, C, SC> LinkHalf<Block, C, SC> {
pub fn shared_authority_set(&self) -> &SharedAuthoritySet<Block::Hash, NumberFor<Block>> {
&self.persistent_data.authority_set
}
pub fn justification_stream(&self) -> GrandpaJustificationStream<Block> {
self.justification_stream.clone()
}
}
pub trait GenesisAuthoritySetProvider<Block: BlockT> {
fn get(&self) -> Result<AuthorityList, ClientError>;
}
impl<Block: BlockT, E> GenesisAuthoritySetProvider<Block>
for Arc<dyn ExecutorProvider<Block, Executor = E>>
where
E: CallExecutor<Block>,
{
fn get(&self) -> Result<AuthorityList, ClientError> {
self.executor()
.call(
&BlockId::Number(Zero::zero()),
"GrandpaApi_grandpa_authorities",
&[],
ExecutionStrategy::NativeElseWasm,
)
.and_then(|call_result| {
Decode::decode(&mut &call_result[..]).map_err(|err| {
ClientError::CallResultDecode(
"failed to decode GRANDPA authorities set proof",
err,
)
})
})
}
}
pub fn block_import<BE, Block: BlockT, Client, SC>(
client: Arc<Client>,
genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
select_chain: SC,
telemetry: Option<TelemetryHandle>,
) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
where
SC: SelectChain<Block>,
BE: Backend<Block> + 'static,
Client: ClientForGrandpa<Block, BE> + 'static,
{
block_import_with_authority_set_hard_forks(
client,
genesis_authorities_provider,
select_chain,
Default::default(),
telemetry,
)
}
pub struct AuthoritySetHardFork<Block: BlockT> {
pub set_id: SetId,
pub block: (Block::Hash, NumberFor<Block>),
pub authorities: AuthorityList,
pub last_finalized: Option<NumberFor<Block>>,
}
pub fn block_import_with_authority_set_hard_forks<BE, Block: BlockT, Client, SC>(
client: Arc<Client>,
genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
select_chain: SC,
authority_set_hard_forks: Vec<AuthoritySetHardFork<Block>>,
telemetry: Option<TelemetryHandle>,
) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
where
SC: SelectChain<Block>,
BE: Backend<Block> + 'static,
Client: ClientForGrandpa<Block, BE> + 'static,
{
let chain_info = client.info();
let genesis_hash = chain_info.genesis_hash;
let persistent_data =
aux_schema::load_persistent(&*client, genesis_hash, <NumberFor<Block>>::zero(), {
let telemetry = telemetry.clone();
move || {
let authorities = genesis_authorities_provider.get()?;
telemetry!(
telemetry;
CONSENSUS_DEBUG;
"afg.loading_authorities";
"authorities_len" => ?authorities.len()
);
Ok(authorities)
}
})?;
let (voter_commands_tx, voter_commands_rx) =
tracing_unbounded("mpsc_grandpa_voter_command", 100_000);
let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
let authority_set_hard_forks = authority_set_hard_forks
.into_iter()
.map(|fork| {
let delay_kind = if let Some(last_finalized) = fork.last_finalized {
authorities::DelayKind::Best { median_last_finalized: last_finalized }
} else {
authorities::DelayKind::Finalized
};
(
fork.set_id,
authorities::PendingChange {
next_authorities: fork.authorities,
delay: Zero::zero(),
canon_hash: fork.block.0,
canon_height: fork.block.1,
delay_kind,
},
)
})
.collect();
Ok((
GrandpaBlockImport::new(
client.clone(),
select_chain.clone(),
persistent_data.authority_set.clone(),
voter_commands_tx,
authority_set_hard_forks,
justification_sender.clone(),
telemetry.clone(),
),
LinkHalf {
client,
select_chain,
persistent_data,
voter_commands_rx,
justification_sender,
justification_stream,
telemetry,
},
))
}
fn global_communication<BE, Block: BlockT, C, N>(
set_id: SetId,
voters: &Arc<VoterSet<AuthorityId>>,
client: Arc<C>,
network: &NetworkBridge<Block, N>,
keystore: Option<&SyncCryptoStorePtr>,
metrics: Option<until_imported::Metrics>,
) -> (
impl Stream<
Item = Result<
CommunicationInH<Block, Block::Hash>,
CommandOrError<Block::Hash, NumberFor<Block>>,
>,
>,
impl Sink<
CommunicationOutH<Block, Block::Hash>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
>,
)
where
BE: Backend<Block> + 'static,
C: ClientForGrandpa<Block, BE> + 'static,
N: NetworkT<Block>,
NumberFor<Block>: BlockNumberOps,
{
let is_voter = local_authority_id(voters, keystore).is_some();
let (global_in, global_out) =
network.global_communication(communication::SetId(set_id), voters.clone(), is_voter);
let global_in = UntilGlobalMessageBlocksImported::new(
client.import_notification_stream(),
network.clone(),
client.clone(),
global_in,
"global",
metrics,
);
let global_in = global_in.map_err(CommandOrError::from);
let global_out = global_out.sink_map_err(CommandOrError::from);
(global_in, global_out)
}
pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
pub config: Config,
pub link: LinkHalf<Block, C, SC>,
pub network: N,
pub voting_rule: VR,
pub prometheus_registry: Option<prometheus_endpoint::Registry>,
pub shared_voter_state: SharedVoterState,
pub telemetry: Option<TelemetryHandle>,
}
pub fn grandpa_peers_set_config(
protocol_name: ProtocolName,
) -> sc_network_common::config::NonDefaultSetConfig {
use communication::grandpa_protocol_name;
sc_network_common::config::NonDefaultSetConfig {
notifications_protocol: protocol_name,
fallback_names: grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: sc_network_common::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network_common::config::NonReservedPeerMode::Deny,
},
}
}
pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, SC, VR>(
grandpa_params: GrandpaParams<Block, C, N, SC, VR>,
) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
where
Block::Hash: Ord,
BE: Backend<Block> + 'static,
N: NetworkT<Block> + Sync + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
C: ClientForGrandpa<Block, BE> + 'static,
C::Api: GrandpaApi<Block>,
{
let GrandpaParams {
mut config,
link,
network,
voting_rule,
prometheus_registry,
shared_voter_state,
telemetry,
} = grandpa_params;
config.observer_enabled = false;
let LinkHalf {
client,
select_chain,
persistent_data,
voter_commands_rx,
justification_sender,
justification_stream: _,
telemetry: _,
} = link;
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
prometheus_registry.as_ref(),
telemetry.clone(),
);
let conf = config.clone();
let telemetry_task =
if let Some(telemetry_on_connect) = telemetry.as_ref().map(|x| x.on_connect_stream()) {
let authorities = persistent_data.authority_set.clone();
let telemetry = telemetry.clone();
let events = telemetry_on_connect.for_each(move |_| {
let current_authorities = authorities.current_authorities();
let set_id = authorities.set_id();
let maybe_authority_id =
local_authority_id(¤t_authorities, conf.keystore.as_ref());
let authorities =
current_authorities.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
let authorities = serde_json::to_string(&authorities).expect(
"authorities is always at least an empty vector; \
elements are always of type string",
);
telemetry!(
telemetry;
CONSENSUS_INFO;
"afg.authority_set";
"authority_id" => maybe_authority_id.map_or("".into(), |s| s.to_string()),
"authority_set_id" => ?set_id,
"authorities" => authorities,
);
future::ready(())
});
future::Either::Left(events)
} else {
future::Either::Right(future::pending())
};
let voter_work = VoterWork::new(
client,
config,
network,
select_chain,
voting_rule,
persistent_data,
voter_commands_rx,
prometheus_registry,
shared_voter_state,
justification_sender,
telemetry,
);
let voter_work = voter_work.map(|res| match res {
Ok(()) => error!(
target: LOG_TARGET,
"GRANDPA voter future has concluded naturally, this should be unreachable."
),
Err(e) => error!(target: LOG_TARGET, "GRANDPA voter error: {}", e),
});
let telemetry_task = telemetry_task.then(|_| future::pending::<()>());
Ok(future::select(voter_work, telemetry_task).map(drop))
}
struct Metrics {
environment: environment::Metrics,
until_imported: until_imported::Metrics,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
environment: environment::Metrics::register(registry)?,
until_imported: until_imported::Metrics::register(registry)?,
})
}
}
#[must_use]
struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, SC, VR> {
voter: Pin<
Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>,
>,
shared_voter_state: SharedVoterState,
env: Arc<Environment<B, Block, C, N, SC, VR>>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: NetworkBridge<Block, N>,
telemetry: Option<TelemetryHandle>,
metrics: Option<Metrics>,
}
impl<B, Block, C, N, SC, VR> VoterWork<B, Block, C, N, SC, VR>
where
Block: BlockT,
B: Backend<Block> + 'static,
C: ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block>,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C> + Clone + 'static,
{
fn new(
client: Arc<C>,
config: Config,
network: NetworkBridge<Block, N>,
select_chain: SC,
voting_rule: VR,
persistent_data: PersistentData<Block>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
prometheus_registry: Option<prometheus_endpoint::Registry>,
shared_voter_state: SharedVoterState,
justification_sender: GrandpaJustificationSender<Block>,
telemetry: Option<TelemetryHandle>,
) -> Self {
let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
None
},
None => None,
};
let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
client,
select_chain,
voting_rule,
voters: Arc::new(voters),
config,
network: network.clone(),
set_id: persistent_data.authority_set.set_id(),
authority_set: persistent_data.authority_set.clone(),
voter_set_state: persistent_data.set_state,
metrics: metrics.as_ref().map(|m| m.environment.clone()),
justification_sender: Some(justification_sender),
telemetry: telemetry.clone(),
_phantom: PhantomData,
});
let mut work = VoterWork {
voter: Box::pin(future::pending()),
shared_voter_state,
env,
voter_commands_rx,
network,
telemetry,
metrics,
};
work.rebuild_voter();
work
}
fn rebuild_voter(&mut self) {
debug!(
target: LOG_TARGET,
"{}: Starting new voter with set ID {}",
self.env.config.name(),
self.env.set_id
);
let maybe_authority_id =
local_authority_id(&self.env.voters, self.env.config.keystore.as_ref());
let authority_id = maybe_authority_id.map_or("<unknown>".into(), |s| s.to_string());
telemetry!(
self.telemetry;
CONSENSUS_DEBUG;
"afg.starting_new_voter";
"name" => ?self.env.config.name(),
"set_id" => ?self.env.set_id,
"authority_id" => authority_id,
);
let chain_info = self.env.client.info();
let authorities = self.env.voters.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
let authorities = serde_json::to_string(&authorities).expect(
"authorities is always at least an empty vector; elements are always of type string; qed.",
);
telemetry!(
self.telemetry;
CONSENSUS_INFO;
"afg.authority_set";
"number" => ?chain_info.finalized_number,
"hash" => ?chain_info.finalized_hash,
"authority_id" => authority_id,
"authority_set_id" => ?self.env.set_id,
"authorities" => authorities,
);
match &*self.env.voter_set_state.read() {
VoterSetState::Live { completed_rounds, .. } => {
let last_finalized = (chain_info.finalized_hash, chain_info.finalized_number);
let global_comms = global_communication(
self.env.set_id,
&self.env.voters,
self.env.client.clone(),
&self.env.network,
self.env.config.keystore.as_ref(),
self.metrics.as_ref().map(|m| m.until_imported.clone()),
);
let last_completed_round = completed_rounds.last();
let voter = voter::Voter::new(
self.env.clone(),
(*self.env.voters).clone(),
global_comms,
last_completed_round.number,
last_completed_round.votes.clone(),
last_completed_round.base,
last_finalized,
);
if self.shared_voter_state.reset(voter.voter_state()).is_none() {
info!(
target: LOG_TARGET,
"Timed out trying to update shared GRANDPA voter state. \
RPC endpoints may return stale data."
);
}
self.voter = Box::pin(voter);
},
VoterSetState::Paused { .. } => self.voter = Box::pin(future::pending()),
};
}
fn handle_voter_command(
&mut self,
command: VoterCommand<Block::Hash, NumberFor<Block>>,
) -> Result<(), Error> {
match command {
VoterCommand::ChangeAuthorities(new) => {
let voters: Vec<String> =
new.authorities.iter().map(move |(a, _)| format!("{}", a)).collect();
telemetry!(
self.telemetry;
CONSENSUS_INFO;
"afg.voter_command_change_authorities";
"number" => ?new.canon_number,
"hash" => ?new.canon_hash,
"voters" => ?voters,
"set_id" => ?new.set_id,
);
self.env.update_voter_set_state(|_| {
let set_state = VoterSetState::live(
new.set_id,
&*self.env.authority_set.inner(),
(new.canon_hash, new.canon_number),
);
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
let voters = Arc::new(VoterSet::new(new.authorities.into_iter()).expect(
"new authorities come from pending change; pending change comes from \
`AuthoritySet`; `AuthoritySet` validates authorities is non-empty and \
weights are non-zero; qed.",
));
self.env = Arc::new(Environment {
voters,
set_id: new.set_id,
voter_set_state: self.env.voter_set_state.clone(),
client: self.env.client.clone(),
select_chain: self.env.select_chain.clone(),
config: self.env.config.clone(),
authority_set: self.env.authority_set.clone(),
network: self.env.network.clone(),
voting_rule: self.env.voting_rule.clone(),
metrics: self.env.metrics.clone(),
justification_sender: self.env.justification_sender.clone(),
telemetry: self.telemetry.clone(),
_phantom: PhantomData,
});
self.rebuild_voter();
Ok(())
},
VoterCommand::Pause(reason) => {
info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
self.env.update_voter_set_state(|voter_set_state| {
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
self.rebuild_voter();
Ok(())
},
}
}
}
impl<B, Block, C, N, SC, VR> Future for VoterWork<B, Block, C, N, SC, VR>
where
Block: BlockT,
B: Backend<Block> + 'static,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
SC: SelectChain<Block> + 'static,
C: ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block>,
VR: VotingRule<Block, C> + Clone + 'static,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.voter), cx) {
Poll::Pending => {},
Poll::Ready(Ok(())) => {
return Poll::Ready(Err(Error::Safety(
"finality-grandpa inner voter has concluded.".into(),
)))
},
Poll::Ready(Err(CommandOrError::Error(e))) => {
return Poll::Ready(Err(e))
},
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
},
}
match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
Poll::Pending => {},
Poll::Ready(None) => {
return Poll::Ready(Err(Error::Safety("`voter_commands_rx` was closed.".into())))
},
Poll::Ready(Some(command)) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
},
}
Future::poll(Pin::new(&mut self.network), cx)
}
}
fn local_authority_id(
voters: &VoterSet<AuthorityId>,
keystore: Option<&SyncCryptoStorePtr>,
) -> Option<AuthorityId> {
keystore.and_then(|keystore| {
voters
.iter()
.find(|(p, _)| {
SyncCryptoStore::has_keys(&**keystore, &[(p.to_raw_vec(), AuthorityId::ID)])
})
.map(|(p, _)| p.clone())
})
}
pub fn revert<Block, Client>(client: Arc<Client>, blocks: NumberFor<Block>) -> ClientResult<()>
where
Block: BlockT,
Client: AuxStore + HeaderMetadata<Block, Error = ClientError> + HeaderBackend<Block>,
{
let best_number = client.info().best_number;
let finalized = client.info().finalized_number;
let revertible = blocks.min(best_number - finalized);
if revertible == Zero::zero() {
return Ok(())
}
let number = best_number - revertible;
let hash = client
.block_hash_from_id(&BlockId::Number(number))?
.ok_or(ClientError::Backend(format!(
"Unexpected hash lookup failure for block number: {}",
number
)))?;
let info = client.info();
let persistent_data: PersistentData<Block> =
aux_schema::load_persistent(&*client, info.genesis_hash, Zero::zero(), || {
const MSG: &str = "Unexpected missing grandpa data during revert";
Err(ClientError::Application(Box::from(MSG)))
})?;
let shared_authority_set = persistent_data.authority_set;
let mut authority_set = shared_authority_set.inner();
let is_descendent_of = is_descendent_of(&*client, None);
authority_set.revert(hash, number, &is_descendent_of);
let (set_id, set_ref) = authority_set.current();
let new_set = Some(NewAuthoritySet {
canon_hash: info.finalized_hash,
canon_number: info.finalized_number,
set_id,
authorities: set_ref.to_vec(),
});
aux_schema::update_authority_set::<Block, _, _>(&authority_set, new_set.as_ref(), |values| {
client.insert_aux(values, None)
})
}