#![warn(missing_docs)]
use polkadot_node_subsystem::{
errors::{RuntimeApiError, SubsystemError},
messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
overseer, SubsystemSender,
};
pub use overseer::{
gen::{OrchestraError as OverseerError, Timeout},
Subsystem, TimeoutExt,
};
pub use polkadot_node_metrics::{metrics, Metronome};
use futures::channel::{mpsc, oneshot};
use parity_scale_codec::Encode;
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed,
SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
};
pub use rand;
use sp_application_crypto::AppKey;
use sp_core::ByteArray;
use sp_keystore::{CryptoStore, Error as KeystoreError, SyncCryptoStorePtr};
use std::time::Duration;
use thiserror::Error;
pub use metered;
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
pub use determine_new_blocks::determine_new_blocks;
pub mod reexports {
pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
}
pub mod rolling_session_window;
pub mod runtime;
pub mod database;
pub mod nesting_sender;
mod determine_new_blocks;
#[cfg(test)]
mod tests;
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
pub const JOB_CHANNEL_CAPACITY: usize = 64;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
#[error(transparent)]
Subsystem(#[from] SubsystemError),
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
Infallible(#[from] std::convert::Infallible),
#[error("AllMessage not relevant to Job")]
SenderConversion(String),
#[error("Node is not a validator")]
NotAValidator,
#[error("AlreadyForwarding")]
AlreadyForwarding,
}
impl From<OverseerError> for Error {
fn from(e: OverseerError) -> Self {
Self::from(SubsystemError::from(e))
}
}
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
parent: Hash,
sender: &mut Sender,
request_builder: RequestBuilder,
) -> RuntimeApiReceiver<Response>
where
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
Sender: SubsystemSender<RuntimeApiMessage>,
{
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
.await;
rx
}
macro_rules! specialize_requests {
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name (
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> RuntimeApiReceiver<$return_ty>
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
}
specialize_requests! {
fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
fn request_validators() -> Vec<ValidatorId>; Validators;
fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
-> Option<ValidationCodeHash>; ValidationCodeHash;
fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
}
pub async fn signing_key(
validators: &[ValidatorId],
keystore: &SyncCryptoStorePtr,
) -> Option<ValidatorId> {
signing_key_and_index(validators, keystore).await.map(|(k, _)| k)
}
pub async fn signing_key_and_index(
validators: &[ValidatorId],
keystore: &SyncCryptoStorePtr,
) -> Option<(ValidatorId, ValidatorIndex)> {
for (i, v) in validators.iter().enumerate() {
if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
return Some((v.clone(), ValidatorIndex(i as _)))
}
}
None
}
pub async fn sign(
keystore: &SyncCryptoStorePtr,
key: &ValidatorId,
data: &[u8],
) -> Result<Option<ValidatorSignature>, KeystoreError> {
let signature =
CryptoStore::sign_with(&**keystore, ValidatorId::ID, &key.into(), &data).await?;
match signature {
Some(sig) =>
Ok(Some(sig.try_into().map_err(|_| KeystoreError::KeyNotSupported(ValidatorId::ID))?)),
None => Ok(None),
}
}
pub fn find_validator_group(
groups: &[Vec<ValidatorIndex>],
index: ValidatorIndex,
) -> Option<GroupIndex> {
groups.iter().enumerate().find_map(|(i, g)| {
if g.contains(&index) {
Some(GroupIndex(i as _))
} else {
None
}
})
}
pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
}
pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
is_priority: F,
v: &mut Vec<T>,
rng: &mut R,
min: usize,
) {
use rand::seq::SliceRandom as _;
let i = itertools::partition(v.iter_mut(), is_priority);
if i >= min || v.len() <= i {
v.truncate(i);
return
}
v[i..].shuffle(rng);
v.truncate(min);
}
pub fn gen_ratio(a: usize, b: usize) -> bool {
gen_ratio_rng(a, b, &mut rand::thread_rng())
}
pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
rng.gen_ratio(a as u32, b as u32)
}
#[derive(Debug)]
pub struct Validator {
signing_context: SigningContext,
key: ValidatorId,
index: ValidatorIndex,
}
impl Validator {
pub async fn new<S>(
parent: Hash,
keystore: SyncCryptoStorePtr,
sender: &mut S,
) -> Result<Self, Error>
where
S: SubsystemSender<RuntimeApiMessage>,
{
let (validators, session_index) = futures::try_join!(
request_validators(parent, sender).await,
request_session_index_for_child(parent, sender).await,
)?;
let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
let validators = validators?;
Self::construct(&validators, signing_context, keystore).await
}
pub async fn construct(
validators: &[ValidatorId],
signing_context: SigningContext,
keystore: SyncCryptoStorePtr,
) -> Result<Self, Error> {
let (key, index) =
signing_key_and_index(validators, &keystore).await.ok_or(Error::NotAValidator)?;
Ok(Validator { signing_context, key, index })
}
pub fn id(&self) -> ValidatorId {
self.key.clone()
}
pub fn index(&self) -> ValidatorIndex {
self.index
}
pub fn signing_context(&self) -> &SigningContext {
&self.signing_context
}
pub async fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
&self,
keystore: SyncCryptoStorePtr,
payload: Payload,
) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
}
}