futures_util/stream/stream/
cycle.rs

1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Stream for the [`cycle`](super::StreamExt::cycle) method.
9    #[derive(Debug)]
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Cycle<St> {
12        orig: St,
13        #[pin]
14        stream: St,
15    }
16}
17
18impl<St> Cycle<St>
19where
20    St: Clone + Stream,
21{
22    pub(super) fn new(stream: St) -> Self {
23        Self { orig: stream.clone(), stream }
24    }
25}
26
27impl<St> Stream for Cycle<St>
28where
29    St: Clone + Stream,
30{
31    type Item = St::Item;
32
33    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34        let mut this = self.project();
35
36        match ready!(this.stream.as_mut().poll_next(cx)) {
37            None => {
38                this.stream.set(this.orig.clone());
39                this.stream.poll_next(cx)
40            }
41            item => Poll::Ready(item),
42        }
43    }
44
45    fn size_hint(&self) -> (usize, Option<usize>) {
46        // the cycle stream is either empty or infinite
47        match self.orig.size_hint() {
48            size @ (0, Some(0)) => size,
49            (0, _) => (0, None),
50            _ => (usize::MAX, None),
51        }
52    }
53}
54
55impl<St> FusedStream for Cycle<St>
56where
57    St: Clone + Stream,
58{
59    fn is_terminated(&self) -> bool {
60        // the cycle stream is either empty or infinite
61        if let (0, Some(0)) = self.size_hint() {
62            true
63        } else {
64            false
65        }
66    }
67}