use backtrace::Backtrace;
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
cell::RefCell,
fmt,
pin::Pin,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
task::{Context, Poll},
};
pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
let metrics = Arc::new(Mutex::new(None));
let queue_size = Arc::new(AtomicI64::new(0));
let tx = Sender {
inner: tx,
name,
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::new_unresolved(),
metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
(tx, rx)
}
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
name: &'static str,
queue_size: Arc<AtomicI64>,
queue_size_warning: i64,
warning_fired: bool,
creation_backtrace: Backtrace,
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
impl fmt::Debug for Sender {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Sender").finish()
}
}
impl Drop for Sender {
fn drop(&mut self) {
let metrics = self.metrics.lock();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.num_channels.with_label_values(&[self.name]).dec();
}
}
}
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
name: &'static str,
queue_size: Arc<AtomicI64>,
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
impl Stream for Receiver {
type Item = Event;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed);
let metrics = self.metrics.lock().clone();
match metrics.as_ref().map(|m| m.as_ref()) {
Some(Some(metrics)) => metrics.event_out(&ev, self.name),
Some(None) => (), None => log::warn!(
"Inconsistency in out_events: event happened before sender associated"
),
}
Poll::Ready(Some(ev))
} else {
Poll::Ready(None)
}
}
}
impl fmt::Debug for Receiver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Receiver").finish()
}
}
impl Drop for Receiver {
fn drop(&mut self) {
if !self.inner.is_terminated() {
while let Some(Some(_)) = self.next().now_or_never() {}
}
}
}
pub struct OutChannels {
event_streams: Vec<Sender>,
metrics: Arc<Option<Metrics>>,
}
impl OutChannels {
pub fn new(registry: Option<&Registry>) -> Result<Self, PrometheusError> {
let metrics =
if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };
Ok(Self { event_streams: Vec::new(), metrics: Arc::new(metrics) })
}
pub fn push(&mut self, sender: Sender) {
let mut metrics = sender.metrics.lock();
debug_assert!(metrics.is_none());
*metrics = Some(self.metrics.clone());
drop(metrics);
if let Some(metrics) = &*self.metrics {
metrics.num_channels.with_label_values(&[sender.name]).inc();
}
self.event_streams.push(sender);
}
pub fn send(&mut self, event: Event) {
self.event_streams.retain_mut(|sender| {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
sender.creation_backtrace.resolve();
error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
);
}
sender.inner.unbounded_send(event.clone()).is_ok()
});
if let Some(metrics) = &*self.metrics {
for ev in &self.event_streams {
metrics.event_in(&event, ev.name);
}
}
}
}
impl fmt::Debug for OutChannels {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OutChannels")
.field("num_channels", &self.event_streams.len())
.finish()
}
}
struct Metrics {
events_total: CounterVec<U64>,
notifications_sizes: CounterVec<U64>,
num_channels: GaugeVec<U64>,
}
thread_local! {
static LABEL_BUFFER: RefCell<String> = RefCell::new(String::new());
}
fn format_label(prefix: &str, protocol: &str, callback: impl FnOnce(&str)) {
LABEL_BUFFER.with(|label_buffer| {
let mut label_buffer = label_buffer.borrow_mut();
label_buffer.clear();
label_buffer.reserve(prefix.len() + protocol.len() + 2);
label_buffer.push_str(prefix);
label_buffer.push('"');
label_buffer.push_str(protocol);
label_buffer.push('"');
callback(&label_buffer);
});
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
events_total: register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_out_events_events_total",
"Number of broadcast network events that have been sent or received across all \
channels"
),
&["event_name", "action", "name"]
)?, registry)?,
notifications_sizes: register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_out_events_notifications_sizes",
"Size of notification events that have been sent or received across all \
channels"
),
&["protocol", "action", "name"]
)?, registry)?,
num_channels: register(GaugeVec::new(
Opts::new(
"substrate_sub_libp2p_out_events_num_channels",
"Number of internal active channels that broadcast network events",
),
&["name"]
)?, registry)?,
})
}
fn event_in(&self, event: &Event, name: &str) {
match event {
Event::Dht(_) => {
self.events_total.with_label_values(&["dht", "sent", name]).inc();
},
Event::SyncConnected { .. } => {
self.events_total.with_label_values(&["sync-connected", "sent", name]).inc();
},
Event::SyncDisconnected { .. } => {
self.events_total.with_label_values(&["sync-disconnected", "sent", name]).inc();
},
Event::NotificationStreamOpened { protocol, .. } => {
format_label("notif-open-", protocol, |protocol_label| {
self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
});
},
Event::NotificationStreamClosed { protocol, .. } => {
format_label("notif-closed-", protocol, |protocol_label| {
self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
});
},
Event::NotificationsReceived { messages, .. } =>
for (protocol, message) in messages {
format_label("notif-", protocol, |protocol_label| {
self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
});
self.notifications_sizes
.with_label_values(&[protocol, "sent", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
},
}
}
fn event_out(&self, event: &Event, name: &str) {
match event {
Event::Dht(_) => {
self.events_total.with_label_values(&["dht", "received", name]).inc();
},
Event::SyncConnected { .. } => {
self.events_total.with_label_values(&["sync-connected", "received", name]).inc();
},
Event::SyncDisconnected { .. } => {
self.events_total
.with_label_values(&["sync-disconnected", "received", name])
.inc();
},
Event::NotificationStreamOpened { protocol, .. } => {
format_label("notif-open-", protocol, |protocol_label| {
self.events_total.with_label_values(&[protocol_label, "received", name]).inc();
});
},
Event::NotificationStreamClosed { protocol, .. } => {
format_label("notif-closed-", protocol, |protocol_label| {
self.events_total.with_label_values(&[protocol_label, "received", name]).inc();
});
},
Event::NotificationsReceived { messages, .. } =>
for (protocol, message) in messages {
format_label("notif-", protocol, |protocol_label| {
self.events_total
.with_label_values(&[protocol_label, "received", name])
.inc();
});
self.notifications_sizes
.with_label_values(&[protocol, "received", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
},
}
}
}