future_queue/
future_queue.rs

1// Copyright (c) The future-queue Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::{
5    global_weight::GlobalWeight, peekable_fused::PeekableFused, slots::SlotReservations,
6    FutureQueueContext,
7};
8use futures_util::{
9    stream::{Fuse, FuturesUnordered},
10    Future, Stream, StreamExt as _,
11};
12use pin_project_lite::pin_project;
13use std::{
14    fmt,
15    pin::Pin,
16    task::{Context, Poll},
17};
18
19pin_project! {
20    /// Stream for the [`future_queue`](crate::StreamExt::future_queue) method.
21    #[must_use = "streams do nothing unless polled"]
22    pub struct FutureQueue<St>
23    where
24        St: Stream,
25        St::Item: WeightedFuture,
26     {
27        #[pin]
28        stream: PeekableFused<Fuse<St>>,
29        in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
30        slots: SlotReservations,
31        global_weight: GlobalWeight,
32    }
33}
34
35impl<St> fmt::Debug for FutureQueue<St>
36where
37    St: Stream + fmt::Debug,
38    St::Item: WeightedFuture,
39{
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("FutureQueue")
42            .field("stream", &self.stream)
43            .field("in_progress_queue", &self.in_progress_queue)
44            .field("slots", &self.slots)
45            .field("global_weight", &self.global_weight)
46            .finish()
47    }
48}
49
50impl<St> FutureQueue<St>
51where
52    St: Stream,
53    St::Item: WeightedFuture,
54{
55    pub(crate) fn new(stream: St, max_weight: usize) -> Self {
56        Self {
57            stream: PeekableFused::new(stream.fuse()),
58            in_progress_queue: FuturesUnordered::new(),
59            slots: SlotReservations::with_capacity(max_weight),
60            global_weight: GlobalWeight::new(max_weight),
61        }
62    }
63
64    /// Sets a mode where extra internal verifications are performed.
65    #[doc(hidden)]
66    pub fn set_extra_verify(&mut self, verify: bool) {
67        self.slots.set_check_reserved(verify);
68    }
69
70    /// Returns the maximum weight of futures allowed to be run by this adaptor.
71    pub fn max_weight(&self) -> usize {
72        self.global_weight.max()
73    }
74
75    /// Returns the currently running weight of futures.
76    pub fn current_weight(&self) -> usize {
77        self.global_weight.current()
78    }
79
80    /// Acquires a reference to the underlying sink or stream that this combinator is
81    /// pulling from.
82    pub fn get_ref(&self) -> &St {
83        self.stream.get_ref().get_ref()
84    }
85
86    /// Acquires a mutable reference to the underlying sink or stream that this
87    /// combinator is pulling from.
88    ///
89    /// Note that care must be taken to avoid tampering with the state of the
90    /// sink or stream which may otherwise confuse this combinator.
91    pub fn get_mut(&mut self) -> &mut St {
92        self.stream.get_mut().get_mut()
93    }
94
95    /// Acquires a pinned mutable reference to the underlying sink or stream that this
96    /// combinator is pulling from.
97    ///
98    /// Note that care must be taken to avoid tampering with the state of the
99    /// sink or stream which may otherwise confuse this combinator.
100    pub fn get_pin_mut(self: Pin<&mut Self>) -> core::pin::Pin<&mut St> {
101        self.project().stream.get_pin_mut().get_pin_mut()
102    }
103
104    /// Consumes this combinator, returning the underlying sink or stream.
105    ///
106    /// Note that this may discard intermediate state of this combinator, so
107    /// care should be taken to avoid losing resources when this is called.
108    pub fn into_inner(self) -> St {
109        self.stream.into_inner().into_inner()
110    }
111}
112
113impl<St> Stream for FutureQueue<St>
114where
115    St: Stream,
116    St::Item: WeightedFuture,
117{
118    type Item = <<St::Item as WeightedFuture>::Future as Future>::Output;
119
120    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        let mut this = self.project();
122
123        // First up, try to spawn off as many futures as possible by filling up
124        // our queue of futures.
125        while let Poll::Ready(Some(weighted_future)) = this.stream.as_mut().poll_peek(cx) {
126            if !this.global_weight.has_space_for(weighted_future.weight()) {
127                // Global limits would be exceeded, break out of the loop. Consider this
128                // item next time.
129                break;
130            }
131
132            let (weight, future_fn) = match this.stream.as_mut().poll_next(cx) {
133                Poll::Ready(Some(weighted_future)) => weighted_future.into_components(),
134                _ => unreachable!("we just peeked at this item"),
135            };
136            this.global_weight.add_weight(weight);
137            let global_slot = this.slots.reserve();
138
139            let cx = FutureQueueContext {
140                global_slot,
141                group_slot: None,
142            };
143            let future = future_fn(cx);
144
145            this.in_progress_queue
146                .push(FutureWithWeight::new(weight, global_slot, future));
147        }
148
149        // Attempt to pull the next value from the in_progress_queue.
150        match this.in_progress_queue.poll_next_unpin(cx) {
151            Poll::Pending => return Poll::Pending,
152            Poll::Ready(Some((weight, slot, output))) => {
153                this.global_weight.sub_weight(weight);
154                this.slots.release(slot);
155                return Poll::Ready(Some(output));
156            }
157            Poll::Ready(None) => {}
158        }
159
160        // If more values are still coming from the stream, we're not done yet
161        if this.stream.is_done() {
162            Poll::Ready(None)
163        } else {
164            Poll::Pending
165        }
166    }
167
168    fn size_hint(&self) -> (usize, Option<usize>) {
169        let queue_len = self.in_progress_queue.len();
170        let (lower, upper) = self.stream.size_hint();
171        let lower = lower.saturating_add(queue_len);
172        let upper = match upper {
173            Some(x) => x.checked_add(queue_len),
174            None => None,
175        };
176        (lower, upper)
177    }
178}
179
180/// A trait for types which can be converted into a `Future` and a weight.
181///
182/// Provided in case it's necessary. This trait is only implemented for `(usize, impl Future)`.
183pub trait WeightedFuture: private::Sealed {
184    /// The function to obtain the future from
185    type F: FnOnce(FutureQueueContext) -> Self::Future;
186
187    /// The associated `Future` type.
188    type Future: Future;
189
190    /// The weight of the future.
191    fn weight(&self) -> usize;
192
193    /// Turns self into its components.
194    fn into_components(self) -> (usize, Self::F);
195}
196
197mod private {
198    pub trait Sealed {}
199}
200
201impl<F, Fut> private::Sealed for (usize, F)
202where
203    F: FnOnce(FutureQueueContext) -> Fut,
204    Fut: Future,
205{
206}
207
208impl<F, Fut> WeightedFuture for (usize, F)
209where
210    F: FnOnce(FutureQueueContext) -> Fut,
211    Fut: Future,
212{
213    type F = F;
214    type Future = Fut;
215
216    #[inline]
217    fn weight(&self) -> usize {
218        self.0
219    }
220
221    #[inline]
222    fn into_components(self) -> (usize, Self::F) {
223        self
224    }
225}
226
227pin_project! {
228    #[must_use = "futures do nothing unless polled"]
229    struct FutureWithWeight<Fut> {
230        #[pin]
231        future: Fut,
232        weight: usize,
233        slot: u64,
234    }
235}
236
237impl<Fut> FutureWithWeight<Fut> {
238    pub fn new(weight: usize, slot: u64, future: Fut) -> Self {
239        Self {
240            future,
241            weight,
242            slot,
243        }
244    }
245}
246
247impl<Fut> Future for FutureWithWeight<Fut>
248where
249    Fut: Future,
250{
251    type Output = (usize, u64, Fut::Output);
252    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253        let this = self.project();
254
255        match this.future.poll(cx) {
256            Poll::Pending => Poll::Pending,
257            Poll::Ready(output) => Poll::Ready((*this.weight, *this.slot, output)),
258        }
259    }
260}