use crate::config::*;
use codec::{Decode, Encode};
use futures::{prelude::*, stream::FuturesUnordered};
use libp2p::{multiaddr, PeerId};
use log::{debug, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network_common::{
config::{NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, SetConfig},
error,
protocol::{event::Event, role::ObservedRole, ProtocolName},
service::{NetworkEventStream, NetworkNotification, NetworkPeers},
utils::{interval, LruHashSet},
ExHashT,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap},
iter,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::Poll,
};
pub mod config;
pub type Transactions<E> = Vec<E>;
mod rep {
use sc_peerset::ReputationChange as Rep;
pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
}
struct Metrics {
propagated_transactions: Counter<U64>,
}
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
propagated_transactions: register(
Counter::new(
"substrate_sync_propagated_transactions",
"Number of transactions propagated to at least one peer",
)?,
r,
)?,
})
}
}
#[pin_project::pin_project]
struct PendingTransaction<H> {
#[pin]
validation: TransactionImportFuture,
tx_hash: H,
}
impl<H: ExHashT> Future for PendingTransaction<H> {
type Output = (H, TransactionImport);
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) {
return Poll::Ready((this.tx_hash.clone(), import_result))
}
Poll::Pending
}
}
pub struct TransactionsHandlerPrototype {
protocol_name: ProtocolName,
fallback_protocol_names: Vec<ProtocolName>,
}
impl TransactionsHandlerPrototype {
pub fn new<Hash: AsRef<[u8]>>(
protocol_id: ProtocolId,
genesis_hash: Hash,
fork_id: Option<&str>,
) -> Self {
let genesis_hash = genesis_hash.as_ref();
let protocol_name = if let Some(fork_id) = fork_id {
format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
} else {
format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
};
let legacy_protocol_name = format!("/{}/transactions/1", protocol_id.as_ref());
Self {
protocol_name: protocol_name.into(),
fallback_protocol_names: iter::once(legacy_protocol_name.into()).collect(),
}
}
pub fn set_config(&self) -> NonDefaultSetConfig {
NonDefaultSetConfig {
notifications_protocol: self.protocol_name.clone(),
fallback_names: self.fallback_protocol_names.clone(),
max_notification_size: MAX_TRANSACTIONS_SIZE,
handshake: None,
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
}
pub fn build<
B: BlockT + 'static,
H: ExHashT,
S: NetworkPeers + NetworkEventStream + NetworkNotification + sp_consensus::SyncOracle,
>(
self,
service: S,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
metrics_registry: Option<&Registry>,
) -> error::Result<(TransactionsHandler<B, H, S>, TransactionsHandlerController<H>)> {
let event_stream = service.event_stream("transactions-handler");
let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000);
let handler = TransactionsHandler {
protocol_name: self.protocol_name,
propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_transactions: FuturesUnordered::new(),
pending_transactions_peers: HashMap::new(),
service,
event_stream: event_stream.fuse(),
peers: HashMap::new(),
transaction_pool,
from_controller,
metrics: if let Some(r) = metrics_registry {
Some(Metrics::register(r)?)
} else {
None
},
};
let controller = TransactionsHandlerController { to_handler };
Ok((handler, controller))
}
}
pub struct TransactionsHandlerController<H: ExHashT> {
to_handler: TracingUnboundedSender<ToHandler<H>>,
}
impl<H: ExHashT> TransactionsHandlerController<H> {
pub fn propagate_transactions(&self) {
let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);
}
pub fn propagate_transaction(&self, hash: H) {
let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));
}
}
enum ToHandler<H: ExHashT> {
PropagateTransactions,
PropagateTransaction(H),
}
pub struct TransactionsHandler<
B: BlockT + 'static,
H: ExHashT,
S: NetworkPeers + NetworkEventStream + NetworkNotification + sp_consensus::SyncOracle,
> {
protocol_name: ProtocolName,
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
pending_transactions: FuturesUnordered<PendingTransaction<H>>,
pending_transactions_peers: HashMap<H, Vec<PeerId>>,
service: S,
event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = Event> + Send>>>,
peers: HashMap<PeerId, Peer<H>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
from_controller: TracingUnboundedReceiver<ToHandler<H>>,
metrics: Option<Metrics>,
}
#[derive(Debug)]
struct Peer<H: ExHashT> {
known_transactions: LruHashSet<H>,
role: ObservedRole,
}
impl<B, H, S> TransactionsHandler<B, H, S>
where
B: BlockT + 'static,
H: ExHashT,
S: NetworkPeers + NetworkEventStream + NetworkNotification + sp_consensus::SyncOracle,
{
pub async fn run(mut self) {
loop {
futures::select! {
_ = self.propagate_timeout.next() => {
self.propagate_transactions();
},
(tx_hash, result) = self.pending_transactions.select_next_some() => {
if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
} else {
warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
}
},
network_event = self.event_stream.next() => {
if let Some(network_event) = network_event {
self.handle_network_event(network_event).await;
} else {
return;
}
},
message = self.from_controller.select_next_some() => {
match message {
ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
ToHandler::PropagateTransactions => self.propagate_transactions(),
}
},
}
}
}
async fn handle_network_event(&mut self, event: Event) {
match event {
Event::Dht(_) => {},
Event::SyncConnected { remote } => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.service.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: "sync", "Add reserved peer failed: {}", err);
}
},
Event::SyncDisconnected { remote } => {
self.service.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(remote).collect(),
);
},
Event::NotificationStreamOpened { remote, protocol, role, .. }
if protocol == self.protocol_name =>
{
let _was_in = self.peers.insert(
remote,
Peer {
known_transactions: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
),
role,
},
);
debug_assert!(_was_in.is_none());
},
Event::NotificationStreamClosed { remote, protocol }
if protocol == self.protocol_name =>
{
let _peer = self.peers.remove(&remote);
debug_assert!(_peer.is_some());
},
Event::NotificationsReceived { remote, messages } => {
for (protocol, message) in messages {
if protocol != self.protocol_name {
continue
}
if let Ok(m) =
<Transactions<B::Extrinsic> as Decode>::decode(&mut message.as_ref())
{
self.on_transactions(remote, m);
} else {
warn!(target: "sub-libp2p", "Failed to decode transactions list");
}
}
},
Event::NotificationStreamOpened { .. } | Event::NotificationStreamClosed { .. } => {},
}
}
fn on_transactions(&mut self, who: PeerId, transactions: Transactions<B::Extrinsic>) {
if self.service.is_major_syncing() {
trace!(target: "sync", "{} Ignoring transactions while major syncing", who);
return
}
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
for t in transactions {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!(
target: "sync",
"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
MAX_PENDING_TRANSACTIONS,
);
break
}
let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash.clone());
self.service.report_peer(who, rep::ANY_TRANSACTION);
match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
self.pending_transactions.push(PendingTransaction {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who);
},
}
}
}
}
fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
match import {
TransactionImport::KnownGood =>
self.service.report_peer(who, rep::ANY_TRANSACTION_REFUND),
TransactionImport::NewGood => self.service.report_peer(who, rep::GOOD_TRANSACTION),
TransactionImport::Bad => self.service.report_peer(who, rep::BAD_TRANSACTION),
TransactionImport::None => {},
}
}
pub fn propagate_transaction(&mut self, hash: &H) {
if self.service.is_major_syncing() {
return
}
debug!(target: "sync", "Propagating transaction [{:?}]", hash);
if let Some(transaction) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
self.transaction_pool.on_broadcasted(propagated_to);
}
}
fn do_propagate_transactions(
&mut self,
transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::<_, Vec<_>>::new();
let mut propagated_transactions = 0;
for (who, peer) in self.peers.iter_mut() {
if matches!(peer.role, ObservedRole::Light) {
continue
}
let (hashes, to_send): (Vec<_>, Vec<_>) = transactions
.iter()
.filter(|&(ref hash, _)| peer.known_transactions.insert(hash.clone()))
.cloned()
.unzip();
propagated_transactions += hashes.len();
if !to_send.is_empty() {
for hash in hashes {
propagated_to.entry(hash).or_default().push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.service
.write_notification(*who, self.protocol_name.clone(), to_send.encode());
}
}
if let Some(ref metrics) = self.metrics {
metrics.propagated_transactions.inc_by(propagated_transactions as _)
}
propagated_to
}
fn propagate_transactions(&mut self) {
if self.service.is_major_syncing() {
return
}
debug!(target: "sync", "Propagating transactions");
let transactions = self.transaction_pool.transactions();
let propagated_to = self.do_propagate_transactions(&transactions);
self.transaction_pool.on_broadcasted(propagated_to);
}
}