use crate::{
transaction::{
api::TransactionApiServer,
error::Error,
event::{
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
TransactionEvent,
},
},
SubscriptionTaskExecutor,
};
use jsonrpsee::{
core::async_trait,
types::{
error::{CallError, ErrorObject},
SubscriptionResult,
},
SubscriptionSink,
};
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
};
use std::sync::Arc;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_runtime::{generic, traits::Block as BlockT};
use codec::Decode;
use futures::{FutureExt, StreamExt, TryFutureExt};
pub struct Transaction<Pool, Client> {
client: Arc<Client>,
pool: Arc<Pool>,
executor: SubscriptionTaskExecutor,
}
impl<Pool, Client> Transaction<Pool, Client> {
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
Transaction { client, pool, executor }
}
}
const TX_SOURCE: TransactionSource = TransactionSource::External;
const BAD_FORMAT: i32 = 1001;
#[async_trait]
impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
where
Pool: TransactionPool + Sync + Send + 'static,
Pool::Hash: Unpin,
<Pool::Block as BlockT>::Hash: Unpin,
Client: HeaderBackend<Pool::Block> + ProvideRuntimeApi<Pool::Block> + Send + Sync + 'static,
{
fn submit_and_watch(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult {
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
Ok(decoded_extrinsic) => decoded_extrinsic,
Err(e) => {
let err = CallError::Custom(ErrorObject::owned(
BAD_FORMAT,
format!("Extrinsic has invalid format: {}", e),
None::<()>,
));
let _ = sink.reject(err);
return Ok(())
},
};
let best_block_hash = self.client.info().best_hash;
let submit = self
.pool
.submit_and_watch(
&generic::BlockId::hash(best_block_hash),
TX_SOURCE,
decoded_extrinsic,
)
.map_err(|e| {
e.into_pool_error()
.map(Error::from)
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
});
let fut = async move {
match submit.await {
Ok(stream) => {
let mut state = TransactionState::new();
let stream =
stream.filter_map(|event| async move { state.handle_event(event) });
sink.pipe_from_stream(stream.boxed()).await;
},
Err(err) => {
let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
sink.pipe_from_stream(futures::stream::once(async { event }).boxed()).await;
},
};
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
}
#[derive(Clone, Copy)]
struct TransactionState {
broadcasted: bool,
}
impl TransactionState {
pub fn new() -> Self {
TransactionState { broadcasted: false }
}
#[inline]
pub fn handle_event<Hash: Clone, BlockHash: Clone>(
&mut self,
event: TransactionStatus<Hash, BlockHash>,
) -> Option<TransactionEvent<BlockHash>> {
match event {
TransactionStatus::Ready | TransactionStatus::Future =>
Some(TransactionEvent::<BlockHash>::Validated),
TransactionStatus::Broadcast(peers) => {
self.broadcasted = self.broadcasted || !peers.is_empty();
Some(TransactionEvent::Broadcasted(TransactionBroadcasted {
num_peers: peers.len(),
}))
},
TransactionStatus::InBlock((hash, index)) =>
Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash,
index,
}))),
TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
TransactionStatus::FinalityTimeout(_) =>
Some(TransactionEvent::Dropped(TransactionDropped {
broadcasted: self.broadcasted,
error: "Maximum number of finality watchers has been reached".into(),
})),
TransactionStatus::Finalized((hash, index)) =>
Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic was rendered invalid by another extrinsic".into(),
})),
TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic dropped from the pool due to exceeding limits".into(),
})),
TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic marked as invalid".into(),
})),
}
}
}