use crate::{
error::{Error, Result},
interval::ExpIncInterval,
ServicetoWorkerMsg, WorkerConfig,
};
use std::{
collections::{HashMap, HashSet},
marker::PhantomData,
sync::Arc,
time::Duration,
};
use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
use addr_cache::AddrCache;
use codec::Decode;
use ip_network::IpNetwork;
use libp2p::{
core::multiaddr,
multihash::{Multihash, MultihashDigest},
Multiaddr, PeerId,
};
use log::{debug, error, log_enabled};
use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_network_common::{
protocol::event::DhtEvent,
service::{KademliaKey, NetworkDHTProvider, NetworkSigner, NetworkStateInfo, Signature},
};
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_authority_discovery::{
AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
};
use sp_blockchain::HeaderBackend;
use sp_core::crypto::{key_types, CryptoTypePublicPair, Pair};
use sp_keystore::CryptoStore;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
mod addr_cache;
mod schema {
#[cfg(test)]
mod tests;
include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs"));
}
#[cfg(test)]
pub mod tests;
const LOG_TARGET: &str = "sub-authority-discovery";
const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;
const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
pub enum Role {
PublishAndDiscover(Arc<dyn CryptoStore>),
Discover,
}
pub struct Worker<Client, Network, Block, DhtEventStream> {
from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
client: Arc<Client>,
network: Arc<Network>,
dht_event_rx: DhtEventStream,
publish_interval: ExpIncInterval,
publish_if_changed_interval: ExpIncInterval,
latest_published_keys: HashSet<CryptoTypePublicPair>,
publish_non_global_ips: bool,
strict_record_validation: bool,
query_interval: ExpIncInterval,
pending_lookups: Vec<AuthorityId>,
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
addr_cache: addr_cache::AddrCache,
metrics: Option<Metrics>,
role: Role,
phantom: PhantomData<Block>,
}
#[async_trait::async_trait]
pub trait AuthorityDiscovery<Block: BlockT> {
async fn authorities(&self, at: Block::Hash)
-> std::result::Result<Vec<AuthorityId>, ApiError>;
}
#[async_trait::async_trait]
impl<Block, T> AuthorityDiscovery<Block> for T
where
T: ProvideRuntimeApi<Block> + Send + Sync,
T::Api: AuthorityDiscoveryApi<Block>,
Block: BlockT,
{
async fn authorities(
&self,
at: Block::Hash,
) -> std::result::Result<Vec<AuthorityId>, ApiError> {
self.runtime_api().authorities(&BlockId::Hash(at))
}
}
impl<Client, Network, Block, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: AuthorityDiscovery<Block> + HeaderBackend<Block> + 'static,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
pub(crate) fn new(
from_service: mpsc::Receiver<ServicetoWorkerMsg>,
client: Arc<Client>,
network: Arc<Network>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
config: WorkerConfig,
) -> Self {
let publish_interval =
ExpIncInterval::new(Duration::from_secs(2), config.max_publish_interval);
let query_interval = ExpIncInterval::new(Duration::from_secs(2), config.max_query_interval);
let publish_if_changed_interval =
ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval);
let addr_cache = AddrCache::new();
let metrics = match prometheus_registry {
Some(registry) => match Metrics::register(®istry) {
Ok(metrics) => Some(metrics),
Err(e) => {
error!(target: LOG_TARGET, "Failed to register metrics: {}", e);
None
},
},
None => None,
};
Worker {
from_service: from_service.fuse(),
client,
network,
dht_event_rx,
publish_interval,
publish_if_changed_interval,
latest_published_keys: HashSet::new(),
publish_non_global_ips: config.publish_non_global_ips,
strict_record_validation: config.strict_record_validation,
query_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
addr_cache,
role,
metrics,
phantom: PhantomData,
}
}
pub async fn run(mut self) {
loop {
self.start_new_lookups();
futures::select! {
event = self.dht_event_rx.next().fuse() => {
if let Some(event) = event {
self.handle_dht_event(event).await;
} else {
return;
}
},
msg = self.from_service.select_next_some() => {
self.process_message_from_service(msg);
},
only_if_changed = future::select(
self.publish_interval.next().map(|_| false),
self.publish_if_changed_interval.next().map(|_| true)
).map(|e| e.factor_first().0).fuse() => {
if let Err(e) = self.publish_ext_addresses(only_if_changed).await {
error!(
target: LOG_TARGET,
"Failed to publish external addresses: {}", e,
);
}
},
_ = self.query_interval.next().fuse() => {
if let Err(e) = self.refill_pending_lookups_queue().await {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {}", e,
);
}
},
}
}
}
fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
match msg {
ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
let _ = sender.send(
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
},
ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
let _ = sender
.send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
},
}
}
fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> {
let peer_id: Multihash = self.network.local_peer_id().into();
let publish_non_global_ips = self.publish_non_global_ips;
self.network
.external_addresses()
.into_iter()
.filter(move |a| {
if publish_non_global_ips {
return true
}
a.iter().all(|p| match p {
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
_ => true,
})
})
.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id))
}
})
}
async fn publish_ext_addresses(&mut self, only_if_changed: bool) -> Result<()> {
let key_store = match &self.role {
Role::PublishAndDiscover(key_store) => key_store,
Role::Discover => return Ok(()),
};
let keys = Worker::<Client, Network, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
key_store.clone(),
self.client.as_ref(),
).await?.into_iter().map(Into::into).collect::<HashSet<_>>();
if only_if_changed && keys == self.latest_published_keys {
return Ok(())
}
let addresses = serialize_addresses(self.addresses_to_publish());
if let Some(metrics) = &self.metrics {
metrics.publish.inc();
metrics
.amount_addresses_last_published
.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
}
let serialized_record = serialize_authority_record(addresses)?;
let peer_signature = sign_record_with_peer_id(&serialized_record, self.network.as_ref())?;
let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
let kv_pairs = sign_record_with_authority_ids(
serialized_record,
Some(peer_signature),
key_store.as_ref(),
keys_vec,
)
.await?;
for (key, value) in kv_pairs.into_iter() {
self.network.put_value(key, value);
}
self.latest_published_keys = keys;
Ok(())
}
async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
let best_hash = self.client.info().best_hash;
let local_keys = match &self.role {
Role::PublishAndDiscover(key_store) => key_store
.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
.await
.into_iter()
.collect::<HashSet<_>>(),
Role::Discover => HashSet::new(),
};
let mut authorities = self
.client
.authorities(best_hash)
.await
.map_err(|e| Error::CallingRuntime(e.into()))?
.into_iter()
.filter(|id| !local_keys.contains(id.as_ref()))
.collect::<Vec<_>>();
self.addr_cache.retain_ids(&authorities);
authorities.shuffle(&mut thread_rng());
self.pending_lookups = authorities;
self.in_flight_lookups.clear();
if let Some(metrics) = &self.metrics {
metrics
.requests_pending
.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
}
Ok(())
}
fn start_new_lookups(&mut self) {
while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
let authority_id = match self.pending_lookups.pop() {
Some(authority) => authority,
None => return,
};
let hash = hash_authority_id(authority_id.as_ref());
self.network.get_value(&hash);
self.in_flight_lookups.insert(hash, authority_id);
if let Some(metrics) = &self.metrics {
metrics.requests.inc();
metrics
.requests_pending
.set(self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX));
}
}
}
async fn handle_dht_event(&mut self, event: DhtEvent) {
match event {
DhtEvent::ValueFound(v) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}
if log_enabled!(log::Level::Debug) {
let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", hashes);
}
if let Err(e) = self.handle_dht_value_found_event(v) {
if let Some(metrics) = &self.metrics {
metrics.handle_value_found_event_failure.inc();
}
debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
}
},
DhtEvent::ValueNotFound(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}
if self.in_flight_lookups.remove(&hash).is_some() {
debug!(target: LOG_TARGET, "Value for hash '{:?}' not found on Dht.", hash)
} else {
debug!(
target: LOG_TARGET,
"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
)
}
},
DhtEvent::ValuePut(hash) => {
self.publish_interval.set_to_max();
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
},
DhtEvent::ValuePutFailed(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
},
}
}
fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec<u8>)>) -> Result<()> {
let remote_key = single(values.iter().map(|(key, _)| key.clone()))
.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)?
.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
let authority_id: AuthorityId = self
.in_flight_lookups
.remove(&remote_key)
.ok_or(Error::ReceivingUnexpectedRecord)?;
let local_peer_id = self.network.local_peer_id();
let remote_addresses: Vec<Multiaddr> = values
.into_iter()
.map(|(_k, v)| {
let schema::SignedAuthorityRecord { record, auth_signature, peer_signature } =
schema::SignedAuthorityRecord::decode(v.as_slice())
.map_err(Error::DecodingProto)?;
let auth_signature = AuthoritySignature::decode(&mut &auth_signature[..])
.map_err(Error::EncodingDecodingScale)?;
if !AuthorityPair::verify(&auth_signature, &record, &authority_id) {
return Err(Error::VerifyingDhtPayload)
}
let addresses: Vec<Multiaddr> = schema::AuthorityRecord::decode(record.as_slice())
.map(|a| a.addresses)
.map_err(Error::DecodingProto)?
.into_iter()
.map(|a| a.try_into())
.collect::<std::result::Result<_, _>>()
.map_err(Error::ParsingMultiaddress)?;
let get_peer_id = |a: &Multiaddr| match a.iter().last() {
Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
_ => None,
};
let addresses: Vec<Multiaddr> = addresses
.into_iter()
.filter(|a| get_peer_id(a).filter(|p| *p != local_peer_id).is_some())
.collect();
let remote_peer_id = single(addresses.iter().map(get_peer_id))
.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? .flatten()
.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; if let Some(peer_signature) = peer_signature {
let public_key = libp2p::identity::PublicKey::from_protobuf_encoding(
&peer_signature.public_key,
)
.map_err(Error::ParsingLibp2pIdentity)?;
let signature = Signature { public_key, bytes: peer_signature.signature };
if !signature.verify(record, &remote_peer_id) {
return Err(Error::VerifyingDhtPayload)
}
} else if self.strict_record_validation {
return Err(Error::MissingPeerIdSignature)
} else {
debug!(
target: LOG_TARGET,
"Received unsigned authority discovery record from {}", authority_id
);
}
Ok(addresses)
})
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
.into_iter()
.flatten()
.take(MAX_ADDRESSES_PER_AUTHORITY)
.collect();
if !remote_addresses.is_empty() {
self.addr_cache.insert(authority_id, remote_addresses);
if let Some(metrics) = &self.metrics {
metrics
.known_authorities_count
.set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
}
}
Ok(())
}
async fn get_own_public_keys_within_authority_set(
key_store: Arc<dyn CryptoStore>,
client: &Client,
) -> Result<HashSet<AuthorityId>> {
let local_pub_keys = key_store
.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
.await
.into_iter()
.collect::<HashSet<_>>();
let best_hash = client.info().best_hash;
let authorities = client
.authorities(best_hash)
.await
.map_err(|e| Error::CallingRuntime(e.into()))?
.into_iter()
.map(Into::into)
.collect::<HashSet<_>>();
let intersection =
local_pub_keys.intersection(&authorities).cloned().map(Into::into).collect();
Ok(intersection)
}
}
pub trait NetworkProvider: NetworkDHTProvider + NetworkStateInfo + NetworkSigner {}
impl<T> NetworkProvider for T where T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner {}
fn hash_authority_id(id: &[u8]) -> KademliaKey {
KademliaKey::new(&libp2p::multihash::Code::Sha2_256.digest(id).digest())
}
fn single<T>(values: impl IntoIterator<Item = T>) -> std::result::Result<Option<T>, ()>
where
T: PartialEq<T>,
{
values.into_iter().try_fold(None, |acc, item| match acc {
None => Ok(Some(item)),
Some(ref prev) if *prev != item => Err(()),
Some(x) => Ok(Some(x)),
})
}
fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8>> {
addresses.map(|a| a.to_vec()).collect()
}
fn serialize_authority_record(addresses: Vec<Vec<u8>>) -> Result<Vec<u8>> {
let mut serialized_record = vec![];
schema::AuthorityRecord { addresses }
.encode(&mut serialized_record)
.map_err(Error::EncodingProto)?;
Ok(serialized_record)
}
fn sign_record_with_peer_id(
serialized_record: &[u8],
network: &impl NetworkSigner,
) -> Result<schema::PeerSignature> {
let signature = network
.sign_with_local_identity(serialized_record)
.map_err(|_| Error::Signing)?;
let public_key = signature.public_key.to_protobuf_encoding();
let signature = signature.bytes;
Ok(schema::PeerSignature { signature, public_key })
}
async fn sign_record_with_authority_ids(
serialized_record: Vec<u8>,
peer_signature: Option<schema::PeerSignature>,
key_store: &dyn CryptoStore,
keys: Vec<CryptoTypePublicPair>,
) -> Result<Vec<(KademliaKey, Vec<u8>)>> {
let signatures = key_store
.sign_with_all(key_types::AUTHORITY_DISCOVERY, keys.clone(), &serialized_record)
.await
.map_err(|_| Error::Signing)?;
let mut result = vec![];
for (sign_result, key) in signatures.into_iter().zip(keys.iter()) {
let mut signed_record = vec![];
let auth_signature =
sign_result.ok().flatten().ok_or_else(|| Error::MissingSignature(key.clone()))?;
schema::SignedAuthorityRecord {
record: serialized_record.clone(),
auth_signature,
peer_signature: peer_signature.clone(),
}
.encode(&mut signed_record)
.map_err(Error::EncodingProto)?;
result.push((hash_authority_id(key.1.as_ref()), signed_record));
}
Ok(result)
}
#[derive(Clone)]
pub(crate) struct Metrics {
publish: Counter<U64>,
amount_addresses_last_published: Gauge<U64>,
requests: Counter<U64>,
requests_pending: Gauge<U64>,
dht_event_received: CounterVec<U64>,
handle_value_found_event_failure: Counter<U64>,
known_authorities_count: Gauge<U64>,
}
impl Metrics {
pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
Ok(Self {
publish: register(
Counter::new(
"substrate_authority_discovery_times_published_total",
"Number of times authority discovery has published external addresses.",
)?,
registry,
)?,
amount_addresses_last_published: register(
Gauge::new(
"substrate_authority_discovery_amount_external_addresses_last_published",
"Number of external addresses published when authority discovery last \
published addresses.",
)?,
registry,
)?,
requests: register(
Counter::new(
"substrate_authority_discovery_authority_addresses_requested_total",
"Number of times authority discovery has requested external addresses of a \
single authority.",
)?,
registry,
)?,
requests_pending: register(
Gauge::new(
"substrate_authority_discovery_authority_address_requests_pending",
"Number of pending authority address requests.",
)?,
registry,
)?,
dht_event_received: register(
CounterVec::new(
Opts::new(
"substrate_authority_discovery_dht_event_received",
"Number of dht events received by authority discovery.",
),
&["name"],
)?,
registry,
)?,
handle_value_found_event_failure: register(
Counter::new(
"substrate_authority_discovery_handle_value_found_event_failure",
"Number of times handling a dht value found event failed.",
)?,
registry,
)?,
known_authorities_count: register(
Gauge::new(
"substrate_authority_discovery_known_authorities_count",
"Number of authorities known by authority discovery.",
)?,
registry,
)?,
})
}
}
#[cfg(test)]
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream> {
pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
self.addr_cache.insert(authority, addresses);
}
}