use futures::{prelude::*, stream::FusedStream};
use std::{
pin::Pin,
task::{Context, Poll, Waker},
};
#[derive(Debug)]
pub(crate) struct Pausable<S> {
paused: bool,
stream: S,
waker: Option<Waker>,
}
impl<S: Stream + Unpin> Pausable<S> {
pub(crate) fn new(stream: S) -> Self {
Pausable {
paused: false,
stream,
waker: None,
}
}
pub(crate) fn is_paused(&mut self) -> bool {
self.paused
}
pub(crate) fn pause(&mut self) {
self.paused = true
}
pub(crate) fn unpause(&mut self) {
self.paused = false;
if let Some(w) = self.waker.take() {
w.wake()
}
}
pub(crate) fn stream(&mut self) -> &mut S {
&mut self.stream
}
}
impl<S: Stream + Unpin> Stream for Pausable<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.paused {
return self.stream.poll_next_unpin(cx);
}
self.waker = Some(cx.waker().clone());
Poll::Pending
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<S: FusedStream + Unpin> FusedStream for Pausable<S> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
#[cfg(test)]
mod tests {
use super::Pausable;
use futures::prelude::*;
#[test]
fn pause_unpause() {
let mut stream = Pausable::new(futures::stream::iter(&[1, 2, 3, 4]));
assert_eq!(Some(Some(&1)), stream.next().now_or_never());
assert_eq!(Some(Some(&2)), stream.next().now_or_never());
stream.pause();
assert_eq!(None, stream.next().now_or_never());
stream.unpause();
assert_eq!(Some(Some(&3)), stream.next().now_or_never());
assert_eq!(Some(Some(&4)), stream.next().now_or_never());
assert_eq!(Some(None), stream.next().now_or_never()) }
}