1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
// This file is part of Substrate.
// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
peer_info,
protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
request_responses,
};
use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{
core::{Multiaddr, PeerId, PublicKey},
identify::Info as IdentifyInfo,
kad::record,
swarm::NetworkBehaviour,
};
use sc_network_common::{
protocol::{
event::DhtEvent,
role::{ObservedRole, Roles},
ProtocolName,
},
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
};
use sc_peerset::{PeersetHandle, ReputationChange};
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, time::Duration};
pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure};
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut")]
pub struct Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
/// All the substrate-specific protocols.
substrate: Protocol<B, Client>,
/// Periodically pings and identifies the nodes we are connected to, and store information in a
/// cache.
peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Generic request-response protocols.
request_responses: request_responses::RequestResponsesBehaviour,
}
/// Event generated by `Behaviour`.
pub enum BehaviourOut {
/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted,
/// We have received a request from a peer and answered it.
///
/// This event is generated for statistics purposes.
InboundRequest {
/// Peer which sent us a request.
peer: PeerId,
/// Protocol name of the request.
protocol: ProtocolName,
/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
result: Result<Duration, ResponseFailure>,
},
/// A request has succeeded or failed.
///
/// This event is generated for statistics purposes.
RequestFinished {
/// Peer that we send a request to.
peer: PeerId,
/// Name of the protocol in question.
protocol: ProtocolName,
/// Duration the request took.
duration: Duration,
/// Result of the request.
result: Result<(), RequestFailure>,
},
/// A request protocol handler issued reputation changes for the given peer.
ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: ProtocolName,
/// If the negotiation didn't use the main name of the protocol (the one in
/// `notifications_protocol`), then this field contains which name has actually been
/// used.
/// See also [`crate::Event::NotificationStreamOpened`].
negotiated_fallback: Option<ProtocolName>,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Role of the remote.
role: ObservedRole,
},
/// The [`NotificationsSink`] object used to send notifications with the given peer must be
/// replaced with a new one.
///
/// This event is typically emitted when a transport-level connection is closed and we fall
/// back to a secondary connection.
NotificationStreamReplaced {
/// Id of the peer we are connected to.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: ProtocolName,
/// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink,
},
/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: ProtocolName,
},
/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<(ProtocolName, Bytes)>,
},
/// Now connected to a new peer for syncing purposes.
SyncConnected(PeerId),
/// No longer connected to a peer for syncing purposes.
SyncDisconnected(PeerId),
/// We have obtained identity information from a peer, including the addresses it is listening
/// on.
PeerIdentify {
/// Id of the peer that has been identified.
peer_id: PeerId,
/// Information about the peer.
info: IdentifyInfo,
},
/// We have learned about the existence of a node on the default set.
Discovered(PeerId),
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
/// request duration.
Dht(DhtEvent, Duration),
/// Ignored event generated by lower layers.
None,
}
impl<B, Client> Behaviour<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
/// Builds a new `Behaviour`.
pub fn new(
substrate: Protocol<B, Client>,
user_agent: String,
local_public_key: PublicKey,
disco_config: DiscoveryConfig,
request_response_protocols: Vec<ProtocolConfig>,
peerset: PeersetHandle,
) -> Result<Self, request_responses::RegisterError> {
Ok(Self {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses: request_responses::RequestResponsesBehaviour::new(
request_response_protocols.into_iter(),
peerset,
)?,
})
}
/// Returns the list of nodes that we know exist in the network.
pub fn known_peers(&mut self) -> HashSet<PeerId> {
self.discovery.known_peers()
}
/// Adds a hard-coded address for the given peer, that never expires.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.discovery.add_known_address(peer_id, addr)
}
/// Returns the number of nodes in each Kademlia kbucket.
///
/// Identifies kbuckets by the base 2 logarithm of their lower bound.
pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
self.discovery.num_entries_per_kbucket()
}
/// Returns the number of records in the Kademlia record stores.
pub fn num_kademlia_records(&mut self) -> Option<usize> {
self.discovery.num_kademlia_records()
}
/// Returns the total size in bytes of all the records in the Kademlia record stores.
pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
self.discovery.kademlia_records_total_size()
}
/// Borrows `self` and returns a struct giving access to the information about a node.
///
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
/// we're connected to, meaning that if `None` is returned then we're not connected to that
/// node.
pub fn node(&self, peer_id: &PeerId) -> Option<peer_info::Node> {
self.peer_info.node(peer_id)
}
/// Initiates sending a request.
pub fn send_request(
&mut self,
target: &PeerId,
protocol: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
self.request_responses
.send_request(target, protocol, request, pending_response, connect)
}
/// Returns a shared reference to the user protocol.
pub fn user_protocol(&self) -> &Protocol<B, Client> {
&self.substrate
}
/// Returns a mutable reference to the user protocol.
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B, Client> {
&mut self.substrate
}
/// Add a self-reported address of a remote peer to the k-buckets of the supported
/// DHTs (`supported_protocols`).
pub fn add_self_reported_address_to_dht(
&mut self,
peer_id: &PeerId,
supported_protocols: &[impl AsRef<[u8]>],
addr: Multiaddr,
) {
self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
}
/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a
/// `ValueNotFound` event.
pub fn get_value(&mut self, key: record::Key) {
self.discovery.get_value(key);
}
/// Starts putting a record into DHT. Will later produce either a `ValuePut` or a
/// `ValuePutFailed` event.
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
self.discovery.put_value(key, value);
}
}
fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
if roles.is_authority() {
ObservedRole::Authority
} else if roles.is_full() {
ObservedRole::Full
} else {
ObservedRole::Light
}
}
impl<B: BlockT> From<CustomMessageOutcome<B>> for BehaviourOut {
fn from(event: CustomMessageOutcome<B>) -> Self {
match event {
CustomMessageOutcome::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
roles,
notifications_sink,
} => BehaviourOut::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
role: reported_roles_to_observed_role(roles),
notifications_sink,
},
CustomMessageOutcome::NotificationStreamReplaced {
remote,
protocol,
notifications_sink,
} => BehaviourOut::NotificationStreamReplaced { remote, protocol, notifications_sink },
CustomMessageOutcome::NotificationStreamClosed { remote, protocol } =>
BehaviourOut::NotificationStreamClosed { remote, protocol },
CustomMessageOutcome::NotificationsReceived { remote, messages } =>
BehaviourOut::NotificationsReceived { remote, messages },
CustomMessageOutcome::PeerNewBest(_peer_id, _number) => BehaviourOut::None,
CustomMessageOutcome::SyncConnected(peer_id) => BehaviourOut::SyncConnected(peer_id),
CustomMessageOutcome::SyncDisconnected(peer_id) =>
BehaviourOut::SyncDisconnected(peer_id),
CustomMessageOutcome::None => BehaviourOut::None,
}
}
}
impl From<request_responses::Event> for BehaviourOut {
fn from(event: request_responses::Event) -> Self {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } =>
BehaviourOut::InboundRequest { peer, protocol, result },
request_responses::Event::RequestFinished { peer, protocol, duration, result } =>
BehaviourOut::RequestFinished { peer, protocol, duration, result },
request_responses::Event::ReputationChanges { peer, changes } =>
BehaviourOut::ReputationChanges { peer, changes },
}
}
}
impl From<peer_info::PeerInfoEvent> for BehaviourOut {
fn from(event: peer_info::PeerInfoEvent) -> Self {
let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
BehaviourOut::PeerIdentify { peer_id, info }
}
}
impl From<DiscoveryOut> for BehaviourOut {
fn from(event: DiscoveryOut) -> Self {
match event {
DiscoveryOut::UnroutablePeer(_peer_id) => {
// Obtaining and reporting listen addresses for unroutable peers back
// to Kademlia is handled by the `Identify` protocol, part of the
// `PeerInfoBehaviour`. See the `From<peer_info::PeerInfoEvent>`
// implementation.
BehaviourOut::None
},
DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
DiscoveryOut::ValueFound(results, duration) =>
BehaviourOut::Dht(DhtEvent::ValueFound(results), duration),
DiscoveryOut::ValueNotFound(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration),
DiscoveryOut::ValuePut(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePut(key), duration),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
}