hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46    L: Location<'a>,
47{
48    fn defer_tick(self) -> Self {
49        Optional::defer_tick(self)
50    }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60        Optional::new(
61            location.clone(),
62            HydroNode::CycleSource {
63                ident,
64                metadata: location.new_node_metadata::<T>(),
65            },
66        )
67    }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75        assert_eq!(
76            Location::id(&self.location),
77            expected_location,
78            "locations do not match"
79        );
80        self.location
81            .flow_state()
82            .borrow_mut()
83            .push_root(HydroRoot::CycleSink {
84                ident,
85                input: Box::new(self.ir_node.into_inner()),
86                out_location: Location::id(&self.location),
87                op_metadata: HydroIrOpMetadata::new(),
88            });
89    }
90}
91
92impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
93where
94    L: Location<'a>,
95{
96    type Location = Tick<L>;
97
98    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
99        Optional::new(
100            location.clone(),
101            HydroNode::CycleSource {
102                ident,
103                metadata: location.new_node_metadata::<T>(),
104            },
105        )
106    }
107}
108
109impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
110where
111    L: Location<'a>,
112{
113    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114        assert_eq!(
115            Location::id(&self.location),
116            expected_location,
117            "locations do not match"
118        );
119        self.location
120            .flow_state()
121            .borrow_mut()
122            .push_root(HydroRoot::CycleSink {
123                ident,
124                input: Box::new(self.ir_node.into_inner()),
125                out_location: Location::id(&self.location),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133    L: Location<'a> + NoTick,
134{
135    type Location = L;
136
137    fn create_source(ident: syn::Ident, location: L) -> Self {
138        Optional::new(
139            location.clone(),
140            HydroNode::CycleSource {
141                ident,
142                metadata: location.new_node_metadata::<T>(),
143            },
144        )
145    }
146}
147
148impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
149where
150    L: Location<'a> + NoTick,
151{
152    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
153        assert_eq!(
154            Location::id(&self.location),
155            expected_location,
156            "locations do not match"
157        );
158        self.location
159            .flow_state()
160            .borrow_mut()
161            .push_root(HydroRoot::CycleSink {
162                ident,
163                input: Box::new(self.ir_node.into_inner()),
164                out_location: Location::id(&self.location),
165                op_metadata: HydroIrOpMetadata::new(),
166            });
167    }
168}
169
170impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
171where
172    L: Location<'a>,
173{
174    fn from(singleton: Optional<T, L, Bounded>) -> Self {
175        Optional::new(singleton.location, singleton.ir_node.into_inner())
176    }
177}
178
179impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
180where
181    L: Location<'a>,
182{
183    fn from(singleton: Singleton<T, L, B>) -> Self {
184        Optional::new(singleton.location, singleton.ir_node.into_inner())
185    }
186}
187
188#[cfg(stageleft_runtime)]
189fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
190    me: Optional<T, L, B>,
191    other: Optional<O, L, B>,
192) -> Optional<(T, O), L, B> {
193    check_matching_location(&me.location, &other.location);
194
195    Optional::new(
196        me.location.clone(),
197        HydroNode::CrossSingleton {
198            left: Box::new(me.ir_node.into_inner()),
199            right: Box::new(other.ir_node.into_inner()),
200            metadata: me.location.new_node_metadata::<(T, O)>(),
201        },
202    )
203}
204
205#[cfg(stageleft_runtime)]
206fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
207    me: Optional<T, L, B>,
208    other: Optional<T, L, B>,
209) -> Optional<T, L, B> {
210    check_matching_location(&me.location, &other.location);
211
212    Optional::new(
213        me.location.clone(),
214        HydroNode::ChainFirst {
215            first: Box::new(me.ir_node.into_inner()),
216            second: Box::new(other.ir_node.into_inner()),
217            metadata: me.location.new_node_metadata::<T>(),
218        },
219    )
220}
221
222impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
223where
224    T: Clone,
225    L: Location<'a>,
226{
227    fn clone(&self) -> Self {
228        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
229            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
230            *self.ir_node.borrow_mut() = HydroNode::Tee {
231                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
232                metadata: self.location.new_node_metadata::<T>(),
233            };
234        }
235
236        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
237            Optional {
238                location: self.location.clone(),
239                ir_node: HydroNode::Tee {
240                    inner: TeeNode(inner.0.clone()),
241                    metadata: metadata.clone(),
242                }
243                .into(),
244                _phantom: PhantomData,
245            }
246        } else {
247            unreachable!()
248        }
249    }
250}
251
252impl<'a, T, L, B: Boundedness> Optional<T, L, B>
253where
254    L: Location<'a>,
255{
256    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
257        debug_assert_eq!(Location::id(&location), ir_node.metadata().location_kind);
258        Optional {
259            location,
260            ir_node: RefCell::new(ir_node),
261            _phantom: PhantomData,
262        }
263    }
264
265    /// Transforms the optional value by applying a function `f` to it,
266    /// continuously as the input is updated.
267    ///
268    /// Whenever the optional is empty, the output optional is also empty.
269    ///
270    /// # Example
271    /// ```rust
272    /// # use hydro_lang::prelude::*;
273    /// # use futures::StreamExt;
274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
275    /// let tick = process.tick();
276    /// let optional = tick.optional_first_tick(q!(1));
277    /// optional.map(q!(|v| v + 1)).all_ticks()
278    /// # }, |mut stream| async move {
279    /// // 2
280    /// # assert_eq!(stream.next().await.unwrap(), 2);
281    /// # }));
282    /// ```
283    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
284    where
285        F: Fn(T) -> U + 'a,
286    {
287        let f = f.splice_fn1_ctx(&self.location).into();
288        Optional::new(
289            self.location.clone(),
290            HydroNode::Map {
291                f,
292                input: Box::new(self.ir_node.into_inner()),
293                metadata: self.location.new_node_metadata::<U>(),
294            },
295        )
296    }
297
298    /// Transforms the optional value by applying a function `f` to it and then flattening
299    /// the result into a stream, preserving the order of elements.
300    ///
301    /// If the optional is empty, the output stream is also empty. If the optional contains
302    /// a value, `f` is applied to produce an iterator, and all items from that iterator
303    /// are emitted in the output stream in deterministic order.
304    ///
305    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
306    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
307    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
308    ///
309    /// # Example
310    /// ```rust
311    /// # use hydro_lang::prelude::*;
312    /// # use futures::StreamExt;
313    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
314    /// let tick = process.tick();
315    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
316    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
317    /// # }, |mut stream| async move {
318    /// // 1, 2, 3
319    /// # for w in vec![1, 2, 3] {
320    /// #     assert_eq!(stream.next().await.unwrap(), w);
321    /// # }
322    /// # }));
323    /// ```
324    pub fn flat_map_ordered<U, I, F>(
325        self,
326        f: impl IntoQuotedMut<'a, F, L>,
327    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
328    where
329        I: IntoIterator<Item = U>,
330        F: Fn(T) -> I + 'a,
331    {
332        let f = f.splice_fn1_ctx(&self.location).into();
333        Stream::new(
334            self.location.clone(),
335            HydroNode::FlatMap {
336                f,
337                input: Box::new(self.ir_node.into_inner()),
338                metadata: self.location.new_node_metadata::<U>(),
339            },
340        )
341    }
342
343    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
344    /// for the output type `I` to produce items in any order.
345    ///
346    /// If the optional is empty, the output stream is also empty. If the optional contains
347    /// a value, `f` is applied to produce an iterator, and all items from that iterator
348    /// are emitted in the output stream in non-deterministic order.
349    ///
350    /// # Example
351    /// ```rust
352    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
353    /// # use futures::StreamExt;
354    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
355    /// let tick = process.tick();
356    /// let optional = tick.optional_first_tick(q!(
357    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
358    /// ));
359    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
360    /// # }, |mut stream| async move {
361    /// // 1, 2, 3, but in no particular order
362    /// # let mut results = Vec::new();
363    /// # for _ in 0..3 {
364    /// #     results.push(stream.next().await.unwrap());
365    /// # }
366    /// # results.sort();
367    /// # assert_eq!(results, vec![1, 2, 3]);
368    /// # }));
369    /// ```
370    pub fn flat_map_unordered<U, I, F>(
371        self,
372        f: impl IntoQuotedMut<'a, F, L>,
373    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
374    where
375        I: IntoIterator<Item = U>,
376        F: Fn(T) -> I + 'a,
377    {
378        let f = f.splice_fn1_ctx(&self.location).into();
379        Stream::new(
380            self.location.clone(),
381            HydroNode::FlatMap {
382                f,
383                input: Box::new(self.ir_node.into_inner()),
384                metadata: self.location.new_node_metadata::<U>(),
385            },
386        )
387    }
388
389    /// Flattens the optional value into a stream, preserving the order of elements.
390    ///
391    /// If the optional is empty, the output stream is also empty. If the optional contains
392    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
393    /// in the output stream in deterministic order.
394    ///
395    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
396    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
397    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
398    ///
399    /// # Example
400    /// ```rust
401    /// # use hydro_lang::prelude::*;
402    /// # use futures::StreamExt;
403    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
404    /// let tick = process.tick();
405    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
406    /// optional.flatten_ordered().all_ticks()
407    /// # }, |mut stream| async move {
408    /// // 1, 2, 3
409    /// # for w in vec![1, 2, 3] {
410    /// #     assert_eq!(stream.next().await.unwrap(), w);
411    /// # }
412    /// # }));
413    /// ```
414    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
415    where
416        T: IntoIterator<Item = U>,
417    {
418        self.flat_map_ordered(q!(|v| v))
419    }
420
421    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
422    /// for the element type `T` to produce items in any order.
423    ///
424    /// If the optional is empty, the output stream is also empty. If the optional contains
425    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
426    /// in the output stream in non-deterministic order.
427    ///
428    /// # Example
429    /// ```rust
430    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
431    /// # use futures::StreamExt;
432    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
433    /// let tick = process.tick();
434    /// let optional = tick.optional_first_tick(q!(
435    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
436    /// ));
437    /// optional.flatten_unordered().all_ticks()
438    /// # }, |mut stream| async move {
439    /// // 1, 2, 3, but in no particular order
440    /// # let mut results = Vec::new();
441    /// # for _ in 0..3 {
442    /// #     results.push(stream.next().await.unwrap());
443    /// # }
444    /// # results.sort();
445    /// # assert_eq!(results, vec![1, 2, 3]);
446    /// # }));
447    /// ```
448    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
449    where
450        T: IntoIterator<Item = U>,
451    {
452        self.flat_map_unordered(q!(|v| v))
453    }
454
455    /// Creates an optional containing only the value if it satisfies a predicate `f`.
456    ///
457    /// If the optional is empty, the output optional is also empty. If the optional contains
458    /// a value and the predicate returns `true`, the output optional contains the same value.
459    /// If the predicate returns `false`, the output optional is empty.
460    ///
461    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
462    /// not modify or take ownership of the value. If you need to modify the value while filtering
463    /// use [`Optional::filter_map`] instead.
464    ///
465    /// # Example
466    /// ```rust
467    /// # use hydro_lang::prelude::*;
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470    /// let tick = process.tick();
471    /// let optional = tick.optional_first_tick(q!(5));
472    /// optional.filter(q!(|&x| x > 3)).all_ticks()
473    /// # }, |mut stream| async move {
474    /// // 5
475    /// # assert_eq!(stream.next().await.unwrap(), 5);
476    /// # }));
477    /// ```
478    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
479    where
480        F: Fn(&T) -> bool + 'a,
481    {
482        let f = f.splice_fn1_borrow_ctx(&self.location).into();
483        Optional::new(
484            self.location.clone(),
485            HydroNode::Filter {
486                f,
487                input: Box::new(self.ir_node.into_inner()),
488                metadata: self.location.new_node_metadata::<T>(),
489            },
490        )
491    }
492
493    /// An operator that both filters and maps. It yields only the value if the supplied
494    /// closure `f` returns `Some(value)`.
495    ///
496    /// If the optional is empty, the output optional is also empty. If the optional contains
497    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
498    /// If the closure returns `None`, the output optional is empty.
499    ///
500    /// # Example
501    /// ```rust
502    /// # use hydro_lang::prelude::*;
503    /// # use futures::StreamExt;
504    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
505    /// let tick = process.tick();
506    /// let optional = tick.optional_first_tick(q!("42"));
507    /// optional
508    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
509    ///     .all_ticks()
510    /// # }, |mut stream| async move {
511    /// // 42
512    /// # assert_eq!(stream.next().await.unwrap(), 42);
513    /// # }));
514    /// ```
515    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
516    where
517        F: Fn(T) -> Option<U> + 'a,
518    {
519        let f = f.splice_fn1_ctx(&self.location).into();
520        Optional::new(
521            self.location.clone(),
522            HydroNode::FilterMap {
523                f,
524                input: Box::new(self.ir_node.into_inner()),
525                metadata: self.location.new_node_metadata::<U>(),
526            },
527        )
528    }
529
530    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
531    ///
532    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
533    /// non-null. This is useful for combining several pieces of state together.
534    ///
535    /// # Example
536    /// ```rust
537    /// # use hydro_lang::prelude::*;
538    /// # use futures::StreamExt;
539    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
540    /// let tick = process.tick();
541    /// let numbers = process
542    ///   .source_iter(q!(vec![123, 456, 789]))
543    ///   .batch(&tick, nondet!(/** test */));
544    /// let min = numbers.clone().min(); // Optional
545    /// let max = numbers.max(); // Optional
546    /// min.zip(max).all_ticks()
547    /// # }, |mut stream| async move {
548    /// // [(123, 789)]
549    /// # for w in vec![(123, 789)] {
550    /// #     assert_eq!(stream.next().await.unwrap(), w);
551    /// # }
552    /// # }));
553    /// ```
554    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
555    where
556        O: Clone,
557    {
558        let other: Optional<O, L, B> = other.into();
559        check_matching_location(&self.location, &other.location);
560
561        if L::is_top_level()
562            && let Some(tick) = self.location.try_tick()
563        {
564            let out = zip_inside_tick(
565                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
566                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
567            )
568            .latest();
569
570            Optional::new(out.location, out.ir_node.into_inner())
571        } else {
572            zip_inside_tick(self, other)
573        }
574    }
575
576    /// Passes through `self` when it has a value, otherwise passes through `other`.
577    ///
578    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
579    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
580    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
581    ///
582    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
583    /// of the inputs change (including to/from null states).
584    ///
585    /// # Example
586    /// ```rust
587    /// # use hydro_lang::prelude::*;
588    /// # use futures::StreamExt;
589    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
590    /// let tick = process.tick();
591    /// // ticks are lazy by default, forces the second tick to run
592    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
593    ///
594    /// let some_first_tick = tick.optional_first_tick(q!(123));
595    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
596    /// some_first_tick.or(some_second_tick).all_ticks()
597    /// # }, |mut stream| async move {
598    /// // [123 /* first tick */, 456 /* second tick */]
599    /// # for w in vec![123, 456] {
600    /// #     assert_eq!(stream.next().await.unwrap(), w);
601    /// # }
602    /// # }));
603    /// ```
604    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
605        check_matching_location(&self.location, &other.location);
606
607        if L::is_top_level()
608            && let Some(tick) = self.location.try_tick()
609        {
610            let out = or_inside_tick(
611                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
612                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
613            )
614            .latest();
615
616            Optional::new(out.location, out.ir_node.into_inner())
617        } else {
618            Optional::new(
619                self.location.clone(),
620                HydroNode::ChainFirst {
621                    first: Box::new(self.ir_node.into_inner()),
622                    second: Box::new(other.ir_node.into_inner()),
623                    metadata: self.location.new_node_metadata::<T>(),
624                },
625            )
626        }
627    }
628
629    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
630    ///
631    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
632    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
633    ///
634    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
635    /// of the inputs change (including to/from null states).
636    ///
637    /// # Example
638    /// ```rust
639    /// # use hydro_lang::prelude::*;
640    /// # use futures::StreamExt;
641    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
642    /// let tick = process.tick();
643    /// // ticks are lazy by default, forces the later ticks to run
644    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
645    ///
646    /// let some_first_tick = tick.optional_first_tick(q!(123));
647    /// some_first_tick
648    ///     .unwrap_or(tick.singleton(q!(456)))
649    ///     .all_ticks()
650    /// # }, |mut stream| async move {
651    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
652    /// # for w in vec![123, 456, 456, 456] {
653    /// #     assert_eq!(stream.next().await.unwrap(), w);
654    /// # }
655    /// # }));
656    /// ```
657    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
658        let res_option = self.or(other.into());
659        Singleton::new(res_option.location, res_option.ir_node.into_inner())
660    }
661
662    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
663    ///
664    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
665    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
666    /// so that Hydro can skip any computation on null values.
667    ///
668    /// # Example
669    /// ```rust
670    /// # use hydro_lang::prelude::*;
671    /// # use futures::StreamExt;
672    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673    /// let tick = process.tick();
674    /// // ticks are lazy by default, forces the later ticks to run
675    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
676    ///
677    /// let some_first_tick = tick.optional_first_tick(q!(123));
678    /// some_first_tick.into_singleton().all_ticks()
679    /// # }, |mut stream| async move {
680    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
681    /// # for w in vec![Some(123), None, None, None] {
682    /// #     assert_eq!(stream.next().await.unwrap(), w);
683    /// # }
684    /// # }));
685    /// ```
686    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
687    where
688        T: Clone,
689    {
690        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
691        let core_ir = HydroNode::Source {
692            source: HydroSource::Iter(none.into()),
693            metadata: self.location.new_node_metadata::<Option<T>>(),
694        };
695
696        let none_singleton = if L::is_top_level() {
697            Singleton::new(
698                self.location.clone(),
699                HydroNode::Persist {
700                    inner: Box::new(core_ir),
701                    metadata: self.location.new_node_metadata::<Option<T>>(),
702                },
703            )
704        } else {
705            Singleton::new(self.location.clone(), core_ir)
706        };
707
708        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
709    }
710
711    /// An operator which allows you to "name" a `HydroNode`.
712    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
713    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
714        {
715            let mut node = self.ir_node.borrow_mut();
716            let metadata = node.metadata_mut();
717            metadata.tag = Some(name.to_string());
718        }
719        self
720    }
721}
722
723impl<'a, T, L> Optional<T, L, Bounded>
724where
725    L: Location<'a>,
726{
727    /// Filters this optional, passing through the optional value if it is non-null **and** the
728    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
729    ///
730    /// Useful for conditionally processing, such as only emitting an optional's value outside
731    /// a tick if some other condition is satisfied.
732    ///
733    /// # Example
734    /// ```rust
735    /// # use hydro_lang::prelude::*;
736    /// # use futures::StreamExt;
737    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
738    /// let tick = process.tick();
739    /// // ticks are lazy by default, forces the second tick to run
740    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
741    ///
742    /// let batch_first_tick = process
743    ///   .source_iter(q!(vec![]))
744    ///   .batch(&tick, nondet!(/** test */));
745    /// let batch_second_tick = process
746    ///   .source_iter(q!(vec![456]))
747    ///   .batch(&tick, nondet!(/** test */))
748    ///   .defer_tick(); // appears on the second tick
749    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
750    /// batch_first_tick.chain(batch_second_tick).first()
751    ///   .filter_if_some(some_on_first_tick)
752    ///   .unwrap_or(tick.singleton(q!(789)))
753    ///   .all_ticks()
754    /// # }, |mut stream| async move {
755    /// // [789, 789]
756    /// # for w in vec![789, 789] {
757    /// #     assert_eq!(stream.next().await.unwrap(), w);
758    /// # }
759    /// # }));
760    /// ```
761    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
762        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
763    }
764
765    /// Filters this optional, passing through the optional value if it is non-null **and** the
766    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
767    ///
768    /// Useful for conditionally processing, such as only emitting an optional's value outside
769    /// a tick if some other condition is satisfied.
770    ///
771    /// # Example
772    /// ```rust
773    /// # use hydro_lang::prelude::*;
774    /// # use futures::StreamExt;
775    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
776    /// let tick = process.tick();
777    /// // ticks are lazy by default, forces the second tick to run
778    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
779    ///
780    /// let batch_first_tick = process
781    ///   .source_iter(q!(vec![]))
782    ///   .batch(&tick, nondet!(/** test */));
783    /// let batch_second_tick = process
784    ///   .source_iter(q!(vec![456]))
785    ///   .batch(&tick, nondet!(/** test */))
786    ///   .defer_tick(); // appears on the second tick
787    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
788    /// batch_first_tick.chain(batch_second_tick).first()
789    ///   .filter_if_none(some_on_first_tick)
790    ///   .unwrap_or(tick.singleton(q!(789)))
791    ///   .all_ticks()
792    /// # }, |mut stream| async move {
793    /// // [789, 789]
794    /// # for w in vec![789, 456] {
795    /// #     assert_eq!(stream.next().await.unwrap(), w);
796    /// # }
797    /// # }));
798    /// ```
799    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
800        self.filter_if_some(
801            other
802                .map(q!(|_| ()))
803                .into_singleton()
804                .filter(q!(|o| o.is_none())),
805        )
806    }
807
808    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
809    ///
810    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
811    /// having a value, such as only releasing a piece of state if the node is the leader.
812    ///
813    /// # Example
814    /// ```rust
815    /// # use hydro_lang::prelude::*;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818    /// let tick = process.tick();
819    /// // ticks are lazy by default, forces the second tick to run
820    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
821    ///
822    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
823    /// some_on_first_tick
824    ///     .if_some_then(tick.singleton(q!(456)))
825    ///     .unwrap_or(tick.singleton(q!(123)))
826    /// # .all_ticks()
827    /// # }, |mut stream| async move {
828    /// // 456 (first tick) ~> 123 (second tick onwards)
829    /// # for w in vec![456, 123, 123] {
830    /// #     assert_eq!(stream.next().await.unwrap(), w);
831    /// # }
832    /// # }));
833    /// ```
834    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
835        value.filter_if_some(self)
836    }
837}
838
839impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
840where
841    L: Location<'a> + NoTick,
842{
843    /// Returns an optional value corresponding to the latest snapshot of the optional
844    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
845    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
846    /// all snapshots of this optional into the atomic-associated tick will observe the
847    /// same value each tick.
848    ///
849    /// # Non-Determinism
850    /// Because this picks a snapshot of a optional whose value is continuously changing,
851    /// the output optional has a non-deterministic value since the snapshot can be at an
852    /// arbitrary point in time.
853    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
854        Optional::new(
855            self.location.clone().tick,
856            HydroNode::Batch {
857                inner: Box::new(self.ir_node.into_inner()),
858                metadata: self.location.new_node_metadata::<T>(),
859            },
860        )
861    }
862
863    /// Returns this optional back into a top-level, asynchronous execution context where updates
864    /// to the value will be asynchronously propagated.
865    pub fn end_atomic(self) -> Optional<T, L, B> {
866        Optional::new(
867            self.location.tick.l.clone(),
868            HydroNode::EndAtomic {
869                inner: Box::new(self.ir_node.into_inner()),
870                metadata: self.location.tick.l.new_node_metadata::<T>(),
871            },
872        )
873    }
874}
875
876impl<'a, T, L, B: Boundedness> Optional<T, L, B>
877where
878    L: Location<'a>,
879{
880    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
881    /// will observe the same version of the value and will be executed synchronously before any
882    /// outputs are yielded (in [`Optional::end_atomic`]).
883    ///
884    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
885    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
886    /// a different version).
887    ///
888    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
889    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
890    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
891    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
892        let out_location = Atomic { tick: tick.clone() };
893        Optional::new(
894            out_location.clone(),
895            HydroNode::BeginAtomic {
896                inner: Box::new(self.ir_node.into_inner()),
897                metadata: out_location.new_node_metadata::<T>(),
898            },
899        )
900    }
901
902    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
903    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
904    /// relevant data that contributed to the snapshot at tick `t`.
905    ///
906    /// # Non-Determinism
907    /// Because this picks a snapshot of a optional whose value is continuously changing,
908    /// the output optional has a non-deterministic value since the snapshot can be at an
909    /// arbitrary point in time.
910    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
911        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
912        Optional::new(
913            tick.clone(),
914            HydroNode::Batch {
915                inner: Box::new(self.ir_node.into_inner()),
916                metadata: tick.new_node_metadata::<T>(),
917            },
918        )
919    }
920
921    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
922    /// with order corresponding to increasing prefixes of data contributing to the optional.
923    ///
924    /// # Non-Determinism
925    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
926    /// to non-deterministic batching and arrival of inputs, the output stream is
927    /// non-deterministic.
928    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
929    where
930        L: NoTick,
931    {
932        let tick = self.location.tick();
933        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
934    }
935
936    /// Given a time interval, returns a stream corresponding to snapshots of the optional
937    /// value taken at various points in time. Because the input optional may be
938    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
939    /// represent the value of the optional given some prefix of the streams leading up to
940    /// it.
941    ///
942    /// # Non-Determinism
943    /// The output stream is non-deterministic in which elements are sampled, since this
944    /// is controlled by a clock.
945    pub fn sample_every(
946        self,
947        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
948        nondet: NonDet,
949    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
950    where
951        L: NoTick + NoAtomic,
952    {
953        let samples = self.location.source_interval(interval, nondet);
954        let tick = self.location.tick();
955
956        self.snapshot(&tick, nondet)
957            .filter_if_some(samples.batch(&tick, nondet).first())
958            .all_ticks()
959            .weakest_retries()
960    }
961}
962
963impl<'a, T, L> Optional<T, Tick<L>, Bounded>
964where
965    L: Location<'a>,
966{
967    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
968    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
969    /// null values).
970    ///
971    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
972    /// producing one element in the output for each (non-null) tick. This is useful for batched
973    /// computations, where the results from each tick must be combined together.
974    ///
975    /// # Example
976    /// ```rust
977    /// # use hydro_lang::prelude::*;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// # let tick = process.tick();
981    /// # // ticks are lazy by default, forces the second tick to run
982    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
983    /// # let batch_first_tick = process
984    /// #   .source_iter(q!(vec![]))
985    /// #   .batch(&tick, nondet!(/** test */));
986    /// # let batch_second_tick = process
987    /// #   .source_iter(q!(vec![1, 2, 3]))
988    /// #   .batch(&tick, nondet!(/** test */))
989    /// #   .defer_tick(); // appears on the second tick
990    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
991    /// input_batch // first tick: [], second tick: [1, 2, 3]
992    ///     .max()
993    ///     .all_ticks()
994    /// # }, |mut stream| async move {
995    /// // [3]
996    /// # for w in vec![3] {
997    /// #     assert_eq!(stream.next().await.unwrap(), w);
998    /// # }
999    /// # }));
1000    /// ```
1001    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1002        self.into_stream().all_ticks()
1003    }
1004
1005    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1006    /// which will stream the value computed in _each_ tick as a separate stream element.
1007    ///
1008    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1009    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1010    /// optional's [`Tick`] context.
1011    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1012        self.into_stream().all_ticks_atomic()
1013    }
1014
1015    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1016    /// be asynchronously updated with the latest value of the optional inside the tick, including
1017    /// whether the optional is null or not.
1018    ///
1019    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1020    /// tick that tracks the inner value. This is useful for getting the value as of the
1021    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1022    ///
1023    /// # Example
1024    /// ```rust
1025    /// # use hydro_lang::prelude::*;
1026    /// # use futures::StreamExt;
1027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028    /// # let tick = process.tick();
1029    /// # // ticks are lazy by default, forces the second tick to run
1030    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1031    /// # let batch_first_tick = process
1032    /// #   .source_iter(q!(vec![]))
1033    /// #   .batch(&tick, nondet!(/** test */));
1034    /// # let batch_second_tick = process
1035    /// #   .source_iter(q!(vec![1, 2, 3]))
1036    /// #   .batch(&tick, nondet!(/** test */))
1037    /// #   .defer_tick(); // appears on the second tick
1038    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1039    /// input_batch // first tick: [], second tick: [1, 2, 3]
1040    ///     .max()
1041    ///     .latest()
1042    /// # .into_singleton()
1043    /// # .sample_eager(nondet!(/** test */))
1044    /// # }, |mut stream| async move {
1045    /// // asynchronously changes from None ~> 3
1046    /// # for w in vec![None, Some(3)] {
1047    /// #     assert_eq!(stream.next().await.unwrap(), w);
1048    /// # }
1049    /// # }));
1050    /// ```
1051    pub fn latest(self) -> Optional<T, L, Unbounded> {
1052        Optional::new(
1053            self.location.outer().clone(),
1054            HydroNode::YieldConcat {
1055                inner: Box::new(self.ir_node.into_inner()),
1056                metadata: self.location.outer().new_node_metadata::<T>(),
1057            },
1058        )
1059    }
1060
1061    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1062    /// be updated with the latest value of the optional inside the tick.
1063    ///
1064    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1065    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1066    /// optional's [`Tick`] context.
1067    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1068        let out_location = Atomic {
1069            tick: self.location.clone(),
1070        };
1071
1072        Optional::new(
1073            out_location.clone(),
1074            HydroNode::YieldConcat {
1075                inner: Box::new(self.ir_node.into_inner()),
1076                metadata: out_location.new_node_metadata::<T>(),
1077            },
1078        )
1079    }
1080
1081    #[expect(missing_docs, reason = "TODO")]
1082    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1083        Optional::new(
1084            self.location.clone(),
1085            HydroNode::DeferTick {
1086                input: Box::new(self.ir_node.into_inner()),
1087                metadata: self.location.new_node_metadata::<T>(),
1088            },
1089        )
1090    }
1091
1092    #[deprecated(note = "use .into_stream().persist()")]
1093    #[expect(missing_docs, reason = "deprecated")]
1094    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1095    where
1096        T: Clone,
1097    {
1098        self.into_stream().persist()
1099    }
1100
1101    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1102    /// non-null. Otherwise, the stream is empty.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # use hydro_lang::prelude::*;
1107    /// # use futures::StreamExt;
1108    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1109    /// # let tick = process.tick();
1110    /// # // ticks are lazy by default, forces the second tick to run
1111    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1112    /// # let batch_first_tick = process
1113    /// #   .source_iter(q!(vec![]))
1114    /// #   .batch(&tick, nondet!(/** test */));
1115    /// # let batch_second_tick = process
1116    /// #   .source_iter(q!(vec![123, 456]))
1117    /// #   .batch(&tick, nondet!(/** test */))
1118    /// #   .defer_tick(); // appears on the second tick
1119    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1120    /// input_batch // first tick: [], second tick: [123, 456]
1121    ///     .clone()
1122    ///     .max()
1123    ///     .into_stream()
1124    ///     .chain(input_batch)
1125    ///     .all_ticks()
1126    /// # }, |mut stream| async move {
1127    /// // [456, 123, 456]
1128    /// # for w in vec![456, 123, 456] {
1129    /// #     assert_eq!(stream.next().await.unwrap(), w);
1130    /// # }
1131    /// # }));
1132    /// ```
1133    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1134        Stream::new(self.location, self.ir_node.into_inner())
1135    }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140    use futures::StreamExt;
1141    use hydro_deploy::Deployment;
1142    use stageleft::q;
1143
1144    use super::Optional;
1145    use crate::compile::builder::FlowBuilder;
1146    use crate::location::Location;
1147
1148    #[tokio::test]
1149    async fn optional_or_cardinality() {
1150        let mut deployment = Deployment::new();
1151
1152        let flow = FlowBuilder::new();
1153        let node = flow.process::<()>();
1154        let external = flow.external::<()>();
1155
1156        let node_tick = node.tick();
1157        let tick_singleton = node_tick.singleton(q!(123));
1158        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1159        let counts = tick_optional_inhabited
1160            .clone()
1161            .or(tick_optional_inhabited)
1162            .into_stream()
1163            .count()
1164            .all_ticks()
1165            .send_bincode_external(&external);
1166
1167        let nodes = flow
1168            .with_process(&node, deployment.Localhost())
1169            .with_external(&external, deployment.Localhost())
1170            .deploy(&mut deployment);
1171
1172        deployment.deploy().await.unwrap();
1173
1174        let mut external_out = nodes.connect_source_bincode(counts).await;
1175
1176        deployment.start().await.unwrap();
1177
1178        assert_eq!(external_out.next().await.unwrap(), 1);
1179    }
1180}