#![warn(missing_docs)]
use std::{
collections::{hash_map, HashMap},
fmt::{self, Debug},
num::NonZeroUsize,
pin::Pin,
sync::Arc,
time::Duration,
};
use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use lru::LruCache;
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::v2::{Block, BlockNumber, Hash};
use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
use polkadot_node_subsystem_types::messages::{
ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage,
AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage,
CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage,
CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage,
DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage,
NetworkBridgeTxMessage, ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
};
pub use polkadot_node_subsystem_types::{
errors::{SubsystemError, SubsystemResult},
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
RuntimeApiSubsystemClient,
};
pub mod metrics;
pub use self::metrics::Metrics as OverseerMetrics;
pub mod dummy;
pub use self::dummy::DummySubsystem;
pub use polkadot_node_metrics::{
metrics::{prometheus, Metrics as MetricsTrait},
Metronome,
};
pub use orchestra as gen;
pub use orchestra::{
contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
SubsystemSender, TimeoutExt, ToOrchestra,
};
pub const KNOWN_LEAVES_CACHE_SIZE: NonZeroUsize = match NonZeroUsize::new(2 * 24 * 3600 / 6) {
Some(cap) => cap,
None => panic!("Known leaves cache size must be non-zero"),
};
mod memory_stats;
#[cfg(test)]
mod tests;
use sp_core::traits::SpawnNamed;
use memory_stats::MemoryAllocationTracker;
pub struct SpawnGlue<S>(pub S);
impl<S> AsRef<S> for SpawnGlue<S> {
fn as_ref(&self) -> &S {
&self.0
}
}
impl<S: Clone> Clone for SpawnGlue<S> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
SpawnNamed::spawn_blocking(&self.0, name, group, future)
}
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
SpawnNamed::spawn(&self.0, name, group, future)
}
}
#[async_trait::async_trait]
pub trait HeadSupportsParachains {
async fn head_supports_parachains(&self, head: &Hash) -> bool;
}
#[async_trait::async_trait]
impl<Client> HeadSupportsParachains for Arc<Client>
where
Client: RuntimeApiSubsystemClient + Sync + Send,
{
async fn head_supports_parachains(&self, head: &Hash) -> bool {
self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
}
}
#[derive(Clone)]
pub struct Handle(OverseerHandle);
impl Handle {
pub fn new(raw: OverseerHandle) -> Self {
Self(raw)
}
pub async fn block_imported(&mut self, block: BlockInfo) {
self.send_and_log_error(Event::BlockImported(block)).await
}
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
}
#[inline(always)]
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
self.send_msg(msg, "").await
}
pub async fn block_finalized(&mut self, block: BlockInfo) {
self.send_and_log_error(Event::BlockFinalized(block)).await
}
pub async fn wait_for_activation(
&mut self,
hash: Hash,
response_channel: oneshot::Sender<SubsystemResult<()>>,
) {
self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
hash,
response_channel,
}))
.await;
}
pub async fn stop(&mut self) {
self.send_and_log_error(Event::Stop).await;
}
async fn send_and_log_error(&mut self, event: Event) {
if self.0.send(event).await.is_err() {
gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
}
}
#[derive(Debug, Clone)]
pub struct BlockInfo {
pub hash: Hash,
pub parent_hash: Hash,
pub number: BlockNumber,
}
impl From<BlockImportNotification<Block>> for BlockInfo {
fn from(n: BlockImportNotification<Block>) -> Self {
BlockInfo { hash: n.hash, parent_hash: n.header.parent_hash, number: n.header.number }
}
}
impl From<FinalityNotification<Block>> for BlockInfo {
fn from(n: FinalityNotification<Block>) -> Self {
BlockInfo { hash: n.hash, parent_hash: n.header.parent_hash, number: n.header.number }
}
}
pub enum Event {
BlockImported(BlockInfo),
BlockFinalized(BlockInfo),
MsgToSubsystem {
msg: AllMessages,
origin: &'static str,
},
ExternalRequest(ExternalRequest),
Stop,
}
pub enum ExternalRequest {
WaitForActivation {
hash: Hash,
response_channel: oneshot::Sender<SubsystemResult<()>>,
},
}
pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
let mut finality = client.finality_notification_stream();
let mut imports = client.import_notification_stream();
loop {
select! {
f = finality.next() => {
match f {
Some(block) => {
handle.block_finalized(block.into()).await;
}
None => break,
}
},
i = imports.next() => {
match i {
Some(block) => {
handle.block_imported(block.into()).await;
}
None => break,
}
},
complete => break,
}
}
}
#[orchestra(
gen=AllMessages,
event=Event,
signal=OverseerSignal,
error=SubsystemError,
message_capacity=2048,
)]
pub struct Overseer<SupportsParachains> {
#[subsystem(CandidateValidationMessage, sends: [
RuntimeApiMessage,
])]
candidate_validation: CandidateValidation,
#[subsystem(sends: [
CandidateValidationMessage,
RuntimeApiMessage,
])]
pvf_checker: PvfChecker,
#[subsystem(CandidateBackingMessage, sends: [
CandidateValidationMessage,
CollatorProtocolMessage,
AvailabilityDistributionMessage,
AvailabilityStoreMessage,
StatementDistributionMessage,
ProvisionerMessage,
RuntimeApiMessage,
])]
candidate_backing: CandidateBacking,
#[subsystem(StatementDistributionMessage, sends: [
NetworkBridgeTxMessage,
CandidateBackingMessage,
RuntimeApiMessage,
])]
statement_distribution: StatementDistribution,
#[subsystem(AvailabilityDistributionMessage, sends: [
AvailabilityStoreMessage,
AvailabilityRecoveryMessage,
ChainApiMessage,
RuntimeApiMessage,
NetworkBridgeTxMessage,
])]
availability_distribution: AvailabilityDistribution,
#[subsystem(AvailabilityRecoveryMessage, sends: [
NetworkBridgeTxMessage,
RuntimeApiMessage,
AvailabilityStoreMessage,
])]
availability_recovery: AvailabilityRecovery,
#[subsystem(blocking, sends: [
AvailabilityStoreMessage,
RuntimeApiMessage,
BitfieldDistributionMessage,
])]
bitfield_signing: BitfieldSigning,
#[subsystem(BitfieldDistributionMessage, sends: [
RuntimeApiMessage,
NetworkBridgeTxMessage,
ProvisionerMessage,
])]
bitfield_distribution: BitfieldDistribution,
#[subsystem(ProvisionerMessage, sends: [
RuntimeApiMessage,
CandidateBackingMessage,
ChainApiMessage,
DisputeCoordinatorMessage,
])]
provisioner: Provisioner,
#[subsystem(blocking, RuntimeApiMessage, sends: [])]
runtime_api: RuntimeApi,
#[subsystem(blocking, AvailabilityStoreMessage, sends: [
ChainApiMessage,
RuntimeApiMessage,
])]
availability_store: AvailabilityStore,
#[subsystem(NetworkBridgeRxMessage, sends: [
BitfieldDistributionMessage,
StatementDistributionMessage,
ApprovalDistributionMessage,
GossipSupportMessage,
DisputeDistributionMessage,
CollationGenerationMessage,
CollatorProtocolMessage,
])]
network_bridge_rx: NetworkBridgeRx,
#[subsystem(NetworkBridgeTxMessage, sends: [])]
network_bridge_tx: NetworkBridgeTx,
#[subsystem(blocking, ChainApiMessage, sends: [])]
chain_api: ChainApi,
#[subsystem(CollationGenerationMessage, sends: [
RuntimeApiMessage,
CollatorProtocolMessage,
])]
collation_generation: CollationGeneration,
#[subsystem(CollatorProtocolMessage, sends: [
NetworkBridgeTxMessage,
RuntimeApiMessage,
CandidateBackingMessage,
])]
collator_protocol: CollatorProtocol,
#[subsystem(ApprovalDistributionMessage, sends: [
NetworkBridgeTxMessage,
ApprovalVotingMessage,
])]
approval_distribution: ApprovalDistribution,
#[subsystem(blocking, ApprovalVotingMessage, sends: [
ApprovalDistributionMessage,
AvailabilityRecoveryMessage,
CandidateValidationMessage,
ChainApiMessage,
ChainSelectionMessage,
DisputeCoordinatorMessage,
RuntimeApiMessage,
])]
approval_voting: ApprovalVoting,
#[subsystem(GossipSupportMessage, sends: [
NetworkBridgeTxMessage,
NetworkBridgeRxMessage, RuntimeApiMessage,
ChainSelectionMessage,
])]
gossip_support: GossipSupport,
#[subsystem(blocking, DisputeCoordinatorMessage, sends: [
RuntimeApiMessage,
ChainApiMessage,
DisputeDistributionMessage,
CandidateValidationMessage,
ApprovalVotingMessage,
AvailabilityStoreMessage,
AvailabilityRecoveryMessage,
])]
dispute_coordinator: DisputeCoordinator,
#[subsystem(DisputeDistributionMessage, sends: [
RuntimeApiMessage,
DisputeCoordinatorMessage,
NetworkBridgeTxMessage,
])]
dispute_distribution: DisputeDistribution,
#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
chain_selection: ChainSelection,
pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
pub leaves: Vec<(Hash, BlockNumber)>,
pub active_leaves: HashMap<Hash, BlockNumber>,
pub supports_parachains: SupportsParachains,
pub known_leaves: LruCache<Hash, ()>,
pub metrics: OverseerMetrics,
}
pub fn spawn_metronome_metrics<S, SupportsParachains>(
overseer: &mut Overseer<S, SupportsParachains>,
metronome_metrics: OverseerMetrics,
) -> Result<(), SubsystemError>
where
S: Spawner,
SupportsParachains: HeadSupportsParachains,
{
struct ExtractNameAndMeters;
impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
type Output = Option<(&'static str, SubsystemMeters)>;
fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
subsystem
.instance
.as_ref()
.map(|instance| (instance.name, instance.meters.clone()))
}
}
let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
match MemoryAllocationTracker::new() {
Ok(memory_stats) =>
Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
Ok(memory_stats_snapshot) => {
gum::trace!(
target: LOG_TARGET,
"memory_stats: {:?}",
&memory_stats_snapshot
);
metrics.memory_stats_snapshot(memory_stats_snapshot);
},
Err(e) =>
gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
}),
Err(_) => {
gum::debug!(
target: LOG_TARGET,
"Memory allocation tracking is not supported by the allocator.",
);
Box::new(|_| {})
},
};
let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
collect_memory_stats(&metronome_metrics);
metronome_metrics.channel_metrics_snapshot(
subsystem_meters
.iter()
.cloned()
.flatten()
.map(|(name, ref meters)| (name, meters.read())),
);
futures::future::ready(())
});
overseer
.spawner()
.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
Ok(())
}
impl<S, SupportsParachains> Overseer<S, SupportsParachains>
where
SupportsParachains: HeadSupportsParachains,
S: Spawner,
{
async fn stop(mut self) {
let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
}
pub async fn run(self) {
if let Err(err) = self.run_inner().await {
gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
}
}
async fn run_inner(mut self) -> SubsystemResult<()> {
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
}
loop {
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into(), origin).await?;
self.metrics.on_message_relayed();
}
Event::Stop => {
self.stop().await;
return Ok(());
}
Event::BlockImported(block) => {
self.block_imported(block).await?;
}
Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
}
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
}
},
msg = self.to_orchestra_rx.select_next_some() => {
match msg {
ToOrchestra::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
}
},
res = self.running_subsystems.select_next_some() => {
gum::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return res;
},
}
}
}
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => entry.insert(block.number),
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
return Ok(())
},
};
let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
Some((span, status)) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block.hash,
number: block.number,
status,
span,
}),
None => ActiveLeavesUpdate::default(),
};
if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
debug_assert_eq!(block.number.saturating_sub(1), number);
update.deactivated.push(block.parent_hash);
self.on_head_deactivated(&block.parent_hash);
}
self.clean_up_external_listeners();
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
}
async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
self.active_leaves.retain(|h, n| {
if *n <= block.number && *h != block.hash {
update.deactivated.push(*h);
false
} else {
true
}
});
for deactivated in &update.deactivated {
self.on_head_deactivated(deactivated)
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
.await?;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
}
async fn on_head_activated(
&mut self,
hash: &Hash,
parent_hash: Option<Hash>,
) -> Option<(Arc<jaeger::Span>, LeafStatus)> {
if !self.supports_parachains.head_supports_parachains(hash).await {
return None
}
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf got activated, notifying exterinal listeners"
);
for listener in listeners {
let _ = listener.send(Ok(()));
}
}
let mut span = jaeger::Span::new(*hash, "leaf-activated");
if let Some(parent_span) = parent_hash.and_then(|h| self.span_per_active_leaf.get(&h)) {
span.add_follows_from(parent_span);
}
let span = Arc::new(span);
self.span_per_active_leaf.insert(*hash, span.clone());
let status = if let Some(_) = self.known_leaves.put(*hash, ()) {
LeafStatus::Stale
} else {
LeafStatus::Fresh
};
Some((span, status))
}
fn on_head_deactivated(&mut self, hash: &Hash) {
self.metrics.on_head_deactivated();
self.activation_external_listeners.remove(hash);
self.span_per_active_leaf.remove(hash);
}
fn clean_up_external_listeners(&mut self) {
self.activation_external_listeners.retain(|_, v| {
v.retain(|c| !c.is_canceled());
!v.is_empty()
})
}
fn handle_external_request(&mut self, request: ExternalRequest) {
match request {
ExternalRequest::WaitForActivation { hash, response_channel } => {
if self.active_leaves.get(&hash).is_some() {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf was already ready - answering `WaitForActivation`"
);
let _ = response_channel.send(Ok(()));
} else {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf not yet ready - queuing `WaitForActivation` sender"
);
self.activation_external_listeners
.entry(hash)
.or_default()
.push(response_channel);
}
},
}
}
fn spawn_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn(task_name, subsystem_name, j);
}
fn spawn_blocking_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn_blocking(task_name, subsystem_name, j);
}
}