hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, DeferTick, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46    L: Location<'a>,
47{
48    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49        Singleton::new(singleton.location, singleton.ir_node.into_inner())
50    }
51}
52
53impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    fn defer_tick(self) -> Self {
58        Singleton::defer_tick(self)
59    }
60}
61
62impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
63where
64    L: Location<'a>,
65{
66    type Location = Tick<L>;
67
68    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
69        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
70            location.clone(),
71            HydroNode::DeferTick {
72                input: Box::new(HydroNode::CycleSource {
73                    ident,
74                    metadata: location.new_node_metadata::<T>(),
75                }),
76                metadata: location.new_node_metadata::<T>(),
77            },
78        );
79
80        from_previous_tick.unwrap_or(initial)
81    }
82}
83
84impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
85where
86    L: Location<'a>,
87{
88    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
89        assert_eq!(
90            Location::id(&self.location),
91            expected_location,
92            "locations do not match"
93        );
94        self.location
95            .flow_state()
96            .borrow_mut()
97            .push_root(HydroRoot::CycleSink {
98                ident,
99                input: Box::new(self.ir_node.into_inner()),
100                out_location: Location::id(&self.location),
101                op_metadata: HydroIrOpMetadata::new(),
102            });
103    }
104}
105
106impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
107where
108    L: Location<'a>,
109{
110    type Location = Tick<L>;
111
112    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
113        Singleton::new(
114            location.clone(),
115            HydroNode::CycleSource {
116                ident,
117                metadata: location.new_node_metadata::<T>(),
118            },
119        )
120    }
121}
122
123impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
124where
125    L: Location<'a>,
126{
127    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
128        assert_eq!(
129            Location::id(&self.location),
130            expected_location,
131            "locations do not match"
132        );
133        self.location
134            .flow_state()
135            .borrow_mut()
136            .push_root(HydroRoot::CycleSink {
137                ident,
138                input: Box::new(self.ir_node.into_inner()),
139                out_location: Location::id(&self.location),
140                op_metadata: HydroIrOpMetadata::new(),
141            });
142    }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
146where
147    L: Location<'a> + NoTick,
148{
149    type Location = L;
150
151    fn create_source(ident: syn::Ident, location: L) -> Self {
152        Singleton::new(
153            location.clone(),
154            HydroNode::CycleSource {
155                ident,
156                metadata: location.new_node_metadata::<T>(),
157            },
158        )
159    }
160}
161
162impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
163where
164    L: Location<'a> + NoTick,
165{
166    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
167        assert_eq!(
168            Location::id(&self.location),
169            expected_location,
170            "locations do not match"
171        );
172        self.location
173            .flow_state()
174            .borrow_mut()
175            .push_root(HydroRoot::CycleSink {
176                ident,
177                input: Box::new(self.ir_node.into_inner()),
178                out_location: Location::id(&self.location),
179                op_metadata: HydroIrOpMetadata::new(),
180            });
181    }
182}
183
184impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
185where
186    T: Clone,
187    L: Location<'a>,
188{
189    fn clone(&self) -> Self {
190        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192            *self.ir_node.borrow_mut() = HydroNode::Tee {
193                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194                metadata: self.location.new_node_metadata::<T>(),
195            };
196        }
197
198        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199            Singleton {
200                location: self.location.clone(),
201                ir_node: HydroNode::Tee {
202                    inner: TeeNode(inner.0.clone()),
203                    metadata: metadata.clone(),
204                }
205                .into(),
206                _phantom: PhantomData,
207            }
208        } else {
209            unreachable!()
210        }
211    }
212}
213
214#[cfg(stageleft_runtime)]
215fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
216    me: Singleton<T, Tick<L>, B>,
217    other: O,
218) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
219where
220    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
221{
222    check_matching_location(
223        &me.location,
224        &Singleton::<T, Tick<L>, B>::other_location(&other),
225    );
226
227    Singleton::<T, Tick<L>, B>::make(
228        me.location.clone(),
229        HydroNode::CrossSingleton {
230            left: Box::new(me.ir_node.into_inner()),
231            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
232            metadata: me
233                .location
234                .new_node_metadata::<<Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType>(),
235        },
236    )
237}
238
239impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
240where
241    L: Location<'a>,
242{
243    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
244        debug_assert_eq!(Location::id(&location), ir_node.metadata().location_kind);
245        Singleton {
246            location,
247            ir_node: RefCell::new(ir_node),
248            _phantom: PhantomData,
249        }
250    }
251
252    /// Transforms the singleton value by applying a function `f` to it,
253    /// continuously as the input is updated.
254    ///
255    /// # Example
256    /// ```rust
257    /// # use hydro_lang::prelude::*;
258    /// # use futures::StreamExt;
259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
260    /// let tick = process.tick();
261    /// let singleton = tick.singleton(q!(5));
262    /// singleton.map(q!(|v| v * 2)).all_ticks()
263    /// # }, |mut stream| async move {
264    /// // 10
265    /// # assert_eq!(stream.next().await.unwrap(), 10);
266    /// # }));
267    /// ```
268    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
269    where
270        F: Fn(T) -> U + 'a,
271    {
272        let f = f.splice_fn1_ctx(&self.location).into();
273        Singleton::new(
274            self.location.clone(),
275            HydroNode::Map {
276                f,
277                input: Box::new(self.ir_node.into_inner()),
278                metadata: self.location.new_node_metadata::<U>(),
279            },
280        )
281    }
282
283    /// Transforms the singleton value by applying a function `f` to it and then flattening
284    /// the result into a stream, preserving the order of elements.
285    ///
286    /// The function `f` is applied to the singleton value to produce an iterator, and all items
287    /// from that iterator are emitted in the output stream in deterministic order.
288    ///
289    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
290    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
291    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
292    ///
293    /// # Example
294    /// ```rust
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298    /// let tick = process.tick();
299    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
300    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
301    /// # }, |mut stream| async move {
302    /// // 1, 2, 3
303    /// # for w in vec![1, 2, 3] {
304    /// #     assert_eq!(stream.next().await.unwrap(), w);
305    /// # }
306    /// # }));
307    /// ```
308    pub fn flat_map_ordered<U, I, F>(
309        self,
310        f: impl IntoQuotedMut<'a, F, L>,
311    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
312    where
313        I: IntoIterator<Item = U>,
314        F: Fn(T) -> I + 'a,
315    {
316        let f = f.splice_fn1_ctx(&self.location).into();
317        Stream::new(
318            self.location.clone(),
319            HydroNode::FlatMap {
320                f,
321                input: Box::new(self.ir_node.into_inner()),
322                metadata: self.location.new_node_metadata::<U>(),
323            },
324        )
325    }
326
327    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
328    /// for the output type `I` to produce items in any order.
329    ///
330    /// The function `f` is applied to the singleton value to produce an iterator, and all items
331    /// from that iterator are emitted in the output stream in non-deterministic order.
332    ///
333    /// # Example
334    /// ```rust
335    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
336    /// # use futures::StreamExt;
337    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
338    /// let tick = process.tick();
339    /// let singleton = tick.singleton(q!(
340    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
341    /// ));
342    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
343    /// # }, |mut stream| async move {
344    /// // 1, 2, 3, but in no particular order
345    /// # let mut results = Vec::new();
346    /// # for _ in 0..3 {
347    /// #     results.push(stream.next().await.unwrap());
348    /// # }
349    /// # results.sort();
350    /// # assert_eq!(results, vec![1, 2, 3]);
351    /// # }));
352    /// ```
353    pub fn flat_map_unordered<U, I, F>(
354        self,
355        f: impl IntoQuotedMut<'a, F, L>,
356    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
357    where
358        I: IntoIterator<Item = U>,
359        F: Fn(T) -> I + 'a,
360    {
361        let f = f.splice_fn1_ctx(&self.location).into();
362        Stream::new(
363            self.location.clone(),
364            HydroNode::FlatMap {
365                f,
366                input: Box::new(self.ir_node.into_inner()),
367                metadata: self.location.new_node_metadata::<U>(),
368            },
369        )
370    }
371
372    /// Flattens the singleton value into a stream, preserving the order of elements.
373    ///
374    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
375    /// are emitted in the output stream in deterministic order.
376    ///
377    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
378    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
379    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
380    ///
381    /// # Example
382    /// ```rust
383    /// # use hydro_lang::prelude::*;
384    /// # use futures::StreamExt;
385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
386    /// let tick = process.tick();
387    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
388    /// singleton.flatten_ordered().all_ticks()
389    /// # }, |mut stream| async move {
390    /// // 1, 2, 3
391    /// # for w in vec![1, 2, 3] {
392    /// #     assert_eq!(stream.next().await.unwrap(), w);
393    /// # }
394    /// # }));
395    /// ```
396    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
397    where
398        T: IntoIterator<Item = U>,
399    {
400        self.flat_map_ordered(q!(|x| x))
401    }
402
403    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
404    /// for the element type `T` to produce items in any order.
405    ///
406    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
407    /// are emitted in the output stream in non-deterministic order.
408    ///
409    /// # Example
410    /// ```rust
411    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
412    /// # use futures::StreamExt;
413    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
414    /// let tick = process.tick();
415    /// let singleton = tick.singleton(q!(
416    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
417    /// ));
418    /// singleton.flatten_unordered().all_ticks()
419    /// # }, |mut stream| async move {
420    /// // 1, 2, 3, but in no particular order
421    /// # let mut results = Vec::new();
422    /// # for _ in 0..3 {
423    /// #     results.push(stream.next().await.unwrap());
424    /// # }
425    /// # results.sort();
426    /// # assert_eq!(results, vec![1, 2, 3]);
427    /// # }));
428    /// ```
429    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
430    where
431        T: IntoIterator<Item = U>,
432    {
433        self.flat_map_unordered(q!(|x| x))
434    }
435
436    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
437    ///
438    /// If the predicate returns `true`, the output optional contains the same value.
439    /// If the predicate returns `false`, the output optional is empty.
440    ///
441    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
442    /// not modify or take ownership of the value. If you need to modify the value while filtering
443    /// use [`Singleton::filter_map`] instead.
444    ///
445    /// # Example
446    /// ```rust
447    /// # use hydro_lang::prelude::*;
448    /// # use futures::StreamExt;
449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
450    /// let tick = process.tick();
451    /// let singleton = tick.singleton(q!(5));
452    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
453    /// # }, |mut stream| async move {
454    /// // 5
455    /// # assert_eq!(stream.next().await.unwrap(), 5);
456    /// # }));
457    /// ```
458    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
459    where
460        F: Fn(&T) -> bool + 'a,
461    {
462        let f = f.splice_fn1_borrow_ctx(&self.location).into();
463        Optional::new(
464            self.location.clone(),
465            HydroNode::Filter {
466                f,
467                input: Box::new(self.ir_node.into_inner()),
468                metadata: self.location.new_node_metadata::<T>(),
469            },
470        )
471    }
472
473    /// An operator that both filters and maps. It yields the value only if the supplied
474    /// closure `f` returns `Some(value)`.
475    ///
476    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
477    /// If the closure returns `None`, the output optional is empty.
478    ///
479    /// # Example
480    /// ```rust
481    /// # use hydro_lang::prelude::*;
482    /// # use futures::StreamExt;
483    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
484    /// let tick = process.tick();
485    /// let singleton = tick.singleton(q!("42"));
486    /// singleton
487    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
488    ///     .all_ticks()
489    /// # }, |mut stream| async move {
490    /// // 42
491    /// # assert_eq!(stream.next().await.unwrap(), 42);
492    /// # }));
493    /// ```
494    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
495    where
496        F: Fn(T) -> Option<U> + 'a,
497    {
498        let f = f.splice_fn1_ctx(&self.location).into();
499        Optional::new(
500            self.location.clone(),
501            HydroNode::FilterMap {
502                f,
503                input: Box::new(self.ir_node.into_inner()),
504                metadata: self.location.new_node_metadata::<U>(),
505            },
506        )
507    }
508
509    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
510    ///
511    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
512    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
513    /// non-null. This is useful for combining several pieces of state together.
514    ///
515    /// # Example
516    /// ```rust
517    /// # use hydro_lang::prelude::*;
518    /// # use futures::StreamExt;
519    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
520    /// let tick = process.tick();
521    /// let numbers = process
522    ///   .source_iter(q!(vec![123, 456]))
523    ///   .batch(&tick, nondet!(/** test */));
524    /// let count = numbers.clone().count(); // Singleton
525    /// let max = numbers.max(); // Optional
526    /// count.zip(max).all_ticks()
527    /// # }, |mut stream| async move {
528    /// // [(2, 456)]
529    /// # for w in vec![(2, 456)] {
530    /// #     assert_eq!(stream.next().await.unwrap(), w);
531    /// # }
532    /// # }));
533    /// ```
534    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
535    where
536        Self: ZipResult<'a, O, Location = L>,
537    {
538        check_matching_location(&self.location, &Self::other_location(&other));
539
540        if L::is_top_level()
541            && let Some(tick) = self.location.try_tick()
542        {
543            let out = zip_inside_tick(
544                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
545                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
546                    Self::other_location(&other),
547                    Self::other_ir_node(other),
548                )
549                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
550            )
551            .latest();
552
553            Self::make(out.location, out.ir_node.into_inner())
554        } else {
555            Self::make(
556                self.location.clone(),
557                HydroNode::CrossSingleton {
558                    left: Box::new(self.ir_node.into_inner()),
559                    right: Box::new(Self::other_ir_node(other)),
560                    metadata: self
561                        .location
562                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
563                },
564            )
565        }
566    }
567
568    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
569    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
570    ///
571    /// Useful for conditionally processing, such as only emitting a singleton's value outside
572    /// a tick if some other condition is satisfied.
573    ///
574    /// # Example
575    /// ```rust
576    /// # use hydro_lang::prelude::*;
577    /// # use futures::StreamExt;
578    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
579    /// let tick = process.tick();
580    /// // ticks are lazy by default, forces the second tick to run
581    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
582    ///
583    /// let batch_first_tick = process
584    ///   .source_iter(q!(vec![1]))
585    ///   .batch(&tick, nondet!(/** test */));
586    /// let batch_second_tick = process
587    ///   .source_iter(q!(vec![1, 2, 3]))
588    ///   .batch(&tick, nondet!(/** test */))
589    ///   .defer_tick(); // appears on the second tick
590    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
591    /// batch_first_tick.chain(batch_second_tick).count()
592    ///   .filter_if_some(some_on_first_tick)
593    ///   .all_ticks()
594    /// # }, |mut stream| async move {
595    /// // [1]
596    /// # for w in vec![1] {
597    /// #     assert_eq!(stream.next().await.unwrap(), w);
598    /// # }
599    /// # }));
600    /// ```
601    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
602        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
603            .map(q!(|(d, _signal)| d))
604    }
605
606    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
607    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
608    ///
609    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
610    /// the condition.
611    ///
612    /// # Example
613    /// ```rust
614    /// # use hydro_lang::prelude::*;
615    /// # use futures::StreamExt;
616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617    /// let tick = process.tick();
618    /// // ticks are lazy by default, forces the second tick to run
619    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
620    ///
621    /// let batch_first_tick = process
622    ///   .source_iter(q!(vec![1]))
623    ///   .batch(&tick, nondet!(/** test */));
624    /// let batch_second_tick = process
625    ///   .source_iter(q!(vec![1, 2, 3]))
626    ///   .batch(&tick, nondet!(/** test */))
627    ///   .defer_tick(); // appears on the second tick
628    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
629    /// batch_first_tick.chain(batch_second_tick).count()
630    ///   .filter_if_none(some_on_first_tick)
631    ///   .all_ticks()
632    /// # }, |mut stream| async move {
633    /// // [3]
634    /// # for w in vec![3] {
635    /// #     assert_eq!(stream.next().await.unwrap(), w);
636    /// # }
637    /// # }));
638    /// ```
639    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
640        self.filter_if_some(
641            other
642                .map(q!(|_| ()))
643                .into_singleton()
644                .filter(q!(|o| o.is_none())),
645        )
646    }
647
648    /// An operator which allows you to "name" a `HydroNode`.
649    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
650    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
651        {
652            let mut node = self.ir_node.borrow_mut();
653            let metadata = node.metadata_mut();
654            metadata.tag = Some(name.to_string());
655        }
656        self
657    }
658}
659
660impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
661where
662    L: Location<'a> + NoTick,
663{
664    /// Returns a singleton value corresponding to the latest snapshot of the singleton
665    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
666    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
667    /// all snapshots of this singleton into the atomic-associated tick will observe the
668    /// same value each tick.
669    ///
670    /// # Non-Determinism
671    /// Because this picks a snapshot of a singleton whose value is continuously changing,
672    /// the output singleton has a non-deterministic value since the snapshot can be at an
673    /// arbitrary point in time.
674    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
675        Singleton::new(
676            self.location.clone().tick,
677            HydroNode::Batch {
678                inner: Box::new(self.ir_node.into_inner()),
679                metadata: self.location.tick.new_node_metadata::<T>(),
680            },
681        )
682    }
683
684    /// Returns this singleton back into a top-level, asynchronous execution context where updates
685    /// to the value will be asynchronously propagated.
686    pub fn end_atomic(self) -> Singleton<T, L, B> {
687        Singleton::new(
688            self.location.tick.l.clone(),
689            HydroNode::EndAtomic {
690                inner: Box::new(self.ir_node.into_inner()),
691                metadata: self.location.tick.l.new_node_metadata::<T>(),
692            },
693        )
694    }
695}
696
697impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
698where
699    L: Location<'a>,
700{
701    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
702    /// will observe the same version of the value and will be executed synchronously before any
703    /// outputs are yielded (in [`Optional::end_atomic`]).
704    ///
705    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
706    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
707    /// a different version).
708    ///
709    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
710    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
711    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
712    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
713        let out_location = Atomic { tick: tick.clone() };
714        Singleton::new(
715            out_location.clone(),
716            HydroNode::BeginAtomic {
717                inner: Box::new(self.ir_node.into_inner()),
718                metadata: out_location.new_node_metadata::<T>(),
719            },
720        )
721    }
722
723    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
724    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
725    /// relevant data that contributed to the snapshot at tick `t`.
726    ///
727    /// # Non-Determinism
728    /// Because this picks a snapshot of a singleton whose value is continuously changing,
729    /// the output singleton has a non-deterministic value since the snapshot can be at an
730    /// arbitrary point in time.
731    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
732        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
733        Singleton::new(
734            tick.clone(),
735            HydroNode::Batch {
736                inner: Box::new(self.ir_node.into_inner()),
737                metadata: tick.new_node_metadata::<T>(),
738            },
739        )
740    }
741
742    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
743    /// with order corresponding to increasing prefixes of data contributing to the singleton.
744    ///
745    /// # Non-Determinism
746    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
747    /// to non-deterministic batching and arrival of inputs, the output stream is
748    /// non-deterministic.
749    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
750    where
751        L: NoTick,
752    {
753        let tick = self.location.tick();
754        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
755    }
756
757    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
758    /// value taken at various points in time. Because the input singleton may be
759    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
760    /// represent the value of the singleton given some prefix of the streams leading up to
761    /// it.
762    ///
763    /// # Non-Determinism
764    /// The output stream is non-deterministic in which elements are sampled, since this
765    /// is controlled by a clock.
766    pub fn sample_every(
767        self,
768        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
769        nondet: NonDet,
770    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
771    where
772        L: NoTick + NoAtomic,
773    {
774        let samples = self.location.source_interval(interval, nondet);
775        let tick = self.location.tick();
776
777        self.snapshot(&tick, nondet)
778            .filter_if_some(samples.batch(&tick, nondet).first())
779            .all_ticks()
780            .weakest_retries()
781    }
782}
783
784impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
785where
786    L: Location<'a>,
787{
788    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
789    /// which will stream the value computed in _each_ tick as a separate stream element.
790    ///
791    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
792    /// producing one element in the output for each tick. This is useful for batched computations,
793    /// where the results from each tick must be combined together.
794    ///
795    /// # Example
796    /// ```rust
797    /// # use hydro_lang::prelude::*;
798    /// # use futures::StreamExt;
799    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
800    /// let tick = process.tick();
801    /// # // ticks are lazy by default, forces the second tick to run
802    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
803    /// # let batch_first_tick = process
804    /// #   .source_iter(q!(vec![1]))
805    /// #   .batch(&tick, nondet!(/** test */));
806    /// # let batch_second_tick = process
807    /// #   .source_iter(q!(vec![1, 2, 3]))
808    /// #   .batch(&tick, nondet!(/** test */))
809    /// #   .defer_tick(); // appears on the second tick
810    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
811    /// input_batch // first tick: [1], second tick: [1, 2, 3]
812    ///     .count()
813    ///     .all_ticks()
814    /// # }, |mut stream| async move {
815    /// // [1, 3]
816    /// # for w in vec![1, 3] {
817    /// #     assert_eq!(stream.next().await.unwrap(), w);
818    /// # }
819    /// # }));
820    /// ```
821    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
822        self.into_stream().all_ticks()
823    }
824
825    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
826    /// which will stream the value computed in _each_ tick as a separate stream element.
827    ///
828    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
829    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
830    /// singleton's [`Tick`] context.
831    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
832        self.into_stream().all_ticks_atomic()
833    }
834
835    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
836    /// be asynchronously updated with the latest value of the singleton inside the tick.
837    ///
838    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
839    /// tick that tracks the inner value. This is useful for getting the value as of the
840    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
841    ///
842    /// # Example
843    /// ```rust
844    /// # use hydro_lang::prelude::*;
845    /// # use futures::StreamExt;
846    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
847    /// let tick = process.tick();
848    /// # // ticks are lazy by default, forces the second tick to run
849    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
850    /// # let batch_first_tick = process
851    /// #   .source_iter(q!(vec![1]))
852    /// #   .batch(&tick, nondet!(/** test */));
853    /// # let batch_second_tick = process
854    /// #   .source_iter(q!(vec![1, 2, 3]))
855    /// #   .batch(&tick, nondet!(/** test */))
856    /// #   .defer_tick(); // appears on the second tick
857    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
858    /// input_batch // first tick: [1], second tick: [1, 2, 3]
859    ///     .count()
860    ///     .latest()
861    /// # .sample_eager(nondet!(/** test */))
862    /// # }, |mut stream| async move {
863    /// // asynchronously changes from 1 ~> 3
864    /// # for w in vec![1, 3] {
865    /// #     assert_eq!(stream.next().await.unwrap(), w);
866    /// # }
867    /// # }));
868    /// ```
869    pub fn latest(self) -> Singleton<T, L, Unbounded> {
870        Singleton::new(
871            self.location.outer().clone(),
872            HydroNode::YieldConcat {
873                inner: Box::new(self.ir_node.into_inner()),
874                metadata: self.location.outer().new_node_metadata::<T>(),
875            },
876        )
877    }
878
879    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
880    /// be updated with the latest value of the singleton inside the tick.
881    ///
882    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
883    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
884    /// singleton's [`Tick`] context.
885    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
886        let out_location = Atomic {
887            tick: self.location.clone(),
888        };
889        Singleton::new(
890            out_location.clone(),
891            HydroNode::YieldConcat {
892                inner: Box::new(self.ir_node.into_inner()),
893                metadata: out_location.new_node_metadata::<T>(),
894            },
895        )
896    }
897
898    #[expect(missing_docs, reason = "TODO")]
899    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
900        Singleton::new(
901            self.location.clone(),
902            HydroNode::DeferTick {
903                input: Box::new(self.ir_node.into_inner()),
904                metadata: self.location.new_node_metadata::<T>(),
905            },
906        )
907    }
908
909    #[deprecated(note = "use .into_stream().persist()")]
910    #[expect(missing_docs, reason = "deprecated")]
911    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
912        Stream::new(
913            self.location.clone(),
914            HydroNode::Persist {
915                inner: Box::new(self.ir_node.into_inner()),
916                metadata: self.location.new_node_metadata::<T>(),
917            },
918        )
919    }
920
921    /// Converts this singleton into a [`Stream`] containing a single element, the value.
922    ///
923    /// # Example
924    /// ```rust
925    /// # use hydro_lang::prelude::*;
926    /// # use futures::StreamExt;
927    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
928    /// let tick = process.tick();
929    /// let batch_input = process
930    ///   .source_iter(q!(vec![123, 456]))
931    ///   .batch(&tick, nondet!(/** test */));
932    /// batch_input.clone().chain(
933    ///   batch_input.count().into_stream()
934    /// ).all_ticks()
935    /// # }, |mut stream| async move {
936    /// // [123, 456, 2]
937    /// # for w in vec![123, 456, 2] {
938    /// #     assert_eq!(stream.next().await.unwrap(), w);
939    /// # }
940    /// # }));
941    /// ```
942    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
943        Stream::new(self.location, self.ir_node.into_inner())
944    }
945}
946
947#[doc(hidden)]
948/// Helper trait that determines the output collection type for [`Singleton::zip`].
949///
950/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
951/// [`Singleton`].
952#[sealed::sealed]
953pub trait ZipResult<'a, Other> {
954    /// The output collection type.
955    type Out;
956    /// The type of the tupled output value.
957    type ElementType;
958    /// The type of the other collection's value.
959    type OtherType;
960    /// The location where the tupled result will be materialized.
961    type Location: Location<'a>;
962
963    /// The location of the second input to the `zip`.
964    fn other_location(other: &Other) -> Self::Location;
965    /// The IR node of the second input to the `zip`.
966    fn other_ir_node(other: Other) -> HydroNode;
967
968    /// Constructs the output live collection given an IR node containing the zip result.
969    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
970}
971
972#[sealed::sealed]
973impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
974where
975    L: Location<'a>,
976{
977    type Out = Singleton<(T, U), L, B>;
978    type ElementType = (T, U);
979    type OtherType = U;
980    type Location = L;
981
982    fn other_location(other: &Singleton<U, L, B>) -> L {
983        other.location.clone()
984    }
985
986    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
987        other.ir_node.into_inner()
988    }
989
990    fn make(location: L, ir_node: HydroNode) -> Self::Out {
991        Singleton::new(location, ir_node)
992    }
993}
994
995#[sealed::sealed]
996impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
997where
998    L: Location<'a>,
999{
1000    type Out = Optional<(T, U), L, B>;
1001    type ElementType = (T, U);
1002    type OtherType = U;
1003    type Location = L;
1004
1005    fn other_location(other: &Optional<U, L, B>) -> L {
1006        other.location.clone()
1007    }
1008
1009    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1010        other.ir_node.into_inner()
1011    }
1012
1013    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1014        Optional::new(location, ir_node)
1015    }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020    use futures::{SinkExt, StreamExt};
1021    use hydro_deploy::Deployment;
1022    use stageleft::q;
1023
1024    use crate::compile::builder::FlowBuilder;
1025    use crate::location::Location;
1026    use crate::nondet::nondet;
1027
1028    #[tokio::test]
1029    async fn tick_cycle_cardinality() {
1030        let mut deployment = Deployment::new();
1031
1032        let flow = FlowBuilder::new();
1033        let node = flow.process::<()>();
1034        let external = flow.external::<()>();
1035
1036        let (input_send, input) = node.source_external_bincode(&external);
1037
1038        let node_tick = node.tick();
1039        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1040        let counts = singleton
1041            .clone()
1042            .into_stream()
1043            .count()
1044            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1045            .all_ticks()
1046            .send_bincode_external(&external);
1047        complete_cycle.complete_next_tick(singleton);
1048
1049        let nodes = flow
1050            .with_process(&node, deployment.Localhost())
1051            .with_external(&external, deployment.Localhost())
1052            .deploy(&mut deployment);
1053
1054        deployment.deploy().await.unwrap();
1055
1056        let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
1057        let mut external_out = nodes.connect_source_bincode(counts).await;
1058
1059        deployment.start().await.unwrap();
1060
1061        tick_trigger.send(()).await.unwrap();
1062
1063        assert_eq!(external_out.next().await.unwrap(), 1);
1064
1065        tick_trigger.send(()).await.unwrap();
1066
1067        assert_eq!(external_out.next().await.unwrap(), 1);
1068    }
1069}