hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use syn::parse_quote;
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::KeyedStream;
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::{Atomic, DeferTick, NoAtomic};
26use crate::location::{Location, NoTick, Tick, check_matching_location};
27use crate::nondet::{NonDet, nondet};
28
29pub mod networking;
30
31/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
32#[sealed::sealed]
33pub trait Ordering:
34    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
35{
36}
37
38/// Marks the stream as being totally ordered, which means that there are
39/// no sources of non-determinism (other than intentional ones) that will
40/// affect the order of elements.
41pub enum TotalOrder {}
42
43#[sealed::sealed]
44impl Ordering for TotalOrder {}
45
46/// Marks the stream as having no order, which means that the order of
47/// elements may be affected by non-determinism.
48///
49/// This restricts certain operators, such as `fold` and `reduce`, to only
50/// be used with commutative aggregation functions.
51pub enum NoOrder {}
52
53#[sealed::sealed]
54impl Ordering for NoOrder {}
55
56/// Helper trait for determining the weakest of two orderings.
57#[sealed::sealed]
58pub trait MinOrder<Other: ?Sized> {
59    /// The weaker of the two orderings.
60    type Min: Ordering;
61}
62
63#[sealed::sealed]
64impl MinOrder<NoOrder> for TotalOrder {
65    type Min = NoOrder;
66}
67
68#[sealed::sealed]
69impl MinOrder<TotalOrder> for TotalOrder {
70    type Min = TotalOrder;
71}
72
73#[sealed::sealed]
74impl MinOrder<TotalOrder> for NoOrder {
75    type Min = NoOrder;
76}
77
78#[sealed::sealed]
79impl MinOrder<NoOrder> for NoOrder {
80    type Min = NoOrder;
81}
82
83/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
84#[sealed::sealed]
85pub trait Retries:
86    MinRetries<Self, Min = Self>
87    + MinRetries<ExactlyOnce, Min = Self>
88    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
89{
90}
91
92/// Marks the stream as having deterministic message cardinality, with no
93/// possibility of duplicates.
94pub enum ExactlyOnce {}
95
96#[sealed::sealed]
97impl Retries for ExactlyOnce {}
98
99/// Marks the stream as having non-deterministic message cardinality, which
100/// means that duplicates may occur, but messages will not be dropped.
101pub enum AtLeastOnce {}
102
103#[sealed::sealed]
104impl Retries for AtLeastOnce {}
105
106/// Helper trait for determining the weakest of two retry guarantees.
107#[sealed::sealed]
108pub trait MinRetries<Other: ?Sized> {
109    /// The weaker of the two retry guarantees.
110    type Min: Retries;
111}
112
113#[sealed::sealed]
114impl MinRetries<AtLeastOnce> for ExactlyOnce {
115    type Min = AtLeastOnce;
116}
117
118#[sealed::sealed]
119impl MinRetries<ExactlyOnce> for ExactlyOnce {
120    type Min = ExactlyOnce;
121}
122
123#[sealed::sealed]
124impl MinRetries<ExactlyOnce> for AtLeastOnce {
125    type Min = AtLeastOnce;
126}
127
128#[sealed::sealed]
129impl MinRetries<AtLeastOnce> for AtLeastOnce {
130    type Min = AtLeastOnce;
131}
132
133/// Streaming sequence of elements with type `Type`.
134///
135/// This live collection represents a growing sequence of elements, with new elements being
136/// asynchronously appended to the end of the sequence. This can be used to model the arrival
137/// of network input, such as API requests, or streaming ingestion.
138///
139/// By default, all streams have deterministic ordering and each element is materialized exactly
140/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
141/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
142/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
143///
144/// Type Parameters:
145/// - `Type`: the type of elements in the stream
146/// - `Loc`: the location where the stream is being materialized
147/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
148/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
149///   (default is [`TotalOrder`])
150/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
151///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
152pub struct Stream<
153    Type,
154    Loc,
155    Bound: Boundedness,
156    Order: Ordering = TotalOrder,
157    Retry: Retries = ExactlyOnce,
158> {
159    pub(crate) location: Loc,
160    pub(crate) ir_node: RefCell<HydroNode>,
161
162    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
163}
164
165impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
166    for Stream<T, L, Unbounded, O, R>
167where
168    L: Location<'a>,
169{
170    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
171        Stream {
172            location: stream.location,
173            ir_node: stream.ir_node,
174            _phantom: PhantomData,
175        }
176    }
177}
178
179impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
180    for Stream<T, L, B, NoOrder, R>
181where
182    L: Location<'a>,
183{
184    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
185        Stream {
186            location: stream.location,
187            ir_node: stream.ir_node,
188            _phantom: PhantomData,
189        }
190    }
191}
192
193impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
194    for Stream<T, L, B, O, AtLeastOnce>
195where
196    L: Location<'a>,
197{
198    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
199        Stream {
200            location: stream.location,
201            ir_node: stream.ir_node,
202            _phantom: PhantomData,
203        }
204    }
205}
206
207impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
208where
209    L: Location<'a>,
210{
211    fn defer_tick(self) -> Self {
212        Stream::defer_tick(self)
213    }
214}
215
216impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
217    for Stream<T, Tick<L>, Bounded, O, R>
218where
219    L: Location<'a>,
220{
221    type Location = Tick<L>;
222
223    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
224        Stream::new(
225            location.clone(),
226            HydroNode::CycleSource {
227                ident,
228                metadata: location.new_node_metadata::<T>(),
229            },
230        )
231    }
232}
233
234impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
235    for Stream<T, Tick<L>, Bounded, O, R>
236where
237    L: Location<'a>,
238{
239    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
240        assert_eq!(
241            Location::id(&self.location),
242            expected_location,
243            "locations do not match"
244        );
245        self.location
246            .flow_state()
247            .borrow_mut()
248            .push_root(HydroRoot::CycleSink {
249                ident,
250                input: Box::new(self.ir_node.into_inner()),
251                out_location: Location::id(&self.location),
252                op_metadata: HydroIrOpMetadata::new(),
253            });
254    }
255}
256
257impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
258    for Stream<T, L, B, O, R>
259where
260    L: Location<'a> + NoTick,
261{
262    type Location = L;
263
264    fn create_source(ident: syn::Ident, location: L) -> Self {
265        Stream::new(
266            location.clone(),
267            HydroNode::CycleSource {
268                ident,
269                metadata: location.new_node_metadata::<T>(),
270            },
271        )
272    }
273}
274
275impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
276    for Stream<T, L, B, O, R>
277where
278    L: Location<'a> + NoTick,
279{
280    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
281        assert_eq!(
282            Location::id(&self.location),
283            expected_location,
284            "locations do not match"
285        );
286        self.location
287            .flow_state()
288            .borrow_mut()
289            .push_root(HydroRoot::CycleSink {
290                ident,
291                input: Box::new(self.ir_node.into_inner()),
292                out_location: Location::id(&self.location),
293                op_metadata: HydroIrOpMetadata::new(),
294            });
295    }
296}
297
298impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
299where
300    T: Clone,
301    L: Location<'a>,
302{
303    fn clone(&self) -> Self {
304        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
305            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
306            *self.ir_node.borrow_mut() = HydroNode::Tee {
307                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
308                metadata: self.location.new_node_metadata::<T>(),
309            };
310        }
311
312        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
313            Stream {
314                location: self.location.clone(),
315                ir_node: HydroNode::Tee {
316                    inner: TeeNode(inner.0.clone()),
317                    metadata: metadata.clone(),
318                }
319                .into(),
320                _phantom: PhantomData,
321            }
322        } else {
323            unreachable!()
324        }
325    }
326}
327
328impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
329where
330    L: Location<'a>,
331{
332    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
333        debug_assert_eq!(&Location::id(&location), &ir_node.metadata().location_kind);
334        Stream {
335            location,
336            ir_node: RefCell::new(ir_node),
337            _phantom: PhantomData,
338        }
339    }
340
341    /// Produces a stream based on invoking `f` on each element.
342    /// If you do not want to modify the stream and instead only want to view
343    /// each item use [`Stream::inspect`] instead.
344    ///
345    /// # Example
346    /// ```rust
347    /// # use hydro_lang::prelude::*;
348    /// # use futures::StreamExt;
349    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
350    /// let words = process.source_iter(q!(vec!["hello", "world"]));
351    /// words.map(q!(|x| x.to_uppercase()))
352    /// # }, |mut stream| async move {
353    /// # for w in vec!["HELLO", "WORLD"] {
354    /// #     assert_eq!(stream.next().await.unwrap(), w);
355    /// # }
356    /// # }));
357    /// ```
358    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
359    where
360        F: Fn(T) -> U + 'a,
361    {
362        let f = f.splice_fn1_ctx(&self.location).into();
363        Stream::new(
364            self.location.clone(),
365            HydroNode::Map {
366                f,
367                input: Box::new(self.ir_node.into_inner()),
368                metadata: self.location.new_node_metadata::<U>(),
369            },
370        )
371    }
372
373    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
374    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
375    /// for the output type `U` must produce items in a **deterministic** order.
376    ///
377    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
378    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
379    ///
380    /// # Example
381    /// ```rust
382    /// # use hydro_lang::prelude::*;
383    /// # use futures::StreamExt;
384    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
385    /// process
386    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
387    ///     .flat_map_ordered(q!(|x| x))
388    /// # }, |mut stream| async move {
389    /// // 1, 2, 3, 4
390    /// # for w in (1..5) {
391    /// #     assert_eq!(stream.next().await.unwrap(), w);
392    /// # }
393    /// # }));
394    /// ```
395    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
396    where
397        I: IntoIterator<Item = U>,
398        F: Fn(T) -> I + 'a,
399    {
400        let f = f.splice_fn1_ctx(&self.location).into();
401        Stream::new(
402            self.location.clone(),
403            HydroNode::FlatMap {
404                f,
405                input: Box::new(self.ir_node.into_inner()),
406                metadata: self.location.new_node_metadata::<U>(),
407            },
408        )
409    }
410
411    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
412    /// for the output type `U` to produce items in any order.
413    ///
414    /// # Example
415    /// ```rust
416    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
417    /// # use futures::StreamExt;
418    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
419    /// process
420    ///     .source_iter(q!(vec![
421    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
422    ///         std::collections::HashSet::from_iter(vec![3, 4]),
423    ///     ]))
424    ///     .flat_map_unordered(q!(|x| x))
425    /// # }, |mut stream| async move {
426    /// // 1, 2, 3, 4, but in no particular order
427    /// # let mut results = Vec::new();
428    /// # for w in (1..5) {
429    /// #     results.push(stream.next().await.unwrap());
430    /// # }
431    /// # results.sort();
432    /// # assert_eq!(results, vec![1, 2, 3, 4]);
433    /// # }));
434    /// ```
435    pub fn flat_map_unordered<U, I, F>(
436        self,
437        f: impl IntoQuotedMut<'a, F, L>,
438    ) -> Stream<U, L, B, NoOrder, R>
439    where
440        I: IntoIterator<Item = U>,
441        F: Fn(T) -> I + 'a,
442    {
443        let f = f.splice_fn1_ctx(&self.location).into();
444        Stream::new(
445            self.location.clone(),
446            HydroNode::FlatMap {
447                f,
448                input: Box::new(self.ir_node.into_inner()),
449                metadata: self.location.new_node_metadata::<U>(),
450            },
451        )
452    }
453
454    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
455    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
456    ///
457    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
458    /// not deterministic, use [`Stream::flatten_unordered`] instead.
459    ///
460    /// ```rust
461    /// # use hydro_lang::prelude::*;
462    /// # use futures::StreamExt;
463    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
464    /// process
465    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
466    ///     .flatten_ordered()
467    /// # }, |mut stream| async move {
468    /// // 1, 2, 3, 4
469    /// # for w in (1..5) {
470    /// #     assert_eq!(stream.next().await.unwrap(), w);
471    /// # }
472    /// # }));
473    /// ```
474    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
475    where
476        T: IntoIterator<Item = U>,
477    {
478        self.flat_map_ordered(q!(|d| d))
479    }
480
481    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
482    /// for the element type `T` to produce items in any order.
483    ///
484    /// # Example
485    /// ```rust
486    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
487    /// # use futures::StreamExt;
488    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
489    /// process
490    ///     .source_iter(q!(vec![
491    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
492    ///         std::collections::HashSet::from_iter(vec![3, 4]),
493    ///     ]))
494    ///     .flatten_unordered()
495    /// # }, |mut stream| async move {
496    /// // 1, 2, 3, 4, but in no particular order
497    /// # let mut results = Vec::new();
498    /// # for w in (1..5) {
499    /// #     results.push(stream.next().await.unwrap());
500    /// # }
501    /// # results.sort();
502    /// # assert_eq!(results, vec![1, 2, 3, 4]);
503    /// # }));
504    /// ```
505    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
506    where
507        T: IntoIterator<Item = U>,
508    {
509        self.flat_map_unordered(q!(|d| d))
510    }
511
512    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
513    /// `f`, preserving the order of the elements.
514    ///
515    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
516    /// not modify or take ownership of the values. If you need to modify the values while filtering
517    /// use [`Stream::filter_map`] instead.
518    ///
519    /// # Example
520    /// ```rust
521    /// # use hydro_lang::prelude::*;
522    /// # use futures::StreamExt;
523    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
524    /// process
525    ///     .source_iter(q!(vec![1, 2, 3, 4]))
526    ///     .filter(q!(|&x| x > 2))
527    /// # }, |mut stream| async move {
528    /// // 3, 4
529    /// # for w in (3..5) {
530    /// #     assert_eq!(stream.next().await.unwrap(), w);
531    /// # }
532    /// # }));
533    /// ```
534    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
535    where
536        F: Fn(&T) -> bool + 'a,
537    {
538        let f = f.splice_fn1_borrow_ctx(&self.location).into();
539        Stream::new(
540            self.location.clone(),
541            HydroNode::Filter {
542                f,
543                input: Box::new(self.ir_node.into_inner()),
544                metadata: self.location.new_node_metadata::<T>(),
545            },
546        )
547    }
548
549    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
550    ///
551    /// # Example
552    /// ```rust
553    /// # use hydro_lang::prelude::*;
554    /// # use futures::StreamExt;
555    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
556    /// process
557    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
558    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
559    /// # }, |mut stream| async move {
560    /// // 1, 2
561    /// # for w in (1..3) {
562    /// #     assert_eq!(stream.next().await.unwrap(), w);
563    /// # }
564    /// # }));
565    /// ```
566    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
567    where
568        F: Fn(T) -> Option<U> + 'a,
569    {
570        let f = f.splice_fn1_ctx(&self.location).into();
571        Stream::new(
572            self.location.clone(),
573            HydroNode::FilterMap {
574                f,
575                input: Box::new(self.ir_node.into_inner()),
576                metadata: self.location.new_node_metadata::<U>(),
577            },
578        )
579    }
580
581    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
582    /// where `x` is the final value of `other`, a bounded [`Singleton`].
583    ///
584    /// # Example
585    /// ```rust
586    /// # use hydro_lang::prelude::*;
587    /// # use futures::StreamExt;
588    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589    /// let tick = process.tick();
590    /// let batch = process
591    ///   .source_iter(q!(vec![1, 2, 3, 4]))
592    ///   .batch(&tick, nondet!(/** test */));
593    /// let count = batch.clone().count(); // `count()` returns a singleton
594    /// batch.cross_singleton(count).all_ticks()
595    /// # }, |mut stream| async move {
596    /// // (1, 4), (2, 4), (3, 4), (4, 4)
597    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
598    /// #     assert_eq!(stream.next().await.unwrap(), w);
599    /// # }
600    /// # }));
601    /// ```
602    pub fn cross_singleton<O2>(
603        self,
604        other: impl Into<Optional<O2, L, Bounded>>,
605    ) -> Stream<(T, O2), L, B, O, R>
606    where
607        O2: Clone,
608    {
609        let other: Optional<O2, L, Bounded> = other.into();
610        check_matching_location(&self.location, &other.location);
611
612        Stream::new(
613            self.location.clone(),
614            HydroNode::CrossSingleton {
615                left: Box::new(self.ir_node.into_inner()),
616                right: Box::new(other.ir_node.into_inner()),
617                metadata: self.location.new_node_metadata::<(T, O2)>(),
618            },
619        )
620    }
621
622    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
623    ///
624    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
625    /// leader of a cluster.
626    ///
627    /// # Example
628    /// ```rust
629    /// # use hydro_lang::prelude::*;
630    /// # use futures::StreamExt;
631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
632    /// let tick = process.tick();
633    /// // ticks are lazy by default, forces the second tick to run
634    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
635    ///
636    /// let batch_first_tick = process
637    ///   .source_iter(q!(vec![1, 2, 3, 4]))
638    ///   .batch(&tick, nondet!(/** test */));
639    /// let batch_second_tick = process
640    ///   .source_iter(q!(vec![5, 6, 7, 8]))
641    ///   .batch(&tick, nondet!(/** test */))
642    ///   .defer_tick(); // appears on the second tick
643    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
644    /// batch_first_tick.chain(batch_second_tick)
645    ///   .filter_if_some(some_on_first_tick)
646    ///   .all_ticks()
647    /// # }, |mut stream| async move {
648    /// // [1, 2, 3, 4]
649    /// # for w in vec![1, 2, 3, 4] {
650    /// #     assert_eq!(stream.next().await.unwrap(), w);
651    /// # }
652    /// # }));
653    /// ```
654    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
655        self.cross_singleton(signal.map(q!(|_u| ())))
656            .map(q!(|(d, _signal)| d))
657    }
658
659    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
660    ///
661    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
662    /// some local state.
663    ///
664    /// # Example
665    /// ```rust
666    /// # use hydro_lang::prelude::*;
667    /// # use futures::StreamExt;
668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669    /// let tick = process.tick();
670    /// // ticks are lazy by default, forces the second tick to run
671    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672    ///
673    /// let batch_first_tick = process
674    ///   .source_iter(q!(vec![1, 2, 3, 4]))
675    ///   .batch(&tick, nondet!(/** test */));
676    /// let batch_second_tick = process
677    ///   .source_iter(q!(vec![5, 6, 7, 8]))
678    ///   .batch(&tick, nondet!(/** test */))
679    ///   .defer_tick(); // appears on the second tick
680    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
681    /// batch_first_tick.chain(batch_second_tick)
682    ///   .filter_if_none(some_on_first_tick)
683    ///   .all_ticks()
684    /// # }, |mut stream| async move {
685    /// // [5, 6, 7, 8]
686    /// # for w in vec![5, 6, 7, 8] {
687    /// #     assert_eq!(stream.next().await.unwrap(), w);
688    /// # }
689    /// # }));
690    /// ```
691    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
692        self.filter_if_some(
693            other
694                .map(q!(|_| ()))
695                .into_singleton()
696                .filter(q!(|o| o.is_none())),
697        )
698    }
699
700    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
701    /// tupled pairs in a non-deterministic order.
702    ///
703    /// # Example
704    /// ```rust
705    /// # use hydro_lang::prelude::*;
706    /// # use std::collections::HashSet;
707    /// # use futures::StreamExt;
708    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
709    /// let tick = process.tick();
710    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
711    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
712    /// stream1.cross_product(stream2)
713    /// # }, |mut stream| async move {
714    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
715    /// # stream.map(|i| assert!(expected.contains(&i)));
716    /// # }));
717    /// ```
718    pub fn cross_product<T2, O2: Ordering>(
719        self,
720        other: Stream<T2, L, B, O2, R>,
721    ) -> Stream<(T, T2), L, B, NoOrder, R>
722    where
723        T: Clone,
724        T2: Clone,
725    {
726        check_matching_location(&self.location, &other.location);
727
728        Stream::new(
729            self.location.clone(),
730            HydroNode::CrossProduct {
731                left: Box::new(self.ir_node.into_inner()),
732                right: Box::new(other.ir_node.into_inner()),
733                metadata: self.location.new_node_metadata::<(T, T2)>(),
734            },
735        )
736    }
737
738    /// Takes one stream as input and filters out any duplicate occurrences. The output
739    /// contains all unique values from the input.
740    ///
741    /// # Example
742    /// ```rust
743    /// # use hydro_lang::prelude::*;
744    /// # use futures::StreamExt;
745    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
746    /// let tick = process.tick();
747    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
748    /// # }, |mut stream| async move {
749    /// # for w in vec![1, 2, 3, 4] {
750    /// #     assert_eq!(stream.next().await.unwrap(), w);
751    /// # }
752    /// # }));
753    /// ```
754    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
755    where
756        T: Eq + Hash,
757    {
758        Stream::new(
759            self.location.clone(),
760            HydroNode::Unique {
761                input: Box::new(self.ir_node.into_inner()),
762                metadata: self.location.new_node_metadata::<T>(),
763            },
764        )
765    }
766
767    /// Outputs everything in this stream that is *not* contained in the `other` stream.
768    ///
769    /// The `other` stream must be [`Bounded`], since this function will wait until
770    /// all its elements are available before producing any output.
771    /// # Example
772    /// ```rust
773    /// # use hydro_lang::prelude::*;
774    /// # use futures::StreamExt;
775    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
776    /// let tick = process.tick();
777    /// let stream = process
778    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
779    ///   .batch(&tick, nondet!(/** test */));
780    /// let batch = process
781    ///   .source_iter(q!(vec![1, 2]))
782    ///   .batch(&tick, nondet!(/** test */));
783    /// stream.filter_not_in(batch).all_ticks()
784    /// # }, |mut stream| async move {
785    /// # for w in vec![3, 4] {
786    /// #     assert_eq!(stream.next().await.unwrap(), w);
787    /// # }
788    /// # }));
789    /// ```
790    pub fn filter_not_in<O2: Ordering>(
791        self,
792        other: Stream<T, L, Bounded, O2, R>,
793    ) -> Stream<T, L, Bounded, O, R>
794    where
795        T: Eq + Hash,
796    {
797        check_matching_location(&self.location, &other.location);
798
799        Stream::new(
800            self.location.clone(),
801            HydroNode::Difference {
802                pos: Box::new(self.ir_node.into_inner()),
803                neg: Box::new(other.ir_node.into_inner()),
804                metadata: self.location.new_node_metadata::<T>(),
805            },
806        )
807    }
808
809    /// An operator which allows you to "inspect" each element of a stream without
810    /// modifying it. The closure `f` is called on a reference to each item. This is
811    /// mainly useful for debugging, and should not be used to generate side-effects.
812    ///
813    /// # Example
814    /// ```rust
815    /// # use hydro_lang::prelude::*;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818    /// let nums = process.source_iter(q!(vec![1, 2]));
819    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
820    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
821    /// # }, |mut stream| async move {
822    /// # for w in vec![1, 2] {
823    /// #     assert_eq!(stream.next().await.unwrap(), w);
824    /// # }
825    /// # }));
826    /// ```
827    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
828    where
829        F: Fn(&T) + 'a,
830    {
831        let f = f.splice_fn1_borrow_ctx(&self.location).into();
832
833        Stream::new(
834            self.location.clone(),
835            HydroNode::Inspect {
836                f,
837                input: Box::new(self.ir_node.into_inner()),
838                metadata: self.location.new_node_metadata::<T>(),
839            },
840        )
841    }
842
843    /// An operator which allows you to "name" a `HydroNode`.
844    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
845    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
846        {
847            let mut node = self.ir_node.borrow_mut();
848            let metadata = node.metadata_mut();
849            metadata.tag = Some(name.to_string());
850        }
851        self
852    }
853
854    /// Explicitly "casts" the stream to a type with a different ordering
855    /// guarantee. Useful in unsafe code where the ordering cannot be proven
856    /// by the type-system.
857    ///
858    /// # Non-Determinism
859    /// This function is used as an escape hatch, and any mistakes in the
860    /// provided ordering guarantee will propagate into the guarantees
861    /// for the rest of the program.
862    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
863        Stream::new(self.location, self.ir_node.into_inner())
864    }
865
866    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
867    /// which is always safe because that is the weakest possible guarantee.
868    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
869        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
870        self.assume_ordering::<NoOrder>(nondet)
871    }
872
873    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
874    /// enforcing that `O2` is weaker than the input ordering guarantee.
875    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
876        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
877        self.assume_ordering::<O2>(nondet)
878    }
879
880    /// Explicitly "casts" the stream to a type with a different retries
881    /// guarantee. Useful in unsafe code where the lack of retries cannot
882    /// be proven by the type-system.
883    ///
884    /// # Non-Determinism
885    /// This function is used as an escape hatch, and any mistakes in the
886    /// provided retries guarantee will propagate into the guarantees
887    /// for the rest of the program.
888    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
889        Stream::new(self.location, self.ir_node.into_inner())
890    }
891
892    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
893    /// which is always safe because that is the weakest possible guarantee.
894    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
895        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
896        self.assume_retries::<AtLeastOnce>(nondet)
897    }
898
899    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
900    /// enforcing that `R2` is weaker than the input retries guarantee.
901    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
902        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
903        self.assume_retries::<R2>(nondet)
904    }
905}
906
907impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
908where
909    L: Location<'a>,
910{
911    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
912    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
913    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
914        self.assume_retries(
915            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
916        )
917    }
918}
919
920impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
921where
922    L: Location<'a>,
923{
924    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
925    ///
926    /// # Example
927    /// ```rust
928    /// # use hydro_lang::prelude::*;
929    /// # use futures::StreamExt;
930    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
931    /// process.source_iter(q!(&[1, 2, 3])).cloned()
932    /// # }, |mut stream| async move {
933    /// // 1, 2, 3
934    /// # for w in vec![1, 2, 3] {
935    /// #     assert_eq!(stream.next().await.unwrap(), w);
936    /// # }
937    /// # }));
938    /// ```
939    pub fn cloned(self) -> Stream<T, L, B, O, R>
940    where
941        T: Clone,
942    {
943        self.map(q!(|d| d.clone()))
944    }
945}
946
947impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
948where
949    L: Location<'a>,
950{
951    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
952    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
953    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
954    ///
955    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
956    /// and there may be duplicates.
957    ///
958    /// # Example
959    /// ```rust
960    /// # use hydro_lang::prelude::*;
961    /// # use futures::StreamExt;
962    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
963    /// let tick = process.tick();
964    /// let bools = process.source_iter(q!(vec![false, true, false]));
965    /// let batch = bools.batch(&tick, nondet!(/** test */));
966    /// batch
967    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
968    ///     .all_ticks()
969    /// # }, |mut stream| async move {
970    /// // true
971    /// # assert_eq!(stream.next().await.unwrap(), true);
972    /// # }));
973    /// ```
974    pub fn fold_commutative_idempotent<A, I, F>(
975        self,
976        init: impl IntoQuotedMut<'a, I, L>,
977        comb: impl IntoQuotedMut<'a, F, L>,
978    ) -> Singleton<A, L, B>
979    where
980        I: Fn() -> A + 'a,
981        F: Fn(&mut A, T),
982    {
983        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
984        self.assume_ordering(nondet)
985            .assume_retries(nondet)
986            .fold(init, comb)
987    }
988
989    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
990    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
991    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
992    /// reference, so that it can be modified in place.
993    ///
994    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
995    /// and there may be duplicates.
996    ///
997    /// # Example
998    /// ```rust
999    /// # use hydro_lang::prelude::*;
1000    /// # use futures::StreamExt;
1001    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1002    /// let tick = process.tick();
1003    /// let bools = process.source_iter(q!(vec![false, true, false]));
1004    /// let batch = bools.batch(&tick, nondet!(/** test */));
1005    /// batch
1006    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1007    ///     .all_ticks()
1008    /// # }, |mut stream| async move {
1009    /// // true
1010    /// # assert_eq!(stream.next().await.unwrap(), true);
1011    /// # }));
1012    /// ```
1013    pub fn reduce_commutative_idempotent<F>(
1014        self,
1015        comb: impl IntoQuotedMut<'a, F, L>,
1016    ) -> Optional<T, L, B>
1017    where
1018        F: Fn(&mut T, T) + 'a,
1019    {
1020        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1021        self.assume_ordering(nondet)
1022            .assume_retries(nondet)
1023            .reduce(comb)
1024    }
1025
1026    /// Computes the maximum element in the stream as an [`Optional`], which
1027    /// will be empty until the first element in the input arrives.
1028    ///
1029    /// # Example
1030    /// ```rust
1031    /// # use hydro_lang::prelude::*;
1032    /// # use futures::StreamExt;
1033    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1034    /// let tick = process.tick();
1035    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1036    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1037    /// batch.max().all_ticks()
1038    /// # }, |mut stream| async move {
1039    /// // 4
1040    /// # assert_eq!(stream.next().await.unwrap(), 4);
1041    /// # }));
1042    /// ```
1043    pub fn max(self) -> Optional<T, L, B>
1044    where
1045        T: Ord,
1046    {
1047        self.reduce_commutative_idempotent(q!(|curr, new| {
1048            if new > *curr {
1049                *curr = new;
1050            }
1051        }))
1052    }
1053
1054    /// Computes the maximum element in the stream as an [`Optional`], where the
1055    /// maximum is determined according to the `key` function. The [`Optional`] will
1056    /// be empty until the first element in the input arrives.
1057    ///
1058    /// # Example
1059    /// ```rust
1060    /// # use hydro_lang::prelude::*;
1061    /// # use futures::StreamExt;
1062    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1063    /// let tick = process.tick();
1064    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1065    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1066    /// batch.max_by_key(q!(|x| -x)).all_ticks()
1067    /// # }, |mut stream| async move {
1068    /// // 1
1069    /// # assert_eq!(stream.next().await.unwrap(), 1);
1070    /// # }));
1071    /// ```
1072    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1073    where
1074        K: Ord,
1075        F: Fn(&T) -> K + 'a,
1076    {
1077        let f = key.splice_fn1_borrow_ctx(&self.location);
1078
1079        let wrapped: syn::Expr = parse_quote!({
1080            let key_fn = #f;
1081            move |curr, new| {
1082                if key_fn(&new) > key_fn(&*curr) {
1083                    *curr = new;
1084                }
1085            }
1086        });
1087
1088        Optional::new(
1089            self.location.clone(),
1090            HydroNode::Reduce {
1091                f: wrapped.into(),
1092                input: Box::new(self.ir_node.into_inner()),
1093                metadata: self.location.new_node_metadata::<T>(),
1094            },
1095        )
1096    }
1097
1098    /// Computes the minimum element in the stream as an [`Optional`], which
1099    /// will be empty until the first element in the input arrives.
1100    ///
1101    /// # Example
1102    /// ```rust
1103    /// # use hydro_lang::prelude::*;
1104    /// # use futures::StreamExt;
1105    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1106    /// let tick = process.tick();
1107    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1108    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1109    /// batch.min().all_ticks()
1110    /// # }, |mut stream| async move {
1111    /// // 1
1112    /// # assert_eq!(stream.next().await.unwrap(), 1);
1113    /// # }));
1114    /// ```
1115    pub fn min(self) -> Optional<T, L, B>
1116    where
1117        T: Ord,
1118    {
1119        self.reduce_commutative_idempotent(q!(|curr, new| {
1120            if new < *curr {
1121                *curr = new;
1122            }
1123        }))
1124    }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1128where
1129    L: Location<'a>,
1130{
1131    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1132    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134    ///
1135    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1136    ///
1137    /// # Example
1138    /// ```rust
1139    /// # use hydro_lang::prelude::*;
1140    /// # use futures::StreamExt;
1141    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1142    /// let tick = process.tick();
1143    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1144    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1145    /// batch
1146    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1147    ///     .all_ticks()
1148    /// # }, |mut stream| async move {
1149    /// // 10
1150    /// # assert_eq!(stream.next().await.unwrap(), 10);
1151    /// # }));
1152    /// ```
1153    pub fn fold_commutative<A, I, F>(
1154        self,
1155        init: impl IntoQuotedMut<'a, I, L>,
1156        comb: impl IntoQuotedMut<'a, F, L>,
1157    ) -> Singleton<A, L, B>
1158    where
1159        I: Fn() -> A + 'a,
1160        F: Fn(&mut A, T),
1161    {
1162        let nondet = nondet!(/** the combinator function is commutative */);
1163        self.assume_ordering(nondet).fold(init, comb)
1164    }
1165
1166    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1167    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1168    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1169    /// reference, so that it can be modified in place.
1170    ///
1171    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1172    ///
1173    /// # Example
1174    /// ```rust
1175    /// # use hydro_lang::prelude::*;
1176    /// # use futures::StreamExt;
1177    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1178    /// let tick = process.tick();
1179    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1180    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1181    /// batch
1182    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1183    ///     .all_ticks()
1184    /// # }, |mut stream| async move {
1185    /// // 10
1186    /// # assert_eq!(stream.next().await.unwrap(), 10);
1187    /// # }));
1188    /// ```
1189    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1190    where
1191        F: Fn(&mut T, T) + 'a,
1192    {
1193        let nondet = nondet!(/** the combinator function is commutative */);
1194        self.assume_ordering(nondet).reduce(comb)
1195    }
1196
1197    /// Computes the number of elements in the stream as a [`Singleton`].
1198    ///
1199    /// # Example
1200    /// ```rust
1201    /// # use hydro_lang::prelude::*;
1202    /// # use futures::StreamExt;
1203    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1204    /// let tick = process.tick();
1205    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1206    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1207    /// batch.count().all_ticks()
1208    /// # }, |mut stream| async move {
1209    /// // 4
1210    /// # assert_eq!(stream.next().await.unwrap(), 4);
1211    /// # }));
1212    /// ```
1213    pub fn count(self) -> Singleton<usize, L, B> {
1214        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1215    }
1216}
1217
1218impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1219where
1220    L: Location<'a>,
1221{
1222    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1223    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1224    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1225    ///
1226    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1227    ///
1228    /// # Example
1229    /// ```rust
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// let tick = process.tick();
1234    /// let bools = process.source_iter(q!(vec![false, true, false]));
1235    /// let batch = bools.batch(&tick, nondet!(/** test */));
1236    /// batch
1237    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1238    ///     .all_ticks()
1239    /// # }, |mut stream| async move {
1240    /// // true
1241    /// # assert_eq!(stream.next().await.unwrap(), true);
1242    /// # }));
1243    /// ```
1244    pub fn fold_idempotent<A, I, F>(
1245        self,
1246        init: impl IntoQuotedMut<'a, I, L>,
1247        comb: impl IntoQuotedMut<'a, F, L>,
1248    ) -> Singleton<A, L, B>
1249    where
1250        I: Fn() -> A + 'a,
1251        F: Fn(&mut A, T),
1252    {
1253        let nondet = nondet!(/** the combinator function is idempotent */);
1254        self.assume_retries(nondet).fold(init, comb)
1255    }
1256
1257    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1258    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1259    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1260    /// reference, so that it can be modified in place.
1261    ///
1262    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1263    ///
1264    /// # Example
1265    /// ```rust
1266    /// # use hydro_lang::prelude::*;
1267    /// # use futures::StreamExt;
1268    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1269    /// let tick = process.tick();
1270    /// let bools = process.source_iter(q!(vec![false, true, false]));
1271    /// let batch = bools.batch(&tick, nondet!(/** test */));
1272    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1273    /// # }, |mut stream| async move {
1274    /// // true
1275    /// # assert_eq!(stream.next().await.unwrap(), true);
1276    /// # }));
1277    /// ```
1278    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1279    where
1280        F: Fn(&mut T, T) + 'a,
1281    {
1282        let nondet = nondet!(/** the combinator function is idempotent */);
1283        self.assume_retries(nondet).reduce(comb)
1284    }
1285
1286    /// Computes the first element in the stream as an [`Optional`], which
1287    /// will be empty until the first element in the input arrives.
1288    ///
1289    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1290    /// re-ordering of elements may cause the first element to change.
1291    ///
1292    /// # Example
1293    /// ```rust
1294    /// # use hydro_lang::prelude::*;
1295    /// # use futures::StreamExt;
1296    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1297    /// let tick = process.tick();
1298    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1299    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1300    /// batch.first().all_ticks()
1301    /// # }, |mut stream| async move {
1302    /// // 1
1303    /// # assert_eq!(stream.next().await.unwrap(), 1);
1304    /// # }));
1305    /// ```
1306    pub fn first(self) -> Optional<T, L, B> {
1307        self.reduce_idempotent(q!(|_, _| {}))
1308    }
1309
1310    /// Computes the last element in the stream as an [`Optional`], which
1311    /// will be empty until an element in the input arrives.
1312    ///
1313    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1314    /// re-ordering of elements may cause the last element to change.
1315    ///
1316    /// # Example
1317    /// ```rust
1318    /// # use hydro_lang::prelude::*;
1319    /// # use futures::StreamExt;
1320    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1321    /// let tick = process.tick();
1322    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1323    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1324    /// batch.last().all_ticks()
1325    /// # }, |mut stream| async move {
1326    /// // 4
1327    /// # assert_eq!(stream.next().await.unwrap(), 4);
1328    /// # }));
1329    /// ```
1330    pub fn last(self) -> Optional<T, L, B> {
1331        self.reduce_idempotent(q!(|curr, new| *curr = new))
1332    }
1333}
1334
1335impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1336where
1337    L: Location<'a>,
1338{
1339    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1340    ///
1341    /// # Example
1342    /// ```rust
1343    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1344    /// # use futures::StreamExt;
1345    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1346    /// let tick = process.tick();
1347    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1348    /// numbers.enumerate()
1349    /// # }, |mut stream| async move {
1350    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1351    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1352    /// #     assert_eq!(stream.next().await.unwrap(), w);
1353    /// # }
1354    /// # }));
1355    /// ```
1356    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1357        Stream::new(
1358            self.location.clone(),
1359            HydroNode::Enumerate {
1360                input: Box::new(self.ir_node.into_inner()),
1361                metadata: self.location.new_node_metadata::<(usize, T)>(),
1362            },
1363        )
1364    }
1365
1366    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1367    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1368    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1369    ///
1370    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1371    /// to depend on the order of elements in the stream.
1372    ///
1373    /// # Example
1374    /// ```rust
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// let tick = process.tick();
1379    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1380    /// let batch = words.batch(&tick, nondet!(/** test */));
1381    /// batch
1382    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1383    ///     .all_ticks()
1384    /// # }, |mut stream| async move {
1385    /// // "HELLOWORLD"
1386    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1387    /// # }));
1388    /// ```
1389    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1390        self,
1391        init: impl IntoQuotedMut<'a, I, L>,
1392        comb: impl IntoQuotedMut<'a, F, L>,
1393    ) -> Singleton<A, L, B> {
1394        let init = init.splice_fn0_ctx(&self.location).into();
1395        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1396
1397        let core = HydroNode::Fold {
1398            init,
1399            acc: comb,
1400            input: Box::new(self.ir_node.into_inner()),
1401            metadata: self.location.new_node_metadata::<A>(),
1402        };
1403
1404        Singleton::new(self.location, core)
1405    }
1406
1407    /// Collects all the elements of this stream into a single [`Vec`] element.
1408    ///
1409    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1410    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1411    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1412    /// the vector at an arbitrary point in time.
1413    ///
1414    /// # Example
1415    /// ```rust
1416    /// # use hydro_lang::prelude::*;
1417    /// # use futures::StreamExt;
1418    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1419    /// let tick = process.tick();
1420    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1421    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1422    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1423    /// # }, |mut stream| async move {
1424    /// // [ vec![1, 2, 3, 4] ]
1425    /// # for w in vec![vec![1, 2, 3, 4]] {
1426    /// #     assert_eq!(stream.next().await.unwrap(), w);
1427    /// # }
1428    /// # }));
1429    /// ```
1430    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1431        self.fold(
1432            q!(|| vec![]),
1433            q!(|acc, v| {
1434                acc.push(v);
1435            }),
1436        )
1437    }
1438
1439    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1440    /// and emitting each intermediate result.
1441    ///
1442    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1443    /// containing all intermediate accumulated values. The scan operation can also terminate early
1444    /// by returning `None`.
1445    ///
1446    /// The function takes a mutable reference to the accumulator and the current element, and returns
1447    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1448    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1449    ///
1450    /// # Examples
1451    ///
1452    /// Basic usage - running sum:
1453    /// ```rust
1454    /// # use hydro_lang::prelude::*;
1455    /// # use futures::StreamExt;
1456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1457    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1458    ///     q!(|| 0),
1459    ///     q!(|acc, x| {
1460    ///         *acc += x;
1461    ///         Some(*acc)
1462    ///     }),
1463    /// )
1464    /// # }, |mut stream| async move {
1465    /// // Output: 1, 3, 6, 10
1466    /// # for w in vec![1, 3, 6, 10] {
1467    /// #     assert_eq!(stream.next().await.unwrap(), w);
1468    /// # }
1469    /// # }));
1470    /// ```
1471    ///
1472    /// Early termination example:
1473    /// ```rust
1474    /// # use hydro_lang::prelude::*;
1475    /// # use futures::StreamExt;
1476    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1477    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1478    ///     q!(|| 1),
1479    ///     q!(|state, x| {
1480    ///         *state = *state * x;
1481    ///         if *state > 6 {
1482    ///             None // Terminate the stream
1483    ///         } else {
1484    ///             Some(-*state)
1485    ///         }
1486    ///     }),
1487    /// )
1488    /// # }, |mut stream| async move {
1489    /// // Output: -1, -2, -6
1490    /// # for w in vec![-1, -2, -6] {
1491    /// #     assert_eq!(stream.next().await.unwrap(), w);
1492    /// # }
1493    /// # }));
1494    /// ```
1495    pub fn scan<A, U, I, F>(
1496        self,
1497        init: impl IntoQuotedMut<'a, I, L>,
1498        f: impl IntoQuotedMut<'a, F, L>,
1499    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1500    where
1501        I: Fn() -> A + 'a,
1502        F: Fn(&mut A, T) -> Option<U> + 'a,
1503    {
1504        let init = init.splice_fn0_ctx(&self.location).into();
1505        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1506
1507        Stream::new(
1508            self.location.clone(),
1509            HydroNode::Scan {
1510                init,
1511                acc: f,
1512                input: Box::new(self.ir_node.into_inner()),
1513                metadata: self.location.new_node_metadata::<U>(),
1514            },
1515        )
1516    }
1517
1518    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1519    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1520    /// until the first element in the input arrives.
1521    ///
1522    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1523    /// to depend on the order of elements in the stream.
1524    ///
1525    /// # Example
1526    /// ```rust
1527    /// # use hydro_lang::prelude::*;
1528    /// # use futures::StreamExt;
1529    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1530    /// let tick = process.tick();
1531    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1532    /// let batch = words.batch(&tick, nondet!(/** test */));
1533    /// batch
1534    ///     .map(q!(|x| x.to_string()))
1535    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1536    ///     .all_ticks()
1537    /// # }, |mut stream| async move {
1538    /// // "HELLOWORLD"
1539    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1540    /// # }));
1541    /// ```
1542    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1543        self,
1544        comb: impl IntoQuotedMut<'a, F, L>,
1545    ) -> Optional<T, L, B> {
1546        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1547        let core = HydroNode::Reduce {
1548            f,
1549            input: Box::new(self.ir_node.into_inner()),
1550            metadata: self.location.new_node_metadata::<T>(),
1551        };
1552
1553        Optional::new(self.location, core)
1554    }
1555}
1556
1557impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1558    /// Produces a new stream that interleaves the elements of the two input streams.
1559    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1560    ///
1561    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1562    /// [`Bounded`], you can use [`Stream::chain`] instead.
1563    ///
1564    /// # Example
1565    /// ```rust
1566    /// # use hydro_lang::prelude::*;
1567    /// # use futures::StreamExt;
1568    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1569    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1570    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1571    /// # }, |mut stream| async move {
1572    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1573    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1574    /// #     assert_eq!(stream.next().await.unwrap(), w);
1575    /// # }
1576    /// # }));
1577    /// ```
1578    pub fn interleave<O2: Ordering, R2: Retries>(
1579        self,
1580        other: Stream<T, L, Unbounded, O2, R2>,
1581    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1582    where
1583        R: MinRetries<R2>,
1584    {
1585        let tick = self.location.tick();
1586        // Because the outputs are unordered, we can interleave batches from both streams.
1587        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1588        self.batch(&tick, nondet_batch_interleaving)
1589            .weakest_ordering()
1590            .chain(
1591                other
1592                    .batch(&tick, nondet_batch_interleaving)
1593                    .weakest_ordering(),
1594            )
1595            .all_ticks()
1596    }
1597}
1598
1599impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1600where
1601    L: Location<'a>,
1602{
1603    /// Produces a new stream that emits the input elements in sorted order.
1604    ///
1605    /// The input stream can have any ordering guarantee, but the output stream
1606    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1607    /// elements in the input stream are available, so it requires the input stream
1608    /// to be [`Bounded`].
1609    ///
1610    /// # Example
1611    /// ```rust
1612    /// # use hydro_lang::prelude::*;
1613    /// # use futures::StreamExt;
1614    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1615    /// let tick = process.tick();
1616    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1617    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1618    /// batch.sort().all_ticks()
1619    /// # }, |mut stream| async move {
1620    /// // 1, 2, 3, 4
1621    /// # for w in (1..5) {
1622    /// #     assert_eq!(stream.next().await.unwrap(), w);
1623    /// # }
1624    /// # }));
1625    /// ```
1626    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1627    where
1628        T: Ord,
1629    {
1630        Stream::new(
1631            self.location.clone(),
1632            HydroNode::Sort {
1633                input: Box::new(self.ir_node.into_inner()),
1634                metadata: self.location.new_node_metadata::<T>(),
1635            },
1636        )
1637    }
1638
1639    /// Produces a new stream that first emits the elements of the `self` stream,
1640    /// and then emits the elements of the `other` stream. The output stream has
1641    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1642    /// [`TotalOrder`] guarantee.
1643    ///
1644    /// Currently, both input streams must be [`Bounded`]. This operator will block
1645    /// on the first stream until all its elements are available. In a future version,
1646    /// we will relax the requirement on the `other` stream.
1647    ///
1648    /// # Example
1649    /// ```rust
1650    /// # use hydro_lang::prelude::*;
1651    /// # use futures::StreamExt;
1652    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1653    /// let tick = process.tick();
1654    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1655    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1656    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1657    /// # }, |mut stream| async move {
1658    /// // 2, 3, 4, 5, 1, 2, 3, 4
1659    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1660    /// #     assert_eq!(stream.next().await.unwrap(), w);
1661    /// # }
1662    /// # }));
1663    /// ```
1664    pub fn chain<O2: Ordering, R2: Retries>(
1665        self,
1666        other: Stream<T, L, Bounded, O2, R2>,
1667    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1668    where
1669        O: MinOrder<O2>,
1670        R: MinRetries<R2>,
1671    {
1672        check_matching_location(&self.location, &other.location);
1673
1674        Stream::new(
1675            self.location.clone(),
1676            HydroNode::Chain {
1677                first: Box::new(self.ir_node.into_inner()),
1678                second: Box::new(other.ir_node.into_inner()),
1679                metadata: self.location.new_node_metadata::<T>(),
1680            },
1681        )
1682    }
1683
1684    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1685    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1686    /// because this is compiled into a nested loop.
1687    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1688        self,
1689        other: Stream<T2, L, Bounded, O2, R>,
1690    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1691    where
1692        T: Clone,
1693        T2: Clone,
1694    {
1695        check_matching_location(&self.location, &other.location);
1696
1697        Stream::new(
1698            self.location.clone(),
1699            HydroNode::CrossProduct {
1700                left: Box::new(self.ir_node.into_inner()),
1701                right: Box::new(other.ir_node.into_inner()),
1702                metadata: self.location.new_node_metadata::<(T, T2)>(),
1703            },
1704        )
1705    }
1706
1707    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1708    /// `self` used as the values for *each* key.
1709    ///
1710    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1711    /// values. For example, it can be used to send the same set of elements to several cluster
1712    /// members, if the membership information is available as a [`KeyedSingleton`].
1713    ///
1714    /// # Example
1715    /// ```rust
1716    /// # use hydro_lang::prelude::*;
1717    /// # use futures::StreamExt;
1718    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1719    /// # let tick = process.tick();
1720    /// let keyed_singleton = // { 1: (), 2: () }
1721    /// # process
1722    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1723    /// #     .into_keyed()
1724    /// #     .batch(&tick, nondet!(/** test */))
1725    /// #     .first();
1726    /// let stream = // [ "a", "b" ]
1727    /// # process
1728    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1729    /// #     .batch(&tick, nondet!(/** test */));
1730    /// stream.repeat_with_keys(keyed_singleton)
1731    /// # .entries().all_ticks()
1732    /// # }, |mut stream| async move {
1733    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1734    /// # let mut results = Vec::new();
1735    /// # for _ in 0..4 {
1736    /// #     results.push(stream.next().await.unwrap());
1737    /// # }
1738    /// # results.sort();
1739    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1740    /// # }));
1741    /// ```
1742    pub fn repeat_with_keys<K, V2>(
1743        self,
1744        keys: KeyedSingleton<K, V2, L, Bounded>,
1745    ) -> KeyedStream<K, T, L, Bounded, O, R>
1746    where
1747        K: Clone,
1748        T: Clone,
1749    {
1750        keys.keys().weaken_retries().cross_product_nested_loop(self).into_keyed().assume_ordering(
1751            nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
1752        )
1753    }
1754}
1755
1756impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1757where
1758    L: Location<'a>,
1759{
1760    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1761    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1762    /// by equi-joining the two streams on the key attribute `K`.
1763    ///
1764    /// # Example
1765    /// ```rust
1766    /// # use hydro_lang::prelude::*;
1767    /// # use std::collections::HashSet;
1768    /// # use futures::StreamExt;
1769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1770    /// let tick = process.tick();
1771    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1772    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1773    /// stream1.join(stream2)
1774    /// # }, |mut stream| async move {
1775    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1776    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1777    /// # stream.map(|i| assert!(expected.contains(&i)));
1778    /// # }));
1779    pub fn join<V2, O2: Ordering, R2: Retries>(
1780        self,
1781        n: Stream<(K, V2), L, B, O2, R2>,
1782    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1783    where
1784        K: Eq + Hash,
1785        R: MinRetries<R2>,
1786    {
1787        check_matching_location(&self.location, &n.location);
1788
1789        Stream::new(
1790            self.location.clone(),
1791            HydroNode::Join {
1792                left: Box::new(self.ir_node.into_inner()),
1793                right: Box::new(n.ir_node.into_inner()),
1794                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1795            },
1796        )
1797    }
1798
1799    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1800    /// computes the anti-join of the items in the input -- i.e. returns
1801    /// unique items in the first input that do not have a matching key
1802    /// in the second input.
1803    ///
1804    /// # Example
1805    /// ```rust
1806    /// # use hydro_lang::prelude::*;
1807    /// # use futures::StreamExt;
1808    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1809    /// let tick = process.tick();
1810    /// let stream = process
1811    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1812    ///   .batch(&tick, nondet!(/** test */));
1813    /// let batch = process
1814    ///   .source_iter(q!(vec![1, 2]))
1815    ///   .batch(&tick, nondet!(/** test */));
1816    /// stream.anti_join(batch).all_ticks()
1817    /// # }, |mut stream| async move {
1818    /// # for w in vec![(3, 'c'), (4, 'd')] {
1819    /// #     assert_eq!(stream.next().await.unwrap(), w);
1820    /// # }
1821    /// # }));
1822    pub fn anti_join<O2: Ordering, R2: Retries>(
1823        self,
1824        n: Stream<K, L, Bounded, O2, R2>,
1825    ) -> Stream<(K, V1), L, B, O, R>
1826    where
1827        K: Eq + Hash,
1828    {
1829        check_matching_location(&self.location, &n.location);
1830
1831        Stream::new(
1832            self.location.clone(),
1833            HydroNode::AntiJoin {
1834                pos: Box::new(self.ir_node.into_inner()),
1835                neg: Box::new(n.ir_node.into_inner()),
1836                metadata: self.location.new_node_metadata::<(K, V1)>(),
1837            },
1838        )
1839    }
1840}
1841
1842impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1843    Stream<(K, V), L, B, O, R>
1844{
1845    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1846    /// is used as the key and the second element is added to the entries associated with that key.
1847    ///
1848    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1849    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1850    /// performing grouped aggregations, but also for more precise ordering guarantees such as
1851    /// total ordering _within_ each group but no ordering _across_ groups.
1852    ///
1853    /// # Example
1854    /// ```rust
1855    /// # use hydro_lang::prelude::*;
1856    /// # use futures::StreamExt;
1857    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1858    /// process
1859    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1860    ///     .into_keyed()
1861    /// #   .entries()
1862    /// # }, |mut stream| async move {
1863    /// // { 1: [2, 3], 2: [4] }
1864    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1865    /// #     assert_eq!(stream.next().await.unwrap(), w);
1866    /// # }
1867    /// # }));
1868    /// ```
1869    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1870        KeyedStream {
1871            underlying: self.weakest_ordering(),
1872            _phantom_order: Default::default(),
1873        }
1874    }
1875}
1876
1877impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1878where
1879    K: Eq + Hash,
1880    L: Location<'a>,
1881{
1882    #[deprecated = "use .into_keyed().fold(...) instead"]
1883    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1884    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1885    /// in the second element are accumulated via the `comb` closure.
1886    ///
1887    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1888    /// to depend on the order of elements in the stream.
1889    ///
1890    /// If the input and output value types are the same and do not require initialization then use
1891    /// [`Stream::reduce_keyed`].
1892    ///
1893    /// # Example
1894    /// ```rust
1895    /// # use hydro_lang::prelude::*;
1896    /// # use futures::StreamExt;
1897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1898    /// let tick = process.tick();
1899    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1900    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1901    /// batch
1902    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1903    ///     .all_ticks()
1904    /// # }, |mut stream| async move {
1905    /// // (1, 5), (2, 7)
1906    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1907    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1908    /// # }));
1909    /// ```
1910    pub fn fold_keyed<A, I, F>(
1911        self,
1912        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1913        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1914    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1915    where
1916        I: Fn() -> A + 'a,
1917        F: Fn(&mut A, V) + 'a,
1918    {
1919        self.into_keyed().fold(init, comb).entries()
1920    }
1921
1922    #[deprecated = "use .into_keyed().reduce(...) instead"]
1923    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1924    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1925    /// in the second element are accumulated via the `comb` closure.
1926    ///
1927    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1928    /// to depend on the order of elements in the stream.
1929    ///
1930    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1931    ///
1932    /// # Example
1933    /// ```rust
1934    /// # use hydro_lang::prelude::*;
1935    /// # use futures::StreamExt;
1936    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1937    /// let tick = process.tick();
1938    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1939    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1940    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1941    /// # }, |mut stream| async move {
1942    /// // (1, 5), (2, 7)
1943    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1944    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1945    /// # }));
1946    /// ```
1947    pub fn reduce_keyed<F>(
1948        self,
1949        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1950    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1951    where
1952        F: Fn(&mut V, V) + 'a,
1953    {
1954        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1955
1956        Stream::new(
1957            self.location.clone(),
1958            HydroNode::ReduceKeyed {
1959                f,
1960                input: Box::new(self.ir_node.into_inner()),
1961                metadata: self.location.new_node_metadata::<(K, V)>(),
1962            },
1963        )
1964    }
1965}
1966
1967impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
1968where
1969    K: Eq + Hash,
1970    L: Location<'a>,
1971{
1972    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
1973    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1974    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1975    /// in the second element are accumulated via the `comb` closure.
1976    ///
1977    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1978    /// as there may be non-deterministic duplicates.
1979    ///
1980    /// If the input and output value types are the same and do not require initialization then use
1981    /// [`Stream::reduce_keyed_commutative_idempotent`].
1982    ///
1983    /// # Example
1984    /// ```rust
1985    /// # use hydro_lang::prelude::*;
1986    /// # use futures::StreamExt;
1987    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1988    /// let tick = process.tick();
1989    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
1990    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1991    /// batch
1992    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1993    ///     .all_ticks()
1994    /// # }, |mut stream| async move {
1995    /// // (1, false), (2, true)
1996    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1997    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1998    /// # }));
1999    /// ```
2000    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2001        self,
2002        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2003        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2004    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2005    where
2006        I: Fn() -> A + 'a,
2007        F: Fn(&mut A, V) + 'a,
2008    {
2009        self.into_keyed()
2010            .fold_commutative_idempotent(init, comb)
2011            .entries()
2012    }
2013
2014    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2015    /// # Example
2016    /// ```rust
2017    /// # use hydro_lang::prelude::*;
2018    /// # use futures::StreamExt;
2019    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2020    /// let tick = process.tick();
2021    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2022    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2023    /// batch.keys().all_ticks()
2024    /// # }, |mut stream| async move {
2025    /// // 1, 2
2026    /// # assert_eq!(stream.next().await.unwrap(), 1);
2027    /// # assert_eq!(stream.next().await.unwrap(), 2);
2028    /// # }));
2029    /// ```
2030    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2031        self.into_keyed()
2032            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2033            .keys()
2034    }
2035
2036    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2037    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2038    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2039    /// in the second element are accumulated via the `comb` closure.
2040    ///
2041    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2042    /// as there may be non-deterministic duplicates.
2043    ///
2044    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2045    ///
2046    /// # Example
2047    /// ```rust
2048    /// # use hydro_lang::prelude::*;
2049    /// # use futures::StreamExt;
2050    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2051    /// let tick = process.tick();
2052    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2053    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2054    /// batch
2055    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2056    ///     .all_ticks()
2057    /// # }, |mut stream| async move {
2058    /// // (1, false), (2, true)
2059    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2060    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2061    /// # }));
2062    /// ```
2063    pub fn reduce_keyed_commutative_idempotent<F>(
2064        self,
2065        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2066    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2067    where
2068        F: Fn(&mut V, V) + 'a,
2069    {
2070        self.into_keyed()
2071            .reduce_commutative_idempotent(comb)
2072            .entries()
2073    }
2074}
2075
2076impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2077where
2078    K: Eq + Hash,
2079    L: Location<'a>,
2080{
2081    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2082    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2083    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2084    /// in the second element are accumulated via the `comb` closure.
2085    ///
2086    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2087    ///
2088    /// If the input and output value types are the same and do not require initialization then use
2089    /// [`Stream::reduce_keyed_commutative`].
2090    ///
2091    /// # Example
2092    /// ```rust
2093    /// # use hydro_lang::prelude::*;
2094    /// # use futures::StreamExt;
2095    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2096    /// let tick = process.tick();
2097    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2098    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2099    /// batch
2100    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2101    ///     .all_ticks()
2102    /// # }, |mut stream| async move {
2103    /// // (1, 5), (2, 7)
2104    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2105    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2106    /// # }));
2107    /// ```
2108    pub fn fold_keyed_commutative<A, I, F>(
2109        self,
2110        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2111        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2112    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2113    where
2114        I: Fn() -> A + 'a,
2115        F: Fn(&mut A, V) + 'a,
2116    {
2117        self.into_keyed().fold_commutative(init, comb).entries()
2118    }
2119
2120    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2121    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2122    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2123    /// in the second element are accumulated via the `comb` closure.
2124    ///
2125    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2126    ///
2127    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2128    ///
2129    /// # Example
2130    /// ```rust
2131    /// # use hydro_lang::prelude::*;
2132    /// # use futures::StreamExt;
2133    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2134    /// let tick = process.tick();
2135    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2136    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2137    /// batch
2138    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2139    ///     .all_ticks()
2140    /// # }, |mut stream| async move {
2141    /// // (1, 5), (2, 7)
2142    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2143    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2144    /// # }));
2145    /// ```
2146    pub fn reduce_keyed_commutative<F>(
2147        self,
2148        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2149    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2150    where
2151        F: Fn(&mut V, V) + 'a,
2152    {
2153        self.into_keyed().reduce_commutative(comb).entries()
2154    }
2155}
2156
2157impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2158where
2159    K: Eq + Hash,
2160    L: Location<'a>,
2161{
2162    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2163    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2164    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2165    /// in the second element are accumulated via the `comb` closure.
2166    ///
2167    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2168    ///
2169    /// If the input and output value types are the same and do not require initialization then use
2170    /// [`Stream::reduce_keyed_idempotent`].
2171    ///
2172    /// # Example
2173    /// ```rust
2174    /// # use hydro_lang::prelude::*;
2175    /// # use futures::StreamExt;
2176    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2177    /// let tick = process.tick();
2178    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2179    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2180    /// batch
2181    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2182    ///     .all_ticks()
2183    /// # }, |mut stream| async move {
2184    /// // (1, false), (2, true)
2185    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2186    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2187    /// # }));
2188    /// ```
2189    pub fn fold_keyed_idempotent<A, I, F>(
2190        self,
2191        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2192        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2193    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2194    where
2195        I: Fn() -> A + 'a,
2196        F: Fn(&mut A, V) + 'a,
2197    {
2198        self.into_keyed().fold_idempotent(init, comb).entries()
2199    }
2200
2201    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2202    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2203    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2204    /// in the second element are accumulated via the `comb` closure.
2205    ///
2206    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2207    ///
2208    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2209    ///
2210    /// # Example
2211    /// ```rust
2212    /// # use hydro_lang::prelude::*;
2213    /// # use futures::StreamExt;
2214    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2215    /// let tick = process.tick();
2216    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2217    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2218    /// batch
2219    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2220    ///     .all_ticks()
2221    /// # }, |mut stream| async move {
2222    /// // (1, false), (2, true)
2223    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2224    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2225    /// # }));
2226    /// ```
2227    pub fn reduce_keyed_idempotent<F>(
2228        self,
2229        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2230    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2231    where
2232        F: Fn(&mut V, V) + 'a,
2233    {
2234        self.into_keyed().reduce_idempotent(comb).entries()
2235    }
2236}
2237
2238impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2239where
2240    L: Location<'a> + NoTick,
2241{
2242    /// Returns a stream corresponding to the latest batch of elements being atomically
2243    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2244    /// the order of the input.
2245    ///
2246    /// # Non-Determinism
2247    /// The batch boundaries are non-deterministic and may change across executions.
2248    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2249        Stream::new(
2250            self.location.clone().tick,
2251            HydroNode::Batch {
2252                inner: Box::new(self.ir_node.into_inner()),
2253                metadata: self.location.tick.new_node_metadata::<T>(),
2254            },
2255        )
2256    }
2257
2258    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2259    /// See [`Stream::atomic`] for more details.
2260    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2261        Stream::new(
2262            self.location.tick.l.clone(),
2263            HydroNode::EndAtomic {
2264                inner: Box::new(self.ir_node.into_inner()),
2265                metadata: self.location.tick.l.new_node_metadata::<T>(),
2266            },
2267        )
2268    }
2269
2270    /// Gets the [`Tick`] inside which this stream is synchronously processed. See [`Stream::atomic`].
2271    pub fn atomic_source(&self) -> Tick<L> {
2272        self.location.tick.clone()
2273    }
2274}
2275
2276impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2277where
2278    L: Location<'a>,
2279{
2280    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2281    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2282    ///
2283    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2284    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2285    /// argument that declares where the stream will be atomically processed. Batching a stream into
2286    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2287    /// [`Tick`] will introduce asynchrony.
2288    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2289        let out_location = Atomic { tick: tick.clone() };
2290        Stream::new(
2291            out_location.clone(),
2292            HydroNode::BeginAtomic {
2293                inner: Box::new(self.ir_node.into_inner()),
2294                metadata: out_location.new_node_metadata::<T>(),
2295            },
2296        )
2297    }
2298
2299    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2300    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2301    /// the order of the input. The output stream will execute in the [`Tick`] that was
2302    /// used to create the atomic section.
2303    ///
2304    /// # Non-Determinism
2305    /// The batch boundaries are non-deterministic and may change across executions.
2306    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2307        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2308        Stream::new(
2309            tick.clone(),
2310            HydroNode::Batch {
2311                inner: Box::new(self.ir_node.into_inner()),
2312                metadata: tick.new_node_metadata::<T>(),
2313            },
2314        )
2315    }
2316
2317    /// Given a time interval, returns a stream corresponding to samples taken from the
2318    /// stream roughly at that interval. The output will have elements in the same order
2319    /// as the input, but with arbitrary elements skipped between samples. There is also
2320    /// no guarantee on the exact timing of the samples.
2321    ///
2322    /// # Non-Determinism
2323    /// The output stream is non-deterministic in which elements are sampled, since this
2324    /// is controlled by a clock.
2325    pub fn sample_every(
2326        self,
2327        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2328        nondet: NonDet,
2329    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2330    where
2331        L: NoTick + NoAtomic,
2332    {
2333        let samples = self.location.source_interval(interval, nondet);
2334
2335        let tick = self.location.tick();
2336        self.batch(&tick, nondet)
2337            .filter_if_some(samples.batch(&tick, nondet).first())
2338            .all_ticks()
2339            .weakest_retries()
2340    }
2341
2342    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2343    /// stream has not emitted a value since that duration.
2344    ///
2345    /// # Non-Determinism
2346    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2347    /// samples take place, timeouts may be non-deterministically generated or missed,
2348    /// and the notification of the timeout may be delayed as well. There is also no
2349    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2350    /// detected based on when the next sample is taken.
2351    pub fn timeout(
2352        self,
2353        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2354        nondet: NonDet,
2355    ) -> Optional<(), L, Unbounded>
2356    where
2357        L: NoTick + NoAtomic,
2358    {
2359        let tick = self.location.tick();
2360
2361        let latest_received = self.assume_retries(nondet).fold_commutative(
2362            q!(|| None),
2363            q!(|latest, _| {
2364                *latest = Some(Instant::now());
2365            }),
2366        );
2367
2368        latest_received
2369            .snapshot(&tick, nondet)
2370            .filter_map(q!(move |latest_received| {
2371                if let Some(latest_received) = latest_received {
2372                    if Instant::now().duration_since(latest_received) > duration {
2373                        Some(())
2374                    } else {
2375                        None
2376                    }
2377                } else {
2378                    Some(())
2379                }
2380            }))
2381            .latest()
2382    }
2383}
2384
2385impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2386where
2387    L: Location<'a> + NoTick + NoAtomic,
2388    F: Future<Output = T>,
2389{
2390    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2391    /// Future outputs are produced as available, regardless of input arrival order.
2392    ///
2393    /// # Example
2394    /// ```rust
2395    /// # use std::collections::HashSet;
2396    /// # use futures::StreamExt;
2397    /// # use hydro_lang::prelude::*;
2398    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2399    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2400    ///     .map(q!(|x| async move {
2401    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2402    ///         x
2403    ///     }))
2404    ///     .resolve_futures()
2405    /// #   },
2406    /// #   |mut stream| async move {
2407    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2408    /// #       let mut output = HashSet::new();
2409    /// #       for _ in 1..10 {
2410    /// #           output.insert(stream.next().await.unwrap());
2411    /// #       }
2412    /// #       assert_eq!(
2413    /// #           output,
2414    /// #           HashSet::<i32>::from_iter(1..10)
2415    /// #       );
2416    /// #   },
2417    /// # ));
2418    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2419        Stream::new(
2420            self.location.clone(),
2421            HydroNode::ResolveFutures {
2422                input: Box::new(self.ir_node.into_inner()),
2423                metadata: self.location.new_node_metadata::<T>(),
2424            },
2425        )
2426    }
2427
2428    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2429    /// Future outputs are produced in the same order as the input stream.
2430    ///
2431    /// # Example
2432    /// ```rust
2433    /// # use std::collections::HashSet;
2434    /// # use futures::StreamExt;
2435    /// # use hydro_lang::prelude::*;
2436    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2437    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2438    ///     .map(q!(|x| async move {
2439    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2440    ///         x
2441    ///     }))
2442    ///     .resolve_futures_ordered()
2443    /// #   },
2444    /// #   |mut stream| async move {
2445    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2446    /// #       let mut output = Vec::new();
2447    /// #       for _ in 1..10 {
2448    /// #           output.push(stream.next().await.unwrap());
2449    /// #       }
2450    /// #       assert_eq!(
2451    /// #           output,
2452    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2453    /// #       );
2454    /// #   },
2455    /// # ));
2456    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2457        Stream::new(
2458            self.location.clone(),
2459            HydroNode::ResolveFuturesOrdered {
2460                input: Box::new(self.ir_node.into_inner()),
2461                metadata: self.location.new_node_metadata::<T>(),
2462            },
2463        )
2464    }
2465}
2466
2467impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2468where
2469    L: Location<'a> + NoTick,
2470{
2471    /// Executes the provided closure for every element in this stream.
2472    ///
2473    /// Because the closure may have side effects, the stream must have deterministic order
2474    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2475    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2476    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2477    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2478        let f = f.splice_fn1_ctx(&self.location).into();
2479        self.location
2480            .flow_state()
2481            .borrow_mut()
2482            .push_root(HydroRoot::ForEach {
2483                input: Box::new(self.ir_node.into_inner()),
2484                f,
2485                op_metadata: HydroIrOpMetadata::new(),
2486            });
2487    }
2488
2489    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2490    /// TCP socket to some other server. You should _not_ use this API for interacting with
2491    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2492    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2493    /// interaction with asynchronous sinks.
2494    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2495    where
2496        S: 'a + futures::Sink<T> + Unpin,
2497    {
2498        self.location
2499            .flow_state()
2500            .borrow_mut()
2501            .push_root(HydroRoot::DestSink {
2502                sink: sink.splice_typed_ctx(&self.location).into(),
2503                input: Box::new(self.ir_node.into_inner()),
2504                op_metadata: HydroIrOpMetadata::new(),
2505            });
2506    }
2507}
2508
2509impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2510where
2511    L: Location<'a>,
2512{
2513    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2514    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2515    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2516        Stream::new(
2517            self.location.outer().clone(),
2518            HydroNode::YieldConcat {
2519                inner: Box::new(self.ir_node.into_inner()),
2520                metadata: self.location.outer().new_node_metadata::<T>(),
2521            },
2522        )
2523    }
2524
2525    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2526    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2527    ///
2528    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2529    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2530    /// stream's [`Tick`] context.
2531    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2532        let out_location = Atomic {
2533            tick: self.location.clone(),
2534        };
2535
2536        Stream::new(
2537            out_location.clone(),
2538            HydroNode::YieldConcat {
2539                inner: Box::new(self.ir_node.into_inner()),
2540                metadata: out_location.new_node_metadata::<T>(),
2541            },
2542        )
2543    }
2544
2545    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2546    ///
2547    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2548    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2549    /// careful when using this operator, as its memory usage will grow linearly over time since it
2550    /// must store its inputs indefinitely.
2551    ///
2552    /// # Example
2553    /// ```rust
2554    /// # use hydro_lang::prelude::*;
2555    /// # use futures::StreamExt;
2556    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2557    /// let tick = process.tick();
2558    /// // ticks are lazy by default, forces the second tick to run
2559    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2560    ///
2561    /// let batch_first_tick = process
2562    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2563    ///   .batch(&tick, nondet!(/** test */));
2564    /// let batch_second_tick = process
2565    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2566    ///   .batch(&tick, nondet!(/** test */))
2567    ///   .defer_tick(); // appears on the second tick
2568    /// batch_first_tick.chain(batch_second_tick)
2569    ///   .persist()
2570    ///   .all_ticks()
2571    /// # }, |mut stream| async move {
2572    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2573    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2574    /// #     assert_eq!(stream.next().await.unwrap(), w);
2575    /// # }
2576    /// # }));
2577    /// ```
2578    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2579    where
2580        T: Clone,
2581    {
2582        Stream::new(
2583            self.location.clone(),
2584            HydroNode::Persist {
2585                inner: Box::new(self.ir_node.into_inner()),
2586                metadata: self.location.new_node_metadata::<T>(),
2587            },
2588        )
2589    }
2590
2591    #[expect(missing_docs, reason = "TODO")]
2592    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2593        Stream::new(
2594            self.location.clone(),
2595            HydroNode::DeferTick {
2596                input: Box::new(self.ir_node.into_inner()),
2597                metadata: self.location.new_node_metadata::<T>(),
2598            },
2599        )
2600    }
2601}
2602
2603#[cfg(test)]
2604mod tests {
2605    use futures::{SinkExt, StreamExt};
2606    use hydro_deploy::Deployment;
2607    use serde::{Deserialize, Serialize};
2608    use stageleft::q;
2609
2610    use crate::compile::builder::FlowBuilder;
2611    use crate::location::Location;
2612    use crate::nondet::nondet;
2613
2614    mod backtrace_chained_ops;
2615
2616    struct P1 {}
2617    struct P2 {}
2618
2619    #[derive(Serialize, Deserialize, Debug)]
2620    struct SendOverNetwork {
2621        n: u32,
2622    }
2623
2624    #[tokio::test]
2625    async fn first_ten_distributed() {
2626        let mut deployment = Deployment::new();
2627
2628        let flow = FlowBuilder::new();
2629        let first_node = flow.process::<P1>();
2630        let second_node = flow.process::<P2>();
2631        let external = flow.external::<P2>();
2632
2633        let numbers = first_node.source_iter(q!(0..10));
2634        let out_port = numbers
2635            .map(q!(|n| SendOverNetwork { n }))
2636            .send_bincode(&second_node)
2637            .send_bincode_external(&external);
2638
2639        let nodes = flow
2640            .with_process(&first_node, deployment.Localhost())
2641            .with_process(&second_node, deployment.Localhost())
2642            .with_external(&external, deployment.Localhost())
2643            .deploy(&mut deployment);
2644
2645        deployment.deploy().await.unwrap();
2646
2647        let mut external_out = nodes.connect_source_bincode(out_port).await;
2648
2649        deployment.start().await.unwrap();
2650
2651        for i in 0..10 {
2652            assert_eq!(external_out.next().await.unwrap().n, i);
2653        }
2654    }
2655
2656    #[tokio::test]
2657    async fn first_cardinality() {
2658        let mut deployment = Deployment::new();
2659
2660        let flow = FlowBuilder::new();
2661        let node = flow.process::<()>();
2662        let external = flow.external::<()>();
2663
2664        let node_tick = node.tick();
2665        let count = node_tick
2666            .singleton(q!([1, 2, 3]))
2667            .into_stream()
2668            .flatten_ordered()
2669            .first()
2670            .into_stream()
2671            .count()
2672            .all_ticks()
2673            .send_bincode_external(&external);
2674
2675        let nodes = flow
2676            .with_process(&node, deployment.Localhost())
2677            .with_external(&external, deployment.Localhost())
2678            .deploy(&mut deployment);
2679
2680        deployment.deploy().await.unwrap();
2681
2682        let mut external_out = nodes.connect_source_bincode(count).await;
2683
2684        deployment.start().await.unwrap();
2685
2686        assert_eq!(external_out.next().await.unwrap(), 1);
2687    }
2688
2689    #[tokio::test]
2690    async fn unbounded_reduce_remembers_state() {
2691        let mut deployment = Deployment::new();
2692
2693        let flow = FlowBuilder::new();
2694        let node = flow.process::<()>();
2695        let external = flow.external::<()>();
2696
2697        let (input_port, input) = node.source_external_bincode(&external);
2698        let out = input
2699            .reduce(q!(|acc, v| *acc += v))
2700            .sample_eager(nondet!(/** test */))
2701            .send_bincode_external(&external);
2702
2703        let nodes = flow
2704            .with_process(&node, deployment.Localhost())
2705            .with_external(&external, deployment.Localhost())
2706            .deploy(&mut deployment);
2707
2708        deployment.deploy().await.unwrap();
2709
2710        let mut external_in = nodes.connect_sink_bincode(input_port).await;
2711        let mut external_out = nodes.connect_source_bincode(out).await;
2712
2713        deployment.start().await.unwrap();
2714
2715        external_in.send(1).await.unwrap();
2716        assert_eq!(external_out.next().await.unwrap(), 1);
2717
2718        external_in.send(2).await.unwrap();
2719        assert_eq!(external_out.next().await.unwrap(), 3);
2720    }
2721
2722    #[tokio::test]
2723    async fn atomic_fold_replays_each_tick() {
2724        let mut deployment = Deployment::new();
2725
2726        let flow = FlowBuilder::new();
2727        let node = flow.process::<()>();
2728        let external = flow.external::<()>();
2729
2730        let (input_port, input) = node.source_external_bincode(&external);
2731        let tick = node.tick();
2732
2733        let out = input
2734            .batch(&tick, nondet!(/** test */))
2735            .cross_singleton(
2736                node.source_iter(q!(vec![1, 2, 3]))
2737                    .atomic(&tick)
2738                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2739                    .snapshot_atomic(nondet!(/** test */)),
2740            )
2741            .all_ticks()
2742            .send_bincode_external(&external);
2743
2744        let nodes = flow
2745            .with_process(&node, deployment.Localhost())
2746            .with_external(&external, deployment.Localhost())
2747            .deploy(&mut deployment);
2748
2749        deployment.deploy().await.unwrap();
2750
2751        let mut external_in = nodes.connect_sink_bincode(input_port).await;
2752        let mut external_out = nodes.connect_source_bincode(out).await;
2753
2754        deployment.start().await.unwrap();
2755
2756        external_in.send(1).await.unwrap();
2757        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2758
2759        external_in.send(2).await.unwrap();
2760        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2761    }
2762
2763    #[tokio::test]
2764    async fn unbounded_scan_remembers_state() {
2765        let mut deployment = Deployment::new();
2766
2767        let flow = FlowBuilder::new();
2768        let node = flow.process::<()>();
2769        let external = flow.external::<()>();
2770
2771        let (input_port, input) = node.source_external_bincode(&external);
2772        let out = input
2773            .scan(
2774                q!(|| 0),
2775                q!(|acc, v| {
2776                    *acc += v;
2777                    Some(*acc)
2778                }),
2779            )
2780            .send_bincode_external(&external);
2781
2782        let nodes = flow
2783            .with_process(&node, deployment.Localhost())
2784            .with_external(&external, deployment.Localhost())
2785            .deploy(&mut deployment);
2786
2787        deployment.deploy().await.unwrap();
2788
2789        let mut external_in = nodes.connect_sink_bincode(input_port).await;
2790        let mut external_out = nodes.connect_source_bincode(out).await;
2791
2792        deployment.start().await.unwrap();
2793
2794        external_in.send(1).await.unwrap();
2795        assert_eq!(external_out.next().await.unwrap(), 1);
2796
2797        external_in.send(2).await.unwrap();
2798        assert_eq!(external_out.next().await.unwrap(), 3);
2799    }
2800
2801    #[tokio::test]
2802    async fn unbounded_enumerate_remembers_state() {
2803        let mut deployment = Deployment::new();
2804
2805        let flow = FlowBuilder::new();
2806        let node = flow.process::<()>();
2807        let external = flow.external::<()>();
2808
2809        let (input_port, input) = node.source_external_bincode(&external);
2810        let out = input.enumerate().send_bincode_external(&external);
2811
2812        let nodes = flow
2813            .with_process(&node, deployment.Localhost())
2814            .with_external(&external, deployment.Localhost())
2815            .deploy(&mut deployment);
2816
2817        deployment.deploy().await.unwrap();
2818
2819        let mut external_in = nodes.connect_sink_bincode(input_port).await;
2820        let mut external_out = nodes.connect_source_bincode(out).await;
2821
2822        deployment.start().await.unwrap();
2823
2824        external_in.send(1).await.unwrap();
2825        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2826
2827        external_in.send(2).await.unwrap();
2828        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2829    }
2830}