future_queue/
peekable_fused.rs1use futures_util::{stream::FusedStream, Stream};
5use pin_project_lite::pin_project;
6use std::{
7 fmt,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12pin_project! {
13 pub(crate) struct PeekableFused<St: FusedStream> {
21 #[pin]
22 stream: St,
23 peeked: Option<St::Item>,
24 }
25}
26
27impl<St> fmt::Debug for PeekableFused<St>
28where
29 St: FusedStream + fmt::Debug,
30{
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.debug_struct("PeekableFused")
33 .field("stream", &self.stream)
34 .field("peeked", &self.peeked.as_ref().map(|_| "..."))
35 .finish()
36 }
37}
38
39#[allow(dead_code)]
40impl<St: FusedStream> PeekableFused<St> {
41 pub(super) fn new(stream: St) -> Self {
42 Self {
43 stream,
44 peeked: None,
45 }
46 }
47
48 pub fn get_ref(&self) -> &St {
51 &self.stream
52 }
53
54 pub fn get_mut(&mut self) -> &mut St {
60 &mut self.stream
61 }
62
63 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
69 self.project().stream
70 }
71
72 pub fn into_inner(self) -> St {
77 self.stream
78 }
79
80 pub fn is_done(&self) -> bool {
82 self.stream.is_terminated()
83 }
84
85 pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
92 let mut this = self.project();
93
94 Poll::Ready(loop {
95 if this.peeked.is_some() {
96 break this.peeked.as_ref();
97 } else if let Some(item) = futures_util::ready!(this.stream.as_mut().poll_next(cx)) {
98 *this.peeked = Some(item);
99 } else {
100 break None;
101 }
102 })
103 }
104}
105
106impl<St: FusedStream> Stream for PeekableFused<St> {
107 type Item = St::Item;
108
109 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
110 let this = self.project();
111 if let Some(item) = this.peeked.take() {
112 return Poll::Ready(Some(item));
113 }
114 this.stream.poll_next(cx)
115 }
116
117 fn size_hint(&self) -> (usize, Option<usize>) {
118 let peek_len = usize::from(self.peeked.is_some());
119 let (lower, upper) = self.stream.size_hint();
120 let lower = lower.saturating_add(peek_len);
121 let upper = match upper {
122 Some(x) => x.checked_add(peek_len),
123 None => None,
124 };
125 (lower, upper)
126 }
127}
128
129impl<St: FusedStream> FusedStream for PeekableFused<St> {
130 fn is_terminated(&self) -> bool {
131 self.peeked.is_none() && self.stream.is_terminated()
132 }
133}