hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
27    membership
28        .fold(
29            q!(|| false),
30            q!(|present, event| {
31                match event {
32                    MembershipEvent::Joined => *present = true,
33                    MembershipEvent::Left => *present = false,
34                }
35            }),
36        )
37        .filter_map(q!(|v| if v { Some(()) } else { None }))
38}
39
40fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
41    let root = get_this_crate();
42
43    if is_demux {
44        parse_quote! {
45            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
46                |(id, data)| {
47                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
48                }
49            )
50        }
51    } else {
52        parse_quote! {
53            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
54                |data| {
55                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
56                }
57            )
58        }
59    }
60}
61
62pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
63    serialize_bincode_with_type(is_demux, &quote_type::<T>())
64}
65
66fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
67    let root = get_this_crate();
68
69    if let Some(c_type) = tagged {
70        parse_quote! {
71            |res| {
72                let (id, b) = res.unwrap();
73                (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
74            }
75        }
76    } else {
77        parse_quote! {
78            |res| {
79                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
80            }
81        }
82    }
83}
84
85pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
86    deserialize_bincode_with_type(tagged, &quote_type::<T>())
87}
88
89impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
90    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
91    /// using [`bincode`] to serialize/deserialize messages.
92    ///
93    /// The returned stream captures the elements received at the destination, where values will
94    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
95    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
96    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
97    /// dropped no further messages will be sent.
98    ///
99    /// # Example
100    /// ```rust
101    /// # use hydro_lang::prelude::*;
102    /// # use futures::StreamExt;
103    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
104    /// let p1 = flow.process::<()>();
105    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
106    /// let p2 = flow.process::<()>();
107    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
108    /// // 1, 2, 3
109    /// # on_p2.send_bincode(&p_out)
110    /// # }, |mut stream| async move {
111    /// # for w in 1..=3 {
112    /// #     assert_eq!(stream.next().await, Some(w));
113    /// # }
114    /// # }));
115    /// ```
116    pub fn send_bincode<L2>(
117        self,
118        other: &Process<'a, L2>,
119    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
120    where
121        T: Serialize + DeserializeOwned,
122    {
123        let serialize_pipeline = Some(serialize_bincode::<T>(false));
124
125        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
126
127        Stream::new(
128            other.clone(),
129            HydroNode::Network {
130                serialize_fn: serialize_pipeline.map(|e| e.into()),
131                instantiate_fn: DebugInstantiate::Building,
132                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133                input: Box::new(self.ir_node.into_inner()),
134                metadata: other.new_node_metadata::<T>(),
135            },
136        )
137    }
138
139    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
140    /// using [`bincode`] to serialize/deserialize messages.
141    ///
142    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
143    /// membership information. This is a common pattern in distributed systems for broadcasting data to
144    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
145    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
146    /// each element to all cluster members.
147    ///
148    /// # Non-Determinism
149    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
150    /// to the current cluster members _at that point in time_. Depending on when we are notified of
151    /// membership changes, we will broadcast each element to different members.
152    ///
153    /// # Example
154    /// ```rust
155    /// # use hydro_lang::prelude::*;
156    /// # use futures::StreamExt;
157    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
158    /// let p1 = flow.process::<()>();
159    /// let workers: Cluster<()> = flow.cluster::<()>();
160    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
161    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
162    /// # on_worker.send_bincode(&p2).entries()
163    /// // if there are 4 members in the cluster, each receives one element
164    /// // - MemberId::<()>(0): [123]
165    /// // - MemberId::<()>(1): [123]
166    /// // - MemberId::<()>(2): [123]
167    /// // - MemberId::<()>(3): [123]
168    /// # }, |mut stream| async move {
169    /// # let mut results = Vec::new();
170    /// # for w in 0..4 {
171    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
172    /// # }
173    /// # results.sort();
174    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
175    /// # }));
176    /// ```
177    pub fn broadcast_bincode<L2: 'a>(
178        self,
179        other: &Cluster<'a, L2>,
180        nondet_membership: NonDet,
181    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
182    where
183        T: Clone + Serialize + DeserializeOwned,
184    {
185        let ids = track_membership(self.location.source_cluster_members(other));
186        let join_tick = self.location.tick();
187        let current_members = ids.snapshot(&join_tick, nondet_membership);
188
189        self.batch(&join_tick, nondet_membership)
190            .repeat_with_keys(current_members)
191            .all_ticks()
192            .demux_bincode(other)
193    }
194
195    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
196    /// serialization. The external process can receive these elements by establishing a TCP
197    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
198    ///
199    /// # Example
200    /// ```rust
201    /// # use hydro_lang::prelude::*;
202    /// # use futures::StreamExt;
203    /// # tokio_test::block_on(async move {
204    /// let flow = FlowBuilder::new();
205    /// let process = flow.process::<()>();
206    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
207    /// let external = flow.external::<()>();
208    /// let external_handle = numbers.send_bincode_external(&external);
209    ///
210    /// let mut deployment = hydro_deploy::Deployment::new();
211    /// let nodes = flow
212    ///     .with_process(&process, deployment.Localhost())
213    ///     .with_external(&external, deployment.Localhost())
214    ///     .deploy(&mut deployment);
215    ///
216    /// deployment.deploy().await.unwrap();
217    /// // establish the TCP connection
218    /// let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
219    /// deployment.start().await.unwrap();
220    ///
221    /// for w in 1..=3 {
222    ///     assert_eq!(external_recv_stream.next().await, Some(w));
223    /// }
224    /// # });
225    /// ```
226    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
227    where
228        T: Serialize + DeserializeOwned,
229    {
230        let serialize_pipeline = Some(serialize_bincode::<T>(false));
231
232        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
233
234        let external_key = flow_state_borrow.next_external_out;
235        flow_state_borrow.next_external_out += 1;
236
237        flow_state_borrow.push_root(HydroRoot::SendExternal {
238            to_external_id: other.id,
239            to_key: external_key,
240            to_many: false,
241            serialize_fn: serialize_pipeline.map(|e| e.into()),
242            instantiate_fn: DebugInstantiate::Building,
243            input: Box::new(self.ir_node.into_inner()),
244            op_metadata: HydroIrOpMetadata::new(),
245        });
246
247        ExternalBincodeStream {
248            process_id: other.id,
249            port_id: external_key,
250            _phantom: PhantomData,
251        }
252    }
253}
254
255impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
256    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
257{
258    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
259    /// using [`bincode`] to serialize/deserialize messages.
260    ///
261    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
262    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
263    /// this API allows precise targeting of specific cluster members rather than broadcasting to
264    /// all members.
265    ///
266    /// # Example
267    /// ```rust
268    /// # use hydro_lang::prelude::*;
269    /// # use futures::StreamExt;
270    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
271    /// let p1 = flow.process::<()>();
272    /// let workers: Cluster<()> = flow.cluster::<()>();
273    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
274    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
275    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
276    ///     .demux_bincode(&workers);
277    /// # on_worker.send_bincode(&p2).entries()
278    /// // if there are 4 members in the cluster, each receives one element
279    /// // - MemberId::<()>(0): [0]
280    /// // - MemberId::<()>(1): [1]
281    /// // - MemberId::<()>(2): [2]
282    /// // - MemberId::<()>(3): [3]
283    /// # }, |mut stream| async move {
284    /// # let mut results = Vec::new();
285    /// # for w in 0..4 {
286    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
287    /// # }
288    /// # results.sort();
289    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
290    /// # }));
291    /// ```
292    pub fn demux_bincode(
293        self,
294        other: &Cluster<'a, L2>,
295    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
296    where
297        T: Serialize + DeserializeOwned,
298    {
299        self.into_keyed().demux_bincode(other)
300    }
301}
302
303impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
304    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
305    /// [`bincode`] to serialize/deserialize messages.
306    ///
307    /// This provides load balancing by evenly distributing work across cluster members. The
308    /// distribution is deterministic based on element order - the first element goes to member 0,
309    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
310    ///
311    /// # Non-Determinism
312    /// The set of cluster members may asynchronously change over time. Each element is distributed
313    /// based on the current cluster membership _at that point in time_. Depending on when cluster
314    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
315    /// membership is stable, the order of members in the round-robin pattern may change across runs.
316    ///
317    /// # Ordering Requirements
318    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
319    /// order of messages and retries affects the round-robin pattern.
320    ///
321    /// # Example
322    /// ```rust
323    /// # use hydro_lang::prelude::*;
324    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
325    /// # use futures::StreamExt;
326    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
327    /// let p1 = flow.process::<()>();
328    /// let workers: Cluster<()> = flow.cluster::<()>();
329    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
330    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
331    /// on_worker.send_bincode(&p2)
332    /// # .first().values() // we use first to assert that each member gets one element
333    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
334    /// // - MemberId::<()>(?): [1]
335    /// // - MemberId::<()>(?): [2]
336    /// // - MemberId::<()>(?): [3]
337    /// // - MemberId::<()>(?): [4]
338    /// # }, |mut stream| async move {
339    /// # let mut results = Vec::new();
340    /// # for w in 0..4 {
341    /// #     results.push(stream.next().await.unwrap());
342    /// # }
343    /// # results.sort();
344    /// # assert_eq!(results, vec![1, 2, 3, 4]);
345    /// # }));
346    /// ```
347    pub fn round_robin_bincode<L2: 'a>(
348        self,
349        other: &Cluster<'a, L2>,
350        nondet_membership: NonDet,
351    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
352    where
353        T: Serialize + DeserializeOwned,
354    {
355        let ids = track_membership(self.location.source_cluster_members(other));
356        let join_tick = self.location.tick();
357        let current_members = ids
358            .snapshot(&join_tick, nondet_membership)
359            .keys()
360            .assume_ordering(nondet_membership)
361            .collect_vec();
362
363        self.enumerate()
364            .batch(&join_tick, nondet_membership)
365            .cross_singleton(current_members)
366            .map(q!(|(data, members)| (
367                members[data.0 % members.len()],
368                data.1
369            )))
370            .all_ticks()
371            .demux_bincode(other)
372    }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
376    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
377    /// using [`bincode`] to serialize/deserialize messages.
378    ///
379    /// Each cluster member sends its local stream elements, and they are collected at the destination
380    /// as a [`KeyedStream`] where keys identify the source cluster member.
381    ///
382    /// # Example
383    /// ```rust
384    /// # use hydro_lang::prelude::*;
385    /// # use futures::StreamExt;
386    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
387    /// let workers: Cluster<()> = flow.cluster::<()>();
388    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
389    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
390    /// # all_received.entries()
391    /// # }, |mut stream| async move {
392    /// // if there are 4 members in the cluster, we should receive 4 elements
393    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
394    /// # let mut results = Vec::new();
395    /// # for w in 0..4 {
396    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
397    /// # }
398    /// # results.sort();
399    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
400    /// # }));
401    /// ```
402    ///
403    /// If you don't need to know the source for each element, you can use `.values()`
404    /// to get just the data:
405    /// ```rust
406    /// # use hydro_lang::prelude::*;
407    /// # use hydro_lang::live_collections::stream::NoOrder;
408    /// # use futures::StreamExt;
409    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
410    /// # let workers: Cluster<()> = flow.cluster::<()>();
411    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
412    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
413    /// # values
414    /// # }, |mut stream| async move {
415    /// # let mut results = Vec::new();
416    /// # for w in 0..4 {
417    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
418    /// # }
419    /// # results.sort();
420    /// // if there are 4 members in the cluster, we should receive 4 elements
421    /// // 1, 1, 1, 1
422    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
423    /// # }));
424    /// ```
425    pub fn send_bincode<L2>(
426        self,
427        other: &Process<'a, L2>,
428    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
429    where
430        T: Serialize + DeserializeOwned,
431    {
432        let serialize_pipeline = Some(serialize_bincode::<T>(false));
433
434        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
435
436        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
437            other.clone(),
438            HydroNode::Network {
439                serialize_fn: serialize_pipeline.map(|e| e.into()),
440                instantiate_fn: DebugInstantiate::Building,
441                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
442                input: Box::new(self.ir_node.into_inner()),
443                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
444            },
445        );
446
447        raw_stream.into_keyed()
448    }
449
450    /// Broadcasts elements of this stream at each source member to all members of a destination
451    /// cluster, using [`bincode`] to serialize/deserialize messages.
452    ///
453    /// Each source member sends each of its stream elements to **every** member of the cluster
454    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
455    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
456    /// **only data elements** and sends each element to all cluster members.
457    ///
458    /// # Non-Determinism
459    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
460    /// to the current cluster members known _at that point in time_ at the source member. Depending
461    /// on when each source member is notified of membership changes, it will broadcast each element
462    /// to different members.
463    ///
464    /// # Example
465    /// ```rust
466    /// # use hydro_lang::prelude::*;
467    /// # use hydro_lang::location::MemberId;
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
470    /// # type Source = ();
471    /// # type Destination = ();
472    /// let source: Cluster<Source> = flow.cluster::<Source>();
473    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
474    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
475    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
476    /// # on_destination.entries().send_bincode(&p2).entries()
477    /// // if there are 4 members in the desination, each receives one element from each source member
478    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
479    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
480    /// // - ...
481    /// # }, |mut stream| async move {
482    /// # let mut results = Vec::new();
483    /// # for w in 0..16 {
484    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
485    /// # }
486    /// # results.sort();
487    /// # assert_eq!(results, vec![
488    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
489    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
490    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
491    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
492    /// # ]);
493    /// # }));
494    /// ```
495    pub fn broadcast_bincode<L2: 'a>(
496        self,
497        other: &Cluster<'a, L2>,
498        nondet_membership: NonDet,
499    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
500    where
501        T: Clone + Serialize + DeserializeOwned,
502    {
503        let ids = track_membership(self.location.source_cluster_members(other));
504        let join_tick = self.location.tick();
505        let current_members = ids.snapshot(&join_tick, nondet_membership);
506
507        self.batch(&join_tick, nondet_membership)
508            .repeat_with_keys(current_members)
509            .all_ticks()
510            .demux_bincode(other)
511    }
512}
513
514impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
515    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
516{
517    /// Sends elements of this stream at each source member to specific members of a destination
518    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
519    ///
520    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
521    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
522    /// this API allows precise targeting of specific cluster members rather than broadcasting to
523    /// all members.
524    ///
525    /// Each cluster member sends its local stream elements, and they are collected at each
526    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
527    ///
528    /// # Example
529    /// ```rust
530    /// # use hydro_lang::prelude::*;
531    /// # use futures::StreamExt;
532    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
533    /// # type Source = ();
534    /// # type Destination = ();
535    /// let source: Cluster<Source> = flow.cluster::<Source>();
536    /// let to_send: Stream<_, Cluster<_>, _> = source
537    ///     .source_iter(q!(vec![0, 1, 2, 3]))
538    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
539    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
540    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
541    /// # all_received.entries().send_bincode(&p2).entries()
542    /// # }, |mut stream| async move {
543    /// // if there are 4 members in the destination cluster, each receives one message from each source member
544    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
545    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
546    /// // - ...
547    /// # let mut results = Vec::new();
548    /// # for w in 0..16 {
549    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
550    /// # }
551    /// # results.sort();
552    /// # assert_eq!(results, vec![
553    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
554    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
555    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
556    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
557    /// # ]);
558    /// # }));
559    /// ```
560    pub fn demux_bincode(
561        self,
562        other: &Cluster<'a, L2>,
563    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
564    where
565        T: Serialize + DeserializeOwned,
566    {
567        self.into_keyed().demux_bincode(other)
568    }
569}