hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46 L: Location<'a>,
47{
48 fn defer_tick(self) -> Self {
49 Optional::defer_tick(self)
50 }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60 Optional::new(
61 location.clone(),
62 HydroNode::CycleSource {
63 ident,
64 metadata: location.new_node_metadata::<T>(),
65 },
66 )
67 }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75 assert_eq!(
76 Location::id(&self.location),
77 expected_location,
78 "locations do not match"
79 );
80 self.location
81 .flow_state()
82 .borrow_mut()
83 .push_root(HydroRoot::CycleSink {
84 ident,
85 input: Box::new(self.ir_node.into_inner()),
86 out_location: Location::id(&self.location),
87 op_metadata: HydroIrOpMetadata::new(),
88 });
89 }
90}
91
92impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
93where
94 L: Location<'a>,
95{
96 type Location = Tick<L>;
97
98 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
99 Optional::new(
100 location.clone(),
101 HydroNode::CycleSource {
102 ident,
103 metadata: location.new_node_metadata::<T>(),
104 },
105 )
106 }
107}
108
109impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119 self.location
120 .flow_state()
121 .borrow_mut()
122 .push_root(HydroRoot::CycleSink {
123 ident,
124 input: Box::new(self.ir_node.into_inner()),
125 out_location: Location::id(&self.location),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133 L: Location<'a> + NoTick,
134{
135 type Location = L;
136
137 fn create_source(ident: syn::Ident, location: L) -> Self {
138 Optional::new(
139 location.clone(),
140 HydroNode::CycleSource {
141 ident,
142 metadata: location.new_node_metadata::<T>(),
143 },
144 )
145 }
146}
147
148impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
149where
150 L: Location<'a> + NoTick,
151{
152 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
153 assert_eq!(
154 Location::id(&self.location),
155 expected_location,
156 "locations do not match"
157 );
158 self.location
159 .flow_state()
160 .borrow_mut()
161 .push_root(HydroRoot::CycleSink {
162 ident,
163 input: Box::new(self.ir_node.into_inner()),
164 out_location: Location::id(&self.location),
165 op_metadata: HydroIrOpMetadata::new(),
166 });
167 }
168}
169
170impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
171where
172 L: Location<'a>,
173{
174 fn from(singleton: Optional<T, L, Bounded>) -> Self {
175 Optional::new(singleton.location, singleton.ir_node.into_inner())
176 }
177}
178
179impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
180where
181 L: Location<'a>,
182{
183 fn from(singleton: Singleton<T, L, B>) -> Self {
184 Optional::new(singleton.location, singleton.ir_node.into_inner())
185 }
186}
187
188#[cfg(stageleft_runtime)]
189fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
190 me: Optional<T, L, B>,
191 other: Optional<O, L, B>,
192) -> Optional<(T, O), L, B> {
193 check_matching_location(&me.location, &other.location);
194
195 Optional::new(
196 me.location.clone(),
197 HydroNode::CrossSingleton {
198 left: Box::new(me.ir_node.into_inner()),
199 right: Box::new(other.ir_node.into_inner()),
200 metadata: me.location.new_node_metadata::<(T, O)>(),
201 },
202 )
203}
204
205#[cfg(stageleft_runtime)]
206fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
207 me: Optional<T, L, B>,
208 other: Optional<T, L, B>,
209) -> Optional<T, L, B> {
210 check_matching_location(&me.location, &other.location);
211
212 Optional::new(
213 me.location.clone(),
214 HydroNode::ChainFirst {
215 first: Box::new(me.ir_node.into_inner()),
216 second: Box::new(other.ir_node.into_inner()),
217 metadata: me.location.new_node_metadata::<T>(),
218 },
219 )
220}
221
222impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
223where
224 T: Clone,
225 L: Location<'a>,
226{
227 fn clone(&self) -> Self {
228 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
229 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
230 *self.ir_node.borrow_mut() = HydroNode::Tee {
231 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
232 metadata: self.location.new_node_metadata::<T>(),
233 };
234 }
235
236 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
237 Optional {
238 location: self.location.clone(),
239 ir_node: HydroNode::Tee {
240 inner: TeeNode(inner.0.clone()),
241 metadata: metadata.clone(),
242 }
243 .into(),
244 _phantom: PhantomData,
245 }
246 } else {
247 unreachable!()
248 }
249 }
250}
251
252impl<'a, T, L, B: Boundedness> Optional<T, L, B>
253where
254 L: Location<'a>,
255{
256 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
257 debug_assert_eq!(Location::id(&location), ir_node.metadata().location_kind);
258 Optional {
259 location,
260 ir_node: RefCell::new(ir_node),
261 _phantom: PhantomData,
262 }
263 }
264
265 /// Transforms the optional value by applying a function `f` to it,
266 /// continuously as the input is updated.
267 ///
268 /// Whenever the optional is empty, the output optional is also empty.
269 ///
270 /// # Example
271 /// ```rust
272 /// # use hydro_lang::prelude::*;
273 /// # use futures::StreamExt;
274 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
275 /// let tick = process.tick();
276 /// let optional = tick.optional_first_tick(q!(1));
277 /// optional.map(q!(|v| v + 1)).all_ticks()
278 /// # }, |mut stream| async move {
279 /// // 2
280 /// # assert_eq!(stream.next().await.unwrap(), 2);
281 /// # }));
282 /// ```
283 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
284 where
285 F: Fn(T) -> U + 'a,
286 {
287 let f = f.splice_fn1_ctx(&self.location).into();
288 Optional::new(
289 self.location.clone(),
290 HydroNode::Map {
291 f,
292 input: Box::new(self.ir_node.into_inner()),
293 metadata: self.location.new_node_metadata::<U>(),
294 },
295 )
296 }
297
298 /// Transforms the optional value by applying a function `f` to it and then flattening
299 /// the result into a stream, preserving the order of elements.
300 ///
301 /// If the optional is empty, the output stream is also empty. If the optional contains
302 /// a value, `f` is applied to produce an iterator, and all items from that iterator
303 /// are emitted in the output stream in deterministic order.
304 ///
305 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
306 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
307 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
308 ///
309 /// # Example
310 /// ```rust
311 /// # use hydro_lang::prelude::*;
312 /// # use futures::StreamExt;
313 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
314 /// let tick = process.tick();
315 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
316 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
317 /// # }, |mut stream| async move {
318 /// // 1, 2, 3
319 /// # for w in vec![1, 2, 3] {
320 /// # assert_eq!(stream.next().await.unwrap(), w);
321 /// # }
322 /// # }));
323 /// ```
324 pub fn flat_map_ordered<U, I, F>(
325 self,
326 f: impl IntoQuotedMut<'a, F, L>,
327 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
328 where
329 I: IntoIterator<Item = U>,
330 F: Fn(T) -> I + 'a,
331 {
332 let f = f.splice_fn1_ctx(&self.location).into();
333 Stream::new(
334 self.location.clone(),
335 HydroNode::FlatMap {
336 f,
337 input: Box::new(self.ir_node.into_inner()),
338 metadata: self.location.new_node_metadata::<U>(),
339 },
340 )
341 }
342
343 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
344 /// for the output type `I` to produce items in any order.
345 ///
346 /// If the optional is empty, the output stream is also empty. If the optional contains
347 /// a value, `f` is applied to produce an iterator, and all items from that iterator
348 /// are emitted in the output stream in non-deterministic order.
349 ///
350 /// # Example
351 /// ```rust
352 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
353 /// # use futures::StreamExt;
354 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
355 /// let tick = process.tick();
356 /// let optional = tick.optional_first_tick(q!(
357 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
358 /// ));
359 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
360 /// # }, |mut stream| async move {
361 /// // 1, 2, 3, but in no particular order
362 /// # let mut results = Vec::new();
363 /// # for _ in 0..3 {
364 /// # results.push(stream.next().await.unwrap());
365 /// # }
366 /// # results.sort();
367 /// # assert_eq!(results, vec![1, 2, 3]);
368 /// # }));
369 /// ```
370 pub fn flat_map_unordered<U, I, F>(
371 self,
372 f: impl IntoQuotedMut<'a, F, L>,
373 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
374 where
375 I: IntoIterator<Item = U>,
376 F: Fn(T) -> I + 'a,
377 {
378 let f = f.splice_fn1_ctx(&self.location).into();
379 Stream::new(
380 self.location.clone(),
381 HydroNode::FlatMap {
382 f,
383 input: Box::new(self.ir_node.into_inner()),
384 metadata: self.location.new_node_metadata::<U>(),
385 },
386 )
387 }
388
389 /// Flattens the optional value into a stream, preserving the order of elements.
390 ///
391 /// If the optional is empty, the output stream is also empty. If the optional contains
392 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
393 /// in the output stream in deterministic order.
394 ///
395 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
396 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
397 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
398 ///
399 /// # Example
400 /// ```rust
401 /// # use hydro_lang::prelude::*;
402 /// # use futures::StreamExt;
403 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
404 /// let tick = process.tick();
405 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
406 /// optional.flatten_ordered().all_ticks()
407 /// # }, |mut stream| async move {
408 /// // 1, 2, 3
409 /// # for w in vec![1, 2, 3] {
410 /// # assert_eq!(stream.next().await.unwrap(), w);
411 /// # }
412 /// # }));
413 /// ```
414 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
415 where
416 T: IntoIterator<Item = U>,
417 {
418 self.flat_map_ordered(q!(|v| v))
419 }
420
421 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
422 /// for the element type `T` to produce items in any order.
423 ///
424 /// If the optional is empty, the output stream is also empty. If the optional contains
425 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
426 /// in the output stream in non-deterministic order.
427 ///
428 /// # Example
429 /// ```rust
430 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
431 /// # use futures::StreamExt;
432 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
433 /// let tick = process.tick();
434 /// let optional = tick.optional_first_tick(q!(
435 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
436 /// ));
437 /// optional.flatten_unordered().all_ticks()
438 /// # }, |mut stream| async move {
439 /// // 1, 2, 3, but in no particular order
440 /// # let mut results = Vec::new();
441 /// # for _ in 0..3 {
442 /// # results.push(stream.next().await.unwrap());
443 /// # }
444 /// # results.sort();
445 /// # assert_eq!(results, vec![1, 2, 3]);
446 /// # }));
447 /// ```
448 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
449 where
450 T: IntoIterator<Item = U>,
451 {
452 self.flat_map_unordered(q!(|v| v))
453 }
454
455 /// Creates an optional containing only the value if it satisfies a predicate `f`.
456 ///
457 /// If the optional is empty, the output optional is also empty. If the optional contains
458 /// a value and the predicate returns `true`, the output optional contains the same value.
459 /// If the predicate returns `false`, the output optional is empty.
460 ///
461 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
462 /// not modify or take ownership of the value. If you need to modify the value while filtering
463 /// use [`Optional::filter_map`] instead.
464 ///
465 /// # Example
466 /// ```rust
467 /// # use hydro_lang::prelude::*;
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470 /// let tick = process.tick();
471 /// let optional = tick.optional_first_tick(q!(5));
472 /// optional.filter(q!(|&x| x > 3)).all_ticks()
473 /// # }, |mut stream| async move {
474 /// // 5
475 /// # assert_eq!(stream.next().await.unwrap(), 5);
476 /// # }));
477 /// ```
478 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
479 where
480 F: Fn(&T) -> bool + 'a,
481 {
482 let f = f.splice_fn1_borrow_ctx(&self.location).into();
483 Optional::new(
484 self.location.clone(),
485 HydroNode::Filter {
486 f,
487 input: Box::new(self.ir_node.into_inner()),
488 metadata: self.location.new_node_metadata::<T>(),
489 },
490 )
491 }
492
493 /// An operator that both filters and maps. It yields only the value if the supplied
494 /// closure `f` returns `Some(value)`.
495 ///
496 /// If the optional is empty, the output optional is also empty. If the optional contains
497 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
498 /// If the closure returns `None`, the output optional is empty.
499 ///
500 /// # Example
501 /// ```rust
502 /// # use hydro_lang::prelude::*;
503 /// # use futures::StreamExt;
504 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
505 /// let tick = process.tick();
506 /// let optional = tick.optional_first_tick(q!("42"));
507 /// optional
508 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
509 /// .all_ticks()
510 /// # }, |mut stream| async move {
511 /// // 42
512 /// # assert_eq!(stream.next().await.unwrap(), 42);
513 /// # }));
514 /// ```
515 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
516 where
517 F: Fn(T) -> Option<U> + 'a,
518 {
519 let f = f.splice_fn1_ctx(&self.location).into();
520 Optional::new(
521 self.location.clone(),
522 HydroNode::FilterMap {
523 f,
524 input: Box::new(self.ir_node.into_inner()),
525 metadata: self.location.new_node_metadata::<U>(),
526 },
527 )
528 }
529
530 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
531 ///
532 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
533 /// non-null. This is useful for combining several pieces of state together.
534 ///
535 /// # Example
536 /// ```rust
537 /// # use hydro_lang::prelude::*;
538 /// # use futures::StreamExt;
539 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
540 /// let tick = process.tick();
541 /// let numbers = process
542 /// .source_iter(q!(vec![123, 456, 789]))
543 /// .batch(&tick, nondet!(/** test */));
544 /// let min = numbers.clone().min(); // Optional
545 /// let max = numbers.max(); // Optional
546 /// min.zip(max).all_ticks()
547 /// # }, |mut stream| async move {
548 /// // [(123, 789)]
549 /// # for w in vec![(123, 789)] {
550 /// # assert_eq!(stream.next().await.unwrap(), w);
551 /// # }
552 /// # }));
553 /// ```
554 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
555 where
556 O: Clone,
557 {
558 let other: Optional<O, L, B> = other.into();
559 check_matching_location(&self.location, &other.location);
560
561 if L::is_top_level()
562 && let Some(tick) = self.location.try_tick()
563 {
564 let out = zip_inside_tick(
565 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
566 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
567 )
568 .latest();
569
570 Optional::new(out.location, out.ir_node.into_inner())
571 } else {
572 zip_inside_tick(self, other)
573 }
574 }
575
576 /// Passes through `self` when it has a value, otherwise passes through `other`.
577 ///
578 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
579 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
580 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
581 ///
582 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
583 /// of the inputs change (including to/from null states).
584 ///
585 /// # Example
586 /// ```rust
587 /// # use hydro_lang::prelude::*;
588 /// # use futures::StreamExt;
589 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
590 /// let tick = process.tick();
591 /// // ticks are lazy by default, forces the second tick to run
592 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
593 ///
594 /// let some_first_tick = tick.optional_first_tick(q!(123));
595 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
596 /// some_first_tick.or(some_second_tick).all_ticks()
597 /// # }, |mut stream| async move {
598 /// // [123 /* first tick */, 456 /* second tick */]
599 /// # for w in vec![123, 456] {
600 /// # assert_eq!(stream.next().await.unwrap(), w);
601 /// # }
602 /// # }));
603 /// ```
604 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
605 check_matching_location(&self.location, &other.location);
606
607 if L::is_top_level()
608 && let Some(tick) = self.location.try_tick()
609 {
610 let out = or_inside_tick(
611 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
612 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
613 )
614 .latest();
615
616 Optional::new(out.location, out.ir_node.into_inner())
617 } else {
618 Optional::new(
619 self.location.clone(),
620 HydroNode::ChainFirst {
621 first: Box::new(self.ir_node.into_inner()),
622 second: Box::new(other.ir_node.into_inner()),
623 metadata: self.location.new_node_metadata::<T>(),
624 },
625 )
626 }
627 }
628
629 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
630 ///
631 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
632 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
633 ///
634 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
635 /// of the inputs change (including to/from null states).
636 ///
637 /// # Example
638 /// ```rust
639 /// # use hydro_lang::prelude::*;
640 /// # use futures::StreamExt;
641 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
642 /// let tick = process.tick();
643 /// // ticks are lazy by default, forces the later ticks to run
644 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
645 ///
646 /// let some_first_tick = tick.optional_first_tick(q!(123));
647 /// some_first_tick
648 /// .unwrap_or(tick.singleton(q!(456)))
649 /// .all_ticks()
650 /// # }, |mut stream| async move {
651 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
652 /// # for w in vec![123, 456, 456, 456] {
653 /// # assert_eq!(stream.next().await.unwrap(), w);
654 /// # }
655 /// # }));
656 /// ```
657 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
658 let res_option = self.or(other.into());
659 Singleton::new(res_option.location, res_option.ir_node.into_inner())
660 }
661
662 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
663 ///
664 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
665 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
666 /// so that Hydro can skip any computation on null values.
667 ///
668 /// # Example
669 /// ```rust
670 /// # use hydro_lang::prelude::*;
671 /// # use futures::StreamExt;
672 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673 /// let tick = process.tick();
674 /// // ticks are lazy by default, forces the later ticks to run
675 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
676 ///
677 /// let some_first_tick = tick.optional_first_tick(q!(123));
678 /// some_first_tick.into_singleton().all_ticks()
679 /// # }, |mut stream| async move {
680 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
681 /// # for w in vec![Some(123), None, None, None] {
682 /// # assert_eq!(stream.next().await.unwrap(), w);
683 /// # }
684 /// # }));
685 /// ```
686 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
687 where
688 T: Clone,
689 {
690 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
691 let core_ir = HydroNode::Source {
692 source: HydroSource::Iter(none.into()),
693 metadata: self.location.new_node_metadata::<Option<T>>(),
694 };
695
696 let none_singleton = if L::is_top_level() {
697 Singleton::new(
698 self.location.clone(),
699 HydroNode::Persist {
700 inner: Box::new(core_ir),
701 metadata: self.location.new_node_metadata::<Option<T>>(),
702 },
703 )
704 } else {
705 Singleton::new(self.location.clone(), core_ir)
706 };
707
708 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
709 }
710
711 /// An operator which allows you to "name" a `HydroNode`.
712 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
713 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
714 {
715 let mut node = self.ir_node.borrow_mut();
716 let metadata = node.metadata_mut();
717 metadata.tag = Some(name.to_string());
718 }
719 self
720 }
721}
722
723impl<'a, T, L> Optional<T, L, Bounded>
724where
725 L: Location<'a>,
726{
727 /// Filters this optional, passing through the optional value if it is non-null **and** the
728 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
729 ///
730 /// Useful for conditionally processing, such as only emitting an optional's value outside
731 /// a tick if some other condition is satisfied.
732 ///
733 /// # Example
734 /// ```rust
735 /// # use hydro_lang::prelude::*;
736 /// # use futures::StreamExt;
737 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
738 /// let tick = process.tick();
739 /// // ticks are lazy by default, forces the second tick to run
740 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
741 ///
742 /// let batch_first_tick = process
743 /// .source_iter(q!(vec![]))
744 /// .batch(&tick, nondet!(/** test */));
745 /// let batch_second_tick = process
746 /// .source_iter(q!(vec![456]))
747 /// .batch(&tick, nondet!(/** test */))
748 /// .defer_tick(); // appears on the second tick
749 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
750 /// batch_first_tick.chain(batch_second_tick).first()
751 /// .filter_if_some(some_on_first_tick)
752 /// .unwrap_or(tick.singleton(q!(789)))
753 /// .all_ticks()
754 /// # }, |mut stream| async move {
755 /// // [789, 789]
756 /// # for w in vec![789, 789] {
757 /// # assert_eq!(stream.next().await.unwrap(), w);
758 /// # }
759 /// # }));
760 /// ```
761 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
762 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
763 }
764
765 /// Filters this optional, passing through the optional value if it is non-null **and** the
766 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
767 ///
768 /// Useful for conditionally processing, such as only emitting an optional's value outside
769 /// a tick if some other condition is satisfied.
770 ///
771 /// # Example
772 /// ```rust
773 /// # use hydro_lang::prelude::*;
774 /// # use futures::StreamExt;
775 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
776 /// let tick = process.tick();
777 /// // ticks are lazy by default, forces the second tick to run
778 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
779 ///
780 /// let batch_first_tick = process
781 /// .source_iter(q!(vec![]))
782 /// .batch(&tick, nondet!(/** test */));
783 /// let batch_second_tick = process
784 /// .source_iter(q!(vec![456]))
785 /// .batch(&tick, nondet!(/** test */))
786 /// .defer_tick(); // appears on the second tick
787 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
788 /// batch_first_tick.chain(batch_second_tick).first()
789 /// .filter_if_none(some_on_first_tick)
790 /// .unwrap_or(tick.singleton(q!(789)))
791 /// .all_ticks()
792 /// # }, |mut stream| async move {
793 /// // [789, 789]
794 /// # for w in vec![789, 456] {
795 /// # assert_eq!(stream.next().await.unwrap(), w);
796 /// # }
797 /// # }));
798 /// ```
799 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
800 self.filter_if_some(
801 other
802 .map(q!(|_| ()))
803 .into_singleton()
804 .filter(q!(|o| o.is_none())),
805 )
806 }
807
808 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
809 ///
810 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
811 /// having a value, such as only releasing a piece of state if the node is the leader.
812 ///
813 /// # Example
814 /// ```rust
815 /// # use hydro_lang::prelude::*;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818 /// let tick = process.tick();
819 /// // ticks are lazy by default, forces the second tick to run
820 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
821 ///
822 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
823 /// some_on_first_tick
824 /// .if_some_then(tick.singleton(q!(456)))
825 /// .unwrap_or(tick.singleton(q!(123)))
826 /// # .all_ticks()
827 /// # }, |mut stream| async move {
828 /// // 456 (first tick) ~> 123 (second tick onwards)
829 /// # for w in vec![456, 123, 123] {
830 /// # assert_eq!(stream.next().await.unwrap(), w);
831 /// # }
832 /// # }));
833 /// ```
834 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
835 value.filter_if_some(self)
836 }
837}
838
839impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
840where
841 L: Location<'a> + NoTick,
842{
843 /// Returns an optional value corresponding to the latest snapshot of the optional
844 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
845 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
846 /// all snapshots of this optional into the atomic-associated tick will observe the
847 /// same value each tick.
848 ///
849 /// # Non-Determinism
850 /// Because this picks a snapshot of a optional whose value is continuously changing,
851 /// the output optional has a non-deterministic value since the snapshot can be at an
852 /// arbitrary point in time.
853 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
854 Optional::new(
855 self.location.clone().tick,
856 HydroNode::Batch {
857 inner: Box::new(self.ir_node.into_inner()),
858 metadata: self.location.new_node_metadata::<T>(),
859 },
860 )
861 }
862
863 /// Returns this optional back into a top-level, asynchronous execution context where updates
864 /// to the value will be asynchronously propagated.
865 pub fn end_atomic(self) -> Optional<T, L, B> {
866 Optional::new(
867 self.location.tick.l.clone(),
868 HydroNode::EndAtomic {
869 inner: Box::new(self.ir_node.into_inner()),
870 metadata: self.location.tick.l.new_node_metadata::<T>(),
871 },
872 )
873 }
874}
875
876impl<'a, T, L, B: Boundedness> Optional<T, L, B>
877where
878 L: Location<'a>,
879{
880 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
881 /// will observe the same version of the value and will be executed synchronously before any
882 /// outputs are yielded (in [`Optional::end_atomic`]).
883 ///
884 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
885 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
886 /// a different version).
887 ///
888 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
889 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
890 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
891 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
892 let out_location = Atomic { tick: tick.clone() };
893 Optional::new(
894 out_location.clone(),
895 HydroNode::BeginAtomic {
896 inner: Box::new(self.ir_node.into_inner()),
897 metadata: out_location.new_node_metadata::<T>(),
898 },
899 )
900 }
901
902 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
903 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
904 /// relevant data that contributed to the snapshot at tick `t`.
905 ///
906 /// # Non-Determinism
907 /// Because this picks a snapshot of a optional whose value is continuously changing,
908 /// the output optional has a non-deterministic value since the snapshot can be at an
909 /// arbitrary point in time.
910 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
911 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
912 Optional::new(
913 tick.clone(),
914 HydroNode::Batch {
915 inner: Box::new(self.ir_node.into_inner()),
916 metadata: tick.new_node_metadata::<T>(),
917 },
918 )
919 }
920
921 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
922 /// with order corresponding to increasing prefixes of data contributing to the optional.
923 ///
924 /// # Non-Determinism
925 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
926 /// to non-deterministic batching and arrival of inputs, the output stream is
927 /// non-deterministic.
928 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
929 where
930 L: NoTick,
931 {
932 let tick = self.location.tick();
933 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
934 }
935
936 /// Given a time interval, returns a stream corresponding to snapshots of the optional
937 /// value taken at various points in time. Because the input optional may be
938 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
939 /// represent the value of the optional given some prefix of the streams leading up to
940 /// it.
941 ///
942 /// # Non-Determinism
943 /// The output stream is non-deterministic in which elements are sampled, since this
944 /// is controlled by a clock.
945 pub fn sample_every(
946 self,
947 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
948 nondet: NonDet,
949 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
950 where
951 L: NoTick + NoAtomic,
952 {
953 let samples = self.location.source_interval(interval, nondet);
954 let tick = self.location.tick();
955
956 self.snapshot(&tick, nondet)
957 .filter_if_some(samples.batch(&tick, nondet).first())
958 .all_ticks()
959 .weakest_retries()
960 }
961}
962
963impl<'a, T, L> Optional<T, Tick<L>, Bounded>
964where
965 L: Location<'a>,
966{
967 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
968 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
969 /// null values).
970 ///
971 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
972 /// producing one element in the output for each (non-null) tick. This is useful for batched
973 /// computations, where the results from each tick must be combined together.
974 ///
975 /// # Example
976 /// ```rust
977 /// # use hydro_lang::prelude::*;
978 /// # use futures::StreamExt;
979 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980 /// # let tick = process.tick();
981 /// # // ticks are lazy by default, forces the second tick to run
982 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
983 /// # let batch_first_tick = process
984 /// # .source_iter(q!(vec![]))
985 /// # .batch(&tick, nondet!(/** test */));
986 /// # let batch_second_tick = process
987 /// # .source_iter(q!(vec![1, 2, 3]))
988 /// # .batch(&tick, nondet!(/** test */))
989 /// # .defer_tick(); // appears on the second tick
990 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
991 /// input_batch // first tick: [], second tick: [1, 2, 3]
992 /// .max()
993 /// .all_ticks()
994 /// # }, |mut stream| async move {
995 /// // [3]
996 /// # for w in vec![3] {
997 /// # assert_eq!(stream.next().await.unwrap(), w);
998 /// # }
999 /// # }));
1000 /// ```
1001 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1002 self.into_stream().all_ticks()
1003 }
1004
1005 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1006 /// which will stream the value computed in _each_ tick as a separate stream element.
1007 ///
1008 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1009 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1010 /// optional's [`Tick`] context.
1011 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1012 self.into_stream().all_ticks_atomic()
1013 }
1014
1015 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1016 /// be asynchronously updated with the latest value of the optional inside the tick, including
1017 /// whether the optional is null or not.
1018 ///
1019 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1020 /// tick that tracks the inner value. This is useful for getting the value as of the
1021 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1022 ///
1023 /// # Example
1024 /// ```rust
1025 /// # use hydro_lang::prelude::*;
1026 /// # use futures::StreamExt;
1027 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028 /// # let tick = process.tick();
1029 /// # // ticks are lazy by default, forces the second tick to run
1030 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1031 /// # let batch_first_tick = process
1032 /// # .source_iter(q!(vec![]))
1033 /// # .batch(&tick, nondet!(/** test */));
1034 /// # let batch_second_tick = process
1035 /// # .source_iter(q!(vec![1, 2, 3]))
1036 /// # .batch(&tick, nondet!(/** test */))
1037 /// # .defer_tick(); // appears on the second tick
1038 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1039 /// input_batch // first tick: [], second tick: [1, 2, 3]
1040 /// .max()
1041 /// .latest()
1042 /// # .into_singleton()
1043 /// # .sample_eager(nondet!(/** test */))
1044 /// # }, |mut stream| async move {
1045 /// // asynchronously changes from None ~> 3
1046 /// # for w in vec![None, Some(3)] {
1047 /// # assert_eq!(stream.next().await.unwrap(), w);
1048 /// # }
1049 /// # }));
1050 /// ```
1051 pub fn latest(self) -> Optional<T, L, Unbounded> {
1052 Optional::new(
1053 self.location.outer().clone(),
1054 HydroNode::YieldConcat {
1055 inner: Box::new(self.ir_node.into_inner()),
1056 metadata: self.location.outer().new_node_metadata::<T>(),
1057 },
1058 )
1059 }
1060
1061 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1062 /// be updated with the latest value of the optional inside the tick.
1063 ///
1064 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1065 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1066 /// optional's [`Tick`] context.
1067 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1068 let out_location = Atomic {
1069 tick: self.location.clone(),
1070 };
1071
1072 Optional::new(
1073 out_location.clone(),
1074 HydroNode::YieldConcat {
1075 inner: Box::new(self.ir_node.into_inner()),
1076 metadata: out_location.new_node_metadata::<T>(),
1077 },
1078 )
1079 }
1080
1081 #[expect(missing_docs, reason = "TODO")]
1082 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1083 Optional::new(
1084 self.location.clone(),
1085 HydroNode::DeferTick {
1086 input: Box::new(self.ir_node.into_inner()),
1087 metadata: self.location.new_node_metadata::<T>(),
1088 },
1089 )
1090 }
1091
1092 #[deprecated(note = "use .into_stream().persist()")]
1093 #[expect(missing_docs, reason = "deprecated")]
1094 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1095 where
1096 T: Clone,
1097 {
1098 self.into_stream().persist()
1099 }
1100
1101 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1102 /// non-null. Otherwise, the stream is empty.
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # use hydro_lang::prelude::*;
1107 /// # use futures::StreamExt;
1108 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1109 /// # let tick = process.tick();
1110 /// # // ticks are lazy by default, forces the second tick to run
1111 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1112 /// # let batch_first_tick = process
1113 /// # .source_iter(q!(vec![]))
1114 /// # .batch(&tick, nondet!(/** test */));
1115 /// # let batch_second_tick = process
1116 /// # .source_iter(q!(vec![123, 456]))
1117 /// # .batch(&tick, nondet!(/** test */))
1118 /// # .defer_tick(); // appears on the second tick
1119 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1120 /// input_batch // first tick: [], second tick: [123, 456]
1121 /// .clone()
1122 /// .max()
1123 /// .into_stream()
1124 /// .chain(input_batch)
1125 /// .all_ticks()
1126 /// # }, |mut stream| async move {
1127 /// // [456, 123, 456]
1128 /// # for w in vec![456, 123, 456] {
1129 /// # assert_eq!(stream.next().await.unwrap(), w);
1130 /// # }
1131 /// # }));
1132 /// ```
1133 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1134 Stream::new(self.location, self.ir_node.into_inner())
1135 }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140 use futures::StreamExt;
1141 use hydro_deploy::Deployment;
1142 use stageleft::q;
1143
1144 use super::Optional;
1145 use crate::compile::builder::FlowBuilder;
1146 use crate::location::Location;
1147
1148 #[tokio::test]
1149 async fn optional_or_cardinality() {
1150 let mut deployment = Deployment::new();
1151
1152 let flow = FlowBuilder::new();
1153 let node = flow.process::<()>();
1154 let external = flow.external::<()>();
1155
1156 let node_tick = node.tick();
1157 let tick_singleton = node_tick.singleton(q!(123));
1158 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1159 let counts = tick_optional_inhabited
1160 .clone()
1161 .or(tick_optional_inhabited)
1162 .into_stream()
1163 .count()
1164 .all_ticks()
1165 .send_bincode_external(&external);
1166
1167 let nodes = flow
1168 .with_process(&node, deployment.Localhost())
1169 .with_external(&external, deployment.Localhost())
1170 .deploy(&mut deployment);
1171
1172 deployment.deploy().await.unwrap();
1173
1174 let mut external_out = nodes.connect_source_bincode(counts).await;
1175
1176 deployment.start().await.unwrap();
1177
1178 assert_eq!(external_out.next().await.unwrap(), 1);
1179 }
1180}