use libp2p::PeerId;
use log::error;
use std::{
borrow::Cow,
collections::{
hash_map::{Entry, OccupiedEntry},
HashMap, HashSet,
},
time::Instant,
};
#[derive(Debug, Clone)]
pub struct PeersState {
nodes: HashMap<PeerId, Node>,
sets: Vec<SetInfo>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct SetConfig {
pub in_peers: u32,
pub out_peers: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct SetInfo {
num_in: u32,
num_out: u32,
max_in: u32,
max_out: u32,
no_slot_nodes: HashSet<PeerId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct Node {
sets: Vec<MembershipState>,
reputation: i32,
}
impl Node {
fn new(num_sets: usize) -> Self {
Self { sets: (0..num_sets).map(|_| MembershipState::NotMember).collect(), reputation: 0 }
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum MembershipState {
NotMember,
In,
Out,
NotConnected {
last_connected: Instant,
},
}
impl MembershipState {
fn is_connected(self) -> bool {
match self {
Self::In | Self::Out => true,
Self::NotMember | Self::NotConnected { .. } => false,
}
}
fn is_not_connected(self) -> bool {
matches!(self, Self::NotConnected { .. })
}
}
impl PeersState {
pub fn new(sets: impl IntoIterator<Item = SetConfig>) -> Self {
Self {
nodes: HashMap::new(),
sets: sets
.into_iter()
.map(|config| SetInfo {
num_in: 0,
num_out: 0,
max_in: config.in_peers,
max_out: config.out_peers,
no_slot_nodes: HashSet::new(),
})
.collect(),
}
}
pub fn num_sets(&self) -> usize {
self.sets.len()
}
pub fn peer_reputation(&mut self, peer_id: PeerId) -> Reputation {
self.nodes.entry(peer_id).or_insert_with(|| Node::new(self.sets.len()));
let entry = match self.nodes.entry(peer_id) {
Entry::Vacant(_) => unreachable!("guaranteed to be inserted above; qed"),
Entry::Occupied(e) => e,
};
Reputation { node: Some(entry) }
}
pub fn peer<'a>(&'a mut self, set: usize, peer_id: &'a PeerId) -> Peer<'a> {
assert!(set < self.sets.len());
match self.nodes.get_mut(peer_id).map(|p| &p.sets[set]) {
None | Some(MembershipState::NotMember) =>
Peer::Unknown(UnknownPeer { parent: self, set, peer_id: Cow::Borrowed(peer_id) }),
Some(MembershipState::In) | Some(MembershipState::Out) =>
Peer::Connected(ConnectedPeer { state: self, set, peer_id: Cow::Borrowed(peer_id) }),
Some(MembershipState::NotConnected { .. }) => Peer::NotConnected(NotConnectedPeer {
state: self,
set,
peer_id: Cow::Borrowed(peer_id),
}),
}
}
pub fn peers(&self) -> impl ExactSizeIterator<Item = &PeerId> {
self.nodes.keys()
}
pub fn connected_peers(&self, set: usize) -> impl Iterator<Item = &PeerId> {
assert!(set < self.sets.len());
self.nodes
.iter()
.filter(move |(_, p)| p.sets[set].is_connected())
.map(|(p, _)| p)
}
pub fn highest_not_connected_peer(&mut self, set: usize) -> Option<NotConnectedPeer> {
assert!(set < self.sets.len());
let outcome = self
.nodes
.iter_mut()
.filter(|(_, Node { sets, .. })| sets[set].is_not_connected())
.fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| {
if let Some(cur_node) = cur_node.take() {
if cur_node.1.reputation >= to_try.1.reputation {
return Some(cur_node)
}
}
Some(to_try)
})
.map(|(peer_id, _)| *peer_id);
outcome.map(move |peer_id| NotConnectedPeer {
state: self,
set,
peer_id: Cow::Owned(peer_id),
})
}
pub fn has_free_outgoing_slot(&self, set: usize) -> bool {
self.sets[set].num_out < self.sets[set].max_out
}
pub fn add_no_slot_node(&mut self, set: usize, peer_id: PeerId) {
if !self.sets[set].no_slot_nodes.insert(peer_id) {
return
}
if let Some(peer) = self.nodes.get_mut(&peer_id) {
match peer.sets[set] {
MembershipState::In => self.sets[set].num_in -= 1,
MembershipState::Out => self.sets[set].num_out -= 1,
MembershipState::NotConnected { .. } | MembershipState::NotMember => {},
}
}
}
pub fn remove_no_slot_node(&mut self, set: usize, peer_id: &PeerId) {
if !self.sets[set].no_slot_nodes.remove(peer_id) {
return
}
if let Some(peer) = self.nodes.get_mut(peer_id) {
match peer.sets[set] {
MembershipState::In => self.sets[set].num_in += 1,
MembershipState::Out => self.sets[set].num_out += 1,
MembershipState::NotConnected { .. } | MembershipState::NotMember => {},
}
}
}
}
pub enum Peer<'a> {
Connected(ConnectedPeer<'a>),
NotConnected(NotConnectedPeer<'a>),
Unknown(UnknownPeer<'a>),
}
impl<'a> Peer<'a> {
pub fn into_connected(self) -> Option<ConnectedPeer<'a>> {
match self {
Self::Connected(peer) => Some(peer),
Self::NotConnected(..) | Self::Unknown(..) => None,
}
}
#[cfg(test)] pub fn into_not_connected(self) -> Option<NotConnectedPeer<'a>> {
match self {
Self::NotConnected(peer) => Some(peer),
Self::Connected(..) | Self::Unknown(..) => None,
}
}
#[cfg(test)] pub fn into_unknown(self) -> Option<UnknownPeer<'a>> {
match self {
Self::Unknown(peer) => Some(peer),
Self::Connected(..) | Self::NotConnected(..) => None,
}
}
}
pub struct ConnectedPeer<'a> {
state: &'a mut PeersState,
set: usize,
peer_id: Cow<'a, PeerId>,
}
impl<'a> ConnectedPeer<'a> {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn into_peer_id(self) -> PeerId {
self.peer_id.into_owned()
}
pub fn disconnect(self) -> NotConnectedPeer<'a> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
if let Some(node) = self.state.nodes.get_mut(&*self.peer_id) {
if !is_no_slot_occupy {
match node.sets[self.set] {
MembershipState::In => self.state.sets[self.set].num_in -= 1,
MembershipState::Out => self.state.sets[self.set].num_out -= 1,
MembershipState::NotMember | MembershipState::NotConnected { .. } => {
debug_assert!(
false,
"State inconsistency: disconnecting a disconnected node"
)
},
}
}
node.sets[self.set] = MembershipState::NotConnected { last_connected: Instant::now() };
} else {
debug_assert!(false, "State inconsistency: disconnecting a disconnected node");
}
NotConnectedPeer { state: self.state, set: self.set, peer_id: self.peer_id }
}
pub fn add_reputation(&mut self, modifier: i32) {
if let Some(node) = self.state.nodes.get_mut(&*self.peer_id) {
node.reputation = node.reputation.saturating_add(modifier);
} else {
debug_assert!(false, "State inconsistency: add_reputation on an unknown node");
}
}
pub fn reputation(&self) -> i32 {
self.state.nodes.get(&*self.peer_id).map_or(0, |p| p.reputation)
}
}
#[derive(Debug)]
pub struct NotConnectedPeer<'a> {
state: &'a mut PeersState,
set: usize,
peer_id: Cow<'a, PeerId>,
}
impl<'a> NotConnectedPeer<'a> {
pub fn into_peer_id(self) -> PeerId {
self.peer_id.into_owned()
}
pub fn bump_last_connected_or_discovered(&mut self) {
let state = match self.state.nodes.get_mut(&*self.peer_id) {
Some(s) => s,
None => return,
};
if let MembershipState::NotConnected { last_connected } = &mut state.sets[self.set] {
*last_connected = Instant::now();
}
}
pub fn last_connected_or_discovered(&self) -> Instant {
let state = match self.state.nodes.get(&*self.peer_id) {
Some(s) => s,
None => {
error!(
target: "peerset",
"State inconsistency with {}; not connected after borrow",
self.peer_id
);
return Instant::now()
},
};
match state.sets[self.set] {
MembershipState::NotConnected { last_connected } => last_connected,
_ => {
error!(target: "peerset", "State inconsistency with {}", self.peer_id);
Instant::now()
},
}
}
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, Self> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
if !self.state.has_free_outgoing_slot(self.set) && !is_no_slot_occupy {
return Err(self)
}
if let Some(peer) = self.state.nodes.get_mut(&*self.peer_id) {
peer.sets[self.set] = MembershipState::Out;
if !is_no_slot_occupy {
self.state.sets[self.set].num_out += 1;
}
} else {
debug_assert!(false, "State inconsistency: try_outgoing on an unknown node");
}
Ok(ConnectedPeer { state: self.state, set: self.set, peer_id: self.peer_id })
}
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, Self> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);
if self.state.sets[self.set].num_in >= self.state.sets[self.set].max_in &&
!is_no_slot_occupy
{
return Err(self)
}
if let Some(peer) = self.state.nodes.get_mut(&*self.peer_id) {
peer.sets[self.set] = MembershipState::In;
if !is_no_slot_occupy {
self.state.sets[self.set].num_in += 1;
}
} else {
debug_assert!(false, "State inconsistency: try_accept_incoming on an unknown node");
}
Ok(ConnectedPeer { state: self.state, set: self.set, peer_id: self.peer_id })
}
pub fn reputation(&self) -> i32 {
self.state.nodes.get(&*self.peer_id).map_or(0, |p| p.reputation)
}
#[cfg(test)] pub fn set_reputation(&mut self, value: i32) {
if let Some(node) = self.state.nodes.get_mut(&*self.peer_id) {
node.reputation = value;
} else {
debug_assert!(false, "State inconsistency: set_reputation on an unknown node");
}
}
pub fn forget_peer(self) -> UnknownPeer<'a> {
if let Some(peer) = self.state.nodes.get_mut(&*self.peer_id) {
debug_assert!(!matches!(peer.sets[self.set], MembershipState::NotMember));
peer.sets[self.set] = MembershipState::NotMember;
if peer.reputation == 0 &&
peer.sets.iter().all(|set| matches!(set, MembershipState::NotMember))
{
self.state.nodes.remove(&*self.peer_id);
}
} else {
debug_assert!(false, "State inconsistency: forget_peer on an unknown node");
error!(
target: "peerset",
"State inconsistency with {} when forgetting peer",
self.peer_id
);
};
UnknownPeer { parent: self.state, set: self.set, peer_id: self.peer_id }
}
}
pub struct UnknownPeer<'a> {
parent: &'a mut PeersState,
set: usize,
peer_id: Cow<'a, PeerId>,
}
impl<'a> UnknownPeer<'a> {
pub fn discover(self) -> NotConnectedPeer<'a> {
let num_sets = self.parent.sets.len();
self.parent
.nodes
.entry(self.peer_id.clone().into_owned())
.or_insert_with(|| Node::new(num_sets))
.sets[self.set] = MembershipState::NotConnected { last_connected: Instant::now() };
NotConnectedPeer { state: self.parent, set: self.set, peer_id: self.peer_id }
}
}
pub struct Reputation<'a> {
node: Option<OccupiedEntry<'a, PeerId, Node>>,
}
impl<'a> Reputation<'a> {
pub fn reputation(&self) -> i32 {
self.node.as_ref().unwrap().get().reputation
}
pub fn set_reputation(&mut self, value: i32) {
self.node.as_mut().unwrap().get_mut().reputation = value;
}
pub fn add_reputation(&mut self, modifier: i32) {
let reputation = &mut self.node.as_mut().unwrap().get_mut().reputation;
*reputation = reputation.saturating_add(modifier);
}
}
impl<'a> Drop for Reputation<'a> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
if node.get().reputation == 0 &&
node.get().sets.iter().all(|set| matches!(set, MembershipState::NotMember))
{
node.remove();
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Peer, PeersState, SetConfig};
use libp2p::PeerId;
use std::iter;
#[test]
fn full_slots_in() {
let mut peers_state = PeersState::new(iter::once(SetConfig { in_peers: 1, out_peers: 1 }));
let id1 = PeerId::random();
let id2 = PeerId::random();
if let Peer::Unknown(e) = peers_state.peer(0, &id1) {
assert!(e.discover().try_accept_incoming().is_ok());
}
if let Peer::Unknown(e) = peers_state.peer(0, &id2) {
assert!(e.discover().try_accept_incoming().is_err());
}
}
#[test]
fn no_slot_node_doesnt_use_slot() {
let mut peers_state = PeersState::new(iter::once(SetConfig { in_peers: 1, out_peers: 1 }));
let id1 = PeerId::random();
let id2 = PeerId::random();
peers_state.add_no_slot_node(0, id1);
if let Peer::Unknown(p) = peers_state.peer(0, &id1) {
assert!(p.discover().try_accept_incoming().is_ok());
} else {
panic!()
}
if let Peer::Unknown(e) = peers_state.peer(0, &id2) {
assert!(e.discover().try_accept_incoming().is_ok());
} else {
panic!()
}
}
#[test]
fn disconnecting_frees_slot() {
let mut peers_state = PeersState::new(iter::once(SetConfig { in_peers: 1, out_peers: 1 }));
let id1 = PeerId::random();
let id2 = PeerId::random();
assert!(peers_state
.peer(0, &id1)
.into_unknown()
.unwrap()
.discover()
.try_accept_incoming()
.is_ok());
assert!(peers_state
.peer(0, &id2)
.into_unknown()
.unwrap()
.discover()
.try_accept_incoming()
.is_err());
peers_state.peer(0, &id1).into_connected().unwrap().disconnect();
assert!(peers_state
.peer(0, &id2)
.into_not_connected()
.unwrap()
.try_accept_incoming()
.is_ok());
}
#[test]
fn highest_not_connected_peer() {
let mut peers_state =
PeersState::new(iter::once(SetConfig { in_peers: 25, out_peers: 25 }));
let id1 = PeerId::random();
let id2 = PeerId::random();
assert!(peers_state.highest_not_connected_peer(0).is_none());
peers_state.peer(0, &id1).into_unknown().unwrap().discover().set_reputation(50);
peers_state.peer(0, &id2).into_unknown().unwrap().discover().set_reputation(25);
assert_eq!(peers_state.highest_not_connected_peer(0).map(|p| p.into_peer_id()), Some(id1));
peers_state.peer(0, &id2).into_not_connected().unwrap().set_reputation(75);
assert_eq!(peers_state.highest_not_connected_peer(0).map(|p| p.into_peer_id()), Some(id2));
peers_state
.peer(0, &id2)
.into_not_connected()
.unwrap()
.try_accept_incoming()
.unwrap();
assert_eq!(peers_state.highest_not_connected_peer(0).map(|p| p.into_peer_id()), Some(id1));
peers_state.peer(0, &id1).into_not_connected().unwrap().set_reputation(100);
peers_state.peer(0, &id2).into_connected().unwrap().disconnect();
assert_eq!(peers_state.highest_not_connected_peer(0).map(|p| p.into_peer_id()), Some(id1));
peers_state.peer(0, &id1).into_not_connected().unwrap().set_reputation(-100);
assert_eq!(peers_state.highest_not_connected_peer(0).map(|p| p.into_peer_id()), Some(id2));
}
#[test]
fn disconnect_no_slot_doesnt_panic() {
let mut peers_state = PeersState::new(iter::once(SetConfig { in_peers: 1, out_peers: 1 }));
let id = PeerId::random();
peers_state.add_no_slot_node(0, id);
let peer = peers_state
.peer(0, &id)
.into_unknown()
.unwrap()
.discover()
.try_outgoing()
.unwrap();
peer.disconnect();
}
}