use beefy_primitives::BEEFY_ENGINE_ID;
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{config as netconfig, config::RequestResponseConfig, PeerId, ReputationChange};
use sc_network_common::protocol::ProtocolName;
use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};
use crate::communication::request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
};
#[derive(Debug)]
pub(crate) struct IncomingRequest<B: Block> {
pub peer: PeerId,
pub payload: JustificationRequest<B>,
pub pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
}
impl<B: Block> IncomingRequest<B> {
pub fn new(
peer: PeerId,
payload: JustificationRequest<B>,
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
) -> Self {
Self { peer, payload, pending_response }
}
pub fn try_from_raw(
raw: netconfig::IncomingRequest,
reputation_changes: Vec<ReputationChange>,
) -> Result<Self, Error> {
let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let response = netconfig::OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
return Err(Error::DecodingErrorNoReputationChange(peer, err))
}
return Err(Error::DecodingError(peer, err))
},
};
Ok(Self::new(peer, payload, pending_response))
}
}
pub(crate) struct IncomingRequestReceiver {
raw: mpsc::Receiver<netconfig::IncomingRequest>,
}
impl IncomingRequestReceiver {
pub fn new(inner: mpsc::Receiver<netconfig::IncomingRequest>) -> Self {
Self { raw: inner }
}
pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
where
B: Block,
F: FnOnce() -> Vec<ReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(Error::RequestChannelExhausted),
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
};
Ok(req)
}
}
pub struct BeefyJustifsRequestHandler<B, Client> {
pub(crate) request_receiver: IncomingRequestReceiver,
pub(crate) justif_protocol_name: ProtocolName,
pub(crate) client: Arc<Client>,
pub(crate) _block: PhantomData<B>,
}
impl<B, Client> BeefyJustifsRequestHandler<B, Client>
where
B: Block,
Client: BlockBackend<B> + Send + Sync,
{
pub fn new<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
client: Arc<Client>,
) -> (Self, RequestResponseConfig) {
let (request_receiver, config) =
on_demand_justifications_protocol_config(genesis_hash, fork_id);
let justif_protocol_name = config.name.clone();
(Self { request_receiver, justif_protocol_name, client, _block: PhantomData }, config)
}
pub fn protocol_name(&self) -> ProtocolName {
self.justif_protocol_name.clone()
}
fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
let maybe_encoded_proof = if let Some(hash) =
self.client.block_hash(request.payload.begin).map_err(Error::Client)?
{
self.client
.justifications(hash)
.map_err(Error::Client)?
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
.ok_or(())
} else {
Err(())
};
request
.pending_response
.send(netconfig::OutgoingResponse {
result: maybe_encoded_proof,
reputation_changes: Vec::new(),
sent_feedback: None,
})
.map_err(|_| Error::SendResponse)
}
pub async fn run(mut self) {
trace!(target: "beefy::sync", "🥩 Running BeefyJustifsRequestHandler");
while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
let peer = request.peer;
match self.handle_request(request) {
Ok(()) => {
debug!(
target: "beefy::sync",
"🥩 Handled BEEFY justification request from {:?}.", peer
)
},
Err(e) => {
debug!(
target: "beefy::sync",
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
)
},
}
}
}
}