use std::fmt::{Debug, Display};
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_util::future::Future;
use futures_util::ready;
use futures_util::stream::{Fuse, Peekable, Stream, StreamExt};
use tracing::{debug, warn};
use crate::error::*;
use crate::Time;
mod dns_exchange;
pub mod dns_handle;
pub mod dns_multiplexer;
pub mod dns_request;
pub mod dns_response;
#[cfg(feature = "dnssec")]
#[cfg_attr(docsrs, doc(cfg(feature = "dnssec")))]
pub mod dnssec_dns_handle;
pub mod retry_dns_handle;
mod serial_message;
pub use self::dns_exchange::{
DnsExchange, DnsExchangeBackground, DnsExchangeConnect, DnsExchangeSend,
};
pub use self::dns_handle::{DnsHandle, DnsStreamHandle};
pub use self::dns_multiplexer::{DnsMultiplexer, DnsMultiplexerConnect};
pub use self::dns_request::{DnsRequest, DnsRequestOptions};
pub use self::dns_response::{DnsResponse, DnsResponseStream};
#[cfg(feature = "dnssec")]
#[cfg_attr(docsrs, doc(cfg(feature = "dnssec")))]
pub use self::dnssec_dns_handle::DnssecDnsHandle;
pub use self::retry_dns_handle::RetryDnsHandle;
pub use self::serial_message::SerialMessage;
fn ignore_send<M, T>(result: Result<M, mpsc::TrySendError<T>>) {
if let Err(error) = result {
if error.is_disconnected() {
debug!("ignoring send error on disconnected stream");
return;
}
warn!("error notifying wait, possible future leak: {:?}", error);
}
}
pub trait DnsClientStream:
Stream<Item = Result<SerialMessage, ProtoError>> + Display + Send
{
type Time: Time;
fn name_server_addr(&self) -> SocketAddr;
}
pub type StreamReceiver = Peekable<Fuse<mpsc::Receiver<SerialMessage>>>;
const CHANNEL_BUFFER_SIZE: usize = 32;
#[derive(Clone)]
pub struct BufDnsStreamHandle {
remote_addr: SocketAddr,
sender: mpsc::Sender<SerialMessage>,
}
impl BufDnsStreamHandle {
pub fn new(remote_addr: SocketAddr) -> (Self, StreamReceiver) {
let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let receiver = receiver.fuse().peekable();
let this = Self {
remote_addr,
sender,
};
(this, receiver)
}
pub fn with_remote_addr(&self, remote_addr: SocketAddr) -> Self {
Self {
remote_addr,
sender: self.sender.clone(),
}
}
}
impl DnsStreamHandle for BufDnsStreamHandle {
fn send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError> {
let remote_addr: SocketAddr = self.remote_addr;
let sender: &mut _ = &mut self.sender;
sender
.try_send(SerialMessage::new(buffer.into_parts().0, remote_addr))
.map_err(|e| ProtoError::from(format!("mpsc::SendError {}", e)))
}
}
pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin + 'static {
fn send_message(&mut self, message: DnsRequest) -> DnsResponseStream;
fn shutdown(&mut self);
fn is_shutdown(&self) -> bool;
}
#[derive(Clone)]
pub struct BufDnsRequestStreamHandle {
sender: mpsc::Sender<OneshotDnsRequest>,
}
macro_rules! try_oneshot {
($expr:expr) => {{
use std::result::Result;
match $expr {
Result::Ok(val) => val,
Result::Err(err) => return DnsResponseReceiver::Err(Some(ProtoError::from(err))),
}
}};
($expr:expr,) => {
$expr?
};
}
impl DnsHandle for BufDnsRequestStreamHandle {
type Response = DnsResponseReceiver;
type Error = ProtoError;
fn send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response {
let request: DnsRequest = request.into();
debug!(
"enqueueing message:{}:{:?}",
request.op_code(),
request.queries()
);
let (request, oneshot) = OneshotDnsRequest::oneshot(request);
try_oneshot!(self.sender.try_send(request).map_err(|_| {
debug!("unable to enqueue message");
ProtoError::from(ProtoErrorKind::Busy)
}));
DnsResponseReceiver::Receiver(oneshot)
}
}
pub struct OneshotDnsRequest {
dns_request: DnsRequest,
sender_for_response: oneshot::Sender<DnsResponseStream>,
}
impl OneshotDnsRequest {
fn oneshot(dns_request: DnsRequest) -> (Self, oneshot::Receiver<DnsResponseStream>) {
let (sender_for_response, receiver) = oneshot::channel();
(
Self {
dns_request,
sender_for_response,
},
receiver,
)
}
fn into_parts(self) -> (DnsRequest, OneshotDnsResponse) {
(
self.dns_request,
OneshotDnsResponse(self.sender_for_response),
)
}
}
struct OneshotDnsResponse(oneshot::Sender<DnsResponseStream>);
impl OneshotDnsResponse {
fn send_response(self, serial_response: DnsResponseStream) -> Result<(), DnsResponseStream> {
self.0.send(serial_response)
}
}
pub enum DnsResponseReceiver {
Receiver(oneshot::Receiver<DnsResponseStream>),
Received(DnsResponseStream),
Err(Option<ProtoError>),
}
impl Stream for DnsResponseReceiver {
type Item = Result<DnsResponse, ProtoError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
*self = match *self.as_mut() {
Self::Receiver(ref mut receiver) => {
let receiver = Pin::new(receiver);
let future = ready!(receiver
.poll(cx)
.map_err(|_| ProtoError::from("receiver was canceled")))?;
Self::Received(future)
}
Self::Received(ref mut stream) => {
return stream.poll_next_unpin(cx);
}
Self::Err(ref mut err) => return Poll::Ready(err.take().map(Err)),
};
}
}
}
pub trait FirstAnswer<T, E: From<ProtoError>>: Stream<Item = Result<T, E>> + Unpin + Sized {
fn first_answer(self) -> FirstAnswerFuture<Self> {
FirstAnswerFuture { stream: Some(self) }
}
}
impl<E, S, T> FirstAnswer<T, E> for S
where
S: Stream<Item = Result<T, E>> + Unpin + Sized,
E: From<ProtoError>,
{
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FirstAnswerFuture<S> {
stream: Option<S>,
}
impl<E, S: Stream<Item = Result<T, E>> + Unpin, T> Future for FirstAnswerFuture<S>
where
S: Stream<Item = Result<T, E>> + Unpin + Sized,
E: From<ProtoError>,
{
type Output = S::Item;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = self
.stream
.as_mut()
.expect("polling FirstAnswerFuture twice");
let item = match ready!(s.poll_next_unpin(cx)) {
Some(r) => r,
None => Err(ProtoError::from(ProtoErrorKind::Timeout).into()),
};
self.stream.take();
Poll::Ready(item)
}
}