futures_util/stream/stream/
zip.rs

1use crate::stream::{Fuse, StreamExt};
2use core::cmp;
3use core::pin::Pin;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`zip`](super::StreamExt::zip) method.
10    #[derive(Debug)]
11    #[must_use = "streams do nothing unless polled"]
12    pub struct Zip<St1: Stream, St2: Stream> {
13        #[pin]
14        stream1: Fuse<St1>,
15        #[pin]
16        stream2: Fuse<St2>,
17        queued1: Option<St1::Item>,
18        queued2: Option<St2::Item>,
19    }
20}
21
22impl<St1: Stream, St2: Stream> Zip<St1, St2> {
23    pub(super) fn new(stream1: St1, stream2: St2) -> Self {
24        Self { stream1: stream1.fuse(), stream2: stream2.fuse(), queued1: None, queued2: None }
25    }
26
27    /// Acquires a reference to the underlying streams that this combinator is
28    /// pulling from.
29    pub fn get_ref(&self) -> (&St1, &St2) {
30        (self.stream1.get_ref(), self.stream2.get_ref())
31    }
32
33    /// Acquires a mutable reference to the underlying streams that this
34    /// combinator is pulling from.
35    ///
36    /// Note that care must be taken to avoid tampering with the state of the
37    /// stream which may otherwise confuse this combinator.
38    pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
39        (self.stream1.get_mut(), self.stream2.get_mut())
40    }
41
42    /// Acquires a pinned mutable reference to the underlying streams that this
43    /// combinator is pulling from.
44    ///
45    /// Note that care must be taken to avoid tampering with the state of the
46    /// stream which may otherwise confuse this combinator.
47    pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
48        let this = self.project();
49        (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
50    }
51
52    /// Consumes this combinator, returning the underlying streams.
53    ///
54    /// Note that this may discard intermediate state of this combinator, so
55    /// care should be taken to avoid losing resources when this is called.
56    pub fn into_inner(self) -> (St1, St2) {
57        (self.stream1.into_inner(), self.stream2.into_inner())
58    }
59}
60
61impl<St1, St2> FusedStream for Zip<St1, St2>
62where
63    St1: Stream,
64    St2: Stream,
65{
66    fn is_terminated(&self) -> bool {
67        self.stream1.is_terminated() && self.stream2.is_terminated()
68    }
69}
70
71impl<St1, St2> Stream for Zip<St1, St2>
72where
73    St1: Stream,
74    St2: Stream,
75{
76    type Item = (St1::Item, St2::Item);
77
78    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        let mut this = self.project();
80
81        if this.queued1.is_none() {
82            match this.stream1.as_mut().poll_next(cx) {
83                Poll::Ready(Some(item1)) => *this.queued1 = Some(item1),
84                Poll::Ready(None) | Poll::Pending => {}
85            }
86        }
87        if this.queued2.is_none() {
88            match this.stream2.as_mut().poll_next(cx) {
89                Poll::Ready(Some(item2)) => *this.queued2 = Some(item2),
90                Poll::Ready(None) | Poll::Pending => {}
91            }
92        }
93
94        if this.queued1.is_some() && this.queued2.is_some() {
95            let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap());
96            Poll::Ready(Some(pair))
97        } else if this.stream1.is_done() || this.stream2.is_done() {
98            Poll::Ready(None)
99        } else {
100            Poll::Pending
101        }
102    }
103
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        let queued1_len = usize::from(self.queued1.is_some());
106        let queued2_len = usize::from(self.queued2.is_some());
107        let (stream1_lower, stream1_upper) = self.stream1.size_hint();
108        let (stream2_lower, stream2_upper) = self.stream2.size_hint();
109
110        let stream1_lower = stream1_lower.saturating_add(queued1_len);
111        let stream2_lower = stream2_lower.saturating_add(queued2_len);
112
113        let lower = cmp::min(stream1_lower, stream2_lower);
114
115        let upper = match (stream1_upper, stream2_upper) {
116            (Some(x), Some(y)) => {
117                let x = x.saturating_add(queued1_len);
118                let y = y.saturating_add(queued2_len);
119                Some(cmp::min(x, y))
120            }
121            (Some(x), None) => x.checked_add(queued1_len),
122            (None, Some(y)) => y.checked_add(queued2_len),
123            (None, None) => None,
124        };
125
126        (lower, upper)
127    }
128}