1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd.
//
// Licensed under the Apache License, Version 2.0 or MIT license, at your option.
//
// A copy of the Apache License, Version 2.0 is included in the software as
// LICENSE-APACHE and a copy of the MIT license is included in the software
// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use super::ControlCommand;
use crate::{error::ConnectionError, Stream};
use futures::{
    channel::{mpsc, oneshot},
    prelude::*,
    ready,
};
use std::{
    pin::Pin,
    task::{Context, Poll},
};

type Result<T> = std::result::Result<T, ConnectionError>;

/// The Yamux `Connection` controller.
///
/// While a Yamux connection makes progress via its `next_stream` method,
/// this controller can be used to concurrently direct the connection,
/// e.g. to open a new stream to the remote or to close the connection.
///
/// The possible operations are implemented as async methods and redundantly
/// as poll-based variants which may be useful inside of other poll based
/// environments such as certain trait implementations.
#[derive(Debug)]
pub struct Control {
    /// Command channel to `Connection`.
    sender: mpsc::Sender<ControlCommand>,
    /// Pending state of `poll_open_stream`.
    pending_open: Option<oneshot::Receiver<Result<Stream>>>,
    /// Pending state of `poll_close`.
    pending_close: Option<oneshot::Receiver<()>>,
}

impl Clone for Control {
    fn clone(&self) -> Self {
        Control {
            sender: self.sender.clone(),
            pending_open: None,
            pending_close: None,
        }
    }
}

impl Control {
    pub(crate) fn new(sender: mpsc::Sender<ControlCommand>) -> Self {
        Control {
            sender,
            pending_open: None,
            pending_close: None,
        }
    }

    /// Open a new stream to the remote.
    pub async fn open_stream(&mut self) -> Result<Stream> {
        let (tx, rx) = oneshot::channel();
        self.sender.send(ControlCommand::OpenStream(tx)).await?;
        rx.await?
    }

    /// Close the connection.
    pub async fn close(&mut self) -> Result<()> {
        let (tx, rx) = oneshot::channel();
        if self
            .sender
            .send(ControlCommand::CloseConnection(tx))
            .await
            .is_err()
        {
            // The receiver is closed which means the connection is already closed.
            return Ok(());
        }
        // A dropped `oneshot::Sender` means the `Connection` is gone,
        // so we do not treat receive errors differently here.
        let _ = rx.await;
        Ok(())
    }

    /// [`Poll`] based alternative to [`Control::open_stream`].
    pub fn poll_open_stream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Stream>> {
        loop {
            match self.pending_open.take() {
                None => {
                    ready!(self.sender.poll_ready(cx)?);
                    let (tx, rx) = oneshot::channel();
                    self.sender.start_send(ControlCommand::OpenStream(tx))?;
                    self.pending_open = Some(rx)
                }
                Some(mut rx) => match rx.poll_unpin(cx)? {
                    Poll::Ready(result) => return Poll::Ready(result),
                    Poll::Pending => {
                        self.pending_open = Some(rx);
                        return Poll::Pending;
                    }
                },
            }
        }
    }

    /// Abort an ongoing open stream operation started by `poll_open_stream`.
    pub fn abort_open_stream(&mut self) {
        self.pending_open = None
    }

    /// [`Poll`] based alternative to [`Control::close`].
    pub fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
        loop {
            match self.pending_close.take() {
                None => {
                    if ready!(self.sender.poll_ready(cx)).is_err() {
                        // The receiver is closed which means the connection is already closed.
                        return Poll::Ready(Ok(()));
                    }
                    let (tx, rx) = oneshot::channel();
                    if let Err(e) = self.sender.start_send(ControlCommand::CloseConnection(tx)) {
                        if e.is_full() {
                            continue;
                        }
                        debug_assert!(e.is_disconnected());
                        // The receiver is closed which means the connection is already closed.
                        return Poll::Ready(Ok(()));
                    }
                    self.pending_close = Some(rx)
                }
                Some(mut rx) => match rx.poll_unpin(cx) {
                    Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
                    Poll::Ready(Err(oneshot::Canceled)) => {
                        // A dropped `oneshot::Sender` means the `Connection` is gone,
                        // which is `Ok`ay for us here.
                        return Poll::Ready(Ok(()));
                    }
                    Poll::Pending => {
                        self.pending_close = Some(rx);
                        return Poll::Pending;
                    }
                },
            }
        }
    }
}