use super::{client_err, ChainBackend, Error};
use crate::SubscriptionTaskExecutor;
use std::{marker::PhantomData, sync::Arc};
use futures::{
future::{self, FutureExt},
stream::{self, Stream, StreamExt},
};
use jsonrpsee::SubscriptionSink;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};
pub struct FullChain<Block: BlockT, Client> {
client: Arc<Client>,
_phantom: PhantomData<Block>,
executor: SubscriptionTaskExecutor,
}
impl<Block: BlockT, Client> FullChain<Block, Client> {
pub fn new(client: Arc<Client>, executor: SubscriptionTaskExecutor) -> Self {
Self { client, executor, _phantom: PhantomData }
}
}
impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
fn client(&self) -> &Arc<Client> {
&self.client
}
fn header(&self, hash: Option<Block::Hash>) -> Result<Option<Block::Header>, Error> {
self.client.header(self.unwrap_or_best(hash)).map_err(client_err)
}
fn block(&self, hash: Option<Block::Hash>) -> Result<Option<SignedBlock<Block>>, Error> {
self.client.block(self.unwrap_or_best(hash)).map_err(client_err)
}
fn subscribe_all_heads(&self, sink: SubscriptionSink) {
subscribe_headers(
&self.client,
&self.executor,
sink,
|| self.client().info().best_hash,
|| {
self.client()
.import_notification_stream()
.map(|notification| notification.header)
},
)
}
fn subscribe_new_heads(&self, sink: SubscriptionSink) {
subscribe_headers(
&self.client,
&self.executor,
sink,
|| self.client().info().best_hash,
|| {
self.client()
.import_notification_stream()
.filter(|notification| future::ready(notification.is_new_best))
.map(|notification| notification.header)
},
)
}
fn subscribe_finalized_heads(&self, sink: SubscriptionSink) {
subscribe_headers(
&self.client,
&self.executor,
sink,
|| self.client().info().finalized_hash,
|| {
self.client()
.finality_notification_stream()
.map(|notification| notification.header)
},
)
}
}
fn subscribe_headers<Block, Client, F, G, S>(
client: &Arc<Client>,
executor: &SubscriptionTaskExecutor,
mut sink: SubscriptionSink,
best_block_hash: G,
stream: F,
) where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: HeaderBackend<Block> + 'static,
F: FnOnce() -> S,
G: FnOnce() -> Block::Hash,
S: Stream<Item = Block::Header> + Send + Unpin + 'static,
{
let maybe_header = client
.header(best_block_hash())
.map_err(client_err)
.and_then(|header| header.ok_or_else(|| Error::Other("Best header missing.".into())))
.map_err(|e| log::warn!("Best header error {:?}", e))
.ok();
let stream = stream::iter(maybe_header).chain(stream());
let fut = async move {
sink.pipe_from_stream(stream).await;
};
executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}