future_queue/
peekable_fused.rs

1// Copyright (c) The buffer-unordered-weighted Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures_util::{stream::FusedStream, Stream};
5use pin_project_lite::pin_project;
6use std::{
7    fmt,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12pin_project! {
13    /// A variant on `Peekable` that only works on fused streams.
14    ///
15    /// The standard `Peekable` doesn't work on examples like `FuturesUnordered`, because the standard
16    /// `Peekable` uses `Fuse<St>` internally, and `FuturesUnordered` can have new elements added to it.
17    ///
18    /// See [issue #2678](https://github.com/rust-lang/futures-rs/issues/2678) on the futures-rs
19    /// repo for more information.
20    pub(crate) struct PeekableFused<St: FusedStream> {
21        #[pin]
22        stream: St,
23        peeked: Option<St::Item>,
24    }
25}
26
27impl<St> fmt::Debug for PeekableFused<St>
28where
29    St: FusedStream + fmt::Debug,
30{
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_struct("PeekableFused")
33            .field("stream", &self.stream)
34            .field("peeked", &self.peeked.as_ref().map(|_| "..."))
35            .finish()
36    }
37}
38
39#[allow(dead_code)]
40impl<St: FusedStream> PeekableFused<St> {
41    pub(super) fn new(stream: St) -> Self {
42        Self {
43            stream,
44            peeked: None,
45        }
46    }
47
48    /// Acquires a reference to the underlying sink or stream that this combinator is
49    /// pulling from.
50    pub fn get_ref(&self) -> &St {
51        &self.stream
52    }
53
54    /// Acquires a mutable reference to the underlying sink or stream that this
55    /// combinator is pulling from.
56    ///
57    /// Note that care must be taken to avoid tampering with the state of the
58    /// sink or stream which may otherwise confuse this combinator.
59    pub fn get_mut(&mut self) -> &mut St {
60        &mut self.stream
61    }
62
63    /// Acquires a pinned mutable reference to the underlying sink or stream that this
64    /// combinator is pulling from.
65    ///
66    /// Note that care must be taken to avoid tampering with the state of the
67    /// sink or stream which may otherwise confuse this combinator.
68    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
69        self.project().stream
70    }
71
72    /// Consumes this combinator, returning the underlying sink or stream.
73    ///
74    /// Note that this may discard intermediate state of this combinator, so
75    /// care should be taken to avoid losing resources when this is called.
76    pub fn into_inner(self) -> St {
77        self.stream
78    }
79
80    /// Returns whether the combinator is done.
81    pub fn is_done(&self) -> bool {
82        self.stream.is_terminated()
83    }
84
85    // TODO: implement the rest of the API if necessary.
86
87    /// Peek retrieves a reference to the next item in the stream.
88    ///
89    /// This method polls the underlying stream and return either a reference
90    /// to the next item if the stream is ready or passes through any errors.
91    pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
92        let mut this = self.project();
93
94        Poll::Ready(loop {
95            if this.peeked.is_some() {
96                break this.peeked.as_ref();
97            } else if let Some(item) = futures_util::ready!(this.stream.as_mut().poll_next(cx)) {
98                *this.peeked = Some(item);
99            } else {
100                break None;
101            }
102        })
103    }
104}
105
106impl<St: FusedStream> Stream for PeekableFused<St> {
107    type Item = St::Item;
108
109    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
110        let this = self.project();
111        if let Some(item) = this.peeked.take() {
112            return Poll::Ready(Some(item));
113        }
114        this.stream.poll_next(cx)
115    }
116
117    fn size_hint(&self) -> (usize, Option<usize>) {
118        let peek_len = usize::from(self.peeked.is_some());
119        let (lower, upper) = self.stream.size_hint();
120        let lower = lower.saturating_add(peek_len);
121        let upper = match upper {
122            Some(x) => x.checked_add(peek_len),
123            None => None,
124        };
125        (lower, upper)
126    }
127}
128
129impl<St: FusedStream> FusedStream for PeekableFused<St> {
130    fn is_terminated(&self) -> bool {
131        self.peeked.is_none() && self.stream.is_terminated()
132    }
133}