future_queue/
lib.rs

1// Copyright (c) The future-queue Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4#![warn(missing_docs)]
5
6//! `future_queue` provides ways to run several futures:
7//!
8//! * concurrently
9//! * in the order they're spawned
10//! * with global limits
11//! * and with an optional group specified for each future, with its own limits.
12//!
13//! This crate is part of the [nextest organization](https://github.com/nextest-rs) on GitHub, and
14//! is designed to serve the needs of [cargo-nextest](https://nexte.st).
15//!
16//! # Motivation
17//!
18//! Async programming in Rust often uses an adaptor called
19//! [`buffer_unordered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffer_unordered):
20//! this adaptor takes a stream of futures[^1], and executes all the futures limited to a maximum
21//! amount of concurrency.
22//!
23//! * Futures are started in the order the stream returns them in.
24//! * Once started, futures are polled simultaneously, and completed future outputs are returned in
25//!   arbitrary order (hence the `unordered`).
26//!
27//! Common use cases for `buffer_unordered` include:
28//!
29//! * Sending network requests concurrently, but limiting the amount of concurrency to avoid
30//!   overwhelming the remote server.
31//! * Running tests with a tool like [cargo-nextest](https://nexte.st).
32//!
33//! `buffer_unordered` works well for many use cases. However, one issue with it is that it treats
34//! all futures as equally taxing: there's no way to say that some futures consume more resources
35//! than others, or that some subsets of futures should be mutually excluded from others.
36//!
37//! For nextest in particular, some tests can be much heavier than others, and fewer of those tests
38//! should be run simultaneously. Also, some tests need to be mutually excluded from others, or
39//! other concurrency limits placed on them.
40//!
41//! [^1]: This adaptor takes a stream of futures for maximum generality. In practice this is often
42//!     an *iterator* of futures, converted over using
43//!     [`stream::iter`](https://docs.rs/futures/latest/futures/stream/fn.iter.html).
44//!
45//! # About this crate
46//!
47//! This crate provides two adaptors on streams.
48//!
49//! ## 1. The `future_queue` adaptor
50//!
51//! The [`future_queue`](StreamExt::future_queue) adaptor can run several futures simultaneously,
52//! limiting the concurrency to a maximum *weight*.
53//!
54//! Rather than taking a stream of futures, this adaptor takes a stream of
55//! `(usize, F)` pairs, where the `usize` indicates the weight of each future,
56//! and `F` is `FnOnce(FutureQueueContext) -> impl Future`. This adaptor will
57//! schedule and buffer futures to be run until queueing the next future will
58//! exceed the maximum weight.
59//!
60//! * The maximum weight is never exceeded while futures are being run.
61//! * If the weight of an individual future is greater than the maximum weight, its weight will be
62//!   set to the maximum weight.
63//!
64//! Once all possible futures are scheduled, this adaptor will wait until some of the currently
65//! executing futures complete, and the current weight of running futures drops below the maximum
66//! weight, before scheduling new futures.
67//!
68//! The weight of a future can be zero, in which case it doesn't count towards the maximum weight.
69//!
70//! If all weights are 1, then `future_queue` is exactly the same as `buffer_unordered`.
71//!
72//! ### Examples
73//!
74//! ```rust
75//! # futures::executor::block_on(async {
76//! use futures::{channel::oneshot, stream, StreamExt as _};
77//! use future_queue::{StreamExt as _};
78//!
79//! let (send_one, recv_one) = oneshot::channel();
80//! let (send_two, recv_two) = oneshot::channel();
81//!
82//! let stream_of_futures = stream::iter(
83//!     vec![(1, recv_one), (2, recv_two)],
84//! ).map(|(weight, future)| {
85//!     (weight, move |_cx| future)
86//! });
87//! let mut queue = stream_of_futures.future_queue(10);
88//!
89//! send_two.send("hello")?;
90//! assert_eq!(queue.next().await, Some(Ok("hello")));
91//!
92//! send_one.send("world")?;
93//! assert_eq!(queue.next().await, Some(Ok("world")));
94//!
95//! assert_eq!(queue.next().await, None);
96//! # Ok::<(), &'static str>(()) }).unwrap();
97//! ```
98//!
99//! ## 2. The `future_queue_grouped` adaptor
100//!
101//! The [`future_queue_grouped`](StreamExt::future_queue_grouped) adaptor is like `future_queue`,
102//! except it is possible to specify an optional *group* for each future. Each group has a maximum
103//! weight, and a future will only be scheduled if both the maximum weight and the group weight
104//! aren't exceeded.
105//!
106//! The adaptor is as fair as possible under the given constraints: it will schedule futures in the
107//! order they're returned by the stream, without doing any reordering based on weight. When a
108//! future from a group completes, queued up futures in this group will be preferentially scheduled
109//! before any other futures from the provided stream.
110//!
111//! Like with [`future_queue`](StreamExt::future_queue):
112//!
113//! * The maximum global and group weights is never exceeded while futures are being run.
114//! * While accounting against global weights, if the weight of an individual future is greater than
115//!   the maximum weight, its weight will be set to the maximum weight.
116//! * *If a future belongs to a group:* While accounting against the group weight, if its weight is
117//!   greater than the maximum group weight, its weight will be set to the maximum group weight.
118//!
119//! ### Examples
120//!
121//! ```rust
122//! # futures::executor::block_on(async {
123//! use futures::{channel::oneshot, stream, StreamExt as _};
124//! use future_queue::{FutureQueueContext, StreamExt as _};
125//!
126//! let (send_one, recv_one) = oneshot::channel();
127//! let (send_two, recv_two) = oneshot::channel();
128//!
129//! let stream_of_futures = stream::iter(
130//!     vec![
131//!         (1, Some("group1"), recv_one),
132//!         (2, None, recv_two),
133//!     ],
134//! ).map(|(weight, group, future)| {
135//!     (weight, group, move |_cx| future)
136//! });
137//! let mut queue = stream_of_futures.future_queue_grouped(10, [("group1", 5)]);
138//!
139//! send_two.send("hello")?;
140//! assert_eq!(queue.next().await, Some(Ok("hello")));
141//!
142//! send_one.send("world")?;
143//! assert_eq!(queue.next().await, Some(Ok("world")));
144//!
145//! assert_eq!(queue.next().await, None);
146//! # Ok::<(), &'static str>(()) }).unwrap();
147//! ```
148//!
149//! # Minimum supported Rust version (MSRV)
150//!
151//! The minimum supported Rust version is **Rust 1.70.** At any time, at least the last six months
152//! of Rust stable releases are supported.
153//!
154//! While this crate is a pre-release (0.x.x) it may have its MSRV bumped in a patch release. Once
155//! this crate has reached 1.x, any MSRV bump will be accompanied with a new minor version.
156//!
157//! # Notes
158//!
159//! This crate used to be called `buffer-unordered-weighted`. It was renamed to `future-queue` to be
160//! more descriptive about what the crate does rather than how it's implemented.
161
162mod future_queue;
163mod future_queue_grouped;
164mod global_weight;
165mod peekable_fused;
166mod slots;
167
168pub use crate::future_queue::FutureQueue;
169pub use future_queue_grouped::FutureQueueGrouped;
170
171/// Traits to aid in type definitions.
172///
173/// These traits are normally not required by end-user code, but may be necessary for some generic
174/// code.
175pub mod traits {
176    pub use crate::{future_queue::WeightedFuture, future_queue_grouped::GroupedWeightedFuture};
177}
178
179use futures_util::{Future, Stream};
180use std::{borrow::Borrow, hash::Hash};
181
182impl<T: ?Sized> StreamExt for T where T: Stream {}
183
184/// An extension trait for `Stream`s that provides
185/// [`future_queue`](StreamExt::future_queue).
186pub trait StreamExt: Stream {
187    /// An adaptor for creating a queue of pending futures (unordered), where each future has a
188    /// different weight.
189    ///
190    /// This stream must return values of type `(usize, impl Future)`, where the `usize` indicates
191    /// the weight of each future. This adaptor will buffer futures up to weight `max_weight`, and
192    /// then return the outputs in the order in which they complete.
193    ///
194    /// * The maximum weight is never exceeded while futures are being run.
195    /// * If the weight of an individual future is greater than the maximum weight, its weight will
196    ///   be set to the maximum weight.
197    ///
198    /// The adaptor will schedule futures in the order they're returned by the stream, without doing
199    /// any reordering based on weight.
200    ///
201    /// The weight of a future can be zero, in which case it will not count towards the total weight.
202    ///
203    /// The returned stream will be a stream of each future's output.
204    ///
205    /// # Examples
206    ///
207    /// See [the crate documentation](crate#examples) for an example.
208    fn future_queue<F, Fut>(self, max_weight: usize) -> FutureQueue<Self>
209    where
210        Self: Sized + Stream<Item = (usize, F)>,
211        F: FnOnce(FutureQueueContext) -> Fut,
212        Fut: Future,
213    {
214        assert_stream::<Fut::Output, _>(FutureQueue::new(self, max_weight))
215    }
216
217    /// An adaptor for creating a queue of pending futures, where each future has a different weight
218    /// and optional group.
219    ///
220    /// This method accepts a maximum global weight, as well as a set of *groups* of type `K`. Each
221    /// group has a defined maximum weight. This stream must return values of type `(usize,
222    /// Option<Q>, F)`, where `K` is `Borrow<Q>`, and `F` is `FnOnce(FutureQueueContext) -> impl Future)`.
223    ///
224    /// This adapter will buffer futures up to weight `max_weight`. If the optional group is
225    /// specified for a future, it will also check that the weight of futures in that group does not
226    /// exceed the specified limit. Any futures that exceed the group's weight limit will be queued
227    /// up, but not scheduled until the weight of futures in that group falls below the limit.
228    ///
229    /// Like with [`future_queue`](Self::future_queue):
230    ///
231    /// * The maximum global and group weights is never exceeded while futures are being run.
232    /// * While accounting against global weights, if the weight of an individual future is greater
233    ///   than the maximum weight, its weight will be set to the maximum weight.
234    /// * *If a future belongs to a group:* While accounting against the group weight, if its weight
235    ///   is greater than the maximum group weight, its weight will be set to the maximum group
236    ///   weight.
237    ///
238    /// The adaptor is as fair as possible under the given constraints: it will schedule futures in
239    /// the order they're returned by the stream, without doing any reordering based on weight. When
240    /// a future from a group completes, queued up futures in this group will be preferentially
241    /// scheduled before any other futures from the provided stream.
242    ///
243    /// The weight of a future can be zero, in which case it will not count towards the total weight.
244    ///
245    /// The returned stream will be a stream of each future's output.
246    ///
247    /// # Panics
248    ///
249    /// The stream panics if the optional group provided by a stream element isn't in the set of
250    /// known groups.
251    fn future_queue_grouped<F, Fut, I, K, Q>(
252        self,
253        max_global_weight: usize,
254        groups: I,
255    ) -> FutureQueueGrouped<Self, K>
256    where
257        I: IntoIterator<Item = (K, usize)>,
258        K: Eq + Hash + Borrow<Q> + std::fmt::Debug,
259        Q: Eq + Hash + std::fmt::Debug,
260        Self: Sized + Stream<Item = (usize, Option<Q>, F)>,
261        F: FnOnce(FutureQueueContext) -> Fut,
262        Fut: Future,
263    {
264        assert_stream::<Fut::Output, _>(FutureQueueGrouped::new(self, max_global_weight, groups))
265    }
266}
267
268/// Context for a function in a [`FutureQueue`] or [`FutureQueueGrouped`].
269#[derive(Clone, Debug)]
270pub struct FutureQueueContext {
271    global_slot: u64,
272    group_slot: Option<u64>,
273}
274
275impl FutureQueueContext {
276    /// Returns a global slot number: an integer that is unique for the lifetime
277    /// of the future, within the context of the [`FutureQueue`] or
278    /// [`FutureQueueGrouped`] it is running in.
279    ///
280    /// The slot number is *compact*: it starts from 0, and is always the
281    /// smallest possible number that could be assigned to the future at the
282    /// moment the function is called.
283    #[inline]
284    pub fn global_slot(&self) -> u64 {
285        self.global_slot
286    }
287
288    /// Returns a group slot number: an integer that is unique for the lifetime
289    /// of the future within a group.
290    ///
291    /// The slot number is *compact*: it starts from 0, and is always the
292    /// smallest possible number that could be assigned to the future at the
293    /// moment the function is called.
294    ///
295    /// Only set in case [`FutureQueueGrouped`] is used, and the future is part
296    /// of a group.
297    #[inline]
298    pub fn group_slot(&self) -> Option<u64> {
299        self.group_slot
300    }
301}
302
303pub(crate) fn assert_stream<T, S>(stream: S) -> S
304where
305    S: Stream<Item = T>,
306{
307    stream
308}