hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::hash::Hash;
4
5use stageleft::{IntoQuotedMut, QuotedWithContext, q};
6
7use super::boundedness::{Bounded, Boundedness, Unbounded};
8use super::keyed_stream::KeyedStream;
9use super::optional::Optional;
10use super::singleton::Singleton;
11use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
12use crate::forward_handle::ForwardRef;
13#[cfg(stageleft_runtime)]
14use crate::forward_handle::{CycleCollection, ReceiverComplete};
15use crate::live_collections::stream::{Ordering, Retries};
16use crate::location::dynamic::LocationId;
17use crate::location::{Atomic, Location, NoTick, Tick};
18use crate::manual_expr::ManualExpr;
19use crate::nondet::{NonDet, nondet};
20
21/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
22///
23/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
24/// removed / changed), this also includes an additional variant [`BoundedValue`], which indicates
25/// that entries may be added over time, but once an entry is added it will never be removed and
26/// its value will never change.
27pub trait KeyedSingletonBound {
28    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
29    type UnderlyingBound: Boundedness;
30    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
31    type ValueBound: Boundedness;
32
33    /// The type of the keyed singleton if the value for each key is immutable.
34    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
35
36    /// The type of the keyed singleton if the value for each key may change asynchronously.
37    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
38}
39
40impl KeyedSingletonBound for Unbounded {
41    type UnderlyingBound = Unbounded;
42    type ValueBound = Unbounded;
43    type WithBoundedValue = BoundedValue;
44    type WithUnboundedValue = Unbounded;
45}
46
47impl KeyedSingletonBound for Bounded {
48    type UnderlyingBound = Bounded;
49    type ValueBound = Bounded;
50    type WithBoundedValue = Bounded;
51    type WithUnboundedValue = UnreachableBound;
52}
53
54/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
55/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
56/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
57pub struct BoundedValue;
58
59impl KeyedSingletonBound for BoundedValue {
60    type UnderlyingBound = Unbounded;
61    type ValueBound = Bounded;
62    type WithBoundedValue = BoundedValue;
63    type WithUnboundedValue = Unbounded;
64}
65
66#[doc(hidden)]
67pub struct UnreachableBound;
68
69impl KeyedSingletonBound for UnreachableBound {
70    type UnderlyingBound = Bounded;
71    type ValueBound = Unbounded;
72
73    type WithBoundedValue = Bounded;
74    type WithUnboundedValue = UnreachableBound;
75}
76
77/// Mapping from keys of type `K` to values of type `V`.
78///
79/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
80/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
81/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
82/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
83/// keys cannot be removed and the value for each key is immutable.
84///
85/// Type Parameters:
86/// - `K`: the type of the key for each entry
87/// - `V`: the type of the value for each entry
88/// - `Loc`: the [`Location`] where the keyed singleton is materialized
89/// - `Bound`: tracks whether the entries are:
90///     - [`Bounded`] (local and finite)
91///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
92///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
93pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
94    pub(crate) underlying: Stream<(K, V), Loc, Bound::UnderlyingBound, NoOrder, ExactlyOnce>,
95}
96
97impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
98    for KeyedSingleton<K, V, Loc, Bound>
99{
100    fn clone(&self) -> Self {
101        KeyedSingleton {
102            underlying: self.underlying.clone(),
103        }
104    }
105}
106
107impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
108    for KeyedSingleton<K, V, L, B>
109where
110    L: Location<'a> + NoTick,
111{
112    type Location = L;
113
114    fn create_source(ident: syn::Ident, location: L) -> Self {
115        KeyedSingleton {
116            underlying: Stream::create_source(ident, location),
117        }
118    }
119}
120
121impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
122    for KeyedSingleton<K, V, L, B>
123where
124    L: Location<'a> + NoTick,
125{
126    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
127        self.underlying.complete(ident, expected_location);
128    }
129}
130
131impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
132    KeyedSingleton<K, V, L, B>
133{
134    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
135    ///
136    /// The value for each key must be bounded, otherwise the resulting stream elements would be
137    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
138    /// into the output.
139    ///
140    /// # Example
141    /// ```rust
142    /// # use hydro_lang::prelude::*;
143    /// # use futures::StreamExt;
144    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
145    /// let keyed_singleton = // { 1: 2, 2: 4 }
146    /// # process
147    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
148    /// #     .into_keyed()
149    /// #     .first();
150    /// keyed_singleton.entries()
151    /// # }, |mut stream| async move {
152    /// // (1, 2), (2, 4) in any order
153    /// # let mut results = Vec::new();
154    /// # for _ in 0..2 {
155    /// #     results.push(stream.next().await.unwrap());
156    /// # }
157    /// # results.sort();
158    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
159    /// # }));
160    /// ```
161    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
162        self.underlying
163    }
164
165    /// Flattens the keyed singleton into an unordered stream of just the values.
166    ///
167    /// The value for each key must be bounded, otherwise the resulting stream elements would be
168    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
169    /// into the output.
170    ///
171    /// # Example
172    /// ```rust
173    /// # use hydro_lang::prelude::*;
174    /// # use futures::StreamExt;
175    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
176    /// let keyed_singleton = // { 1: 2, 2: 4 }
177    /// # process
178    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
179    /// #     .into_keyed()
180    /// #     .first();
181    /// keyed_singleton.values()
182    /// # }, |mut stream| async move {
183    /// // 2, 4 in any order
184    /// # let mut results = Vec::new();
185    /// # for _ in 0..2 {
186    /// #     results.push(stream.next().await.unwrap());
187    /// # }
188    /// # results.sort();
189    /// # assert_eq!(results, vec![2, 4]);
190    /// # }));
191    /// ```
192    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
193        self.entries().map(q!(|(_, v)| v))
194    }
195
196    /// Flattens the keyed singleton into an unordered stream of just the keys.
197    ///
198    /// The value for each key must be bounded, otherwise the removal of keys would result in
199    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
200    /// into the output.
201    ///
202    /// # Example
203    /// ```rust
204    /// # use hydro_lang::prelude::*;
205    /// # use futures::StreamExt;
206    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
207    /// let keyed_singleton = // { 1: 2, 2: 4 }
208    /// # process
209    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
210    /// #     .into_keyed()
211    /// #     .first();
212    /// keyed_singleton.keys()
213    /// # }, |mut stream| async move {
214    /// // 1, 2 in any order
215    /// # let mut results = Vec::new();
216    /// # for _ in 0..2 {
217    /// #     results.push(stream.next().await.unwrap());
218    /// # }
219    /// # results.sort();
220    /// # assert_eq!(results, vec![1, 2]);
221    /// # }));
222    /// ```
223    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
224        self.entries().map(q!(|(k, _)| k))
225    }
226
227    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
228    /// entries whose keys are not in the provided stream.
229    ///
230    /// # Example
231    /// ```rust
232    /// # use hydro_lang::prelude::*;
233    /// # use futures::StreamExt;
234    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
235    /// let tick = process.tick();
236    /// let keyed_singleton = // { 1: 2, 2: 4 }
237    /// # process
238    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
239    /// #     .into_keyed()
240    /// #     .first()
241    /// #     .batch(&tick, nondet!(/** test */));
242    /// let keys_to_remove = process
243    ///     .source_iter(q!(vec![1]))
244    ///     .batch(&tick, nondet!(/** test */));
245    /// keyed_singleton.filter_key_not_in(keys_to_remove)
246    /// #   .entries().all_ticks()
247    /// # }, |mut stream| async move {
248    /// // { 2: 4 }
249    /// # for w in vec![(2, 4)] {
250    /// #     assert_eq!(stream.next().await.unwrap(), w);
251    /// # }
252    /// # }));
253    /// ```
254    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
255        self,
256        other: Stream<K, L, Bounded, O2, R2>,
257    ) -> Self
258    where
259        K: Hash + Eq,
260    {
261        KeyedSingleton {
262            underlying: self.entries().anti_join(other),
263        }
264    }
265
266    /// An operator which allows you to "inspect" each value of a keyed singleton without
267    /// modifying it. The closure `f` is called on a reference to each value. This is
268    /// mainly useful for debugging, and should not be used to generate side-effects.
269    ///
270    /// # Example
271    /// ```rust
272    /// # use hydro_lang::prelude::*;
273    /// # use futures::StreamExt;
274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
275    /// let keyed_singleton = // { 1: 2, 2: 4 }
276    /// # process
277    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
278    /// #     .into_keyed()
279    /// #     .first();
280    /// keyed_singleton
281    ///     .inspect(q!(|v| println!("{}", v)))
282    /// #   .entries()
283    /// # }, |mut stream| async move {
284    /// // { 1: 2, 2: 4 }
285    /// # for w in vec![(1, 2), (2, 4)] {
286    /// #     assert_eq!(stream.next().await.unwrap(), w);
287    /// # }
288    /// # }));
289    /// ```
290    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
291    where
292        F: Fn(&V) + 'a,
293    {
294        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
295        KeyedSingleton {
296            underlying: self.underlying.inspect(q!({
297                let orig = f;
298                move |(_k, v)| orig(v)
299            })),
300        }
301    }
302
303    /// An operator which allows you to "inspect" each entry of a keyed singleton without
304    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
305    /// mainly useful for debugging, and should not be used to generate side-effects.
306    ///
307    /// # Example
308    /// ```rust
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312    /// let keyed_singleton = // { 1: 2, 2: 4 }
313    /// # process
314    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
315    /// #     .into_keyed()
316    /// #     .first();
317    /// keyed_singleton
318    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
319    /// #   .entries()
320    /// # }, |mut stream| async move {
321    /// // { 1: 2, 2: 4 }
322    /// # for w in vec![(1, 2), (2, 4)] {
323    /// #     assert_eq!(stream.next().await.unwrap(), w);
324    /// # }
325    /// # }));
326    /// ```
327    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> KeyedSingleton<K, V, L, B>
328    where
329        F: Fn(&(K, V)) + 'a,
330    {
331        KeyedSingleton {
332            underlying: self.underlying.inspect(f),
333        }
334    }
335
336    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
337    /// element, the value.
338    ///
339    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
340    ///
341    /// # Example
342    /// ```rust
343    /// # use hydro_lang::prelude::*;
344    /// # use futures::StreamExt;
345    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
346    /// let keyed_singleton = // { 1: 2, 2: 4 }
347    /// # process
348    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
349    /// #     .into_keyed()
350    /// #     .first();
351    /// keyed_singleton
352    ///     .clone()
353    ///     .into_keyed_stream()
354    ///     .interleave(
355    ///         keyed_singleton.into_keyed_stream()
356    ///     )
357    /// #   .entries()
358    /// # }, |mut stream| async move {
359    /// /// // { 1: [2, 2], 2: [4, 4] }
360    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
361    /// #     assert_eq!(stream.next().await.unwrap(), w);
362    /// # }
363    /// # }));
364    /// ```
365    pub fn into_keyed_stream(
366        self,
367    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
368        self.underlying
369            .into_keyed()
370            .assume_ordering(nondet!(/** only one element per key */))
371    }
372}
373
374#[cfg(stageleft_runtime)]
375fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
376    me: KeyedSingleton<K, V, L, Bounded>,
377) -> Singleton<usize, L, Bounded> {
378    me.underlying.count()
379}
380
381impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
382    /// Transforms each value by invoking `f` on each element, with keys staying the same
383    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
384    ///
385    /// If you do not want to modify the stream and instead only want to view
386    /// each item use [`KeyedStream::inspect`] instead.
387    ///
388    /// # Example
389    /// ```rust
390    /// # use hydro_lang::prelude::*;
391    /// # use futures::StreamExt;
392    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
393    /// let keyed_singleton = // { 1: 2, 2: 4 }
394    /// # process
395    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
396    /// #     .into_keyed()
397    /// #     .first();
398    /// keyed_singleton.map(q!(|v| v + 1))
399    /// #   .entries()
400    /// # }, |mut stream| async move {
401    /// // { 1: 3, 2: 5 }
402    /// # let mut results = Vec::new();
403    /// # for _ in 0..2 {
404    /// #     results.push(stream.next().await.unwrap());
405    /// # }
406    /// # results.sort();
407    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
408    /// # }));
409    /// ```
410    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
411    where
412        F: Fn(V) -> U + 'a,
413    {
414        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
415        KeyedSingleton {
416            underlying: self.underlying.map(q!({
417                let orig = f;
418                move |(k, v)| (k, orig(v))
419            })),
420        }
421    }
422
423    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
424    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
425    ///
426    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
427    /// the new value `U`. The key remains unchanged in the output.
428    ///
429    /// # Example
430    /// ```rust
431    /// # use hydro_lang::prelude::*;
432    /// # use futures::StreamExt;
433    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434    /// let keyed_singleton = // { 1: 2, 2: 4 }
435    /// # process
436    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
437    /// #     .into_keyed()
438    /// #     .first();
439    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
440    /// #   .entries()
441    /// # }, |mut stream| async move {
442    /// // { 1: 3, 2: 6 }
443    /// # let mut results = Vec::new();
444    /// # for _ in 0..2 {
445    /// #     results.push(stream.next().await.unwrap());
446    /// # }
447    /// # results.sort();
448    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
449    /// # }));
450    /// ```
451    pub fn map_with_key<U, F>(
452        self,
453        f: impl IntoQuotedMut<'a, F, L> + Copy,
454    ) -> KeyedSingleton<K, U, L, B>
455    where
456        F: Fn((K, V)) -> U + 'a,
457        K: Clone,
458    {
459        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
460        KeyedSingleton {
461            underlying: self.underlying.map(q!({
462                let orig = f;
463                move |(k, v)| {
464                    let out = orig((k.clone(), v));
465                    (k, out)
466                }
467            })),
468        }
469    }
470
471    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
472    ///
473    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
474    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
475    /// is filtered out.
476    ///
477    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
478    /// not modify or take ownership of the values. If you need to modify the values while filtering
479    /// use [`KeyedSingleton::filter_map`] instead.
480    ///
481    /// # Example
482    /// ```rust
483    /// # use hydro_lang::prelude::*;
484    /// # use futures::StreamExt;
485    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
487    /// # process
488    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
489    /// #     .into_keyed()
490    /// #     .first();
491    /// keyed_singleton.filter(q!(|&v| v > 1))
492    /// #   .entries()
493    /// # }, |mut stream| async move {
494    /// // { 1: 2, 2: 4 }
495    /// # let mut results = Vec::new();
496    /// # for _ in 0..2 {
497    /// #     results.push(stream.next().await.unwrap());
498    /// # }
499    /// # results.sort();
500    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
501    /// # }));
502    /// ```
503    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
504    where
505        F: Fn(&V) -> bool + 'a,
506    {
507        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
508        KeyedSingleton {
509            underlying: self.underlying.filter(q!({
510                let orig = f;
511                move |(_k, v)| orig(v)
512            })),
513        }
514    }
515
516    /// An operator that both filters and maps values. It yields only the key-value pairs where
517    /// the supplied closure `f` returns `Some(value)`.
518    ///
519    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
520    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
521    /// If it returns `None`, the key-value pair is filtered out.
522    ///
523    /// # Example
524    /// ```rust
525    /// # use hydro_lang::prelude::*;
526    /// # use futures::StreamExt;
527    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
528    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
529    /// # process
530    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
531    /// #     .into_keyed()
532    /// #     .first();
533    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
534    /// #   .entries()
535    /// # }, |mut stream| async move {
536    /// // { 1: 42, 3: 100 }
537    /// # let mut results = Vec::new();
538    /// # for _ in 0..2 {
539    /// #     results.push(stream.next().await.unwrap());
540    /// # }
541    /// # results.sort();
542    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
543    /// # }));
544    /// ```
545    pub fn filter_map<F, U>(
546        self,
547        f: impl IntoQuotedMut<'a, F, L> + Copy,
548    ) -> KeyedSingleton<K, U, L, B>
549    where
550        F: Fn(V) -> Option<U> + 'a,
551    {
552        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
553        KeyedSingleton {
554            underlying: self.underlying.filter_map(q!({
555                let orig = f;
556                move |(k, v)| orig(v).map(|v| (k, v))
557            })),
558        }
559    }
560
561    /// Gets the number of keys in the keyed singleton.
562    ///
563    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
564    /// since keys may be added / removed over time. When the set of keys changes, the count will
565    /// be asynchronously updated.
566    ///
567    /// # Example
568    /// ```rust
569    /// # use hydro_lang::prelude::*;
570    /// # use futures::StreamExt;
571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572    /// # let tick = process.tick();
573    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
574    /// # process
575    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
576    /// #     .into_keyed()
577    /// #     .batch(&tick, nondet!(/** test */))
578    /// #     .first();
579    /// keyed_singleton.key_count()
580    /// # .all_ticks()
581    /// # }, |mut stream| async move {
582    /// // 3
583    /// # assert_eq!(stream.next().await.unwrap(), 3);
584    /// # }));
585    /// ```
586    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
587        if L::is_top_level()
588            && let Some(tick) = self.underlying.location.try_tick()
589        {
590            if B::ValueBound::is_bounded() {
591                let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
592                    underlying: self.underlying,
593                };
594
595                me.entries().count()
596            } else {
597                let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
598                    underlying: self.underlying,
599                };
600
601                let out = key_count_inside_tick(
602                    me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
603                )
604                .latest();
605                Singleton::new(out.location, out.ir_node.into_inner())
606            }
607        } else {
608            self.underlying.count()
609        }
610    }
611
612    /// An operator which allows you to "name" a `HydroNode`.
613    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
614    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
615        {
616            let mut node = self.underlying.ir_node.borrow_mut();
617            let metadata = node.metadata_mut();
618            metadata.tag = Some(name.to_string());
619        }
620        self
621    }
622}
623
624impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
625    /// Gets the value associated with a specific key from the keyed singleton.
626    ///
627    /// # Example
628    /// ```rust
629    /// # use hydro_lang::prelude::*;
630    /// # use futures::StreamExt;
631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
632    /// let tick = process.tick();
633    /// let keyed_data = process
634    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
635    ///     .into_keyed()
636    ///     .batch(&tick, nondet!(/** test */))
637    ///     .first();
638    /// let key = tick.singleton(q!(1));
639    /// keyed_data.get(key).all_ticks()
640    /// # }, |mut stream| async move {
641    /// // 2
642    /// # assert_eq!(stream.next().await.unwrap(), 2);
643    /// # }));
644    /// ```
645    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
646        self.entries()
647            .join(key.into_stream().map(q!(|k| (k, ()))))
648            .map(q!(|(_, (v, _))| v))
649            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
650            .first()
651    }
652
653    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
654    /// is some additional metadata, emits a keyed stream of lookup results where the key
655    /// is the same as before, but the value is a tuple of the lookup result and the metadata
656    /// of the request. If the key is not found, no output will be produced.
657    ///
658    /// # Example
659    /// ```rust
660    /// # use hydro_lang::prelude::*;
661    /// # use futures::StreamExt;
662    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663    /// let tick = process.tick();
664    /// let keyed_data = process
665    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
666    ///     .into_keyed()
667    ///     .batch(&tick, nondet!(/** test */))
668    ///     .first();
669    /// let other_data = process
670    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
671    ///     .into_keyed()
672    ///     .batch(&tick, nondet!(/** test */));
673    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
674    /// # }, |mut stream| async move {
675    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
676    /// # let mut results = vec![];
677    /// # for _ in 0..3 {
678    /// #     results.push(stream.next().await.unwrap());
679    /// # }
680    /// # results.sort();
681    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
682    /// # }));
683    /// ```
684    pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
685        self,
686        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
687    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
688        self.entries()
689            .weaker_retries::<R2>()
690            .join(requests.entries())
691            .into_keyed()
692    }
693
694    /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
695    /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
696    /// containing the value from `self` and an option of the value from `from`. If the key is not
697    /// present in `from`, the option will be [`None`].
698    ///
699    /// # Example
700    /// ```rust
701    /// # use hydro_lang::prelude::*;
702    /// # use futures::StreamExt;
703    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
704    /// # let tick = process.tick();
705    /// let requests = // { 1: 10, 2: 20 }
706    /// # process
707    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
708    /// #     .into_keyed()
709    /// #     .batch(&tick, nondet!(/** test */))
710    /// #     .first();
711    /// let other_data = // { 10: 100, 11: 101 }
712    /// # process
713    /// #     .source_iter(q!(vec![(10, 100), (11, 101)]))
714    /// #     .into_keyed()
715    /// #     .batch(&tick, nondet!(/** test */))
716    /// #     .first();
717    /// requests.get_from(other_data)
718    /// # .entries().all_ticks()
719    /// # }, |mut stream| async move {
720    /// // { 1: (10, Some(100)), 2: (20, None) }
721    /// # let mut results = vec![];
722    /// # for _ in 0..2 {
723    /// #     results.push(stream.next().await.unwrap());
724    /// # }
725    /// # results.sort();
726    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
727    /// # }));
728    /// ```
729    pub fn get_from<V2: Clone>(
730        self,
731        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
732    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
733    where
734        K: Clone,
735        V: Hash + Eq + Clone,
736    {
737        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
738        let lookup_result = from.get_many_if_present(to_lookup.clone());
739        let missing_values =
740            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
741        KeyedSingleton {
742            underlying: lookup_result
743                .entries()
744                .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
745                .chain(missing_values.entries().map(q!(|(v, k)| (k, (v, None))))),
746        }
747    }
748}
749
750impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
751where
752    L: Location<'a>,
753{
754    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
755    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
756    ///
757    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
758    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
759    /// argument that declares where the keyed singleton will be atomically processed. Batching a
760    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
761    /// batching into a different [`Tick`] will introduce asynchrony.
762    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
763        KeyedSingleton {
764            underlying: self.underlying.atomic(tick),
765        }
766    }
767}
768
769impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
770where
771    L: Location<'a> + NoTick,
772{
773    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
774    /// See [`KeyedSingleton::atomic`] for more details.
775    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
776        KeyedSingleton {
777            underlying: self.underlying.end_atomic(),
778        }
779    }
780}
781
782impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
783    /// Asynchronously yields this keyed singleton outside the tick, which will
784    /// be asynchronously updated with the latest set of entries inside the tick.
785    ///
786    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
787    /// tick that tracks the inner value. This is useful for getting the value as of the
788    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
789    ///
790    /// The entire set of entries are propagated on each tick, which means that if a tick
791    /// does not have a key "XYZ" that was present in the previous tick, the entry for "XYZ" will
792    /// also be removed from the output.
793    ///
794    /// # Example
795    /// ```rust
796    /// # use hydro_lang::prelude::*;
797    /// # use futures::StreamExt;
798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
799    /// let tick = process.tick();
800    /// # // ticks are lazy by default, forces the second tick to run
801    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
802    /// # let batch_first_tick = process
803    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
804    /// #   .batch(&tick, nondet!(/** test */))
805    /// #   .into_keyed();
806    /// # let batch_second_tick = process
807    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
808    /// #   .batch(&tick, nondet!(/** test */))
809    /// #   .into_keyed()
810    /// #   .defer_tick(); // appears on the second tick
811    /// # let input_batch = batch_first_tick.chain(batch_second_tick).first();
812    /// input_batch // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
813    ///     .latest()
814    /// # .snapshot(&tick, nondet!(/** test */))
815    /// # .entries()
816    /// # .all_ticks()
817    /// # }, |mut stream| async move {
818    /// // asynchronously changes from { 1: 2, 2: 3 } ~> { 2: 4, 3: 5 }
819    /// # for w in vec![(1, 2), (2, 3), (2, 4), (3, 5)] {
820    /// #     assert_eq!(stream.next().await.unwrap(), w);
821    /// # }
822    /// # }));
823    /// ```
824    pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
825        KeyedSingleton {
826            underlying: self.underlying.all_ticks(),
827        }
828    }
829
830    /// Synchronously yields this keyed singleton outside the tick as an unbounded keyed singleton,
831    /// which will be updated with the latest set of entries inside the tick.
832    ///
833    /// Unlike [`KeyedSingleton::latest`], this preserves synchronous execution, as the output
834    /// keyed singleton is emitted in an [`Atomic`] context that will process elements synchronously
835    /// with the input keyed singleton's [`Tick`] context.
836    pub fn latest_atomic(self) -> KeyedSingleton<K, V, Atomic<L>, Unbounded> {
837        KeyedSingleton {
838            underlying: self.underlying.all_ticks_atomic(),
839        }
840    }
841
842    #[expect(missing_docs, reason = "TODO")]
843    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
844        KeyedSingleton {
845            underlying: self.underlying.defer_tick(),
846        }
847    }
848}
849
850impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
851where
852    L: Location<'a>,
853{
854    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
855    /// point in time.
856    ///
857    /// # Non-Determinism
858    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
859    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
860    pub fn snapshot(
861        self,
862        tick: &Tick<L>,
863        nondet: NonDet,
864    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
865        KeyedSingleton {
866            underlying: self.underlying.batch(tick, nondet),
867        }
868    }
869}
870
871impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
872where
873    L: Location<'a> + NoTick,
874{
875    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
876    /// state of the keyed singleton being atomically processed.
877    ///
878    /// # Non-Determinism
879    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
880    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
881    pub fn snapshot_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
882        KeyedSingleton {
883            underlying: self.underlying.batch_atomic(nondet),
884        }
885    }
886}
887
888impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
889where
890    L: Location<'a> + NoTick,
891{
892    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
893    /// arrived since the previous batch was released.
894    ///
895    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
896    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
897    ///
898    /// # Non-Determinism
899    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
900    /// has a non-deterministic set of key-value pairs.
901    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
902        self.atomic(tick).batch_atomic(nondet)
903    }
904}
905
906impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
907where
908    L: Location<'a> + NoTick,
909{
910    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
911    /// atomically processed.
912    ///
913    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
914    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
915    ///
916    /// # Non-Determinism
917    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
918    /// has a non-deterministic set of key-value pairs.
919    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
920        KeyedSingleton {
921            underlying: self.underlying.batch_atomic(nondet),
922        }
923    }
924}
925
926#[cfg(test)]
927mod tests {
928    use futures::{SinkExt, StreamExt};
929    use hydro_deploy::Deployment;
930    use stageleft::q;
931
932    use crate::compile::builder::FlowBuilder;
933    use crate::location::Location;
934    use crate::nondet::nondet;
935
936    #[tokio::test]
937    async fn key_count_bounded_value() {
938        let mut deployment = Deployment::new();
939
940        let flow = FlowBuilder::new();
941        let node = flow.process::<()>();
942        let external = flow.external::<()>();
943
944        let (input_port, input) = node.source_external_bincode(&external);
945        let out = input
946            .into_keyed()
947            .first()
948            .key_count()
949            .sample_eager(nondet!(/** test */))
950            .send_bincode_external(&external);
951
952        let nodes = flow
953            .with_process(&node, deployment.Localhost())
954            .with_external(&external, deployment.Localhost())
955            .deploy(&mut deployment);
956
957        deployment.deploy().await.unwrap();
958
959        let mut external_in = nodes.connect_sink_bincode(input_port).await;
960        let mut external_out = nodes.connect_source_bincode(out).await;
961
962        deployment.start().await.unwrap();
963
964        assert_eq!(external_out.next().await.unwrap(), 0);
965
966        external_in.send((1, 1)).await.unwrap();
967        assert_eq!(external_out.next().await.unwrap(), 1);
968
969        external_in.send((2, 2)).await.unwrap();
970        assert_eq!(external_out.next().await.unwrap(), 2);
971    }
972
973    #[tokio::test]
974    async fn key_count_unbounded_value() {
975        let mut deployment = Deployment::new();
976
977        let flow = FlowBuilder::new();
978        let node = flow.process::<()>();
979        let external = flow.external::<()>();
980
981        let (input_port, input) = node.source_external_bincode(&external);
982        let out = input
983            .into_keyed()
984            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
985            .key_count()
986            .sample_eager(nondet!(/** test */))
987            .send_bincode_external(&external);
988
989        let nodes = flow
990            .with_process(&node, deployment.Localhost())
991            .with_external(&external, deployment.Localhost())
992            .deploy(&mut deployment);
993
994        deployment.deploy().await.unwrap();
995
996        let mut external_in = nodes.connect_sink_bincode(input_port).await;
997        let mut external_out = nodes.connect_source_bincode(out).await;
998
999        deployment.start().await.unwrap();
1000
1001        assert_eq!(external_out.next().await.unwrap(), 0);
1002
1003        external_in.send((1, 1)).await.unwrap();
1004        assert_eq!(external_out.next().await.unwrap(), 1);
1005
1006        external_in.send((1, 2)).await.unwrap();
1007        assert_eq!(external_out.next().await.unwrap(), 1);
1008
1009        external_in.send((2, 2)).await.unwrap();
1010        assert_eq!(external_out.next().await.unwrap(), 2);
1011
1012        external_in.send((1, 1)).await.unwrap();
1013        assert_eq!(external_out.next().await.unwrap(), 2);
1014
1015        external_in.send((3, 1)).await.unwrap();
1016        assert_eq!(external_out.next().await.unwrap(), 3);
1017    }
1018}