futures_util/stream/stream/
cycle.rs1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5use pin_project_lite::pin_project;
6
7pin_project! {
8 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Cycle<St> {
12 orig: St,
13 #[pin]
14 stream: St,
15 }
16}
17
18impl<St> Cycle<St>
19where
20 St: Clone + Stream,
21{
22 pub(super) fn new(stream: St) -> Self {
23 Self { orig: stream.clone(), stream }
24 }
25}
26
27impl<St> Stream for Cycle<St>
28where
29 St: Clone + Stream,
30{
31 type Item = St::Item;
32
33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34 let mut this = self.project();
35
36 match ready!(this.stream.as_mut().poll_next(cx)) {
37 None => {
38 this.stream.set(this.orig.clone());
39 this.stream.poll_next(cx)
40 }
41 item => Poll::Ready(item),
42 }
43 }
44
45 fn size_hint(&self) -> (usize, Option<usize>) {
46 match self.orig.size_hint() {
48 size @ (0, Some(0)) => size,
49 (0, _) => (0, None),
50 _ => (usize::MAX, None),
51 }
52 }
53}
54
55impl<St> FusedStream for Cycle<St>
56where
57 St: Clone + Stream,
58{
59 fn is_terminated(&self) -> bool {
60 if let (0, Some(0)) = self.size_hint() {
62 true
63 } else {
64 false
65 }
66 }
67}