#[cfg(test)]
mod tests;
use std::sync::Arc;
use crate::SubscriptionTaskExecutor;
use codec::{Decode, Encode};
use futures::{FutureExt, TryFutureExt};
use jsonrpsee::{
core::{async_trait, Error as JsonRpseeError, RpcResult},
types::SubscriptionResult,
SubscriptionSink,
};
use sc_rpc_api::DenyUnsafe;
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
TransactionSource, TxHash,
};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{generic, traits::Block as BlockT};
use sp_session::SessionKeys;
use self::error::{Error, Result};
pub use sc_rpc_api::author::*;
pub struct Author<P, Client> {
client: Arc<Client>,
pool: Arc<P>,
keystore: SyncCryptoStorePtr,
deny_unsafe: DenyUnsafe,
executor: SubscriptionTaskExecutor,
}
impl<P, Client> Author<P, Client> {
pub fn new(
client: Arc<Client>,
pool: Arc<P>,
keystore: SyncCryptoStorePtr,
deny_unsafe: DenyUnsafe,
executor: SubscriptionTaskExecutor,
) -> Self {
Author { client, pool, keystore, deny_unsafe, executor }
}
}
const TX_SOURCE: TransactionSource = TransactionSource::External;
#[async_trait]
impl<P, Client> AuthorApiServer<TxHash<P>, BlockHash<P>> for Author<P, Client>
where
P: TransactionPool + Sync + Send + 'static,
Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
Client::Api: SessionKeys<P::Block>,
P::Hash: Unpin,
<P::Block as BlockT>::Hash: Unpin,
{
async fn submit_extrinsic(&self, ext: Bytes) -> RpcResult<TxHash<P>> {
let xt = match Decode::decode(&mut &ext[..]) {
Ok(xt) => xt,
Err(err) => return Err(Error::Client(Box::new(err)).into()),
};
let best_block_hash = self.client.info().best_hash;
self.pool
.submit_one(&generic::BlockId::hash(best_block_hash), TX_SOURCE, xt)
.await
.map_err(|e| {
e.into_pool_error()
.map(|e| Error::Pool(e))
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
.into()
})
}
fn insert_key(&self, key_type: String, suri: String, public: Bytes) -> RpcResult<()> {
self.deny_unsafe.check_if_safe()?;
let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
SyncCryptoStore::insert_unknown(&*self.keystore, key_type, &suri, &public[..])
.map_err(|_| Error::KeyStoreUnavailable)?;
Ok(())
}
fn rotate_keys(&self) -> RpcResult<Bytes> {
self.deny_unsafe.check_if_safe()?;
let best_block_hash = self.client.info().best_hash;
self.client
.runtime_api()
.generate_session_keys(&generic::BlockId::Hash(best_block_hash), None)
.map(Into::into)
.map_err(|api_err| Error::Client(Box::new(api_err)).into())
}
fn has_session_keys(&self, session_keys: Bytes) -> RpcResult<bool> {
self.deny_unsafe.check_if_safe()?;
let best_block_hash = self.client.info().best_hash;
let keys = self
.client
.runtime_api()
.decode_session_keys(&generic::BlockId::Hash(best_block_hash), session_keys.to_vec())
.map_err(|e| Error::Client(Box::new(e)))?
.ok_or(Error::InvalidSessionKeys)?;
Ok(SyncCryptoStore::has_keys(&*self.keystore, &keys))
}
fn has_key(&self, public_key: Bytes, key_type: String) -> RpcResult<bool> {
self.deny_unsafe.check_if_safe()?;
let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
Ok(SyncCryptoStore::has_keys(&*self.keystore, &[(public_key.to_vec(), key_type)]))
}
fn pending_extrinsics(&self) -> RpcResult<Vec<Bytes>> {
Ok(self.pool.ready().map(|tx| tx.data().encode().into()).collect())
}
fn remove_extrinsic(
&self,
bytes_or_hash: Vec<hash::ExtrinsicOrHash<TxHash<P>>>,
) -> RpcResult<Vec<TxHash<P>>> {
self.deny_unsafe.check_if_safe()?;
let hashes = bytes_or_hash
.into_iter()
.map(|x| match x {
hash::ExtrinsicOrHash::Hash(h) => Ok(h),
hash::ExtrinsicOrHash::Extrinsic(bytes) => {
let xt = Decode::decode(&mut &bytes[..])?;
Ok(self.pool.hash_of(&xt))
},
})
.collect::<Result<Vec<_>>>()?;
Ok(self
.pool
.remove_invalid(&hashes)
.into_iter()
.map(|tx| tx.hash().clone())
.collect())
}
fn watch_extrinsic(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult {
let best_block_hash = self.client.info().best_hash;
let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(|e| Error::from(e)) {
Ok(dxt) => dxt,
Err(e) => {
let _ = sink.reject(JsonRpseeError::from(e));
return Ok(())
},
};
let submit = self
.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), TX_SOURCE, dxt)
.map_err(|e| {
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
});
let fut = async move {
let stream = match submit.await {
Ok(stream) => stream,
Err(err) => {
let _ = sink.reject(JsonRpseeError::from(err));
return
},
};
sink.pipe_from_stream(stream).await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
}