future_queue/
future_queue.rs1use crate::{
5 global_weight::GlobalWeight, peekable_fused::PeekableFused, slots::SlotReservations,
6 FutureQueueContext,
7};
8use futures_util::{
9 stream::{Fuse, FuturesUnordered},
10 Future, Stream, StreamExt as _,
11};
12use pin_project_lite::pin_project;
13use std::{
14 fmt,
15 pin::Pin,
16 task::{Context, Poll},
17};
18
19pin_project! {
20 #[must_use = "streams do nothing unless polled"]
22 pub struct FutureQueue<St>
23 where
24 St: Stream,
25 St::Item: WeightedFuture,
26 {
27 #[pin]
28 stream: PeekableFused<Fuse<St>>,
29 in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
30 slots: SlotReservations,
31 global_weight: GlobalWeight,
32 }
33}
34
35impl<St> fmt::Debug for FutureQueue<St>
36where
37 St: Stream + fmt::Debug,
38 St::Item: WeightedFuture,
39{
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("FutureQueue")
42 .field("stream", &self.stream)
43 .field("in_progress_queue", &self.in_progress_queue)
44 .field("slots", &self.slots)
45 .field("global_weight", &self.global_weight)
46 .finish()
47 }
48}
49
50impl<St> FutureQueue<St>
51where
52 St: Stream,
53 St::Item: WeightedFuture,
54{
55 pub(crate) fn new(stream: St, max_weight: usize) -> Self {
56 Self {
57 stream: PeekableFused::new(stream.fuse()),
58 in_progress_queue: FuturesUnordered::new(),
59 slots: SlotReservations::with_capacity(max_weight),
60 global_weight: GlobalWeight::new(max_weight),
61 }
62 }
63
64 #[doc(hidden)]
66 pub fn set_extra_verify(&mut self, verify: bool) {
67 self.slots.set_check_reserved(verify);
68 }
69
70 pub fn max_weight(&self) -> usize {
72 self.global_weight.max()
73 }
74
75 pub fn current_weight(&self) -> usize {
77 self.global_weight.current()
78 }
79
80 pub fn get_ref(&self) -> &St {
83 self.stream.get_ref().get_ref()
84 }
85
86 pub fn get_mut(&mut self) -> &mut St {
92 self.stream.get_mut().get_mut()
93 }
94
95 pub fn get_pin_mut(self: Pin<&mut Self>) -> core::pin::Pin<&mut St> {
101 self.project().stream.get_pin_mut().get_pin_mut()
102 }
103
104 pub fn into_inner(self) -> St {
109 self.stream.into_inner().into_inner()
110 }
111}
112
113impl<St> Stream for FutureQueue<St>
114where
115 St: Stream,
116 St::Item: WeightedFuture,
117{
118 type Item = <<St::Item as WeightedFuture>::Future as Future>::Output;
119
120 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 let mut this = self.project();
122
123 while let Poll::Ready(Some(weighted_future)) = this.stream.as_mut().poll_peek(cx) {
126 if !this.global_weight.has_space_for(weighted_future.weight()) {
127 break;
130 }
131
132 let (weight, future_fn) = match this.stream.as_mut().poll_next(cx) {
133 Poll::Ready(Some(weighted_future)) => weighted_future.into_components(),
134 _ => unreachable!("we just peeked at this item"),
135 };
136 this.global_weight.add_weight(weight);
137 let global_slot = this.slots.reserve();
138
139 let cx = FutureQueueContext {
140 global_slot,
141 group_slot: None,
142 };
143 let future = future_fn(cx);
144
145 this.in_progress_queue
146 .push(FutureWithWeight::new(weight, global_slot, future));
147 }
148
149 match this.in_progress_queue.poll_next_unpin(cx) {
151 Poll::Pending => return Poll::Pending,
152 Poll::Ready(Some((weight, slot, output))) => {
153 this.global_weight.sub_weight(weight);
154 this.slots.release(slot);
155 return Poll::Ready(Some(output));
156 }
157 Poll::Ready(None) => {}
158 }
159
160 if this.stream.is_done() {
162 Poll::Ready(None)
163 } else {
164 Poll::Pending
165 }
166 }
167
168 fn size_hint(&self) -> (usize, Option<usize>) {
169 let queue_len = self.in_progress_queue.len();
170 let (lower, upper) = self.stream.size_hint();
171 let lower = lower.saturating_add(queue_len);
172 let upper = match upper {
173 Some(x) => x.checked_add(queue_len),
174 None => None,
175 };
176 (lower, upper)
177 }
178}
179
180pub trait WeightedFuture: private::Sealed {
184 type F: FnOnce(FutureQueueContext) -> Self::Future;
186
187 type Future: Future;
189
190 fn weight(&self) -> usize;
192
193 fn into_components(self) -> (usize, Self::F);
195}
196
197mod private {
198 pub trait Sealed {}
199}
200
201impl<F, Fut> private::Sealed for (usize, F)
202where
203 F: FnOnce(FutureQueueContext) -> Fut,
204 Fut: Future,
205{
206}
207
208impl<F, Fut> WeightedFuture for (usize, F)
209where
210 F: FnOnce(FutureQueueContext) -> Fut,
211 Fut: Future,
212{
213 type F = F;
214 type Future = Fut;
215
216 #[inline]
217 fn weight(&self) -> usize {
218 self.0
219 }
220
221 #[inline]
222 fn into_components(self) -> (usize, Self::F) {
223 self
224 }
225}
226
227pin_project! {
228 #[must_use = "futures do nothing unless polled"]
229 struct FutureWithWeight<Fut> {
230 #[pin]
231 future: Fut,
232 weight: usize,
233 slot: u64,
234 }
235}
236
237impl<Fut> FutureWithWeight<Fut> {
238 pub fn new(weight: usize, slot: u64, future: Fut) -> Self {
239 Self {
240 future,
241 weight,
242 slot,
243 }
244 }
245}
246
247impl<Fut> Future for FutureWithWeight<Fut>
248where
249 Fut: Future,
250{
251 type Output = (usize, u64, Fut::Output);
252 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253 let this = self.project();
254
255 match this.future.poll(cx) {
256 Poll::Pending => Poll::Pending,
257 Poll::Ready(output) => Poll::Ready((*this.weight, *this.slot, output)),
258 }
259 }
260}