hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, DeferTick, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46 L: Location<'a>,
47{
48 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49 Singleton::new(singleton.location, singleton.ir_node.into_inner())
50 }
51}
52
53impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 fn defer_tick(self) -> Self {
58 Singleton::defer_tick(self)
59 }
60}
61
62impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
63where
64 L: Location<'a>,
65{
66 type Location = Tick<L>;
67
68 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
69 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
70 location.clone(),
71 HydroNode::DeferTick {
72 input: Box::new(HydroNode::CycleSource {
73 ident,
74 metadata: location.new_node_metadata::<T>(),
75 }),
76 metadata: location.new_node_metadata::<T>(),
77 },
78 );
79
80 from_previous_tick.unwrap_or(initial)
81 }
82}
83
84impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
85where
86 L: Location<'a>,
87{
88 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
89 assert_eq!(
90 Location::id(&self.location),
91 expected_location,
92 "locations do not match"
93 );
94 self.location
95 .flow_state()
96 .borrow_mut()
97 .push_root(HydroRoot::CycleSink {
98 ident,
99 input: Box::new(self.ir_node.into_inner()),
100 out_location: Location::id(&self.location),
101 op_metadata: HydroIrOpMetadata::new(),
102 });
103 }
104}
105
106impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
107where
108 L: Location<'a>,
109{
110 type Location = Tick<L>;
111
112 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
113 Singleton::new(
114 location.clone(),
115 HydroNode::CycleSource {
116 ident,
117 metadata: location.new_node_metadata::<T>(),
118 },
119 )
120 }
121}
122
123impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
124where
125 L: Location<'a>,
126{
127 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
128 assert_eq!(
129 Location::id(&self.location),
130 expected_location,
131 "locations do not match"
132 );
133 self.location
134 .flow_state()
135 .borrow_mut()
136 .push_root(HydroRoot::CycleSink {
137 ident,
138 input: Box::new(self.ir_node.into_inner()),
139 out_location: Location::id(&self.location),
140 op_metadata: HydroIrOpMetadata::new(),
141 });
142 }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
146where
147 L: Location<'a> + NoTick,
148{
149 type Location = L;
150
151 fn create_source(ident: syn::Ident, location: L) -> Self {
152 Singleton::new(
153 location.clone(),
154 HydroNode::CycleSource {
155 ident,
156 metadata: location.new_node_metadata::<T>(),
157 },
158 )
159 }
160}
161
162impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
163where
164 L: Location<'a> + NoTick,
165{
166 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
167 assert_eq!(
168 Location::id(&self.location),
169 expected_location,
170 "locations do not match"
171 );
172 self.location
173 .flow_state()
174 .borrow_mut()
175 .push_root(HydroRoot::CycleSink {
176 ident,
177 input: Box::new(self.ir_node.into_inner()),
178 out_location: Location::id(&self.location),
179 op_metadata: HydroIrOpMetadata::new(),
180 });
181 }
182}
183
184impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
185where
186 T: Clone,
187 L: Location<'a>,
188{
189 fn clone(&self) -> Self {
190 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192 *self.ir_node.borrow_mut() = HydroNode::Tee {
193 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194 metadata: self.location.new_node_metadata::<T>(),
195 };
196 }
197
198 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199 Singleton {
200 location: self.location.clone(),
201 ir_node: HydroNode::Tee {
202 inner: TeeNode(inner.0.clone()),
203 metadata: metadata.clone(),
204 }
205 .into(),
206 _phantom: PhantomData,
207 }
208 } else {
209 unreachable!()
210 }
211 }
212}
213
214#[cfg(stageleft_runtime)]
215fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
216 me: Singleton<T, Tick<L>, B>,
217 other: O,
218) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
219where
220 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
221{
222 check_matching_location(
223 &me.location,
224 &Singleton::<T, Tick<L>, B>::other_location(&other),
225 );
226
227 Singleton::<T, Tick<L>, B>::make(
228 me.location.clone(),
229 HydroNode::CrossSingleton {
230 left: Box::new(me.ir_node.into_inner()),
231 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
232 metadata: me
233 .location
234 .new_node_metadata::<<Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType>(),
235 },
236 )
237}
238
239impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
240where
241 L: Location<'a>,
242{
243 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
244 debug_assert_eq!(Location::id(&location), ir_node.metadata().location_kind);
245 Singleton {
246 location,
247 ir_node: RefCell::new(ir_node),
248 _phantom: PhantomData,
249 }
250 }
251
252 /// Transforms the singleton value by applying a function `f` to it,
253 /// continuously as the input is updated.
254 ///
255 /// # Example
256 /// ```rust
257 /// # use hydro_lang::prelude::*;
258 /// # use futures::StreamExt;
259 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
260 /// let tick = process.tick();
261 /// let singleton = tick.singleton(q!(5));
262 /// singleton.map(q!(|v| v * 2)).all_ticks()
263 /// # }, |mut stream| async move {
264 /// // 10
265 /// # assert_eq!(stream.next().await.unwrap(), 10);
266 /// # }));
267 /// ```
268 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
269 where
270 F: Fn(T) -> U + 'a,
271 {
272 let f = f.splice_fn1_ctx(&self.location).into();
273 Singleton::new(
274 self.location.clone(),
275 HydroNode::Map {
276 f,
277 input: Box::new(self.ir_node.into_inner()),
278 metadata: self.location.new_node_metadata::<U>(),
279 },
280 )
281 }
282
283 /// Transforms the singleton value by applying a function `f` to it and then flattening
284 /// the result into a stream, preserving the order of elements.
285 ///
286 /// The function `f` is applied to the singleton value to produce an iterator, and all items
287 /// from that iterator are emitted in the output stream in deterministic order.
288 ///
289 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
290 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
291 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
292 ///
293 /// # Example
294 /// ```rust
295 /// # use hydro_lang::prelude::*;
296 /// # use futures::StreamExt;
297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298 /// let tick = process.tick();
299 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
300 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
301 /// # }, |mut stream| async move {
302 /// // 1, 2, 3
303 /// # for w in vec![1, 2, 3] {
304 /// # assert_eq!(stream.next().await.unwrap(), w);
305 /// # }
306 /// # }));
307 /// ```
308 pub fn flat_map_ordered<U, I, F>(
309 self,
310 f: impl IntoQuotedMut<'a, F, L>,
311 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
312 where
313 I: IntoIterator<Item = U>,
314 F: Fn(T) -> I + 'a,
315 {
316 let f = f.splice_fn1_ctx(&self.location).into();
317 Stream::new(
318 self.location.clone(),
319 HydroNode::FlatMap {
320 f,
321 input: Box::new(self.ir_node.into_inner()),
322 metadata: self.location.new_node_metadata::<U>(),
323 },
324 )
325 }
326
327 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
328 /// for the output type `I` to produce items in any order.
329 ///
330 /// The function `f` is applied to the singleton value to produce an iterator, and all items
331 /// from that iterator are emitted in the output stream in non-deterministic order.
332 ///
333 /// # Example
334 /// ```rust
335 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
336 /// # use futures::StreamExt;
337 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
338 /// let tick = process.tick();
339 /// let singleton = tick.singleton(q!(
340 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
341 /// ));
342 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
343 /// # }, |mut stream| async move {
344 /// // 1, 2, 3, but in no particular order
345 /// # let mut results = Vec::new();
346 /// # for _ in 0..3 {
347 /// # results.push(stream.next().await.unwrap());
348 /// # }
349 /// # results.sort();
350 /// # assert_eq!(results, vec![1, 2, 3]);
351 /// # }));
352 /// ```
353 pub fn flat_map_unordered<U, I, F>(
354 self,
355 f: impl IntoQuotedMut<'a, F, L>,
356 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
357 where
358 I: IntoIterator<Item = U>,
359 F: Fn(T) -> I + 'a,
360 {
361 let f = f.splice_fn1_ctx(&self.location).into();
362 Stream::new(
363 self.location.clone(),
364 HydroNode::FlatMap {
365 f,
366 input: Box::new(self.ir_node.into_inner()),
367 metadata: self.location.new_node_metadata::<U>(),
368 },
369 )
370 }
371
372 /// Flattens the singleton value into a stream, preserving the order of elements.
373 ///
374 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
375 /// are emitted in the output stream in deterministic order.
376 ///
377 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
378 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
379 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
380 ///
381 /// # Example
382 /// ```rust
383 /// # use hydro_lang::prelude::*;
384 /// # use futures::StreamExt;
385 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
386 /// let tick = process.tick();
387 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
388 /// singleton.flatten_ordered().all_ticks()
389 /// # }, |mut stream| async move {
390 /// // 1, 2, 3
391 /// # for w in vec![1, 2, 3] {
392 /// # assert_eq!(stream.next().await.unwrap(), w);
393 /// # }
394 /// # }));
395 /// ```
396 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
397 where
398 T: IntoIterator<Item = U>,
399 {
400 self.flat_map_ordered(q!(|x| x))
401 }
402
403 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
404 /// for the element type `T` to produce items in any order.
405 ///
406 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
407 /// are emitted in the output stream in non-deterministic order.
408 ///
409 /// # Example
410 /// ```rust
411 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
414 /// let tick = process.tick();
415 /// let singleton = tick.singleton(q!(
416 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
417 /// ));
418 /// singleton.flatten_unordered().all_ticks()
419 /// # }, |mut stream| async move {
420 /// // 1, 2, 3, but in no particular order
421 /// # let mut results = Vec::new();
422 /// # for _ in 0..3 {
423 /// # results.push(stream.next().await.unwrap());
424 /// # }
425 /// # results.sort();
426 /// # assert_eq!(results, vec![1, 2, 3]);
427 /// # }));
428 /// ```
429 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
430 where
431 T: IntoIterator<Item = U>,
432 {
433 self.flat_map_unordered(q!(|x| x))
434 }
435
436 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
437 ///
438 /// If the predicate returns `true`, the output optional contains the same value.
439 /// If the predicate returns `false`, the output optional is empty.
440 ///
441 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
442 /// not modify or take ownership of the value. If you need to modify the value while filtering
443 /// use [`Singleton::filter_map`] instead.
444 ///
445 /// # Example
446 /// ```rust
447 /// # use hydro_lang::prelude::*;
448 /// # use futures::StreamExt;
449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
450 /// let tick = process.tick();
451 /// let singleton = tick.singleton(q!(5));
452 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
453 /// # }, |mut stream| async move {
454 /// // 5
455 /// # assert_eq!(stream.next().await.unwrap(), 5);
456 /// # }));
457 /// ```
458 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
459 where
460 F: Fn(&T) -> bool + 'a,
461 {
462 let f = f.splice_fn1_borrow_ctx(&self.location).into();
463 Optional::new(
464 self.location.clone(),
465 HydroNode::Filter {
466 f,
467 input: Box::new(self.ir_node.into_inner()),
468 metadata: self.location.new_node_metadata::<T>(),
469 },
470 )
471 }
472
473 /// An operator that both filters and maps. It yields the value only if the supplied
474 /// closure `f` returns `Some(value)`.
475 ///
476 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
477 /// If the closure returns `None`, the output optional is empty.
478 ///
479 /// # Example
480 /// ```rust
481 /// # use hydro_lang::prelude::*;
482 /// # use futures::StreamExt;
483 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
484 /// let tick = process.tick();
485 /// let singleton = tick.singleton(q!("42"));
486 /// singleton
487 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
488 /// .all_ticks()
489 /// # }, |mut stream| async move {
490 /// // 42
491 /// # assert_eq!(stream.next().await.unwrap(), 42);
492 /// # }));
493 /// ```
494 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
495 where
496 F: Fn(T) -> Option<U> + 'a,
497 {
498 let f = f.splice_fn1_ctx(&self.location).into();
499 Optional::new(
500 self.location.clone(),
501 HydroNode::FilterMap {
502 f,
503 input: Box::new(self.ir_node.into_inner()),
504 metadata: self.location.new_node_metadata::<U>(),
505 },
506 )
507 }
508
509 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
510 ///
511 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
512 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
513 /// non-null. This is useful for combining several pieces of state together.
514 ///
515 /// # Example
516 /// ```rust
517 /// # use hydro_lang::prelude::*;
518 /// # use futures::StreamExt;
519 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
520 /// let tick = process.tick();
521 /// let numbers = process
522 /// .source_iter(q!(vec![123, 456]))
523 /// .batch(&tick, nondet!(/** test */));
524 /// let count = numbers.clone().count(); // Singleton
525 /// let max = numbers.max(); // Optional
526 /// count.zip(max).all_ticks()
527 /// # }, |mut stream| async move {
528 /// // [(2, 456)]
529 /// # for w in vec![(2, 456)] {
530 /// # assert_eq!(stream.next().await.unwrap(), w);
531 /// # }
532 /// # }));
533 /// ```
534 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
535 where
536 Self: ZipResult<'a, O, Location = L>,
537 {
538 check_matching_location(&self.location, &Self::other_location(&other));
539
540 if L::is_top_level()
541 && let Some(tick) = self.location.try_tick()
542 {
543 let out = zip_inside_tick(
544 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
545 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
546 Self::other_location(&other),
547 Self::other_ir_node(other),
548 )
549 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
550 )
551 .latest();
552
553 Self::make(out.location, out.ir_node.into_inner())
554 } else {
555 Self::make(
556 self.location.clone(),
557 HydroNode::CrossSingleton {
558 left: Box::new(self.ir_node.into_inner()),
559 right: Box::new(Self::other_ir_node(other)),
560 metadata: self
561 .location
562 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
563 },
564 )
565 }
566 }
567
568 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
569 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
570 ///
571 /// Useful for conditionally processing, such as only emitting a singleton's value outside
572 /// a tick if some other condition is satisfied.
573 ///
574 /// # Example
575 /// ```rust
576 /// # use hydro_lang::prelude::*;
577 /// # use futures::StreamExt;
578 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
579 /// let tick = process.tick();
580 /// // ticks are lazy by default, forces the second tick to run
581 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
582 ///
583 /// let batch_first_tick = process
584 /// .source_iter(q!(vec![1]))
585 /// .batch(&tick, nondet!(/** test */));
586 /// let batch_second_tick = process
587 /// .source_iter(q!(vec![1, 2, 3]))
588 /// .batch(&tick, nondet!(/** test */))
589 /// .defer_tick(); // appears on the second tick
590 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
591 /// batch_first_tick.chain(batch_second_tick).count()
592 /// .filter_if_some(some_on_first_tick)
593 /// .all_ticks()
594 /// # }, |mut stream| async move {
595 /// // [1]
596 /// # for w in vec![1] {
597 /// # assert_eq!(stream.next().await.unwrap(), w);
598 /// # }
599 /// # }));
600 /// ```
601 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
602 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
603 .map(q!(|(d, _signal)| d))
604 }
605
606 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
607 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
608 ///
609 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
610 /// the condition.
611 ///
612 /// # Example
613 /// ```rust
614 /// # use hydro_lang::prelude::*;
615 /// # use futures::StreamExt;
616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617 /// let tick = process.tick();
618 /// // ticks are lazy by default, forces the second tick to run
619 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
620 ///
621 /// let batch_first_tick = process
622 /// .source_iter(q!(vec![1]))
623 /// .batch(&tick, nondet!(/** test */));
624 /// let batch_second_tick = process
625 /// .source_iter(q!(vec![1, 2, 3]))
626 /// .batch(&tick, nondet!(/** test */))
627 /// .defer_tick(); // appears on the second tick
628 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
629 /// batch_first_tick.chain(batch_second_tick).count()
630 /// .filter_if_none(some_on_first_tick)
631 /// .all_ticks()
632 /// # }, |mut stream| async move {
633 /// // [3]
634 /// # for w in vec![3] {
635 /// # assert_eq!(stream.next().await.unwrap(), w);
636 /// # }
637 /// # }));
638 /// ```
639 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
640 self.filter_if_some(
641 other
642 .map(q!(|_| ()))
643 .into_singleton()
644 .filter(q!(|o| o.is_none())),
645 )
646 }
647
648 /// An operator which allows you to "name" a `HydroNode`.
649 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
650 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
651 {
652 let mut node = self.ir_node.borrow_mut();
653 let metadata = node.metadata_mut();
654 metadata.tag = Some(name.to_string());
655 }
656 self
657 }
658}
659
660impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
661where
662 L: Location<'a> + NoTick,
663{
664 /// Returns a singleton value corresponding to the latest snapshot of the singleton
665 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
666 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
667 /// all snapshots of this singleton into the atomic-associated tick will observe the
668 /// same value each tick.
669 ///
670 /// # Non-Determinism
671 /// Because this picks a snapshot of a singleton whose value is continuously changing,
672 /// the output singleton has a non-deterministic value since the snapshot can be at an
673 /// arbitrary point in time.
674 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
675 Singleton::new(
676 self.location.clone().tick,
677 HydroNode::Batch {
678 inner: Box::new(self.ir_node.into_inner()),
679 metadata: self.location.tick.new_node_metadata::<T>(),
680 },
681 )
682 }
683
684 /// Returns this singleton back into a top-level, asynchronous execution context where updates
685 /// to the value will be asynchronously propagated.
686 pub fn end_atomic(self) -> Singleton<T, L, B> {
687 Singleton::new(
688 self.location.tick.l.clone(),
689 HydroNode::EndAtomic {
690 inner: Box::new(self.ir_node.into_inner()),
691 metadata: self.location.tick.l.new_node_metadata::<T>(),
692 },
693 )
694 }
695}
696
697impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
698where
699 L: Location<'a>,
700{
701 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
702 /// will observe the same version of the value and will be executed synchronously before any
703 /// outputs are yielded (in [`Optional::end_atomic`]).
704 ///
705 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
706 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
707 /// a different version).
708 ///
709 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
710 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
711 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
712 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
713 let out_location = Atomic { tick: tick.clone() };
714 Singleton::new(
715 out_location.clone(),
716 HydroNode::BeginAtomic {
717 inner: Box::new(self.ir_node.into_inner()),
718 metadata: out_location.new_node_metadata::<T>(),
719 },
720 )
721 }
722
723 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
724 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
725 /// relevant data that contributed to the snapshot at tick `t`.
726 ///
727 /// # Non-Determinism
728 /// Because this picks a snapshot of a singleton whose value is continuously changing,
729 /// the output singleton has a non-deterministic value since the snapshot can be at an
730 /// arbitrary point in time.
731 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
732 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
733 Singleton::new(
734 tick.clone(),
735 HydroNode::Batch {
736 inner: Box::new(self.ir_node.into_inner()),
737 metadata: tick.new_node_metadata::<T>(),
738 },
739 )
740 }
741
742 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
743 /// with order corresponding to increasing prefixes of data contributing to the singleton.
744 ///
745 /// # Non-Determinism
746 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
747 /// to non-deterministic batching and arrival of inputs, the output stream is
748 /// non-deterministic.
749 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
750 where
751 L: NoTick,
752 {
753 let tick = self.location.tick();
754 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
755 }
756
757 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
758 /// value taken at various points in time. Because the input singleton may be
759 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
760 /// represent the value of the singleton given some prefix of the streams leading up to
761 /// it.
762 ///
763 /// # Non-Determinism
764 /// The output stream is non-deterministic in which elements are sampled, since this
765 /// is controlled by a clock.
766 pub fn sample_every(
767 self,
768 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
769 nondet: NonDet,
770 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
771 where
772 L: NoTick + NoAtomic,
773 {
774 let samples = self.location.source_interval(interval, nondet);
775 let tick = self.location.tick();
776
777 self.snapshot(&tick, nondet)
778 .filter_if_some(samples.batch(&tick, nondet).first())
779 .all_ticks()
780 .weakest_retries()
781 }
782}
783
784impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
785where
786 L: Location<'a>,
787{
788 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
789 /// which will stream the value computed in _each_ tick as a separate stream element.
790 ///
791 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
792 /// producing one element in the output for each tick. This is useful for batched computations,
793 /// where the results from each tick must be combined together.
794 ///
795 /// # Example
796 /// ```rust
797 /// # use hydro_lang::prelude::*;
798 /// # use futures::StreamExt;
799 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
800 /// let tick = process.tick();
801 /// # // ticks are lazy by default, forces the second tick to run
802 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
803 /// # let batch_first_tick = process
804 /// # .source_iter(q!(vec![1]))
805 /// # .batch(&tick, nondet!(/** test */));
806 /// # let batch_second_tick = process
807 /// # .source_iter(q!(vec![1, 2, 3]))
808 /// # .batch(&tick, nondet!(/** test */))
809 /// # .defer_tick(); // appears on the second tick
810 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
811 /// input_batch // first tick: [1], second tick: [1, 2, 3]
812 /// .count()
813 /// .all_ticks()
814 /// # }, |mut stream| async move {
815 /// // [1, 3]
816 /// # for w in vec![1, 3] {
817 /// # assert_eq!(stream.next().await.unwrap(), w);
818 /// # }
819 /// # }));
820 /// ```
821 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
822 self.into_stream().all_ticks()
823 }
824
825 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
826 /// which will stream the value computed in _each_ tick as a separate stream element.
827 ///
828 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
829 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
830 /// singleton's [`Tick`] context.
831 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
832 self.into_stream().all_ticks_atomic()
833 }
834
835 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
836 /// be asynchronously updated with the latest value of the singleton inside the tick.
837 ///
838 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
839 /// tick that tracks the inner value. This is useful for getting the value as of the
840 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
841 ///
842 /// # Example
843 /// ```rust
844 /// # use hydro_lang::prelude::*;
845 /// # use futures::StreamExt;
846 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
847 /// let tick = process.tick();
848 /// # // ticks are lazy by default, forces the second tick to run
849 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
850 /// # let batch_first_tick = process
851 /// # .source_iter(q!(vec![1]))
852 /// # .batch(&tick, nondet!(/** test */));
853 /// # let batch_second_tick = process
854 /// # .source_iter(q!(vec![1, 2, 3]))
855 /// # .batch(&tick, nondet!(/** test */))
856 /// # .defer_tick(); // appears on the second tick
857 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
858 /// input_batch // first tick: [1], second tick: [1, 2, 3]
859 /// .count()
860 /// .latest()
861 /// # .sample_eager(nondet!(/** test */))
862 /// # }, |mut stream| async move {
863 /// // asynchronously changes from 1 ~> 3
864 /// # for w in vec![1, 3] {
865 /// # assert_eq!(stream.next().await.unwrap(), w);
866 /// # }
867 /// # }));
868 /// ```
869 pub fn latest(self) -> Singleton<T, L, Unbounded> {
870 Singleton::new(
871 self.location.outer().clone(),
872 HydroNode::YieldConcat {
873 inner: Box::new(self.ir_node.into_inner()),
874 metadata: self.location.outer().new_node_metadata::<T>(),
875 },
876 )
877 }
878
879 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
880 /// be updated with the latest value of the singleton inside the tick.
881 ///
882 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
883 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
884 /// singleton's [`Tick`] context.
885 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
886 let out_location = Atomic {
887 tick: self.location.clone(),
888 };
889 Singleton::new(
890 out_location.clone(),
891 HydroNode::YieldConcat {
892 inner: Box::new(self.ir_node.into_inner()),
893 metadata: out_location.new_node_metadata::<T>(),
894 },
895 )
896 }
897
898 #[expect(missing_docs, reason = "TODO")]
899 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
900 Singleton::new(
901 self.location.clone(),
902 HydroNode::DeferTick {
903 input: Box::new(self.ir_node.into_inner()),
904 metadata: self.location.new_node_metadata::<T>(),
905 },
906 )
907 }
908
909 #[deprecated(note = "use .into_stream().persist()")]
910 #[expect(missing_docs, reason = "deprecated")]
911 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
912 Stream::new(
913 self.location.clone(),
914 HydroNode::Persist {
915 inner: Box::new(self.ir_node.into_inner()),
916 metadata: self.location.new_node_metadata::<T>(),
917 },
918 )
919 }
920
921 /// Converts this singleton into a [`Stream`] containing a single element, the value.
922 ///
923 /// # Example
924 /// ```rust
925 /// # use hydro_lang::prelude::*;
926 /// # use futures::StreamExt;
927 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
928 /// let tick = process.tick();
929 /// let batch_input = process
930 /// .source_iter(q!(vec![123, 456]))
931 /// .batch(&tick, nondet!(/** test */));
932 /// batch_input.clone().chain(
933 /// batch_input.count().into_stream()
934 /// ).all_ticks()
935 /// # }, |mut stream| async move {
936 /// // [123, 456, 2]
937 /// # for w in vec![123, 456, 2] {
938 /// # assert_eq!(stream.next().await.unwrap(), w);
939 /// # }
940 /// # }));
941 /// ```
942 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
943 Stream::new(self.location, self.ir_node.into_inner())
944 }
945}
946
947#[doc(hidden)]
948/// Helper trait that determines the output collection type for [`Singleton::zip`].
949///
950/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
951/// [`Singleton`].
952#[sealed::sealed]
953pub trait ZipResult<'a, Other> {
954 /// The output collection type.
955 type Out;
956 /// The type of the tupled output value.
957 type ElementType;
958 /// The type of the other collection's value.
959 type OtherType;
960 /// The location where the tupled result will be materialized.
961 type Location: Location<'a>;
962
963 /// The location of the second input to the `zip`.
964 fn other_location(other: &Other) -> Self::Location;
965 /// The IR node of the second input to the `zip`.
966 fn other_ir_node(other: Other) -> HydroNode;
967
968 /// Constructs the output live collection given an IR node containing the zip result.
969 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
970}
971
972#[sealed::sealed]
973impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
974where
975 L: Location<'a>,
976{
977 type Out = Singleton<(T, U), L, B>;
978 type ElementType = (T, U);
979 type OtherType = U;
980 type Location = L;
981
982 fn other_location(other: &Singleton<U, L, B>) -> L {
983 other.location.clone()
984 }
985
986 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
987 other.ir_node.into_inner()
988 }
989
990 fn make(location: L, ir_node: HydroNode) -> Self::Out {
991 Singleton::new(location, ir_node)
992 }
993}
994
995#[sealed::sealed]
996impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
997where
998 L: Location<'a>,
999{
1000 type Out = Optional<(T, U), L, B>;
1001 type ElementType = (T, U);
1002 type OtherType = U;
1003 type Location = L;
1004
1005 fn other_location(other: &Optional<U, L, B>) -> L {
1006 other.location.clone()
1007 }
1008
1009 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1010 other.ir_node.into_inner()
1011 }
1012
1013 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1014 Optional::new(location, ir_node)
1015 }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020 use futures::{SinkExt, StreamExt};
1021 use hydro_deploy::Deployment;
1022 use stageleft::q;
1023
1024 use crate::compile::builder::FlowBuilder;
1025 use crate::location::Location;
1026 use crate::nondet::nondet;
1027
1028 #[tokio::test]
1029 async fn tick_cycle_cardinality() {
1030 let mut deployment = Deployment::new();
1031
1032 let flow = FlowBuilder::new();
1033 let node = flow.process::<()>();
1034 let external = flow.external::<()>();
1035
1036 let (input_send, input) = node.source_external_bincode(&external);
1037
1038 let node_tick = node.tick();
1039 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1040 let counts = singleton
1041 .clone()
1042 .into_stream()
1043 .count()
1044 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1045 .all_ticks()
1046 .send_bincode_external(&external);
1047 complete_cycle.complete_next_tick(singleton);
1048
1049 let nodes = flow
1050 .with_process(&node, deployment.Localhost())
1051 .with_external(&external, deployment.Localhost())
1052 .deploy(&mut deployment);
1053
1054 deployment.deploy().await.unwrap();
1055
1056 let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
1057 let mut external_out = nodes.connect_source_bincode(counts).await;
1058
1059 deployment.start().await.unwrap();
1060
1061 tick_trigger.send(()).await.unwrap();
1062
1063 assert_eq!(external_out.next().await.unwrap(), 1);
1064
1065 tick_trigger.send(()).await.unwrap();
1066
1067 assert_eq!(external_out.next().await.unwrap(), 1);
1068 }
1069}