use beefy_primitives::{crypto::AuthorityId, ValidatorSet};
use codec::Encode;
use futures::channel::{oneshot, oneshot::Canceled};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName};
use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest,
};
use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};
use crate::{
communication::request_response::{Error, JustificationRequest},
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
KnownPeers,
};
type Response = Result<Vec<u8>, RequestFailure>;
type ResponseReceiver = oneshot::Receiver<Response>;
#[derive(Clone, Debug)]
struct RequestInfo<B: Block> {
block: NumberFor<B>,
active_set: ValidatorSet<AuthorityId>,
}
enum State<B: Block> {
Idle,
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}
pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
peers_cache: VecDeque<PeerId>,
state: State<B>,
}
impl<B: Block> OnDemandJustificationsEngine<B> {
pub fn new(
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
) -> Self {
Self {
network,
protocol_name,
live_peers,
peers_cache: VecDeque::new(),
state: State::Idle,
}
}
fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
self.peers_cache = self.live_peers.lock().at_least_at_block(block);
}
fn try_next_peer(&mut self) -> Option<PeerId> {
let live = self.live_peers.lock();
while let Some(peer) = self.peers_cache.pop_front() {
if live.contains(&peer) {
return Some(peer)
}
}
None
}
fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B>) {
debug!(
target: "beefy::sync",
"🥩 requesting justif #{:?} from peer {:?}",
req_info.block,
peer,
);
let payload = JustificationRequest::<B> { begin: req_info.block }.encode();
let (tx, rx) = oneshot::channel();
self.network.start_request(
peer,
self.protocol_name.clone(),
payload,
tx,
IfDisconnected::ImmediateError,
);
self.state = State::AwaitingResponse(peer, req_info, rx);
}
pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
if matches!(self.state, State::AwaitingResponse(_, _, _)) {
return
}
self.reset_peers_cache_for_block(block);
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, RequestInfo { block, active_set });
} else {
debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block);
}
}
pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
match &self.state {
State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
debug!(
target: "beefy::sync", "🥩 cancel pending request for justification #{:?}",
req_info.block
);
self.state = State::Idle;
},
_ => (),
}
}
fn process_response(
&mut self,
peer: PeerId,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})?
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(
&encoded[..],
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})
})
}
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::pending!();
return None
},
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
(*peer, req_info.clone(), resp)
},
};
self.state = State::Idle;
let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, req_info);
} else {
warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block);
}
})
.map(|proof| {
debug!(
target: "beefy::sync",
"🥩 received valid on-demand justif #{:?} from {:?}",
block, peer
);
proof
})
.ok()
}
}