use futures::{
prelude::*,
ready,
task::{Context, Poll},
};
use libp2p::{core::transport::timeout::TransportTimeout, Transport};
use std::{io, pin::Pin, time::Duration};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
pub(crate) fn initialize_transport() -> Result<WsTrans, io::Error> {
let transport = {
let tcp_transport = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new());
let inner = libp2p::dns::TokioDnsConfig::system(tcp_transport)?;
libp2p::websocket::framed::WsConfig::new(inner).and_then(|connec, _| {
let connec = connec
.with(|item| {
let item = libp2p::websocket::framed::OutgoingData::Binary(item);
future::ready(Ok::<_, io::Error>(item))
})
.try_filter_map(|item| async move {
if let libp2p::websocket::framed::Incoming::Data(data) = item {
Ok(Some(data.into_bytes()))
} else {
Ok(None)
}
});
future::ready(Ok::<_, io::Error>(connec))
})
};
Ok(TransportTimeout::new(
transport.map(|out, _| {
let out = out
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
Box::pin(out) as Pin<Box<_>>
}),
CONNECT_TIMEOUT,
)
.boxed())
}
pub(crate) trait StreamAndSink<I>: Stream + Sink<I> {}
impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
pub(crate) type WsTrans = libp2p::core::transport::Boxed<
Pin<
Box<
dyn StreamAndSink<Vec<u8>, Item = Result<Vec<u8>, io::Error>, Error = io::Error> + Send,
>,
>,
>;
#[pin_project::pin_project]
pub(crate) struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);
impl<T> From<T> for StreamSink<T> {
fn from(inner: T) -> StreamSink<T> {
StreamSink(inner, None)
}
}
impl<T: AsyncRead> Stream for StreamSink<T> {
type Item = Result<Vec<u8>, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut buf = vec![0; 128];
match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
Ok(0) => Poll::Ready(None),
Ok(n) => {
buf.truncate(n);
Poll::Ready(Some(Ok(buf)))
},
Err(err) => Poll::Ready(Some(Err(err))),
}
}
}
impl<T: AsyncWrite> StreamSink<T> {
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
let this = self.project();
if let Some(buffer) = this.1 {
if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
log::error!(target: "telemetry",
"Detected some internal buffering happening in the telemetry");
let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
return Poll::Ready(Err(err))
}
}
*this.1 = None;
Poll::Ready(Ok(()))
}
}
impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(StreamSink::poll_flush_buffer(self, cx))?;
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let this = self.project();
debug_assert!(this.1.is_none());
*this.1 = Some(item);
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush_buffer(cx))?;
let this = self.project();
AsyncWrite::poll_flush(this.0, cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush_buffer(cx))?;
let this = self.project();
AsyncWrite::poll_close(this.0, cx)
}
}