use crate::{
communication::{
notification::{
BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender,
BeefyVersionedFinalityProofStream,
},
peers::KnownPeers,
request_response::{
outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
},
},
import::BeefyBlockImport,
round::Rounds,
worker::PersistedState,
};
use beefy_primitives::{
crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, BEEFY_ENGINE_ID,
GENESIS_AUTHORITY_SET_ID,
};
use futures::{stream::Fuse, StreamExt};
use log::{debug, error, info};
use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::ProtocolName;
use sc_network_common::service::NetworkRequest;
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
};
use sp_consensus::{Error as ConsensusError, SyncOracle};
use sp_keystore::SyncCryptoStorePtr;
use sp_mmr_primitives::MmrApi;
use sp_runtime::{
generic::BlockId,
traits::{Block, One, Zero},
};
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
mod aux_schema;
mod error;
mod keystore;
mod metrics;
mod round;
mod worker;
pub mod communication;
pub mod import;
pub mod justification;
pub use communication::beefy_protocol_name::{
gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
};
#[cfg(test)]
mod tests;
pub trait Client<B, BE>:
BlockchainEvents<B> + HeaderBackend<B> + Finalizer<B, BE> + Send + Sync
where
B: Block,
BE: Backend<B>,
{
}
impl<B, BE, T> Client<B, BE> for T
where
B: Block,
BE: Backend<B>,
T: BlockchainEvents<B>
+ HeaderBackend<B>
+ Finalizer<B, BE>
+ ProvideRuntimeApi<B>
+ Send
+ Sync,
{
}
#[derive(Clone)]
pub struct BeefyVoterLinks<B: Block> {
pub from_block_import_justif_stream: BeefyVersionedFinalityProofStream<B>,
pub to_rpc_justif_sender: BeefyVersionedFinalityProofSender<B>,
pub to_rpc_best_block_sender: BeefyBestBlockSender<B>,
}
#[derive(Clone)]
pub struct BeefyRPCLinks<B: Block> {
pub from_voter_justif_stream: BeefyVersionedFinalityProofStream<B>,
pub from_voter_best_beefy_stream: BeefyBestBlockStream<B>,
}
pub fn beefy_block_import_and_links<B, BE, RuntimeApi, I>(
wrapped_block_import: I,
backend: Arc<BE>,
runtime: Arc<RuntimeApi>,
) -> (BeefyBlockImport<B, BE, RuntimeApi, I>, BeefyVoterLinks<B>, BeefyRPCLinks<B>)
where
B: Block,
BE: Backend<B>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<RuntimeApi, B>>
+ Send
+ Sync,
RuntimeApi: ProvideRuntimeApi<B> + Send + Sync,
RuntimeApi::Api: BeefyApi<B>,
{
let (to_rpc_justif_sender, from_voter_justif_stream) =
BeefyVersionedFinalityProofStream::<B>::channel();
let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
BeefyBestBlockStream::<B>::channel();
let (to_voter_justif_sender, from_block_import_justif_stream) =
BeefyVersionedFinalityProofStream::<B>::channel();
let import =
BeefyBlockImport::new(backend, runtime, wrapped_block_import, to_voter_justif_sender);
let voter_links = BeefyVoterLinks {
from_block_import_justif_stream,
to_rpc_justif_sender,
to_rpc_best_block_sender,
};
let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream };
(import, voter_links, rpc_links)
}
pub struct BeefyNetworkParams<B: Block, N> {
pub network: Arc<N>,
pub gossip_protocol_name: ProtocolName,
pub justifications_protocol_name: ProtocolName,
pub _phantom: PhantomData<B>,
}
pub struct BeefyParams<B: Block, BE, C, N, P, R> {
pub client: Arc<C>,
pub backend: Arc<BE>,
pub payload_provider: P,
pub runtime: Arc<R>,
pub key_store: Option<SyncCryptoStorePtr>,
pub network_params: BeefyNetworkParams<B, N>,
pub min_block_delta: u32,
pub prometheus_registry: Option<Registry>,
pub links: BeefyVoterLinks<B>,
pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
}
pub async fn start_beefy_gadget<B, BE, C, N, P, R>(beefy_params: BeefyParams<B, BE, C, N, P, R>)
where
B: Block,
BE: Backend<B>,
C: Client<B, BE> + BlockBackend<B>,
P: PayloadProvider<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: GossipNetwork<B> + NetworkRequest + SyncOracle + Send + Sync + 'static,
{
let BeefyParams {
client,
backend,
payload_provider,
runtime,
key_store,
network_params,
min_block_delta,
prometheus_registry,
links,
on_demand_justifications_handler,
} = beefy_params;
let BeefyNetworkParams { network, gossip_protocol_name, justifications_protocol_name, .. } =
network_params;
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
network.clone(),
gossip_protocol_name,
gossip_validator.clone(),
None,
);
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name,
known_peers,
);
let metrics =
prometheus_registry.as_ref().map(metrics::Metrics::register).and_then(
|result| match result {
Ok(metrics) => {
debug!(target: "beefy", "🥩 Registered metrics");
Some(metrics)
},
Err(err) => {
debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err);
None
},
},
);
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
let persisted_state =
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
.await
.and_then(|best_grandpa| {
load_or_init_voter_state(&*backend, &*runtime, best_grandpa, min_block_delta)
}) {
Ok(state) => state,
Err(e) => {
error!(target: "beefy", "Error: {:?}. Terminating.", e);
return
},
};
let worker_params = worker::WorkerParams {
backend,
payload_provider,
network,
key_store: key_store.into(),
gossip_engine,
gossip_validator,
on_demand_justifications,
links,
metrics,
persisted_state,
};
let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params);
futures::future::join(
worker.run(block_import_justif, finality_notifications),
on_demand_justifications_handler.run(),
)
.await;
}
fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
if let Some(mut state) = crate::aux_schema::load_persistent(backend)? {
state.set_best_grandpa(best_grandpa);
state.set_min_block_delta(min_block_delta);
info!(target: "beefy", "🥩 Loading BEEFY voter state from db: {:?}.", state);
Ok(state)
} else {
initialize_voter_state(backend, runtime, best_grandpa, min_block_delta)
}
}
fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
let blockchain = backend.blockchain();
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
if let Some(true) = blockchain
.justifications(header.hash())
.ok()
.flatten()
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
{
info!(
target: "beefy",
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
*header.number()
);
let best_beefy = *header.number();
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, BlockId::hash(header.hash()))?;
let mut rounds = Rounds::new(best_beefy, active_set);
rounds.conclude(best_beefy);
sessions.push_front(rounds);
}
let state =
PersistedState::checked_new(best_grandpa, best_beefy, sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
break state
}
if *header.number() == One::one() {
let genesis_num = *header.number();
let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash()))
.and_then(genesis_set_sanity_check)?;
info!(
target: "beefy",
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Starting voting rounds at block {:?}, genesis validator set {:?}.",
genesis_num, genesis_set,
);
sessions.push_front(Rounds::new(genesis_num, genesis_set));
break PersistedState::checked_new(best_grandpa, Zero::zero(), sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
}
if let Some(active) = worker::find_authorities_change::<B>(&header) {
info!(target: "beefy", "🥩 Marking block {:?} as BEEFY Mandatory.", *header.number());
sessions.push_front(Rounds::new(*header.number(), active));
}
let parent_hash = *header.parent_hash();
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.ok_or_else(|| {
let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash);
error!(target: "beefy", "🥩 {}", msg);
ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg))
})?;
header = blockchain.expect_header(parent_hash)?;
};
aux_schema::write_current_version(backend)?;
aux_schema::write_voter_state(backend, &state)?;
Ok(state)
}
async fn wait_for_runtime_pallet<B, R>(
runtime: &R,
mut gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
) -> ClientResult<<B as Block>::Header>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
info!(target: "beefy", "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
loop {
futures::select! {
notif = finality.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = runtime.runtime_api().validator_set(&at).ok().flatten() {
info!(
target: "beefy", "🥩 BEEFY pallet available: block {:?} validator set {:?}",
notif.header.number(), active
);
return Ok(notif.header)
}
},
_ = gossip_engine => {
break
}
}
}
let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into();
error!(target: "beefy", "{}", err_msg);
Err(ClientError::Backend(err_msg))
}
fn genesis_set_sanity_check(
active: ValidatorSet<AuthorityId>,
) -> ClientResult<ValidatorSet<AuthorityId>> {
if active.id() == GENESIS_AUTHORITY_SET_ID {
Ok(active)
} else {
error!(target: "beefy", "🥩 Unexpected ID for genesis validator set {:?}.", active);
Err(ClientError::Backend("BEEFY Genesis sanity check failed.".into()))
}
}
fn expect_validator_set<B, R>(
runtime: &R,
at: BlockId<B>,
) -> ClientResult<ValidatorSet<AuthorityId>>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
runtime
.runtime_api()
.validator_set(&at)
.ok()
.flatten()
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))
}