hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::marker::PhantomData;
6
7use stageleft::{IntoQuotedMut, QuotedWithContext, q};
8
9use super::boundedness::{Bounded, Boundedness, Unbounded};
10use super::keyed_singleton::KeyedSingleton;
11use super::optional::Optional;
12use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::HydroNode;
14use crate::forward_handle::ForwardRef;
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::live_collections::stream::{Ordering, Retries};
18use crate::location::dynamic::LocationId;
19use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
20use crate::manual_expr::ManualExpr;
21use crate::nondet::{NonDet, nondet};
22
23pub mod networking;
24
25/// Streaming elements of type `V` grouped by a key of type `K`.
26///
27/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
28/// order of keys is non-deterministic but the order *within* each group may be deterministic.
29///
30/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
31/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
32/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
33///
34/// Type Parameters:
35/// - `K`: the type of the key for each group
36/// - `V`: the type of the elements inside each group
37/// - `Loc`: the [`Location`] where the keyed stream is materialized
38/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
39/// - `Order`: tracks whether the elements within each group have deterministic order
40/// ([`TotalOrder`]) or not ([`NoOrder`])
41/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
42/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
43pub struct KeyedStream<
44 K,
45 V,
46 Loc,
47 Bound: Boundedness,
48 Order: Ordering = TotalOrder,
49 Retry: Retries = ExactlyOnce,
50> {
51 pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retry>,
52 pub(crate) _phantom_order: PhantomData<Order>,
53}
54
55impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
56 for KeyedStream<K, V, L, B, NoOrder, R>
57where
58 L: Location<'a>,
59{
60 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
61 KeyedStream {
62 underlying: stream.underlying,
63 _phantom_order: Default::default(),
64 }
65 }
66}
67
68impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
69 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
70{
71 fn clone(&self) -> Self {
72 KeyedStream {
73 underlying: self.underlying.clone(),
74 _phantom_order: PhantomData,
75 }
76 }
77}
78
79impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
80 for KeyedStream<K, V, L, B, O, R>
81where
82 L: Location<'a> + NoTick,
83{
84 type Location = L;
85
86 fn create_source(ident: syn::Ident, location: L) -> Self {
87 Stream::create_source(ident, location).into_keyed()
88 }
89}
90
91impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
92 for KeyedStream<K, V, L, B, O, R>
93where
94 L: Location<'a> + NoTick,
95{
96 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
97 self.underlying.complete(ident, expected_location);
98 }
99}
100
101impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
102 KeyedStream<K, V, L, B, O, R>
103{
104 /// Explicitly "casts" the keyed stream to a type with a different ordering
105 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
106 /// by the type-system.
107 ///
108 /// # Non-Determinism
109 /// This function is used as an escape hatch, and any mistakes in the
110 /// provided ordering guarantee will propagate into the guarantees
111 /// for the rest of the program.
112 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
113 KeyedStream {
114 underlying: self.underlying,
115 _phantom_order: PhantomData,
116 }
117 }
118
119 /// Explicitly "casts" the keyed stream to a type with a different retries
120 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
121 /// be proven by the type-system.
122 ///
123 /// # Non-Determinism
124 /// This function is used as an escape hatch, and any mistakes in the
125 /// provided retries guarantee will propagate into the guarantees
126 /// for the rest of the program.
127 pub fn assume_retries<R2: Retries>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
128 KeyedStream {
129 underlying: self.underlying.assume_retries::<R2>(nondet),
130 _phantom_order: PhantomData,
131 }
132 }
133
134 /// Flattens the keyed stream into an unordered stream of key-value pairs.
135 ///
136 /// # Example
137 /// ```rust
138 /// # use hydro_lang::prelude::*;
139 /// # use futures::StreamExt;
140 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
141 /// process
142 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
143 /// .into_keyed()
144 /// .entries()
145 /// # }, |mut stream| async move {
146 /// // (1, 2), (1, 3), (2, 4) in any order
147 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
148 /// # assert_eq!(stream.next().await.unwrap(), w);
149 /// # }
150 /// # }));
151 /// ```
152 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
153 self.underlying
154 }
155
156 /// Flattens the keyed stream into an unordered stream of only the values.
157 ///
158 /// # Example
159 /// ```rust
160 /// # use hydro_lang::prelude::*;
161 /// # use futures::StreamExt;
162 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
163 /// process
164 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
165 /// .into_keyed()
166 /// .values()
167 /// # }, |mut stream| async move {
168 /// // 2, 3, 4 in any order
169 /// # for w in vec![2, 3, 4] {
170 /// # assert_eq!(stream.next().await.unwrap(), w);
171 /// # }
172 /// # }));
173 /// ```
174 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
175 self.underlying.map(q!(|(_, v)| v))
176 }
177
178 /// Transforms each value by invoking `f` on each element, with keys staying the same
179 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
180 ///
181 /// If you do not want to modify the stream and instead only want to view
182 /// each item use [`KeyedStream::inspect`] instead.
183 ///
184 /// # Example
185 /// ```rust
186 /// # use hydro_lang::prelude::*;
187 /// # use futures::StreamExt;
188 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
189 /// process
190 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
191 /// .into_keyed()
192 /// .map(q!(|v| v + 1))
193 /// # .entries()
194 /// # }, |mut stream| async move {
195 /// // { 1: [3, 4], 2: [5] }
196 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
197 /// # assert_eq!(stream.next().await.unwrap(), w);
198 /// # }
199 /// # }));
200 /// ```
201 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
202 where
203 F: Fn(V) -> U + 'a,
204 {
205 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
206 KeyedStream {
207 underlying: self.underlying.map(q!({
208 let orig = f;
209 move |(k, v)| (k, orig(v))
210 })),
211 _phantom_order: Default::default(),
212 }
213 }
214
215 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
216 /// re-grouped even they are tuples; instead they will be grouped under the original key.
217 ///
218 /// If you do not want to modify the stream and instead only want to view
219 /// each item use [`KeyedStream::inspect_with_key`] instead.
220 ///
221 /// # Example
222 /// ```rust
223 /// # use hydro_lang::prelude::*;
224 /// # use futures::StreamExt;
225 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
226 /// process
227 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
228 /// .into_keyed()
229 /// .map_with_key(q!(|(k, v)| k + v))
230 /// # .entries()
231 /// # }, |mut stream| async move {
232 /// // { 1: [3, 4], 2: [6] }
233 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
234 /// # assert_eq!(stream.next().await.unwrap(), w);
235 /// # }
236 /// # }));
237 /// ```
238 pub fn map_with_key<U, F>(
239 self,
240 f: impl IntoQuotedMut<'a, F, L> + Copy,
241 ) -> KeyedStream<K, U, L, B, O, R>
242 where
243 F: Fn((K, V)) -> U + 'a,
244 K: Clone,
245 {
246 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
247 KeyedStream {
248 underlying: self.underlying.map(q!({
249 let orig = f;
250 move |(k, v)| {
251 let out = orig((k.clone(), v));
252 (k, out)
253 }
254 })),
255 _phantom_order: Default::default(),
256 }
257 }
258
259 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
260 /// `f`, preserving the order of the elements within the group.
261 ///
262 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
263 /// not modify or take ownership of the values. If you need to modify the values while filtering
264 /// use [`KeyedStream::filter_map`] instead.
265 ///
266 /// # Example
267 /// ```rust
268 /// # use hydro_lang::prelude::*;
269 /// # use futures::StreamExt;
270 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
271 /// process
272 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
273 /// .into_keyed()
274 /// .filter(q!(|&x| x > 2))
275 /// # .entries()
276 /// # }, |mut stream| async move {
277 /// // { 1: [3], 2: [4] }
278 /// # for w in vec![(1, 3), (2, 4)] {
279 /// # assert_eq!(stream.next().await.unwrap(), w);
280 /// # }
281 /// # }));
282 /// ```
283 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
284 where
285 F: Fn(&V) -> bool + 'a,
286 {
287 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
288 KeyedStream {
289 underlying: self.underlying.filter(q!({
290 let orig = f;
291 move |(_k, v)| orig(v)
292 })),
293 _phantom_order: Default::default(),
294 }
295 }
296
297 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
298 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
299 ///
300 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
301 /// not modify or take ownership of the values. If you need to modify the values while filtering
302 /// use [`KeyedStream::filter_map_with_key`] instead.
303 ///
304 /// # Example
305 /// ```rust
306 /// # use hydro_lang::prelude::*;
307 /// # use futures::StreamExt;
308 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309 /// process
310 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
311 /// .into_keyed()
312 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
313 /// # .entries()
314 /// # }, |mut stream| async move {
315 /// // { 1: [3], 2: [4] }
316 /// # for w in vec![(1, 3), (2, 4)] {
317 /// # assert_eq!(stream.next().await.unwrap(), w);
318 /// # }
319 /// # }));
320 /// ```
321 pub fn filter_with_key<F>(
322 self,
323 f: impl IntoQuotedMut<'a, F, L> + Copy,
324 ) -> KeyedStream<K, V, L, B, O, R>
325 where
326 F: Fn(&(K, V)) -> bool + 'a,
327 {
328 KeyedStream {
329 underlying: self.underlying.filter(f),
330 _phantom_order: Default::default(),
331 }
332 }
333
334 /// An operator that both filters and maps each value, with keys staying the same.
335 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
336 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
337 ///
338 /// # Example
339 /// ```rust
340 /// # use hydro_lang::prelude::*;
341 /// # use futures::StreamExt;
342 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
343 /// process
344 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
345 /// .into_keyed()
346 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
347 /// # .entries()
348 /// # }, |mut stream| async move {
349 /// // { 1: [2], 2: [4] }
350 /// # for w in vec![(1, 2), (2, 4)] {
351 /// # assert_eq!(stream.next().await.unwrap(), w);
352 /// # }
353 /// # }));
354 /// ```
355 pub fn filter_map<U, F>(
356 self,
357 f: impl IntoQuotedMut<'a, F, L> + Copy,
358 ) -> KeyedStream<K, U, L, B, O, R>
359 where
360 F: Fn(V) -> Option<U> + 'a,
361 {
362 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
363 KeyedStream {
364 underlying: self.underlying.filter_map(q!({
365 let orig = f;
366 move |(k, v)| orig(v).map(|o| (k, o))
367 })),
368 _phantom_order: Default::default(),
369 }
370 }
371
372 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
373 /// re-grouped even they are tuples; instead they will be grouped under the original key.
374 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
375 ///
376 /// # Example
377 /// ```rust
378 /// # use hydro_lang::prelude::*;
379 /// # use futures::StreamExt;
380 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
381 /// process
382 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
383 /// .into_keyed()
384 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
385 /// # .entries()
386 /// # }, |mut stream| async move {
387 /// // { 2: [2] }
388 /// # for w in vec![(2, 2)] {
389 /// # assert_eq!(stream.next().await.unwrap(), w);
390 /// # }
391 /// # }));
392 /// ```
393 pub fn filter_map_with_key<U, F>(
394 self,
395 f: impl IntoQuotedMut<'a, F, L> + Copy,
396 ) -> KeyedStream<K, U, L, B, O, R>
397 where
398 F: Fn((K, V)) -> Option<U> + 'a,
399 K: Clone,
400 {
401 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
402 KeyedStream {
403 underlying: self.underlying.filter_map(q!({
404 let orig = f;
405 move |(k, v)| {
406 let out = orig((k.clone(), v));
407 out.map(|o| (k, o))
408 }
409 })),
410 _phantom_order: Default::default(),
411 }
412 }
413
414 /// For each value `v` in each group, transform `v` using `f` and then treat the
415 /// result as an [`Iterator`] to produce values one by one within the same group.
416 /// The implementation for [`Iterator`] for the output type `I` must produce items
417 /// in a **deterministic** order.
418 ///
419 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
420 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
421 ///
422 /// # Example
423 /// ```rust
424 /// # use hydro_lang::prelude::*;
425 /// # use futures::StreamExt;
426 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
427 /// process
428 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
429 /// .into_keyed()
430 /// .flat_map_ordered(q!(|x| x))
431 /// # .entries()
432 /// # }, |mut stream| async move {
433 /// // { 1: [2, 3, 4], 2: [5, 6] }
434 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
435 /// # assert_eq!(stream.next().await.unwrap(), w);
436 /// # }
437 /// # }));
438 /// ```
439 pub fn flat_map_ordered<U, I, F>(
440 self,
441 f: impl IntoQuotedMut<'a, F, L> + Copy,
442 ) -> KeyedStream<K, U, L, B, O, R>
443 where
444 I: IntoIterator<Item = U>,
445 F: Fn(V) -> I + 'a,
446 K: Clone,
447 {
448 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
449 KeyedStream {
450 underlying: self.underlying.flat_map_ordered(q!({
451 let orig = f;
452 move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
453 })),
454 _phantom_order: Default::default(),
455 }
456 }
457
458 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
459 /// for the output type `I` to produce items in any order.
460 ///
461 /// # Example
462 /// ```rust
463 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
464 /// # use futures::StreamExt;
465 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
466 /// process
467 /// .source_iter(q!(vec![
468 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
469 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
470 /// ]))
471 /// .into_keyed()
472 /// .flat_map_unordered(q!(|x| x))
473 /// # .entries()
474 /// # }, |mut stream| async move {
475 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
476 /// # let mut results = Vec::new();
477 /// # for _ in 0..4 {
478 /// # results.push(stream.next().await.unwrap());
479 /// # }
480 /// # results.sort();
481 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
482 /// # }));
483 /// ```
484 pub fn flat_map_unordered<U, I, F>(
485 self,
486 f: impl IntoQuotedMut<'a, F, L> + Copy,
487 ) -> KeyedStream<K, U, L, B, NoOrder, R>
488 where
489 I: IntoIterator<Item = U>,
490 F: Fn(V) -> I + 'a,
491 K: Clone,
492 {
493 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
494 KeyedStream {
495 underlying: self.underlying.flat_map_unordered(q!({
496 let orig = f;
497 move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
498 })),
499 _phantom_order: Default::default(),
500 }
501 }
502
503 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
504 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
505 /// items in a **deterministic** order.
506 ///
507 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
508 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
509 ///
510 /// # Example
511 /// ```rust
512 /// # use hydro_lang::prelude::*;
513 /// # use futures::StreamExt;
514 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
515 /// process
516 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
517 /// .into_keyed()
518 /// .flatten_ordered()
519 /// # .entries()
520 /// # }, |mut stream| async move {
521 /// // { 1: [2, 3, 4], 2: [5, 6] }
522 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
523 /// # assert_eq!(stream.next().await.unwrap(), w);
524 /// # }
525 /// # }));
526 /// ```
527 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
528 where
529 V: IntoIterator<Item = U>,
530 K: Clone,
531 {
532 self.flat_map_ordered(q!(|d| d))
533 }
534
535 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
536 /// for the value type `V` to produce items in any order.
537 ///
538 /// # Example
539 /// ```rust
540 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
541 /// # use futures::StreamExt;
542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
543 /// process
544 /// .source_iter(q!(vec![
545 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
546 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
547 /// ]))
548 /// .into_keyed()
549 /// .flatten_unordered()
550 /// # .entries()
551 /// # }, |mut stream| async move {
552 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
553 /// # let mut results = Vec::new();
554 /// # for _ in 0..4 {
555 /// # results.push(stream.next().await.unwrap());
556 /// # }
557 /// # results.sort();
558 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
559 /// # }));
560 /// ```
561 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
562 where
563 V: IntoIterator<Item = U>,
564 K: Clone,
565 {
566 self.flat_map_unordered(q!(|d| d))
567 }
568
569 /// An operator which allows you to "inspect" each element of a stream without
570 /// modifying it. The closure `f` is called on a reference to each value. This is
571 /// mainly useful for debugging, and should not be used to generate side-effects.
572 ///
573 /// # Example
574 /// ```rust
575 /// # use hydro_lang::prelude::*;
576 /// # use futures::StreamExt;
577 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
578 /// process
579 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
580 /// .into_keyed()
581 /// .inspect(q!(|v| println!("{}", v)))
582 /// # .entries()
583 /// # }, |mut stream| async move {
584 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
585 /// # assert_eq!(stream.next().await.unwrap(), w);
586 /// # }
587 /// # }));
588 /// ```
589 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
590 where
591 F: Fn(&V) + 'a,
592 {
593 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
594 KeyedStream {
595 underlying: self.underlying.inspect(q!({
596 let orig = f;
597 move |(_k, v)| orig(v)
598 })),
599 _phantom_order: Default::default(),
600 }
601 }
602
603 /// An operator which allows you to "inspect" each element of a stream without
604 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
605 /// mainly useful for debugging, and should not be used to generate side-effects.
606 ///
607 /// # Example
608 /// ```rust
609 /// # use hydro_lang::prelude::*;
610 /// # use futures::StreamExt;
611 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
612 /// process
613 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
614 /// .into_keyed()
615 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
616 /// # .entries()
617 /// # }, |mut stream| async move {
618 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
619 /// # assert_eq!(stream.next().await.unwrap(), w);
620 /// # }
621 /// # }));
622 /// ```
623 pub fn inspect_with_key<F>(
624 self,
625 f: impl IntoQuotedMut<'a, F, L>,
626 ) -> KeyedStream<K, V, L, B, O, R>
627 where
628 F: Fn(&(K, V)) + 'a,
629 {
630 KeyedStream {
631 underlying: self.underlying.inspect(f),
632 _phantom_order: Default::default(),
633 }
634 }
635
636 /// An operator which allows you to "name" a `HydroNode`.
637 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
638 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
639 {
640 let mut node = self.underlying.ir_node.borrow_mut();
641 let metadata = node.metadata_mut();
642 metadata.tag = Some(name.to_string());
643 }
644 self
645 }
646}
647
648impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
649 KeyedStream<K, V, L, Unbounded, O, R>
650{
651 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
652 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
653 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
654 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
655 ///
656 /// Currently, both input streams must be [`Unbounded`].
657 ///
658 /// # Example
659 /// ```rust
660 /// # use hydro_lang::prelude::*;
661 /// # use futures::StreamExt;
662 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
664 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
665 /// numbers1.interleave(numbers2)
666 /// # .entries()
667 /// # }, |mut stream| async move {
668 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
669 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
670 /// # assert_eq!(stream.next().await.unwrap(), w);
671 /// # }
672 /// # }));
673 /// ```
674 pub fn interleave<O2: Ordering, R2: Retries>(
675 self,
676 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
677 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
678 where
679 R: MinRetries<R2>,
680 {
681 self.entries().interleave(other.entries()).into_keyed()
682 }
683}
684
685/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
686/// control the processing of future elements.
687pub enum Generate<T> {
688 /// Emit the provided element, and keep processing future inputs.
689 Yield(T),
690 /// Emit the provided element as the _final_ element, do not process future inputs.
691 Return(T),
692 /// Do not emit anything, but continue processing future inputs.
693 Continue,
694 /// Do not emit anything, and do not process further inputs.
695 Break,
696}
697
698impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
699where
700 K: Eq + Hash,
701 L: Location<'a>,
702{
703 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
704 ///
705 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
706 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
707 /// early by returning `None`.
708 ///
709 /// The function takes a mutable reference to the accumulator and the current element, and returns
710 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
711 /// If the function returns `None`, the stream is terminated and no more elements are processed.
712 ///
713 /// # Example
714 /// ```rust
715 /// # use hydro_lang::prelude::*;
716 /// # use futures::StreamExt;
717 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718 /// process
719 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
720 /// .into_keyed()
721 /// .scan(
722 /// q!(|| 0),
723 /// q!(|acc, x| {
724 /// *acc += x;
725 /// if *acc % 2 == 0 { None } else { Some(*acc) }
726 /// }),
727 /// )
728 /// # .entries()
729 /// # }, |mut stream| async move {
730 /// // Output: { 0: [1], 1: [3, 7] }
731 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
732 /// # assert_eq!(stream.next().await.unwrap(), w);
733 /// # }
734 /// # }));
735 /// ```
736 pub fn scan<A, U, I, F>(
737 self,
738 init: impl IntoQuotedMut<'a, I, L> + Copy,
739 f: impl IntoQuotedMut<'a, F, L> + Copy,
740 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
741 where
742 K: Clone,
743 I: Fn() -> A + 'a,
744 F: Fn(&mut A, V) -> Option<U> + 'a,
745 {
746 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
747 self.generator(
748 init,
749 q!({
750 let orig = f;
751 move |state, v| {
752 if let Some(out) = orig(state, v) {
753 Generate::Yield(out)
754 } else {
755 Generate::Break
756 }
757 }
758 }),
759 )
760 }
761
762 /// Iteratively processes the elements in each group using a state machine that can yield
763 /// elements as it processes its inputs. This is designed to mirror the unstable generator
764 /// syntax in Rust, without requiring special syntax.
765 ///
766 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
767 /// state for each group. The second argument defines the processing logic, taking in a
768 /// mutable reference to the group's state and the value to be processed. It emits a
769 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
770 /// should be processed.
771 ///
772 /// # Example
773 /// ```rust
774 /// # use hydro_lang::prelude::*;
775 /// # use futures::StreamExt;
776 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
777 /// process
778 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
779 /// .into_keyed()
780 /// .generator(
781 /// q!(|| 0),
782 /// q!(|acc, x| {
783 /// *acc += x;
784 /// if *acc > 100 {
785 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
786 /// "done!".to_string()
787 /// )
788 /// } else if *acc % 2 == 0 {
789 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
790 /// "even".to_string()
791 /// )
792 /// } else {
793 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
794 /// }
795 /// }),
796 /// )
797 /// # .entries()
798 /// # }, |mut stream| async move {
799 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
800 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
801 /// # assert_eq!(stream.next().await.unwrap(), w);
802 /// # }
803 /// # }));
804 /// ```
805 pub fn generator<A, U, I, F>(
806 self,
807 init: impl IntoQuotedMut<'a, I, L> + Copy,
808 f: impl IntoQuotedMut<'a, F, L> + Copy,
809 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
810 where
811 K: Clone,
812 I: Fn() -> A + 'a,
813 F: Fn(&mut A, V) -> Generate<U> + 'a,
814 {
815 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
816 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
817 let underlying_scanned = self
818 .underlying
819 .assume_ordering(nondet!(
820 /** we do not rely on the order of keys */
821 ))
822 .scan(
823 q!(|| HashMap::new()),
824 q!(move |acc, (k, v)| {
825 let existing_state = acc.entry(k.clone()).or_insert_with(|| Some(init()));
826 if let Some(existing_state_value) = existing_state {
827 match f(existing_state_value, v) {
828 Generate::Yield(out) => Some(Some((k, out))),
829 Generate::Return(out) => {
830 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
831 Some(Some((k, out)))
832 }
833 Generate::Break => {
834 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
835 Some(None)
836 }
837 Generate::Continue => Some(None),
838 }
839 } else {
840 Some(None)
841 }
842 }),
843 )
844 .flatten_ordered();
845
846 KeyedStream {
847 underlying: underlying_scanned.into(),
848 _phantom_order: Default::default(),
849 }
850 }
851
852 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
853 /// in-order across the values in each group. But the aggregation function returns a boolean,
854 /// which when true indicates that the aggregated result is complete and can be released to
855 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
856 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
857 /// normal stream elements.
858 ///
859 /// # Example
860 /// ```rust
861 /// # use hydro_lang::prelude::*;
862 /// # use futures::StreamExt;
863 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
864 /// process
865 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
866 /// .into_keyed()
867 /// .fold_early_stop(
868 /// q!(|| 0),
869 /// q!(|acc, x| {
870 /// *acc += x;
871 /// x % 2 == 0
872 /// }),
873 /// )
874 /// # .entries()
875 /// # }, |mut stream| async move {
876 /// // Output: { 0: 2, 1: 9 }
877 /// # for w in vec![(0, 2), (1, 9)] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// ```
882 pub fn fold_early_stop<A, I, F>(
883 self,
884 init: impl IntoQuotedMut<'a, I, L> + Copy,
885 f: impl IntoQuotedMut<'a, F, L> + Copy,
886 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
887 where
888 K: Clone,
889 I: Fn() -> A + 'a,
890 F: Fn(&mut A, V) -> bool + 'a,
891 {
892 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
893 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
894 let out_without_bound_cast = self
895 .generator(
896 q!(move || Some(init())),
897 q!(move |key_state, v| {
898 if let Some(key_state_value) = key_state.as_mut() {
899 if f(key_state_value, v) {
900 Generate::Return(key_state.take().unwrap())
901 } else {
902 Generate::Continue
903 }
904 } else {
905 unreachable!()
906 }
907 }),
908 )
909 .underlying;
910
911 KeyedSingleton {
912 underlying: out_without_bound_cast,
913 }
914 }
915
916 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
917 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
918 /// otherwise the first element would be non-deterministic.
919 ///
920 /// # Example
921 /// ```rust
922 /// # use hydro_lang::prelude::*;
923 /// # use futures::StreamExt;
924 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
925 /// process
926 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
927 /// .into_keyed()
928 /// .first()
929 /// # .entries()
930 /// # }, |mut stream| async move {
931 /// // Output: { 0: 2, 1: 3 }
932 /// # for w in vec![(0, 2), (1, 3)] {
933 /// # assert_eq!(stream.next().await.unwrap(), w);
934 /// # }
935 /// # }));
936 /// ```
937 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
938 where
939 K: Clone,
940 {
941 self.fold_early_stop(
942 q!(|| None),
943 q!(|acc, v| {
944 *acc = Some(v);
945 true
946 }),
947 )
948 .map(q!(|v| v.unwrap()))
949 }
950
951 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
952 ///
953 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
954 /// to depend on the order of elements in the group.
955 ///
956 /// If the input and output value types are the same and do not require initialization then use
957 /// [`KeyedStream::reduce`].
958 ///
959 /// # Example
960 /// ```rust
961 /// # use hydro_lang::prelude::*;
962 /// # use futures::StreamExt;
963 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
964 /// let tick = process.tick();
965 /// let numbers = process
966 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
967 /// .into_keyed();
968 /// let batch = numbers.batch(&tick, nondet!(/** test */));
969 /// batch
970 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
971 /// .entries()
972 /// .all_ticks()
973 /// # }, |mut stream| async move {
974 /// // (1, 5), (2, 7)
975 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
976 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
977 /// # }));
978 /// ```
979 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
980 self,
981 init: impl IntoQuotedMut<'a, I, L>,
982 comb: impl IntoQuotedMut<'a, F, L>,
983 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
984 let init = init.splice_fn0_ctx(&self.underlying.location).into();
985 let comb = comb
986 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
987 .into();
988
989 let out_ir = HydroNode::FoldKeyed {
990 init,
991 acc: comb,
992 input: Box::new(self.underlying.ir_node.into_inner()),
993 metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
994 };
995
996 KeyedSingleton {
997 underlying: Stream::new(self.underlying.location, out_ir),
998 }
999 }
1000
1001 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1002 ///
1003 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1004 /// to depend on the order of elements in the stream.
1005 ///
1006 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1007 ///
1008 /// # Example
1009 /// ```rust
1010 /// # use hydro_lang::prelude::*;
1011 /// # use futures::StreamExt;
1012 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1013 /// let tick = process.tick();
1014 /// let numbers = process
1015 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1016 /// .into_keyed();
1017 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1018 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1019 /// # }, |mut stream| async move {
1020 /// // (1, 5), (2, 7)
1021 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1022 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1023 /// # }));
1024 /// ```
1025 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1026 self,
1027 comb: impl IntoQuotedMut<'a, F, L>,
1028 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1029 let f = comb
1030 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1031 .into();
1032
1033 let out_ir = HydroNode::ReduceKeyed {
1034 f,
1035 input: Box::new(self.underlying.ir_node.into_inner()),
1036 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1037 };
1038
1039 KeyedSingleton {
1040 underlying: Stream::new(self.underlying.location, out_ir),
1041 }
1042 }
1043
1044 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1045 ///
1046 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1047 /// to depend on the order of elements in the stream.
1048 ///
1049 /// # Example
1050 /// ```rust
1051 /// # use hydro_lang::prelude::*;
1052 /// # use futures::StreamExt;
1053 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1054 /// let tick = process.tick();
1055 /// let watermark = tick.singleton(q!(1));
1056 /// let numbers = process
1057 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1058 /// .into_keyed();
1059 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1060 /// batch
1061 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1062 /// .entries()
1063 /// .all_ticks()
1064 /// # }, |mut stream| async move {
1065 /// // (2, 204)
1066 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1067 /// # }));
1068 /// ```
1069 pub fn reduce_watermark<O, F>(
1070 self,
1071 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1072 comb: impl IntoQuotedMut<'a, F, L>,
1073 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1074 where
1075 O: Clone,
1076 F: Fn(&mut V, V) + 'a,
1077 {
1078 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1079 check_matching_location(&self.underlying.location.root(), other.location.outer());
1080 let f = comb
1081 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1082 .into();
1083
1084 let out_ir = Stream::new(
1085 self.underlying.location.clone(),
1086 HydroNode::ReduceKeyedWatermark {
1087 f,
1088 input: Box::new(self.underlying.ir_node.into_inner()),
1089 watermark: Box::new(other.ir_node.into_inner()),
1090 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1091 },
1092 );
1093
1094 KeyedSingleton { underlying: out_ir }
1095 }
1096}
1097
1098impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1099where
1100 K: Eq + Hash,
1101 L: Location<'a>,
1102{
1103 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1104 ///
1105 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1106 ///
1107 /// If the input and output value types are the same and do not require initialization then use
1108 /// [`KeyedStream::reduce_commutative`].
1109 ///
1110 /// # Example
1111 /// ```rust
1112 /// # use hydro_lang::prelude::*;
1113 /// # use futures::StreamExt;
1114 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1115 /// let tick = process.tick();
1116 /// let numbers = process
1117 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1118 /// .into_keyed();
1119 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1120 /// batch
1121 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1122 /// .entries()
1123 /// .all_ticks()
1124 /// # }, |mut stream| async move {
1125 /// // (1, 5), (2, 7)
1126 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1127 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1128 /// # }));
1129 /// ```
1130 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1131 self,
1132 init: impl IntoQuotedMut<'a, I, L>,
1133 comb: impl IntoQuotedMut<'a, F, L>,
1134 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1135 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1136 .fold(init, comb)
1137 }
1138
1139 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1140 ///
1141 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1142 ///
1143 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1144 ///
1145 /// # Example
1146 /// ```rust
1147 /// # use hydro_lang::prelude::*;
1148 /// # use futures::StreamExt;
1149 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1150 /// let tick = process.tick();
1151 /// let numbers = process
1152 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1153 /// .into_keyed();
1154 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1155 /// batch
1156 /// .reduce_commutative(q!(|acc, x| *acc += x))
1157 /// .entries()
1158 /// .all_ticks()
1159 /// # }, |mut stream| async move {
1160 /// // (1, 5), (2, 7)
1161 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1162 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1163 /// # }));
1164 /// ```
1165 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1166 self,
1167 comb: impl IntoQuotedMut<'a, F, L>,
1168 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1169 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1170 .reduce(comb)
1171 }
1172
1173 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1174 ///
1175 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1176 ///
1177 /// # Example
1178 /// ```rust
1179 /// # use hydro_lang::prelude::*;
1180 /// # use futures::StreamExt;
1181 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1182 /// let tick = process.tick();
1183 /// let watermark = tick.singleton(q!(1));
1184 /// let numbers = process
1185 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1186 /// .into_keyed();
1187 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1188 /// batch
1189 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1190 /// .entries()
1191 /// .all_ticks()
1192 /// # }, |mut stream| async move {
1193 /// // (2, 204)
1194 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1195 /// # }));
1196 /// ```
1197 pub fn reduce_watermark_commutative<O2, F>(
1198 self,
1199 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1200 comb: impl IntoQuotedMut<'a, F, L>,
1201 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1202 where
1203 O2: Clone,
1204 F: Fn(&mut V, V) + 'a,
1205 {
1206 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1207 .reduce_watermark(other, comb)
1208 }
1209}
1210
1211impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1212where
1213 K: Eq + Hash,
1214 L: Location<'a>,
1215{
1216 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1217 ///
1218 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1219 ///
1220 /// If the input and output value types are the same and do not require initialization then use
1221 /// [`KeyedStream::reduce_idempotent`].
1222 ///
1223 /// # Example
1224 /// ```rust
1225 /// # use hydro_lang::prelude::*;
1226 /// # use futures::StreamExt;
1227 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1228 /// let tick = process.tick();
1229 /// let numbers = process
1230 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1231 /// .into_keyed();
1232 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1233 /// batch
1234 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1235 /// .entries()
1236 /// .all_ticks()
1237 /// # }, |mut stream| async move {
1238 /// // (1, false), (2, true)
1239 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1240 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1241 /// # }));
1242 /// ```
1243 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1244 self,
1245 init: impl IntoQuotedMut<'a, I, L>,
1246 comb: impl IntoQuotedMut<'a, F, L>,
1247 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1248 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1249 .fold(init, comb)
1250 }
1251
1252 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1253 ///
1254 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1255 ///
1256 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1257 ///
1258 /// # Example
1259 /// ```rust
1260 /// # use hydro_lang::prelude::*;
1261 /// # use futures::StreamExt;
1262 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1263 /// let tick = process.tick();
1264 /// let numbers = process
1265 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1266 /// .into_keyed();
1267 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1268 /// batch
1269 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1270 /// .entries()
1271 /// .all_ticks()
1272 /// # }, |mut stream| async move {
1273 /// // (1, false), (2, true)
1274 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1275 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1276 /// # }));
1277 /// ```
1278 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1279 self,
1280 comb: impl IntoQuotedMut<'a, F, L>,
1281 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1282 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1283 .reduce(comb)
1284 }
1285
1286 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1287 ///
1288 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1289 ///
1290 /// # Example
1291 /// ```rust
1292 /// # use hydro_lang::prelude::*;
1293 /// # use futures::StreamExt;
1294 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1295 /// let tick = process.tick();
1296 /// let watermark = tick.singleton(q!(1));
1297 /// let numbers = process
1298 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1299 /// .into_keyed();
1300 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1301 /// batch
1302 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1303 /// .entries()
1304 /// .all_ticks()
1305 /// # }, |mut stream| async move {
1306 /// // (2, true)
1307 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1308 /// # }));
1309 /// ```
1310 pub fn reduce_watermark_idempotent<O2, F>(
1311 self,
1312 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1313 comb: impl IntoQuotedMut<'a, F, L>,
1314 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1315 where
1316 O2: Clone,
1317 F: Fn(&mut V, V) + 'a,
1318 {
1319 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1320 .reduce_watermark(other, comb)
1321 }
1322}
1323
1324impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1325where
1326 K: Eq + Hash,
1327 L: Location<'a>,
1328{
1329 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1330 ///
1331 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1332 /// as there may be non-deterministic duplicates.
1333 ///
1334 /// If the input and output value types are the same and do not require initialization then use
1335 /// [`KeyedStream::reduce_commutative_idempotent`].
1336 ///
1337 /// # Example
1338 /// ```rust
1339 /// # use hydro_lang::prelude::*;
1340 /// # use futures::StreamExt;
1341 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1342 /// let tick = process.tick();
1343 /// let numbers = process
1344 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1345 /// .into_keyed();
1346 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1347 /// batch
1348 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1349 /// .entries()
1350 /// .all_ticks()
1351 /// # }, |mut stream| async move {
1352 /// // (1, false), (2, true)
1353 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1354 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1355 /// # }));
1356 /// ```
1357 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1358 self,
1359 init: impl IntoQuotedMut<'a, I, L>,
1360 comb: impl IntoQuotedMut<'a, F, L>,
1361 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1362 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1363 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1364 .fold(init, comb)
1365 }
1366
1367 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1368 ///
1369 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1370 /// as there may be non-deterministic duplicates.
1371 ///
1372 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1373 ///
1374 /// # Example
1375 /// ```rust
1376 /// # use hydro_lang::prelude::*;
1377 /// # use futures::StreamExt;
1378 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1379 /// let tick = process.tick();
1380 /// let numbers = process
1381 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1382 /// .into_keyed();
1383 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1384 /// batch
1385 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1386 /// .entries()
1387 /// .all_ticks()
1388 /// # }, |mut stream| async move {
1389 /// // (1, false), (2, true)
1390 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1391 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1392 /// # }));
1393 /// ```
1394 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1395 self,
1396 comb: impl IntoQuotedMut<'a, F, L>,
1397 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1398 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1399 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1400 .reduce(comb)
1401 }
1402
1403 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1404 ///
1405 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1406 /// as there may be non-deterministic duplicates.
1407 ///
1408 /// # Example
1409 /// ```rust
1410 /// # use hydro_lang::prelude::*;
1411 /// # use futures::StreamExt;
1412 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1413 /// let tick = process.tick();
1414 /// let watermark = tick.singleton(q!(1));
1415 /// let numbers = process
1416 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1417 /// .into_keyed();
1418 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1419 /// batch
1420 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1421 /// .entries()
1422 /// .all_ticks()
1423 /// # }, |mut stream| async move {
1424 /// // (2, true)
1425 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1426 /// # }));
1427 /// ```
1428 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1429 self,
1430 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1431 comb: impl IntoQuotedMut<'a, F, L>,
1432 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1433 where
1434 O2: Clone,
1435 F: Fn(&mut V, V) + 'a,
1436 {
1437 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1438 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1439 .reduce_watermark(other, comb)
1440 }
1441
1442 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1443 /// whose keys are not in the bounded stream.
1444 ///
1445 /// # Example
1446 /// ```rust
1447 /// # use hydro_lang::prelude::*;
1448 /// # use futures::StreamExt;
1449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450 /// let tick = process.tick();
1451 /// let keyed_stream = process
1452 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1453 /// .batch(&tick, nondet!(/** test */))
1454 /// .into_keyed();
1455 /// let keys_to_remove = process
1456 /// .source_iter(q!(vec![1, 2]))
1457 /// .batch(&tick, nondet!(/** test */));
1458 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1459 /// # .entries()
1460 /// # }, |mut stream| async move {
1461 /// // { 3: ['c'], 4: ['d'] }
1462 /// # for w in vec![(3, 'c'), (4, 'd')] {
1463 /// # assert_eq!(stream.next().await.unwrap(), w);
1464 /// # }
1465 /// # }));
1466 /// ```
1467 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1468 self,
1469 other: Stream<K, L, Bounded, O2, R2>,
1470 ) -> Self {
1471 KeyedStream {
1472 underlying: self.entries().anti_join(other),
1473 _phantom_order: Default::default(),
1474 }
1475 }
1476}
1477
1478impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1479where
1480 L: Location<'a> + NoTick,
1481{
1482 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1483 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1484 ///
1485 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1486 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1487 /// argument that declares where the stream will be atomically processed. Batching a stream into
1488 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1489 /// [`Tick`] will introduce asynchrony.
1490 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1491 KeyedStream {
1492 underlying: self.underlying.atomic(tick),
1493 _phantom_order: Default::default(),
1494 }
1495 }
1496
1497 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1498 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1499 /// the order of the input.
1500 ///
1501 /// # Non-Determinism
1502 /// The batch boundaries are non-deterministic and may change across executions.
1503 pub fn batch(
1504 self,
1505 tick: &Tick<L>,
1506 nondet: NonDet,
1507 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1508 KeyedStream {
1509 underlying: self.underlying.batch(tick, nondet),
1510 _phantom_order: Default::default(),
1511 }
1512 }
1513}
1514
1515impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1516where
1517 L: Location<'a> + NoTick,
1518{
1519 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1520 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1521 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1522 /// used to create the atomic section.
1523 ///
1524 /// # Non-Determinism
1525 /// The batch boundaries are non-deterministic and may change across executions.
1526 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1527 KeyedStream {
1528 underlying: self.underlying.batch_atomic(nondet),
1529 _phantom_order: Default::default(),
1530 }
1531 }
1532
1533 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1534 /// See [`KeyedStream::atomic`] for more details.
1535 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1536 KeyedStream {
1537 underlying: self.underlying.end_atomic(),
1538 _phantom_order: Default::default(),
1539 }
1540 }
1541}
1542
1543impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1544where
1545 L: Location<'a>,
1546{
1547 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1548 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1549 /// is only present in one of the inputs, its values are passed through as-is). The output has
1550 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1551 ///
1552 /// Currently, both input streams must be [`Bounded`]. This operator will block
1553 /// on the first stream until all its elements are available. In a future version,
1554 /// we will relax the requirement on the `other` stream.
1555 ///
1556 /// # Example
1557 /// ```rust
1558 /// # use hydro_lang::prelude::*;
1559 /// # use futures::StreamExt;
1560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561 /// let tick = process.tick();
1562 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1563 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1564 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1565 /// # .entries()
1566 /// # }, |mut stream| async move {
1567 /// // { 0: [2, 1], 1: [4, 3] }
1568 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1569 /// # assert_eq!(stream.next().await.unwrap(), w);
1570 /// # }
1571 /// # }));
1572 /// ```
1573 pub fn chain<O2: Ordering>(
1574 self,
1575 other: KeyedStream<K, V, L, Bounded, O2, R>,
1576 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
1577 where
1578 O: MinOrder<O2>,
1579 {
1580 KeyedStream {
1581 underlying: self.underlying.chain(other.underlying),
1582 _phantom_order: Default::default(),
1583 }
1584 }
1585}
1586
1587impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1588where
1589 L: Location<'a>,
1590{
1591 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1592 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1593 /// each key.
1594 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1595 KeyedStream {
1596 underlying: self.underlying.all_ticks(),
1597 _phantom_order: Default::default(),
1598 }
1599 }
1600
1601 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1602 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1603 /// each key.
1604 ///
1605 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1606 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1607 /// stream's [`Tick`] context.
1608 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1609 KeyedStream {
1610 underlying: self.underlying.all_ticks(),
1611 _phantom_order: Default::default(),
1612 }
1613 }
1614
1615 #[expect(missing_docs, reason = "TODO")]
1616 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1617 KeyedStream {
1618 underlying: self.underlying.defer_tick(),
1619 _phantom_order: Default::default(),
1620 }
1621 }
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626 use futures::{SinkExt, StreamExt};
1627 use hydro_deploy::Deployment;
1628 use stageleft::q;
1629
1630 use crate::compile::builder::FlowBuilder;
1631 use crate::location::Location;
1632 use crate::nondet::nondet;
1633
1634 #[tokio::test]
1635 async fn reduce_watermark_filter() {
1636 let mut deployment = Deployment::new();
1637
1638 let flow = FlowBuilder::new();
1639 let node = flow.process::<()>();
1640 let external = flow.external::<()>();
1641
1642 let node_tick = node.tick();
1643 let watermark = node_tick.singleton(q!(1));
1644
1645 let sum = node
1646 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1647 .into_keyed()
1648 .reduce_watermark(
1649 watermark,
1650 q!(|acc, v| {
1651 *acc += v;
1652 }),
1653 )
1654 .snapshot(&node_tick, nondet!(/** test */))
1655 .entries()
1656 .all_ticks()
1657 .send_bincode_external(&external);
1658
1659 let nodes = flow
1660 .with_process(&node, deployment.Localhost())
1661 .with_external(&external, deployment.Localhost())
1662 .deploy(&mut deployment);
1663
1664 deployment.deploy().await.unwrap();
1665
1666 let mut out = nodes.connect_source_bincode(sum).await;
1667
1668 deployment.start().await.unwrap();
1669
1670 assert_eq!(out.next().await.unwrap(), (2, 204));
1671 }
1672
1673 #[tokio::test]
1674 async fn reduce_watermark_garbage_collect() {
1675 let mut deployment = Deployment::new();
1676
1677 let flow = FlowBuilder::new();
1678 let node = flow.process::<()>();
1679 let external = flow.external::<()>();
1680 let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1681
1682 let node_tick = node.tick();
1683 let (watermark_complete_cycle, watermark) =
1684 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1685 let next_watermark = watermark.clone().map(q!(|v| v + 1));
1686 watermark_complete_cycle.complete_next_tick(next_watermark);
1687
1688 let tick_triggered_input = node
1689 .source_iter(q!([(3, 103)]))
1690 .batch(&node_tick, nondet!(/** test */))
1691 .filter_if_some(
1692 tick_trigger
1693 .clone()
1694 .batch(&node_tick, nondet!(/** test */))
1695 .first(),
1696 )
1697 .all_ticks();
1698
1699 let sum = node
1700 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1701 .interleave(tick_triggered_input)
1702 .into_keyed()
1703 .reduce_watermark_commutative(
1704 watermark,
1705 q!(|acc, v| {
1706 *acc += v;
1707 }),
1708 )
1709 .snapshot(&node_tick, nondet!(/** test */))
1710 .entries()
1711 .all_ticks()
1712 .send_bincode_external(&external);
1713
1714 let nodes = flow
1715 .with_default_optimize()
1716 .with_process(&node, deployment.Localhost())
1717 .with_external(&external, deployment.Localhost())
1718 .deploy(&mut deployment);
1719
1720 deployment.deploy().await.unwrap();
1721
1722 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1723 let mut out_recv = nodes.connect_source_bincode(sum).await;
1724
1725 deployment.start().await.unwrap();
1726
1727 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1728
1729 tick_send.send(()).await.unwrap();
1730
1731 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1732 }
1733}