mod peersstate;
use futures::{channel::oneshot, prelude::*};
use log::{debug, error, trace};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use serde_json::json;
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use wasm_timer::Delay;
pub use libp2p::PeerId;
pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100);
const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
const FORGET_AFTER: Duration = Duration::from_secs(3600);
#[derive(Debug)]
enum Action {
AddReservedPeer(SetId, PeerId),
RemoveReservedPeer(SetId, PeerId),
SetReservedPeers(SetId, HashSet<PeerId>),
SetReservedOnly(SetId, bool),
ReportPeer(PeerId, ReputationChange),
AddToPeersSet(SetId, PeerId),
RemoveFromPeersSet(SetId, PeerId),
PeerReputation(PeerId, oneshot::Sender<i32>),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SetId(usize);
impl SetId {
pub const fn from(id: usize) -> Self {
Self(id)
}
}
impl From<usize> for SetId {
fn from(id: usize) -> Self {
Self(id)
}
}
impl From<SetId> for usize {
fn from(id: SetId) -> Self {
id.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReputationChange {
pub value: i32,
pub reason: &'static str,
}
impl ReputationChange {
pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
Self { value, reason }
}
pub const fn new_fatal(reason: &'static str) -> ReputationChange {
Self { value: i32::MIN, reason }
}
}
#[derive(Debug, Clone)]
pub struct PeersetHandle {
tx: TracingUnboundedSender<Action>,
}
impl PeersetHandle {
pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id));
}
pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id));
}
pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) {
let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved));
}
pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet<PeerId>) {
let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids));
}
pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) {
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
}
pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id));
}
pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id));
}
pub async fn peer_reputation(self, peer_id: PeerId) -> Result<i32, ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx));
rx.await.map_err(|_| ())
}
}
#[derive(Debug, PartialEq)]
pub enum Message {
Connect {
set_id: SetId,
peer_id: PeerId,
},
Drop {
set_id: SetId,
peer_id: PeerId,
},
Accept(IncomingIndex),
Reject(IncomingIndex),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct IncomingIndex(pub u64);
impl From<u64> for IncomingIndex {
fn from(val: u64) -> Self {
Self(val)
}
}
#[derive(Debug)]
pub struct PeersetConfig {
pub sets: Vec<SetConfig>,
}
#[derive(Debug)]
pub struct SetConfig {
pub in_peers: u32,
pub out_peers: u32,
pub bootnodes: Vec<PeerId>,
pub reserved_nodes: HashSet<PeerId>,
pub reserved_only: bool,
}
#[derive(Debug)]
pub struct Peerset {
data: peersstate::PeersState,
reserved_nodes: Vec<(HashSet<PeerId>, bool)>,
rx: TracingUnboundedReceiver<Action>,
tx: TracingUnboundedSender<Action>,
message_queue: VecDeque<Message>,
created: Instant,
latest_time_update: Instant,
next_periodic_alloc_slots: Delay,
}
impl Peerset {
pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) {
let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000);
let handle = PeersetHandle { tx: tx.clone() };
let mut peerset = {
let now = Instant::now();
Self {
data: peersstate::PeersState::new(config.sets.iter().map(|set| {
peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers }
})),
tx,
rx,
reserved_nodes: config
.sets
.iter()
.map(|set| (set.reserved_nodes.clone(), set.reserved_only))
.collect(),
message_queue: VecDeque::new(),
created: now,
latest_time_update: now,
next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)),
}
};
for (set, set_config) in config.sets.into_iter().enumerate() {
for node in set_config.reserved_nodes {
peerset.data.add_no_slot_node(set, node);
}
for peer_id in set_config.bootnodes {
if let peersstate::Peer::Unknown(entry) = peerset.data.peer(set, &peer_id) {
entry.discover();
} else {
debug!(target: "peerset", "Duplicate bootnode in config: {:?}", peer_id);
}
}
}
for set_index in 0..peerset.data.num_sets() {
peerset.alloc_slots(SetId(set_index));
}
(peerset, handle)
}
fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id);
if !newly_inserted {
return
}
self.data.add_no_slot_node(set_id.0, peer_id);
self.alloc_slots(set_id);
}
fn on_remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
if !self.reserved_nodes[set_id.0].0.remove(&peer_id) {
return
}
self.data.remove_no_slot_node(set_id.0, &peer_id);
if !self.reserved_nodes[set_id.0].1 {
return
}
if let peersstate::Peer::Connected(peer) = self.data.peer(set_id.0, &peer_id) {
peer.disconnect();
self.message_queue.push_back(Message::Drop { set_id, peer_id });
}
}
fn on_set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet<PeerId>) {
let (to_insert, to_remove) = {
let to_insert = peer_ids
.difference(&self.reserved_nodes[set_id.0].0)
.cloned()
.collect::<Vec<_>>();
let to_remove = self.reserved_nodes[set_id.0]
.0
.difference(&peer_ids)
.cloned()
.collect::<Vec<_>>();
(to_insert, to_remove)
};
for node in to_insert {
self.on_add_reserved_peer(set_id, node);
}
for node in to_remove {
self.on_remove_reserved_peer(set_id, node);
}
}
fn on_set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) {
self.reserved_nodes[set_id.0].1 = reserved_only;
if reserved_only {
for peer_id in
self.data.connected_peers(set_id.0).cloned().collect::<Vec<_>>().into_iter()
{
if self.reserved_nodes[set_id.0].0.contains(&peer_id) {
continue
}
let peer = self.data.peer(set_id.0, &peer_id).into_connected().expect(
"We are enumerating connected peers, therefore the peer is connected; qed",
);
peer.disconnect();
self.message_queue.push_back(Message::Drop { set_id, peer_id });
}
} else {
self.alloc_slots(set_id);
}
}
pub fn reserved_peers(&self, set_id: SetId) -> impl Iterator<Item = &PeerId> {
self.reserved_nodes[set_id.0].0.iter()
}
pub fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) {
if let peersstate::Peer::Unknown(entry) = self.data.peer(set_id.0, &peer_id) {
entry.discover();
self.alloc_slots(set_id);
}
}
fn on_remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) {
if self.reserved_nodes[set_id.0].0.contains(&peer_id) {
return
}
match self.data.peer(set_id.0, &peer_id) {
peersstate::Peer::Connected(peer) => {
self.message_queue.push_back(Message::Drop { set_id, peer_id: *peer.peer_id() });
peer.disconnect().forget_peer();
},
peersstate::Peer::NotConnected(peer) => {
peer.forget_peer();
},
peersstate::Peer::Unknown(_) => {},
}
}
fn on_report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
self.update_time();
let mut reputation = self.data.peer_reputation(peer_id);
reputation.add_reputation(change.value);
if reputation.reputation() >= BANNED_THRESHOLD {
trace!(target: "peerset", "Report {}: {:+} to {}. Reason: {}",
peer_id, change.value, reputation.reputation(), change.reason
);
return
}
debug!(target: "peerset", "Report {}: {:+} to {}. Reason: {}, Disconnecting",
peer_id, change.value, reputation.reputation(), change.reason
);
drop(reputation);
for set_index in 0..self.data.num_sets() {
if let peersstate::Peer::Connected(peer) = self.data.peer(set_index, &peer_id) {
let peer = peer.disconnect();
self.message_queue.push_back(Message::Drop {
set_id: SetId(set_index),
peer_id: peer.into_peer_id(),
});
self.alloc_slots(SetId(set_index));
}
}
}
fn on_peer_reputation(&mut self, peer_id: PeerId, pending_response: oneshot::Sender<i32>) {
let reputation = self.data.peer_reputation(peer_id);
let _ = pending_response.send(reputation.reputation());
}
fn update_time(&mut self) {
let now = Instant::now();
let secs_diff = {
let elapsed_latest = self.latest_time_update - self.created;
let elapsed_now = now - self.created;
self.latest_time_update = now;
elapsed_now.as_secs() - elapsed_latest.as_secs()
};
for _ in 0..secs_diff {
for peer_id in self.data.peers().cloned().collect::<Vec<_>>() {
fn reput_tick(reput: i32) -> i32 {
let mut diff = reput / 50;
if diff == 0 && reput < 0 {
diff = -1;
} else if diff == 0 && reput > 0 {
diff = 1;
}
reput.saturating_sub(diff)
}
let mut peer_reputation = self.data.peer_reputation(peer_id);
let before = peer_reputation.reputation();
let after = reput_tick(before);
trace!(target: "peerset", "Fleeting {}: {} -> {}", peer_id, before, after);
peer_reputation.set_reputation(after);
if after != 0 {
continue
}
drop(peer_reputation);
for set_index in 0..self.data.num_sets() {
match self.data.peer(set_index, &peer_id) {
peersstate::Peer::Connected(_) => {},
peersstate::Peer::NotConnected(peer) => {
if peer.last_connected_or_discovered() + FORGET_AFTER < now {
peer.forget_peer();
}
},
peersstate::Peer::Unknown(_) => {
},
}
}
}
}
}
fn alloc_slots(&mut self, set_id: SetId) {
self.update_time();
for reserved_node in &self.reserved_nodes[set_id.0].0 {
let entry = match self.data.peer(set_id.0, reserved_node) {
peersstate::Peer::Unknown(n) => n.discover(),
peersstate::Peer::NotConnected(n) => n,
peersstate::Peer::Connected(_) => continue,
};
if entry.reputation() < BANNED_THRESHOLD {
break
}
match entry.try_outgoing() {
Ok(conn) => self
.message_queue
.push_back(Message::Connect { set_id, peer_id: conn.into_peer_id() }),
Err(_) => {
debug_assert!(false);
log::error!(
target: "peerset",
"Not enough slots to connect to reserved node"
);
},
}
}
if self.reserved_nodes[set_id.0].1 {
return
}
while self.data.has_free_outgoing_slot(set_id.0) {
let next = match self.data.highest_not_connected_peer(set_id.0) {
Some(n) => n,
None => break,
};
if next.reputation() < BANNED_THRESHOLD {
break
}
match next.try_outgoing() {
Ok(conn) => self
.message_queue
.push_back(Message::Connect { set_id, peer_id: conn.into_peer_id() }),
Err(_) => {
debug_assert!(false);
break
},
}
}
}
pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) {
trace!(target: "peerset", "Incoming {:?}", peer_id);
self.update_time();
if self.reserved_nodes[set_id.0].1 && !self.reserved_nodes[set_id.0].0.contains(&peer_id) {
self.message_queue.push_back(Message::Reject(index));
return
}
let not_connected = match self.data.peer(set_id.0, &peer_id) {
peersstate::Peer::Connected(_) => return,
peersstate::Peer::NotConnected(mut entry) => {
entry.bump_last_connected_or_discovered();
entry
},
peersstate::Peer::Unknown(entry) => entry.discover(),
};
if not_connected.reputation() < BANNED_THRESHOLD {
self.message_queue.push_back(Message::Reject(index));
return
}
match not_connected.try_accept_incoming() {
Ok(_) => self.message_queue.push_back(Message::Accept(index)),
Err(_) => self.message_queue.push_back(Message::Reject(index)),
}
}
pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) {
self.update_time();
match self.data.peer(set_id.0, &peer_id) {
peersstate::Peer::Connected(mut entry) => {
entry.add_reputation(DISCONNECT_REPUTATION_CHANGE);
trace!(target: "peerset", "Dropping {}: {:+} to {}",
peer_id, DISCONNECT_REPUTATION_CHANGE, entry.reputation());
entry.disconnect();
},
peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) => {
error!(target: "peerset", "Received dropped() for non-connected node")
},
}
if let DropReason::Refused = reason {
self.on_remove_from_peers_set(set_id, peer_id);
}
self.alloc_slots(set_id);
}
pub fn report_peer(&mut self, peer_id: PeerId, score_diff: ReputationChange) {
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
}
pub fn debug_info(&mut self) -> serde_json::Value {
self.update_time();
json!({
"sets": (0..self.data.num_sets()).map(|set_index| {
json!({
"nodes": self.data.peers().cloned().collect::<Vec<_>>().into_iter().filter_map(|peer_id| {
let state = match self.data.peer(set_index, &peer_id) {
peersstate::Peer::Connected(entry) => json!({
"connected": true,
"reputation": entry.reputation()
}),
peersstate::Peer::NotConnected(entry) => json!({
"connected": false,
"reputation": entry.reputation()
}),
peersstate::Peer::Unknown(_) => return None,
};
Some((peer_id.to_base58(), state))
}).collect::<HashMap<_, _>>(),
"reserved_nodes": self.reserved_nodes[set_index].0.iter().map(|peer_id| {
peer_id.to_base58()
}).collect::<HashSet<_>>(),
"reserved_only": self.reserved_nodes[set_index].1,
})
}).collect::<Vec<_>>(),
"message_queue": self.message_queue.len(),
})
}
pub fn num_discovered_peers(&self) -> usize {
self.data.peers().len()
}
}
impl Stream for Peerset {
type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
if let Some(message) = self.message_queue.pop_front() {
return Poll::Ready(Some(message))
}
if Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx).is_ready() {
self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0));
for set_index in 0..self.data.num_sets() {
self.alloc_slots(SetId(set_index));
}
}
let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(event)) => event,
Poll::Ready(None) => return Poll::Pending,
};
match action {
Action::AddReservedPeer(set_id, peer_id) =>
self.on_add_reserved_peer(set_id, peer_id),
Action::RemoveReservedPeer(set_id, peer_id) =>
self.on_remove_reserved_peer(set_id, peer_id),
Action::SetReservedPeers(set_id, peer_ids) =>
self.on_set_reserved_peers(set_id, peer_ids),
Action::SetReservedOnly(set_id, reserved) =>
self.on_set_reserved_only(set_id, reserved),
Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff),
Action::AddToPeersSet(sets_name, peer_id) =>
self.add_to_peers_set(sets_name, peer_id),
Action::RemoveFromPeersSet(sets_name, peer_id) =>
self.on_remove_from_peers_set(sets_name, peer_id),
Action::PeerReputation(peer_id, pending_response) =>
self.on_peer_reputation(peer_id, pending_response),
}
}
}
}
pub enum DropReason {
Unknown,
Refused,
}
#[cfg(test)]
mod tests {
use super::{
IncomingIndex, Message, Peerset, PeersetConfig, ReputationChange, SetConfig, SetId,
BANNED_THRESHOLD,
};
use futures::prelude::*;
use libp2p::PeerId;
use std::{pin::Pin, task::Poll, thread, time::Duration};
fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
for expected_message in messages {
let (message, p) = next_message(peerset).expect("expected message");
assert_eq!(message, expected_message);
peerset = p;
}
peerset
}
fn next_message(mut peerset: Peerset) -> Result<(Message, Peerset), ()> {
let next = futures::executor::block_on_stream(&mut peerset).next();
let message = next.ok_or(())?;
Ok((message, peerset))
}
#[test]
fn test_peerset_add_reserved_peer() {
let bootnode = PeerId::random();
let reserved_peer = PeerId::random();
let reserved_peer2 = PeerId::random();
let config = PeersetConfig {
sets: vec![SetConfig {
in_peers: 0,
out_peers: 2,
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: true,
}],
};
let (peerset, handle) = Peerset::from_config(config);
handle.add_reserved_peer(SetId::from(0), reserved_peer);
handle.add_reserved_peer(SetId::from(0), reserved_peer2);
assert_messages(
peerset,
vec![
Message::Connect { set_id: SetId::from(0), peer_id: reserved_peer },
Message::Connect { set_id: SetId::from(0), peer_id: reserved_peer2 },
],
);
}
#[test]
fn test_peerset_incoming() {
let bootnode = PeerId::random();
let incoming = PeerId::random();
let incoming2 = PeerId::random();
let incoming3 = PeerId::random();
let ii = IncomingIndex(1);
let ii2 = IncomingIndex(2);
let ii3 = IncomingIndex(3);
let ii4 = IncomingIndex(3);
let config = PeersetConfig {
sets: vec![SetConfig {
in_peers: 2,
out_peers: 1,
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};
let (mut peerset, _handle) = Peerset::from_config(config);
peerset.incoming(SetId::from(0), incoming, ii);
peerset.incoming(SetId::from(0), incoming, ii4);
peerset.incoming(SetId::from(0), incoming2, ii2);
peerset.incoming(SetId::from(0), incoming3, ii3);
assert_messages(
peerset,
vec![
Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
Message::Accept(ii),
Message::Accept(ii2),
Message::Reject(ii3),
],
);
}
#[test]
fn test_peerset_reject_incoming_in_reserved_only() {
let incoming = PeerId::random();
let ii = IncomingIndex(1);
let config = PeersetConfig {
sets: vec![SetConfig {
in_peers: 50,
out_peers: 50,
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: true,
}],
};
let (mut peerset, _) = Peerset::from_config(config);
peerset.incoming(SetId::from(0), incoming, ii);
assert_messages(peerset, vec![Message::Reject(ii)]);
}
#[test]
fn test_peerset_discovered() {
let bootnode = PeerId::random();
let discovered = PeerId::random();
let discovered2 = PeerId::random();
let config = PeersetConfig {
sets: vec![SetConfig {
in_peers: 0,
out_peers: 2,
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};
let (mut peerset, _handle) = Peerset::from_config(config);
peerset.add_to_peers_set(SetId::from(0), discovered);
peerset.add_to_peers_set(SetId::from(0), discovered);
peerset.add_to_peers_set(SetId::from(0), discovered2);
assert_messages(
peerset,
vec![
Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
Message::Connect { set_id: SetId::from(0), peer_id: discovered },
],
);
}
#[test]
fn test_peerset_banned() {
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
sets: vec![SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: false,
}],
});
let peer_id = PeerId::random();
handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
let fut = futures::future::poll_fn(move |cx| {
assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
peerset.incoming(SetId::from(0), peer_id, IncomingIndex(1));
if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
} else {
panic!()
}
thread::sleep(Duration::from_millis(1500));
peerset.incoming(SetId::from(0), peer_id, IncomingIndex(2));
while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2)));
}
Poll::Ready(())
});
futures::executor::block_on(fut);
}
#[test]
fn test_relloc_after_banned() {
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
sets: vec![SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: false,
}],
});
let peer_id = PeerId::random();
handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
let fut = futures::future::poll_fn(move |cx| {
assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
peerset.incoming(SetId::from(0), peer_id, IncomingIndex(1));
if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
} else {
panic!()
}
while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id });
}
Poll::Ready(())
});
futures::executor::block_on(fut);
}
}