hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307}
308
309#[cfg(feature = "build")]
310pub enum BuildersOrCallback<'a, L, N>
311where
312    L: FnMut(&mut HydroRoot, &mut usize),
313    N: FnMut(&mut HydroNode, &mut usize),
314{
315    Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
316    Callback(L, N),
317}
318
319/// An root in a Hydro graph, which is an pipeline that doesn't emit
320/// any downstream values. Traversals over the dataflow graph and
321/// generating DFIR IR start from roots.
322#[derive(Debug, Hash)]
323pub enum HydroRoot {
324    ForEach {
325        f: DebugExpr,
326        input: Box<HydroNode>,
327        op_metadata: HydroIrOpMetadata,
328    },
329    SendExternal {
330        to_external_id: usize,
331        to_key: usize,
332        to_many: bool,
333        serialize_fn: Option<DebugExpr>,
334        instantiate_fn: DebugInstantiate,
335        input: Box<HydroNode>,
336        op_metadata: HydroIrOpMetadata,
337    },
338    DestSink {
339        sink: DebugExpr,
340        input: Box<HydroNode>,
341        op_metadata: HydroIrOpMetadata,
342    },
343    CycleSink {
344        ident: syn::Ident,
345        input: Box<HydroNode>,
346        out_location: LocationId,
347        op_metadata: HydroIrOpMetadata,
348    },
349}
350
351impl HydroRoot {
352    #[cfg(feature = "build")]
353    pub fn compile_network<'a, D>(
354        &mut self,
355        compile_env: &D::CompileEnv,
356        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
357        seen_tees: &mut SeenTees,
358        processes: &HashMap<usize, D::Process>,
359        clusters: &HashMap<usize, D::Cluster>,
360        externals: &HashMap<usize, D::External>,
361    ) where
362        D: Deploy<'a>,
363    {
364        self.transform_bottom_up(
365            &mut |l| {
366                if let HydroRoot::SendExternal {
367                    input,
368                    to_external_id,
369                    to_key,
370                    to_many,
371                    instantiate_fn,
372                    ..
373                } = l
374                {
375                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
376                        DebugInstantiate::Building => {
377                            let to_node = externals
378                                .get(to_external_id)
379                                .unwrap_or_else(|| {
380                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
381                                })
382                                .clone();
383
384                            match input.metadata().location_kind.root() {
385                                LocationId::Process(process_id) => {
386                                    if *to_many {
387                                        (
388                                            (
389                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
390                                                parse_quote!(DUMMY),
391                                            ),
392                                            Box::new(|| {}) as Box<dyn FnOnce()>,
393                                        )
394                                    } else {
395                                        let from_node = processes
396                                            .get(process_id)
397                                            .unwrap_or_else(|| {
398                                                panic!("A process used in the graph was not instantiated: {}", process_id)
399                                            })
400                                            .clone();
401
402                                        let sink_port = D::allocate_process_port(&from_node);
403                                        let source_port = D::allocate_external_port(&to_node);
404
405                                        to_node.register(*to_key, source_port.clone());
406
407                                        (
408                                            (
409                                                D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
410                                                parse_quote!(DUMMY),
411                                            ),
412                                            D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
413                                        )
414                                    }
415                                }
416                                LocationId::Cluster(_) => todo!(),
417                                _ => panic!()
418                            }
419                        },
420
421                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
422                    };
423
424                    *instantiate_fn = DebugInstantiateFinalized {
425                        sink: sink_expr,
426                        source: source_expr,
427                        connect_fn: Some(connect_fn),
428                    }
429                    .into();
430                }
431            },
432            &mut |n| {
433                if let HydroNode::Network {
434                    input,
435                    instantiate_fn,
436                    metadata,
437                    ..
438                } = n
439                {
440                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
441                        DebugInstantiate::Building => instantiate_network::<D>(
442                            input.metadata().location_kind.root(),
443                            metadata.location_kind.root(),
444                            processes,
445                            clusters,
446                            compile_env,
447                        ),
448
449                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
450                    };
451
452                    *instantiate_fn = DebugInstantiateFinalized {
453                        sink: sink_expr,
454                        source: source_expr,
455                        connect_fn: Some(connect_fn),
456                    }
457                    .into();
458                } else if let HydroNode::ExternalInput {
459                    from_external_id,
460                    from_key,
461                    from_many,
462                    codec_type,
463                    port_hint,
464                    instantiate_fn,
465                    metadata,
466                    ..
467                } = n
468                {
469                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
470                        DebugInstantiate::Building => {
471                            let from_node = externals
472                                .get(from_external_id)
473                                .unwrap_or_else(|| {
474                                    panic!(
475                                        "A external used in the graph was not instantiated: {}",
476                                        from_external_id
477                                    )
478                                })
479                                .clone();
480
481                            match metadata.location_kind.root() {
482                                LocationId::Process(process_id) => {
483                                    let to_node = processes
484                                        .get(process_id)
485                                        .unwrap_or_else(|| {
486                                            panic!("A process used in the graph was not instantiated: {}", process_id)
487                                        })
488                                        .clone();
489
490                                    let sink_port = D::allocate_external_port(&from_node);
491                                    let source_port = D::allocate_process_port(&to_node);
492
493                                    from_node.register(*from_key, sink_port.clone());
494
495                                    (
496                                        (
497                                            parse_quote!(DUMMY),
498                                            if *from_many {
499                                                D::e2o_many_source(
500                                                    compile_env,
501                                                    extra_stmts.entry(*process_id).or_default(),
502                                                    &to_node, &source_port,
503                                                    codec_type.0.as_ref(),
504                                                    format!("{}_{}", *from_external_id, *from_key)
505                                                )
506                                            } else {
507                                                D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
508                                            },
509                                        ),
510                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
511                                    )
512                                }
513                                LocationId::Cluster(_) => todo!(),
514                                _ => panic!()
515                            }
516                        },
517
518                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
519                    };
520
521                    *instantiate_fn = DebugInstantiateFinalized {
522                        sink: sink_expr,
523                        source: source_expr,
524                        connect_fn: Some(connect_fn),
525                    }
526                    .into();
527                }
528            },
529            seen_tees,
530            false,
531        );
532    }
533
534    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
535        self.transform_bottom_up(
536            &mut |l| {
537                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
538                    match instantiate_fn {
539                        DebugInstantiate::Building => panic!("network not built"),
540
541                        DebugInstantiate::Finalized(finalized) => {
542                            (finalized.connect_fn.take().unwrap())();
543                        }
544                    }
545                }
546            },
547            &mut |n| {
548                if let HydroNode::Network { instantiate_fn, .. }
549                | HydroNode::ExternalInput { instantiate_fn, .. } = n
550                {
551                    match instantiate_fn {
552                        DebugInstantiate::Building => panic!("network not built"),
553
554                        DebugInstantiate::Finalized(finalized) => {
555                            (finalized.connect_fn.take().unwrap())();
556                        }
557                    }
558                }
559            },
560            seen_tees,
561            false,
562        );
563    }
564
565    pub fn transform_bottom_up(
566        &mut self,
567        transform_root: &mut impl FnMut(&mut HydroRoot),
568        transform_node: &mut impl FnMut(&mut HydroNode),
569        seen_tees: &mut SeenTees,
570        check_well_formed: bool,
571    ) {
572        self.transform_children(
573            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
574            seen_tees,
575        );
576
577        transform_root(self);
578    }
579
580    pub fn transform_children(
581        &mut self,
582        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
583        seen_tees: &mut SeenTees,
584    ) {
585        match self {
586            HydroRoot::ForEach { input, .. }
587            | HydroRoot::SendExternal { input, .. }
588            | HydroRoot::DestSink { input, .. }
589            | HydroRoot::CycleSink { input, .. } => {
590                transform(input, seen_tees);
591            }
592        }
593    }
594
595    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
596        match self {
597            HydroRoot::ForEach {
598                f,
599                input,
600                op_metadata,
601            } => HydroRoot::ForEach {
602                f: f.clone(),
603                input: Box::new(input.deep_clone(seen_tees)),
604                op_metadata: op_metadata.clone(),
605            },
606            HydroRoot::SendExternal {
607                to_external_id,
608                to_key,
609                to_many,
610                serialize_fn,
611                instantiate_fn,
612                input,
613                op_metadata,
614            } => HydroRoot::SendExternal {
615                to_external_id: *to_external_id,
616                to_key: *to_key,
617                to_many: *to_many,
618                serialize_fn: serialize_fn.clone(),
619                instantiate_fn: instantiate_fn.clone(),
620                input: Box::new(input.deep_clone(seen_tees)),
621                op_metadata: op_metadata.clone(),
622            },
623            HydroRoot::DestSink {
624                sink,
625                input,
626                op_metadata,
627            } => HydroRoot::DestSink {
628                sink: sink.clone(),
629                input: Box::new(input.deep_clone(seen_tees)),
630                op_metadata: op_metadata.clone(),
631            },
632            HydroRoot::CycleSink {
633                ident,
634                input,
635                out_location,
636                op_metadata,
637            } => HydroRoot::CycleSink {
638                ident: ident.clone(),
639                input: Box::new(input.deep_clone(seen_tees)),
640                out_location: out_location.clone(),
641                op_metadata: op_metadata.clone(),
642            },
643        }
644    }
645
646    #[cfg(feature = "build")]
647    pub fn emit(
648        &mut self,
649        graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
650        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
651        next_stmt_id: &mut usize,
652    ) {
653        self.emit_core(
654            &mut BuildersOrCallback::Builders::<
655                fn(&mut HydroRoot, &mut usize),
656                fn(&mut HydroNode, &mut usize),
657            >(graph_builders),
658            built_tees,
659            next_stmt_id,
660        );
661    }
662
663    #[cfg(feature = "build")]
664    pub fn emit_core(
665        &mut self,
666        builders_or_callback: &mut BuildersOrCallback<
667            impl FnMut(&mut HydroRoot, &mut usize),
668            impl FnMut(&mut HydroNode, &mut usize),
669        >,
670        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
671        next_stmt_id: &mut usize,
672    ) {
673        match self {
674            HydroRoot::ForEach { f, input, .. } => {
675                let (input_ident, input_location_id) =
676                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
677
678                match builders_or_callback {
679                    BuildersOrCallback::Builders(graph_builders) => {
680                        graph_builders
681                            .entry(input_location_id)
682                            .or_default()
683                            .add_dfir(
684                                parse_quote! {
685                                    #input_ident -> for_each(#f);
686                                },
687                                None,
688                                Some(&next_stmt_id.to_string()),
689                            );
690                    }
691                    BuildersOrCallback::Callback(leaf_callback, _) => {
692                        leaf_callback(self, next_stmt_id);
693                    }
694                }
695
696                *next_stmt_id += 1;
697            }
698
699            HydroRoot::SendExternal {
700                serialize_fn,
701                instantiate_fn,
702                input,
703                ..
704            } => {
705                let (input_ident, input_location_id) =
706                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
707
708                match builders_or_callback {
709                    BuildersOrCallback::Builders(graph_builders) => {
710                        let (sink_expr, _) = match instantiate_fn {
711                            DebugInstantiate::Building => (
712                                syn::parse_quote!(DUMMY_SINK),
713                                syn::parse_quote!(DUMMY_SOURCE),
714                            ),
715
716                            DebugInstantiate::Finalized(finalized) => {
717                                (finalized.sink.clone(), finalized.source.clone())
718                            }
719                        };
720
721                        let sender_builder = graph_builders.entry(input_location_id).or_default();
722                        if let Some(serialize_fn) = serialize_fn {
723                            sender_builder.add_dfir(
724                                parse_quote! {
725                                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
726                                },
727                                None,
728                                // operator tag separates send and receive, which otherwise have the same next_stmt_id
729                                Some(&format!("send{}", next_stmt_id)),
730                            );
731                        } else {
732                            sender_builder.add_dfir(
733                                parse_quote! {
734                                    #input_ident -> dest_sink(#sink_expr);
735                                },
736                                None,
737                                Some(&format!("send{}", next_stmt_id)),
738                            );
739                        }
740                    }
741                    BuildersOrCallback::Callback(leaf_callback, _) => {
742                        leaf_callback(self, next_stmt_id);
743                    }
744                }
745
746                *next_stmt_id += 1;
747            }
748
749            HydroRoot::DestSink { sink, input, .. } => {
750                let (input_ident, input_location_id) =
751                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
752
753                match builders_or_callback {
754                    BuildersOrCallback::Builders(graph_builders) => {
755                        graph_builders
756                            .entry(input_location_id)
757                            .or_default()
758                            .add_dfir(
759                                parse_quote! {
760                                    #input_ident -> dest_sink(#sink);
761                                },
762                                None,
763                                Some(&next_stmt_id.to_string()),
764                            );
765                    }
766                    BuildersOrCallback::Callback(leaf_callback, _) => {
767                        leaf_callback(self, next_stmt_id);
768                    }
769                }
770
771                *next_stmt_id += 1;
772            }
773
774            HydroRoot::CycleSink {
775                ident,
776                input,
777                out_location,
778                ..
779            } => {
780                let (input_ident, input_location_id) =
781                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
782
783                let location_id = out_location.root().raw_id();
784
785                match builders_or_callback {
786                    BuildersOrCallback::Builders(graph_builders) => {
787                        assert_eq!(
788                            input_location_id, location_id,
789                            "cycle_sink location mismatch"
790                        );
791
792                        graph_builders.entry(location_id).or_default().add_dfir(
793                            parse_quote! {
794                                #ident = #input_ident;
795                            },
796                            None,
797                            None,
798                        );
799                    }
800                    // No ID, no callback
801                    BuildersOrCallback::Callback(_, _) => {}
802                }
803            }
804        }
805    }
806
807    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
808        match self {
809            HydroRoot::ForEach { op_metadata, .. }
810            | HydroRoot::SendExternal { op_metadata, .. }
811            | HydroRoot::DestSink { op_metadata, .. }
812            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
813        }
814    }
815
816    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
817        match self {
818            HydroRoot::ForEach { op_metadata, .. }
819            | HydroRoot::SendExternal { op_metadata, .. }
820            | HydroRoot::DestSink { op_metadata, .. }
821            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
822        }
823    }
824
825    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
826        match self {
827            HydroRoot::ForEach { input, .. }
828            | HydroRoot::SendExternal { input, .. }
829            | HydroRoot::DestSink { input, .. }
830            | HydroRoot::CycleSink { input, .. } => {
831                vec![input.metadata()]
832            }
833        }
834    }
835
836    pub fn print_root(&self) -> String {
837        match self {
838            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
839            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
840            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
841            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
842        }
843    }
844
845    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
846        match self {
847            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
848                transform(f);
849            }
850            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
851        }
852    }
853}
854
855#[cfg(feature = "build")]
856pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
857    let mut builders = BTreeMap::new();
858    let mut built_tees = HashMap::new();
859    let mut next_stmt_id = 0;
860    for leaf in ir {
861        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
862    }
863    builders
864}
865
866#[cfg(feature = "build")]
867pub fn traverse_dfir(
868    ir: &mut [HydroRoot],
869    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
870    transform_node: impl FnMut(&mut HydroNode, &mut usize),
871) {
872    let mut seen_tees = HashMap::new();
873    let mut next_stmt_id = 0;
874    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
875    ir.iter_mut().for_each(|leaf| {
876        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
877    });
878}
879
880pub fn transform_bottom_up(
881    ir: &mut [HydroRoot],
882    transform_root: &mut impl FnMut(&mut HydroRoot),
883    transform_node: &mut impl FnMut(&mut HydroNode),
884    check_well_formed: bool,
885) {
886    let mut seen_tees = HashMap::new();
887    ir.iter_mut().for_each(|leaf| {
888        leaf.transform_bottom_up(
889            transform_root,
890            transform_node,
891            &mut seen_tees,
892            check_well_formed,
893        );
894    });
895}
896
897pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
898    let mut seen_tees = HashMap::new();
899    ir.iter()
900        .map(|leaf| leaf.deep_clone(&mut seen_tees))
901        .collect()
902}
903
904type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
905thread_local! {
906    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
907}
908
909pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
910    PRINTED_TEES.with(|printed_tees| {
911        let mut printed_tees_mut = printed_tees.borrow_mut();
912        *printed_tees_mut = Some((0, HashMap::new()));
913        drop(printed_tees_mut);
914
915        let ret = f();
916
917        let mut printed_tees_mut = printed_tees.borrow_mut();
918        *printed_tees_mut = None;
919
920        ret
921    })
922}
923
924pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
925
926impl TeeNode {
927    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
928        Rc::as_ptr(&self.0)
929    }
930}
931
932impl Debug for TeeNode {
933    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934        PRINTED_TEES.with(|printed_tees| {
935            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
936            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
937
938            if let Some(printed_tees_mut) = printed_tees_mut {
939                if let Some(existing) = printed_tees_mut
940                    .1
941                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
942                {
943                    write!(f, "<tee {}>", existing)
944                } else {
945                    let next_id = printed_tees_mut.0;
946                    printed_tees_mut.0 += 1;
947                    printed_tees_mut
948                        .1
949                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
950                    drop(printed_tees_mut_borrow);
951                    write!(f, "<tee {}>: ", next_id)?;
952                    Debug::fmt(&self.0.borrow(), f)
953                }
954            } else {
955                drop(printed_tees_mut_borrow);
956                write!(f, "<tee>: ")?;
957                Debug::fmt(&self.0.borrow(), f)
958            }
959        })
960    }
961}
962
963impl Hash for TeeNode {
964    fn hash<H: Hasher>(&self, state: &mut H) {
965        self.0.borrow_mut().hash(state);
966    }
967}
968
969#[derive(Clone)]
970pub struct HydroIrMetadata {
971    pub location_kind: LocationId,
972    pub output_type: Option<DebugType>,
973    pub cardinality: Option<usize>,
974    pub tag: Option<String>,
975    pub op: HydroIrOpMetadata,
976}
977
978// HydroIrMetadata shouldn't be used to hash or compare
979impl Hash for HydroIrMetadata {
980    fn hash<H: Hasher>(&self, _: &mut H) {}
981}
982
983impl PartialEq for HydroIrMetadata {
984    fn eq(&self, _: &Self) -> bool {
985        true
986    }
987}
988
989impl Eq for HydroIrMetadata {}
990
991impl Debug for HydroIrMetadata {
992    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
993        f.debug_struct("HydroIrMetadata")
994            .field("location_kind", &self.location_kind)
995            .field("output_type", &self.output_type)
996            .finish()
997    }
998}
999
1000/// Metadata that is specific to the operator itself, rather than its outputs.
1001/// This is available on _both_ inner nodes and roots.
1002#[derive(Clone)]
1003pub struct HydroIrOpMetadata {
1004    pub backtrace: Backtrace,
1005    pub cpu_usage: Option<f64>,
1006    pub network_recv_cpu_usage: Option<f64>,
1007    pub id: Option<usize>,
1008}
1009
1010impl HydroIrOpMetadata {
1011    #[expect(
1012        clippy::new_without_default,
1013        reason = "explicit calls to new ensure correct backtrace bounds"
1014    )]
1015    pub fn new() -> HydroIrOpMetadata {
1016        Self::new_with_skip(1)
1017    }
1018
1019    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1020        HydroIrOpMetadata {
1021            backtrace: Backtrace::get_backtrace(2 + skip_count),
1022            cpu_usage: None,
1023            network_recv_cpu_usage: None,
1024            id: None,
1025        }
1026    }
1027}
1028
1029impl Debug for HydroIrOpMetadata {
1030    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1031        f.debug_struct("HydroIrOpMetadata").finish()
1032    }
1033}
1034
1035impl Hash for HydroIrOpMetadata {
1036    fn hash<H: Hasher>(&self, _: &mut H) {}
1037}
1038
1039/// An intermediate node in a Hydro graph, which consumes data
1040/// from upstream nodes and emits data to downstream nodes.
1041#[derive(Debug, Hash)]
1042pub enum HydroNode {
1043    Placeholder,
1044
1045    Source {
1046        source: HydroSource,
1047        metadata: HydroIrMetadata,
1048    },
1049
1050    CycleSource {
1051        ident: syn::Ident,
1052        metadata: HydroIrMetadata,
1053    },
1054
1055    Tee {
1056        inner: TeeNode,
1057        metadata: HydroIrMetadata,
1058    },
1059
1060    Persist {
1061        inner: Box<HydroNode>,
1062        metadata: HydroIrMetadata,
1063    },
1064
1065    BeginAtomic {
1066        inner: Box<HydroNode>,
1067        metadata: HydroIrMetadata,
1068    },
1069
1070    EndAtomic {
1071        inner: Box<HydroNode>,
1072        metadata: HydroIrMetadata,
1073    },
1074
1075    Batch {
1076        inner: Box<HydroNode>,
1077        metadata: HydroIrMetadata,
1078    },
1079
1080    YieldConcat {
1081        inner: Box<HydroNode>,
1082        metadata: HydroIrMetadata,
1083    },
1084
1085    Chain {
1086        first: Box<HydroNode>,
1087        second: Box<HydroNode>,
1088        metadata: HydroIrMetadata,
1089    },
1090
1091    ChainFirst {
1092        first: Box<HydroNode>,
1093        second: Box<HydroNode>,
1094        metadata: HydroIrMetadata,
1095    },
1096
1097    CrossProduct {
1098        left: Box<HydroNode>,
1099        right: Box<HydroNode>,
1100        metadata: HydroIrMetadata,
1101    },
1102
1103    CrossSingleton {
1104        left: Box<HydroNode>,
1105        right: Box<HydroNode>,
1106        metadata: HydroIrMetadata,
1107    },
1108
1109    Join {
1110        left: Box<HydroNode>,
1111        right: Box<HydroNode>,
1112        metadata: HydroIrMetadata,
1113    },
1114
1115    Difference {
1116        pos: Box<HydroNode>,
1117        neg: Box<HydroNode>,
1118        metadata: HydroIrMetadata,
1119    },
1120
1121    AntiJoin {
1122        pos: Box<HydroNode>,
1123        neg: Box<HydroNode>,
1124        metadata: HydroIrMetadata,
1125    },
1126
1127    ResolveFutures {
1128        input: Box<HydroNode>,
1129        metadata: HydroIrMetadata,
1130    },
1131    ResolveFuturesOrdered {
1132        input: Box<HydroNode>,
1133        metadata: HydroIrMetadata,
1134    },
1135
1136    Map {
1137        f: DebugExpr,
1138        input: Box<HydroNode>,
1139        metadata: HydroIrMetadata,
1140    },
1141    FlatMap {
1142        f: DebugExpr,
1143        input: Box<HydroNode>,
1144        metadata: HydroIrMetadata,
1145    },
1146    Filter {
1147        f: DebugExpr,
1148        input: Box<HydroNode>,
1149        metadata: HydroIrMetadata,
1150    },
1151    FilterMap {
1152        f: DebugExpr,
1153        input: Box<HydroNode>,
1154        metadata: HydroIrMetadata,
1155    },
1156
1157    DeferTick {
1158        input: Box<HydroNode>,
1159        metadata: HydroIrMetadata,
1160    },
1161    Enumerate {
1162        input: Box<HydroNode>,
1163        metadata: HydroIrMetadata,
1164    },
1165    Inspect {
1166        f: DebugExpr,
1167        input: Box<HydroNode>,
1168        metadata: HydroIrMetadata,
1169    },
1170
1171    Unique {
1172        input: Box<HydroNode>,
1173        metadata: HydroIrMetadata,
1174    },
1175
1176    Sort {
1177        input: Box<HydroNode>,
1178        metadata: HydroIrMetadata,
1179    },
1180    Fold {
1181        init: DebugExpr,
1182        acc: DebugExpr,
1183        input: Box<HydroNode>,
1184        metadata: HydroIrMetadata,
1185    },
1186
1187    Scan {
1188        init: DebugExpr,
1189        acc: DebugExpr,
1190        input: Box<HydroNode>,
1191        metadata: HydroIrMetadata,
1192    },
1193    FoldKeyed {
1194        init: DebugExpr,
1195        acc: DebugExpr,
1196        input: Box<HydroNode>,
1197        metadata: HydroIrMetadata,
1198    },
1199
1200    Reduce {
1201        f: DebugExpr,
1202        input: Box<HydroNode>,
1203        metadata: HydroIrMetadata,
1204    },
1205    ReduceKeyed {
1206        f: DebugExpr,
1207        input: Box<HydroNode>,
1208        metadata: HydroIrMetadata,
1209    },
1210    ReduceKeyedWatermark {
1211        f: DebugExpr,
1212        input: Box<HydroNode>,
1213        watermark: Box<HydroNode>,
1214        metadata: HydroIrMetadata,
1215    },
1216
1217    Network {
1218        serialize_fn: Option<DebugExpr>,
1219        instantiate_fn: DebugInstantiate,
1220        deserialize_fn: Option<DebugExpr>,
1221        input: Box<HydroNode>,
1222        metadata: HydroIrMetadata,
1223    },
1224
1225    ExternalInput {
1226        from_external_id: usize,
1227        from_key: usize,
1228        from_many: bool,
1229        codec_type: DebugType,
1230        port_hint: NetworkHint,
1231        instantiate_fn: DebugInstantiate,
1232        deserialize_fn: Option<DebugExpr>,
1233        metadata: HydroIrMetadata,
1234    },
1235
1236    Counter {
1237        tag: String,
1238        duration: DebugExpr,
1239        prefix: String,
1240        input: Box<HydroNode>,
1241        metadata: HydroIrMetadata,
1242    },
1243}
1244
1245pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1246pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1247
1248impl HydroNode {
1249    pub fn transform_bottom_up(
1250        &mut self,
1251        transform: &mut impl FnMut(&mut HydroNode),
1252        seen_tees: &mut SeenTees,
1253        check_well_formed: bool,
1254    ) {
1255        self.transform_children(
1256            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1257            seen_tees,
1258        );
1259
1260        transform(self);
1261
1262        let self_location = self.metadata().location_kind.root();
1263
1264        if check_well_formed {
1265            match &*self {
1266                HydroNode::Network { .. } => {}
1267                _ => {
1268                    self.input_metadata().iter().for_each(|i| {
1269                        if i.location_kind.root() != self_location {
1270                            panic!(
1271                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1272                                i,
1273                                i.location_kind.root(),
1274                                self,
1275                                self_location
1276                            )
1277                        }
1278                    });
1279                }
1280            }
1281        }
1282    }
1283
1284    #[inline(always)]
1285    pub fn transform_children(
1286        &mut self,
1287        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1288        seen_tees: &mut SeenTees,
1289    ) {
1290        match self {
1291            HydroNode::Placeholder => {
1292                panic!();
1293            }
1294
1295            HydroNode::Source { .. }
1296            | HydroNode::CycleSource { .. }
1297            | HydroNode::ExternalInput { .. } => {}
1298
1299            HydroNode::Tee { inner, .. } => {
1300                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1301                    *inner = TeeNode(transformed.clone());
1302                } else {
1303                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1304                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1305                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1306                    transform(&mut orig, seen_tees);
1307                    *transformed_cell.borrow_mut() = orig;
1308                    *inner = TeeNode(transformed_cell);
1309                }
1310            }
1311
1312            HydroNode::Persist { inner, .. }
1313            | HydroNode::BeginAtomic { inner, .. }
1314            | HydroNode::EndAtomic { inner, .. }
1315            | HydroNode::Batch { inner, .. }
1316            | HydroNode::YieldConcat { inner, .. } => {
1317                transform(inner.as_mut(), seen_tees);
1318            }
1319
1320            HydroNode::Chain { first, second, .. } => {
1321                transform(first.as_mut(), seen_tees);
1322                transform(second.as_mut(), seen_tees);
1323            }
1324
1325            HydroNode::ChainFirst { first, second, .. } => {
1326                transform(first.as_mut(), seen_tees);
1327                transform(second.as_mut(), seen_tees);
1328            }
1329
1330            HydroNode::CrossSingleton { left, right, .. }
1331            | HydroNode::CrossProduct { left, right, .. }
1332            | HydroNode::Join { left, right, .. } => {
1333                transform(left.as_mut(), seen_tees);
1334                transform(right.as_mut(), seen_tees);
1335            }
1336
1337            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1338                transform(pos.as_mut(), seen_tees);
1339                transform(neg.as_mut(), seen_tees);
1340            }
1341
1342            HydroNode::ReduceKeyedWatermark {
1343                input, watermark, ..
1344            } => {
1345                transform(input.as_mut(), seen_tees);
1346                transform(watermark.as_mut(), seen_tees);
1347            }
1348
1349            HydroNode::Map { input, .. }
1350            | HydroNode::ResolveFutures { input, .. }
1351            | HydroNode::ResolveFuturesOrdered { input, .. }
1352            | HydroNode::FlatMap { input, .. }
1353            | HydroNode::Filter { input, .. }
1354            | HydroNode::FilterMap { input, .. }
1355            | HydroNode::Sort { input, .. }
1356            | HydroNode::DeferTick { input, .. }
1357            | HydroNode::Enumerate { input, .. }
1358            | HydroNode::Inspect { input, .. }
1359            | HydroNode::Unique { input, .. }
1360            | HydroNode::Network { input, .. }
1361            | HydroNode::Fold { input, .. }
1362            | HydroNode::Scan { input, .. }
1363            | HydroNode::FoldKeyed { input, .. }
1364            | HydroNode::Reduce { input, .. }
1365            | HydroNode::ReduceKeyed { input, .. }
1366            | HydroNode::Counter { input, .. } => {
1367                transform(input.as_mut(), seen_tees);
1368            }
1369        }
1370    }
1371
1372    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1373        match self {
1374            HydroNode::Placeholder => HydroNode::Placeholder,
1375            HydroNode::Source { source, metadata } => HydroNode::Source {
1376                source: source.clone(),
1377                metadata: metadata.clone(),
1378            },
1379            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1380                ident: ident.clone(),
1381                metadata: metadata.clone(),
1382            },
1383            HydroNode::Tee { inner, metadata } => {
1384                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1385                    HydroNode::Tee {
1386                        inner: TeeNode(transformed.clone()),
1387                        metadata: metadata.clone(),
1388                    }
1389                } else {
1390                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1391                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1392                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1393                    *new_rc.borrow_mut() = cloned;
1394                    HydroNode::Tee {
1395                        inner: TeeNode(new_rc),
1396                        metadata: metadata.clone(),
1397                    }
1398                }
1399            }
1400            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1401                inner: Box::new(inner.deep_clone(seen_tees)),
1402                metadata: metadata.clone(),
1403            },
1404            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1405                inner: Box::new(inner.deep_clone(seen_tees)),
1406                metadata: metadata.clone(),
1407            },
1408            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1409                inner: Box::new(inner.deep_clone(seen_tees)),
1410                metadata: metadata.clone(),
1411            },
1412            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1413                inner: Box::new(inner.deep_clone(seen_tees)),
1414                metadata: metadata.clone(),
1415            },
1416            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1417                inner: Box::new(inner.deep_clone(seen_tees)),
1418                metadata: metadata.clone(),
1419            },
1420            HydroNode::Chain {
1421                first,
1422                second,
1423                metadata,
1424            } => HydroNode::Chain {
1425                first: Box::new(first.deep_clone(seen_tees)),
1426                second: Box::new(second.deep_clone(seen_tees)),
1427                metadata: metadata.clone(),
1428            },
1429            HydroNode::ChainFirst {
1430                first,
1431                second,
1432                metadata,
1433            } => HydroNode::ChainFirst {
1434                first: Box::new(first.deep_clone(seen_tees)),
1435                second: Box::new(second.deep_clone(seen_tees)),
1436                metadata: metadata.clone(),
1437            },
1438            HydroNode::CrossProduct {
1439                left,
1440                right,
1441                metadata,
1442            } => HydroNode::CrossProduct {
1443                left: Box::new(left.deep_clone(seen_tees)),
1444                right: Box::new(right.deep_clone(seen_tees)),
1445                metadata: metadata.clone(),
1446            },
1447            HydroNode::CrossSingleton {
1448                left,
1449                right,
1450                metadata,
1451            } => HydroNode::CrossSingleton {
1452                left: Box::new(left.deep_clone(seen_tees)),
1453                right: Box::new(right.deep_clone(seen_tees)),
1454                metadata: metadata.clone(),
1455            },
1456            HydroNode::Join {
1457                left,
1458                right,
1459                metadata,
1460            } => HydroNode::Join {
1461                left: Box::new(left.deep_clone(seen_tees)),
1462                right: Box::new(right.deep_clone(seen_tees)),
1463                metadata: metadata.clone(),
1464            },
1465            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1466                pos: Box::new(pos.deep_clone(seen_tees)),
1467                neg: Box::new(neg.deep_clone(seen_tees)),
1468                metadata: metadata.clone(),
1469            },
1470            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1471                pos: Box::new(pos.deep_clone(seen_tees)),
1472                neg: Box::new(neg.deep_clone(seen_tees)),
1473                metadata: metadata.clone(),
1474            },
1475            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1476                input: Box::new(input.deep_clone(seen_tees)),
1477                metadata: metadata.clone(),
1478            },
1479            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1480                HydroNode::ResolveFuturesOrdered {
1481                    input: Box::new(input.deep_clone(seen_tees)),
1482                    metadata: metadata.clone(),
1483                }
1484            }
1485            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1486                f: f.clone(),
1487                input: Box::new(input.deep_clone(seen_tees)),
1488                metadata: metadata.clone(),
1489            },
1490            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1491                f: f.clone(),
1492                input: Box::new(input.deep_clone(seen_tees)),
1493                metadata: metadata.clone(),
1494            },
1495            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1496                f: f.clone(),
1497                input: Box::new(input.deep_clone(seen_tees)),
1498                metadata: metadata.clone(),
1499            },
1500            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1501                f: f.clone(),
1502                input: Box::new(input.deep_clone(seen_tees)),
1503                metadata: metadata.clone(),
1504            },
1505            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1506                input: Box::new(input.deep_clone(seen_tees)),
1507                metadata: metadata.clone(),
1508            },
1509            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1510                input: Box::new(input.deep_clone(seen_tees)),
1511                metadata: metadata.clone(),
1512            },
1513            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1514                f: f.clone(),
1515                input: Box::new(input.deep_clone(seen_tees)),
1516                metadata: metadata.clone(),
1517            },
1518            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1519                input: Box::new(input.deep_clone(seen_tees)),
1520                metadata: metadata.clone(),
1521            },
1522            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1523                input: Box::new(input.deep_clone(seen_tees)),
1524                metadata: metadata.clone(),
1525            },
1526            HydroNode::Fold {
1527                init,
1528                acc,
1529                input,
1530                metadata,
1531            } => HydroNode::Fold {
1532                init: init.clone(),
1533                acc: acc.clone(),
1534                input: Box::new(input.deep_clone(seen_tees)),
1535                metadata: metadata.clone(),
1536            },
1537            HydroNode::Scan {
1538                init,
1539                acc,
1540                input,
1541                metadata,
1542            } => HydroNode::Scan {
1543                init: init.clone(),
1544                acc: acc.clone(),
1545                input: Box::new(input.deep_clone(seen_tees)),
1546                metadata: metadata.clone(),
1547            },
1548            HydroNode::FoldKeyed {
1549                init,
1550                acc,
1551                input,
1552                metadata,
1553            } => HydroNode::FoldKeyed {
1554                init: init.clone(),
1555                acc: acc.clone(),
1556                input: Box::new(input.deep_clone(seen_tees)),
1557                metadata: metadata.clone(),
1558            },
1559            HydroNode::ReduceKeyedWatermark {
1560                f,
1561                input,
1562                watermark,
1563                metadata,
1564            } => HydroNode::ReduceKeyedWatermark {
1565                f: f.clone(),
1566                input: Box::new(input.deep_clone(seen_tees)),
1567                watermark: Box::new(watermark.deep_clone(seen_tees)),
1568                metadata: metadata.clone(),
1569            },
1570            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1571                f: f.clone(),
1572                input: Box::new(input.deep_clone(seen_tees)),
1573                metadata: metadata.clone(),
1574            },
1575            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1576                f: f.clone(),
1577                input: Box::new(input.deep_clone(seen_tees)),
1578                metadata: metadata.clone(),
1579            },
1580            HydroNode::Network {
1581                serialize_fn,
1582                instantiate_fn,
1583                deserialize_fn,
1584                input,
1585                metadata,
1586            } => HydroNode::Network {
1587                serialize_fn: serialize_fn.clone(),
1588                instantiate_fn: instantiate_fn.clone(),
1589                deserialize_fn: deserialize_fn.clone(),
1590                input: Box::new(input.deep_clone(seen_tees)),
1591                metadata: metadata.clone(),
1592            },
1593            HydroNode::ExternalInput {
1594                from_external_id,
1595                from_key,
1596                from_many,
1597                codec_type,
1598                port_hint,
1599                instantiate_fn,
1600                deserialize_fn,
1601                metadata,
1602            } => HydroNode::ExternalInput {
1603                from_external_id: *from_external_id,
1604                from_key: *from_key,
1605                from_many: *from_many,
1606                codec_type: codec_type.clone(),
1607                port_hint: *port_hint,
1608                instantiate_fn: instantiate_fn.clone(),
1609                deserialize_fn: deserialize_fn.clone(),
1610                metadata: metadata.clone(),
1611            },
1612            HydroNode::Counter {
1613                tag,
1614                duration,
1615                prefix,
1616                input,
1617                metadata,
1618            } => HydroNode::Counter {
1619                tag: tag.clone(),
1620                duration: duration.clone(),
1621                prefix: prefix.clone(),
1622                input: Box::new(input.deep_clone(seen_tees)),
1623                metadata: metadata.clone(),
1624            },
1625        }
1626    }
1627
1628    #[cfg(feature = "build")]
1629    pub fn emit_core(
1630        &mut self,
1631        builders_or_callback: &mut BuildersOrCallback<
1632            impl FnMut(&mut HydroRoot, &mut usize),
1633            impl FnMut(&mut HydroNode, &mut usize),
1634        >,
1635        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1636        next_stmt_id: &mut usize,
1637    ) -> (syn::Ident, usize) {
1638        match self {
1639            HydroNode::Placeholder => {
1640                panic!()
1641            }
1642
1643            HydroNode::Persist { inner, .. } => {
1644                let (inner_ident, location) =
1645                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1646
1647                let persist_ident =
1648                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1649
1650                match builders_or_callback {
1651                    BuildersOrCallback::Builders(graph_builders) => {
1652                        let builder = graph_builders.entry(location).or_default();
1653                        builder.add_dfir(
1654                            parse_quote! {
1655                                #persist_ident = #inner_ident -> persist::<'static>();
1656                            },
1657                            None,
1658                            Some(&next_stmt_id.to_string()),
1659                        );
1660                    }
1661                    BuildersOrCallback::Callback(_, node_callback) => {
1662                        node_callback(self, next_stmt_id);
1663                    }
1664                }
1665
1666                *next_stmt_id += 1;
1667
1668                (persist_ident, location)
1669            }
1670
1671            HydroNode::Batch { inner, .. } => {
1672                let (inner_ident, location) =
1673                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1674
1675                let batch_ident =
1676                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1677
1678                match builders_or_callback {
1679                    BuildersOrCallback::Builders(graph_builders) => {
1680                        let builder = graph_builders.entry(location).or_default();
1681                        builder.add_dfir(
1682                            parse_quote! {
1683                                #batch_ident = #inner_ident;
1684                            },
1685                            None,
1686                            None,
1687                        );
1688                    }
1689                    BuildersOrCallback::Callback(_, node_callback) => {
1690                        node_callback(self, next_stmt_id);
1691                    }
1692                }
1693
1694                *next_stmt_id += 1;
1695
1696                (batch_ident, location)
1697            }
1698
1699            HydroNode::YieldConcat { inner, .. } => {
1700                let (inner_ident, location) =
1701                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1702
1703                let yield_ident =
1704                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1705
1706                match builders_or_callback {
1707                    BuildersOrCallback::Builders(graph_builders) => {
1708                        let builder = graph_builders.entry(location).or_default();
1709                        builder.add_dfir(
1710                            parse_quote! {
1711                                #yield_ident = #inner_ident;
1712                            },
1713                            None,
1714                            None,
1715                        );
1716                    }
1717                    BuildersOrCallback::Callback(_, node_callback) => {
1718                        node_callback(self, next_stmt_id);
1719                    }
1720                }
1721
1722                *next_stmt_id += 1;
1723
1724                (yield_ident, location)
1725            }
1726
1727            HydroNode::BeginAtomic { inner, .. } => {
1728                inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
1729            }
1730
1731            HydroNode::EndAtomic { inner, .. } => {
1732                inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
1733            }
1734
1735            HydroNode::Source {
1736                source, metadata, ..
1737            } => {
1738                let location_id = metadata.location_kind.root().raw_id();
1739
1740                if let HydroSource::ExternalNetwork() = source {
1741                    (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1742                } else {
1743                    let source_ident =
1744                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1745
1746                    let source_stmt = match source {
1747                        HydroSource::Stream(expr) => {
1748                            debug_assert!(metadata.location_kind.is_top_level());
1749                            parse_quote! {
1750                                #source_ident = source_stream(#expr);
1751                            }
1752                        }
1753
1754                        HydroSource::ExternalNetwork() => {
1755                            unreachable!()
1756                        }
1757
1758                        HydroSource::Iter(expr) => {
1759                            if metadata.location_kind.is_top_level() {
1760                                parse_quote! {
1761                                    #source_ident = source_iter(#expr);
1762                                }
1763                            } else {
1764                                // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
1765                                parse_quote! {
1766                                    #source_ident = source_iter(#expr) -> persist::<'static>();
1767                                }
1768                            }
1769                        }
1770
1771                        HydroSource::Spin() => {
1772                            debug_assert!(metadata.location_kind.is_top_level());
1773                            parse_quote! {
1774                                #source_ident = spin();
1775                            }
1776                        }
1777                    };
1778
1779                    match builders_or_callback {
1780                        BuildersOrCallback::Builders(graph_builders) => {
1781                            let builder = graph_builders.entry(location_id).or_default();
1782                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1783                        }
1784                        BuildersOrCallback::Callback(_, node_callback) => {
1785                            node_callback(self, next_stmt_id);
1786                        }
1787                    }
1788
1789                    *next_stmt_id += 1;
1790
1791                    (source_ident, location_id)
1792                }
1793            }
1794
1795            HydroNode::CycleSource {
1796                ident, metadata, ..
1797            } => {
1798                let location_id = metadata.location_kind.root().raw_id();
1799
1800                let ident = ident.clone();
1801
1802                match builders_or_callback {
1803                    BuildersOrCallback::Builders(_) => {}
1804                    BuildersOrCallback::Callback(_, node_callback) => {
1805                        node_callback(self, next_stmt_id);
1806                    }
1807                }
1808
1809                // consume a stmt id even though we did not emit anything so that we can instrument this
1810                *next_stmt_id += 1;
1811
1812                (ident, location_id)
1813            }
1814
1815            HydroNode::Tee { inner, .. } => {
1816                let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1817                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1818                {
1819                    match builders_or_callback {
1820                        BuildersOrCallback::Builders(_) => {}
1821                        BuildersOrCallback::Callback(_, node_callback) => {
1822                            node_callback(self, next_stmt_id);
1823                        }
1824                    }
1825
1826                    (teed_from.clone(), *inner_location_id)
1827                } else {
1828                    let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1829                        builders_or_callback,
1830                        built_tees,
1831                        next_stmt_id,
1832                    );
1833
1834                    let tee_ident =
1835                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1836
1837                    built_tees.insert(
1838                        inner.0.as_ref() as *const RefCell<HydroNode>,
1839                        (tee_ident.clone(), inner_location_id),
1840                    );
1841
1842                    match builders_or_callback {
1843                        BuildersOrCallback::Builders(graph_builders) => {
1844                            let builder = graph_builders.entry(inner_location_id).or_default();
1845                            builder.add_dfir(
1846                                parse_quote! {
1847                                    #tee_ident = #inner_ident -> tee();
1848                                },
1849                                None,
1850                                Some(&next_stmt_id.to_string()),
1851                            );
1852                        }
1853                        BuildersOrCallback::Callback(_, node_callback) => {
1854                            node_callback(self, next_stmt_id);
1855                        }
1856                    }
1857
1858                    (tee_ident, inner_location_id)
1859                };
1860
1861                // we consume a stmt id regardless of if we emit the tee() operator,
1862                // so that during rewrites we touch all recipients of the tee()
1863
1864                *next_stmt_id += 1;
1865                (ret_ident, inner_location_id)
1866            }
1867
1868            HydroNode::Chain { first, second, .. } => {
1869                let (first_ident, first_location_id) =
1870                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1871                let (second_ident, second_location_id) =
1872                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1873
1874                let chain_ident =
1875                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1876
1877                match builders_or_callback {
1878                    BuildersOrCallback::Builders(graph_builders) => {
1879                        assert_eq!(
1880                            first_location_id, second_location_id,
1881                            "chain inputs must be in the same location"
1882                        );
1883                        let builder = graph_builders.entry(first_location_id).or_default();
1884                        builder.add_dfir(
1885                            parse_quote! {
1886                                #chain_ident = chain();
1887                                #first_ident -> [0]#chain_ident;
1888                                #second_ident -> [1]#chain_ident;
1889                            },
1890                            None,
1891                            Some(&next_stmt_id.to_string()),
1892                        );
1893                    }
1894                    BuildersOrCallback::Callback(_, node_callback) => {
1895                        node_callback(self, next_stmt_id);
1896                    }
1897                }
1898
1899                *next_stmt_id += 1;
1900
1901                (chain_ident, first_location_id)
1902            }
1903
1904            HydroNode::ChainFirst { first, second, .. } => {
1905                let (first_ident, first_location_id) =
1906                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1907                let (second_ident, second_location_id) =
1908                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1909
1910                let chain_ident =
1911                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1912
1913                match builders_or_callback {
1914                    BuildersOrCallback::Builders(graph_builders) => {
1915                        assert_eq!(
1916                            first_location_id, second_location_id,
1917                            "chain inputs must be in the same location"
1918                        );
1919                        let builder = graph_builders.entry(first_location_id).or_default();
1920                        builder.add_dfir(
1921                            parse_quote! {
1922                                #chain_ident = chain_first_n(1);
1923                                #first_ident -> [0]#chain_ident;
1924                                #second_ident -> [1]#chain_ident;
1925                            },
1926                            None,
1927                            Some(&next_stmt_id.to_string()),
1928                        );
1929                    }
1930                    BuildersOrCallback::Callback(_, node_callback) => {
1931                        node_callback(self, next_stmt_id);
1932                    }
1933                }
1934
1935                *next_stmt_id += 1;
1936
1937                (chain_ident, first_location_id)
1938            }
1939
1940            HydroNode::CrossSingleton { left, right, .. } => {
1941                let (left_ident, left_location_id) =
1942                    left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1943                let (right_ident, right_location_id) =
1944                    right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1945
1946                let cross_ident =
1947                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1948
1949                match builders_or_callback {
1950                    BuildersOrCallback::Builders(graph_builders) => {
1951                        assert_eq!(
1952                            left_location_id, right_location_id,
1953                            "cross_singleton inputs must be in the same location"
1954                        );
1955
1956                        let builder = graph_builders.entry(left_location_id).or_default();
1957                        builder.add_dfir(
1958                            parse_quote! {
1959                                #cross_ident = cross_singleton();
1960                                #left_ident -> [input]#cross_ident;
1961                                #right_ident -> [single]#cross_ident;
1962                            },
1963                            None,
1964                            Some(&next_stmt_id.to_string()),
1965                        );
1966                    }
1967                    BuildersOrCallback::Callback(_, node_callback) => {
1968                        node_callback(self, next_stmt_id);
1969                    }
1970                }
1971
1972                *next_stmt_id += 1;
1973
1974                (cross_ident, left_location_id)
1975            }
1976
1977            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1978                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1979                    parse_quote!(cross_join_multiset)
1980                } else {
1981                    parse_quote!(join_multiset)
1982                };
1983
1984                let (HydroNode::CrossProduct { left, right, .. }
1985                | HydroNode::Join { left, right, .. }) = self
1986                else {
1987                    unreachable!()
1988                };
1989
1990                let is_top_level = left.metadata().location_kind.is_top_level()
1991                    && right.metadata().location_kind.is_top_level();
1992                let (left_inner, left_lifetime) =
1993                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1994                        debug_assert!(!left.metadata().location_kind.is_top_level());
1995                        (left, quote!('static))
1996                    } else if left.metadata().location_kind.is_top_level() {
1997                        (left, quote!('static))
1998                    } else {
1999                        (left, quote!('tick))
2000                    };
2001
2002                let (right_inner, right_lifetime) =
2003                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2004                        debug_assert!(!right.metadata().location_kind.is_top_level());
2005                        (right, quote!('static))
2006                    } else if right.metadata().location_kind.is_top_level() {
2007                        (right, quote!('static))
2008                    } else {
2009                        (right, quote!('tick))
2010                    };
2011
2012                let (left_ident, left_location_id) =
2013                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2014                let (right_ident, right_location_id) =
2015                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2016
2017                let stream_ident =
2018                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2019
2020                match builders_or_callback {
2021                    BuildersOrCallback::Builders(graph_builders) => {
2022                        assert_eq!(
2023                            left_location_id, right_location_id,
2024                            "join / cross product inputs must be in the same location"
2025                        );
2026
2027                        let builder = graph_builders.entry(left_location_id).or_default();
2028                        builder.add_dfir(
2029                            if is_top_level {
2030                                // if both inputs are root, the output is expected to have streamy semantics, so we need
2031                                // a multiset_delta() to negate the replay behavior
2032                                parse_quote! {
2033                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2034                                    #left_ident -> [0]#stream_ident;
2035                                    #right_ident -> [1]#stream_ident;
2036                                }
2037                            } else {
2038                                parse_quote! {
2039                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2040                                    #left_ident -> [0]#stream_ident;
2041                                    #right_ident -> [1]#stream_ident;
2042                                }
2043                            }
2044                            ,
2045                            None,
2046                            Some(&next_stmt_id.to_string()),
2047                        );
2048                    }
2049                    BuildersOrCallback::Callback(_, node_callback) => {
2050                        node_callback(self, next_stmt_id);
2051                    }
2052                }
2053
2054                *next_stmt_id += 1;
2055
2056                (stream_ident, left_location_id)
2057            }
2058
2059            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2060                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2061                    parse_quote!(difference_multiset)
2062                } else {
2063                    parse_quote!(anti_join_multiset)
2064                };
2065
2066                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2067                    self
2068                else {
2069                    unreachable!()
2070                };
2071
2072                let (neg, neg_lifetime) =
2073                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2074                        debug_assert!(!neg.metadata().location_kind.is_top_level());
2075                        (neg, quote!('static))
2076                    } else if neg.metadata().location_kind.is_top_level() {
2077                        (neg, quote!('static))
2078                    } else {
2079                        (neg, quote!('tick))
2080                    };
2081
2082                let (pos_ident, pos_location_id) =
2083                    pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2084                let (neg_ident, neg_location_id) =
2085                    neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2086
2087                let stream_ident =
2088                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2089
2090                match builders_or_callback {
2091                    BuildersOrCallback::Builders(graph_builders) => {
2092                        assert_eq!(
2093                            pos_location_id, neg_location_id,
2094                            "difference / anti join inputs must be in the same location"
2095                        );
2096
2097                        let builder = graph_builders.entry(pos_location_id).or_default();
2098                        builder.add_dfir(
2099                            parse_quote! {
2100                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2101                                #pos_ident -> [pos]#stream_ident;
2102                                #neg_ident -> [neg]#stream_ident;
2103                            },
2104                            None,
2105                            Some(&next_stmt_id.to_string()),
2106                        );
2107                    }
2108                    BuildersOrCallback::Callback(_, node_callback) => {
2109                        node_callback(self, next_stmt_id);
2110                    }
2111                }
2112
2113                *next_stmt_id += 1;
2114
2115                (stream_ident, pos_location_id)
2116            }
2117
2118            HydroNode::ResolveFutures { input, .. } => {
2119                let (input_ident, input_location_id) =
2120                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2121
2122                let futures_ident =
2123                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2124
2125                match builders_or_callback {
2126                    BuildersOrCallback::Builders(graph_builders) => {
2127                        let builder = graph_builders.entry(input_location_id).or_default();
2128                        builder.add_dfir(
2129                            parse_quote! {
2130                                #futures_ident = #input_ident -> resolve_futures();
2131                            },
2132                            None,
2133                            Some(&next_stmt_id.to_string()),
2134                        );
2135                    }
2136                    BuildersOrCallback::Callback(_, node_callback) => {
2137                        node_callback(self, next_stmt_id);
2138                    }
2139                }
2140
2141                *next_stmt_id += 1;
2142
2143                (futures_ident, input_location_id)
2144            }
2145
2146            HydroNode::ResolveFuturesOrdered { input, .. } => {
2147                let (input_ident, input_location_id) =
2148                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2149
2150                let futures_ident =
2151                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2152
2153                match builders_or_callback {
2154                    BuildersOrCallback::Builders(graph_builders) => {
2155                        let builder = graph_builders.entry(input_location_id).or_default();
2156                        builder.add_dfir(
2157                            parse_quote! {
2158                                #futures_ident = #input_ident -> resolve_futures_ordered();
2159                            },
2160                            None,
2161                            Some(&next_stmt_id.to_string()),
2162                        );
2163                    }
2164                    BuildersOrCallback::Callback(_, node_callback) => {
2165                        node_callback(self, next_stmt_id);
2166                    }
2167                }
2168
2169                *next_stmt_id += 1;
2170
2171                (futures_ident, input_location_id)
2172            }
2173
2174            HydroNode::Map { f, input, .. } => {
2175                let (input_ident, input_location_id) =
2176                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2177
2178                let map_ident =
2179                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2180
2181                match builders_or_callback {
2182                    BuildersOrCallback::Builders(graph_builders) => {
2183                        let builder = graph_builders.entry(input_location_id).or_default();
2184                        builder.add_dfir(
2185                            parse_quote! {
2186                                #map_ident = #input_ident -> map(#f);
2187                            },
2188                            None,
2189                            Some(&next_stmt_id.to_string()),
2190                        );
2191                    }
2192                    BuildersOrCallback::Callback(_, node_callback) => {
2193                        node_callback(self, next_stmt_id);
2194                    }
2195                }
2196
2197                *next_stmt_id += 1;
2198
2199                (map_ident, input_location_id)
2200            }
2201
2202            HydroNode::FlatMap { f, input, .. } => {
2203                let (input_ident, input_location_id) =
2204                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2205
2206                let flat_map_ident =
2207                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2208
2209                match builders_or_callback {
2210                    BuildersOrCallback::Builders(graph_builders) => {
2211                        let builder = graph_builders.entry(input_location_id).or_default();
2212                        builder.add_dfir(
2213                            parse_quote! {
2214                                #flat_map_ident = #input_ident -> flat_map(#f);
2215                            },
2216                            None,
2217                            Some(&next_stmt_id.to_string()),
2218                        );
2219                    }
2220                    BuildersOrCallback::Callback(_, node_callback) => {
2221                        node_callback(self, next_stmt_id);
2222                    }
2223                }
2224
2225                *next_stmt_id += 1;
2226
2227                (flat_map_ident, input_location_id)
2228            }
2229
2230            HydroNode::Filter { f, input, .. } => {
2231                let (input_ident, input_location_id) =
2232                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2233
2234                let filter_ident =
2235                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2236
2237                match builders_or_callback {
2238                    BuildersOrCallback::Builders(graph_builders) => {
2239                        let builder = graph_builders.entry(input_location_id).or_default();
2240                        builder.add_dfir(
2241                            parse_quote! {
2242                                #filter_ident = #input_ident -> filter(#f);
2243                            },
2244                            None,
2245                            Some(&next_stmt_id.to_string()),
2246                        );
2247                    }
2248                    BuildersOrCallback::Callback(_, node_callback) => {
2249                        node_callback(self, next_stmt_id);
2250                    }
2251                }
2252
2253                *next_stmt_id += 1;
2254
2255                (filter_ident, input_location_id)
2256            }
2257
2258            HydroNode::FilterMap { f, input, .. } => {
2259                let (input_ident, input_location_id) =
2260                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2261
2262                let filter_map_ident =
2263                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2264
2265                match builders_or_callback {
2266                    BuildersOrCallback::Builders(graph_builders) => {
2267                        let builder = graph_builders.entry(input_location_id).or_default();
2268                        builder.add_dfir(
2269                            parse_quote! {
2270                                #filter_map_ident = #input_ident -> filter_map(#f);
2271                            },
2272                            None,
2273                            Some(&next_stmt_id.to_string()),
2274                        );
2275                    }
2276                    BuildersOrCallback::Callback(_, node_callback) => {
2277                        node_callback(self, next_stmt_id);
2278                    }
2279                }
2280
2281                *next_stmt_id += 1;
2282
2283                (filter_map_ident, input_location_id)
2284            }
2285
2286            HydroNode::Sort { input, .. } => {
2287                let (input_ident, input_location_id) =
2288                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2289
2290                let sort_ident =
2291                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2292
2293                match builders_or_callback {
2294                    BuildersOrCallback::Builders(graph_builders) => {
2295                        let builder = graph_builders.entry(input_location_id).or_default();
2296                        builder.add_dfir(
2297                            parse_quote! {
2298                                #sort_ident = #input_ident -> sort();
2299                            },
2300                            None,
2301                            Some(&next_stmt_id.to_string()),
2302                        );
2303                    }
2304                    BuildersOrCallback::Callback(_, node_callback) => {
2305                        node_callback(self, next_stmt_id);
2306                    }
2307                }
2308
2309                *next_stmt_id += 1;
2310
2311                (sort_ident, input_location_id)
2312            }
2313
2314            HydroNode::DeferTick { input, .. } => {
2315                let (input_ident, input_location_id) =
2316                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2317
2318                let defer_tick_ident =
2319                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2320
2321                match builders_or_callback {
2322                    BuildersOrCallback::Builders(graph_builders) => {
2323                        let builder = graph_builders.entry(input_location_id).or_default();
2324                        builder.add_dfir(
2325                            parse_quote! {
2326                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2327                            },
2328                            None,
2329                            Some(&next_stmt_id.to_string()),
2330                        );
2331                    }
2332                    BuildersOrCallback::Callback(_, node_callback) => {
2333                        node_callback(self, next_stmt_id);
2334                    }
2335                }
2336
2337                *next_stmt_id += 1;
2338
2339                (defer_tick_ident, input_location_id)
2340            }
2341
2342            HydroNode::Enumerate { input, .. } => {
2343                let (input_ident, input_location_id) =
2344                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2345
2346                let enumerate_ident =
2347                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2348
2349                match builders_or_callback {
2350                    BuildersOrCallback::Builders(graph_builders) => {
2351                        let builder = graph_builders.entry(input_location_id).or_default();
2352                        let lifetime = if input.metadata().location_kind.is_top_level() {
2353                            quote!('static)
2354                        } else {
2355                            quote!('tick)
2356                        };
2357                        builder.add_dfir(
2358                            parse_quote! {
2359                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2360                            },
2361                            None,
2362                            Some(&next_stmt_id.to_string()),
2363                        );
2364                    }
2365                    BuildersOrCallback::Callback(_, node_callback) => {
2366                        node_callback(self, next_stmt_id);
2367                    }
2368                }
2369
2370                *next_stmt_id += 1;
2371
2372                (enumerate_ident, input_location_id)
2373            }
2374
2375            HydroNode::Inspect { f, input, .. } => {
2376                let (input_ident, input_location_id) =
2377                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2378
2379                let inspect_ident =
2380                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2381
2382                match builders_or_callback {
2383                    BuildersOrCallback::Builders(graph_builders) => {
2384                        let builder = graph_builders.entry(input_location_id).or_default();
2385                        builder.add_dfir(
2386                            parse_quote! {
2387                                #inspect_ident = #input_ident -> inspect(#f);
2388                            },
2389                            None,
2390                            Some(&next_stmt_id.to_string()),
2391                        );
2392                    }
2393                    BuildersOrCallback::Callback(_, node_callback) => {
2394                        node_callback(self, next_stmt_id);
2395                    }
2396                }
2397
2398                *next_stmt_id += 1;
2399
2400                (inspect_ident, input_location_id)
2401            }
2402
2403            HydroNode::Unique { input, .. } => {
2404                let (input_ident, input_location_id) =
2405                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2406
2407                let unique_ident =
2408                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2409
2410                match builders_or_callback {
2411                    BuildersOrCallback::Builders(graph_builders) => {
2412                        let builder = graph_builders.entry(input_location_id).or_default();
2413                        builder.add_dfir(
2414                            parse_quote! {
2415                                #unique_ident = #input_ident -> unique::<'tick>();
2416                            },
2417                            None,
2418                            Some(&next_stmt_id.to_string()),
2419                        );
2420                    }
2421                    BuildersOrCallback::Callback(_, node_callback) => {
2422                        node_callback(self, next_stmt_id);
2423                    }
2424                }
2425
2426                *next_stmt_id += 1;
2427
2428                (unique_ident, input_location_id)
2429            }
2430
2431            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2432                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2433                    parse_quote!(fold)
2434                } else if matches!(self, HydroNode::Scan { .. }) {
2435                    parse_quote!(scan)
2436                } else {
2437                    parse_quote!(fold_keyed)
2438                };
2439
2440                let (HydroNode::Fold {
2441                    init, acc, input, ..
2442                }
2443                | HydroNode::FoldKeyed {
2444                    init, acc, input, ..
2445                }
2446                | HydroNode::Scan {
2447                    init, acc, input, ..
2448                }) = self
2449                else {
2450                    unreachable!()
2451                };
2452
2453                let (input, lifetime) =
2454                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2455                        debug_assert!(!input.metadata().location_kind.is_top_level());
2456                        (input, quote!('static))
2457                    } else if input.metadata().location_kind.is_top_level() {
2458                        (input, quote!('static))
2459                    } else {
2460                        (input, quote!('tick))
2461                    };
2462
2463                let (input_ident, input_location_id) =
2464                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2465
2466                let fold_ident =
2467                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2468
2469                match builders_or_callback {
2470                    BuildersOrCallback::Builders(graph_builders) => {
2471                        let builder = graph_builders.entry(input_location_id).or_default();
2472                        builder.add_dfir(
2473                            parse_quote! {
2474                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2475                            },
2476                            None,
2477                            Some(&next_stmt_id.to_string()),
2478                        );
2479                    }
2480                    BuildersOrCallback::Callback(_, node_callback) => {
2481                        node_callback(self, next_stmt_id);
2482                    }
2483                }
2484
2485                *next_stmt_id += 1;
2486
2487                (fold_ident, input_location_id)
2488            }
2489
2490            HydroNode::ReduceKeyedWatermark {
2491                f,
2492                input,
2493                watermark,
2494                ..
2495            } => {
2496                let (input, lifetime) =
2497                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2498                        debug_assert!(!input.metadata().location_kind.is_top_level());
2499                        (input, quote!('static))
2500                    } else if input.metadata().location_kind.is_top_level() {
2501                        (input, quote!('static))
2502                    } else {
2503                        (input, quote!('tick))
2504                    };
2505
2506                let (input_ident, input_location_id) =
2507                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2508
2509                let (watermark_ident, watermark_location_id) =
2510                    watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2511
2512                let chain_ident = syn::Ident::new(
2513                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2514                    Span::call_site(),
2515                );
2516
2517                let fold_ident =
2518                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2519
2520                match builders_or_callback {
2521                    BuildersOrCallback::Builders(graph_builders) => {
2522                        assert_eq!(
2523                            input_location_id, watermark_location_id,
2524                            "ReduceKeyedWatermark inputs must be in the same location"
2525                        );
2526
2527                        let builder = graph_builders.entry(input_location_id).or_default();
2528                        // 1. Don't allow any values to be added to the map if the key <=the watermark
2529                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
2530                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
2531                        // 3. Convert the BTreeMap back into a stream of (k, v)
2532                        builder.add_dfir(
2533                            parse_quote! {
2534                                #chain_ident = chain();
2535                                #input_ident
2536                                    -> map(|x| (Some(x), None))
2537                                    -> [0]#chain_ident;
2538                                #watermark_ident
2539                                    -> map(|watermark| (None, Some(watermark)))
2540                                    -> [1]#chain_ident;
2541
2542                                #fold_ident = #chain_ident
2543                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2544                                        let __reduce_keyed_fn = #f;
2545                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2546                                            if let Some((k, v)) = opt_payload {
2547                                                if let Some(curr_watermark) = *opt_curr_watermark {
2548                                                    if k <= curr_watermark {
2549                                                        return;
2550                                                    }
2551                                                }
2552                                                match map.entry(k) {
2553                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
2554                                                        e.insert(v);
2555                                                    }
2556                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
2557                                                        __reduce_keyed_fn(e.get_mut(), v);
2558                                                    }
2559                                                }
2560                                            } else {
2561                                                let watermark = opt_watermark.unwrap();
2562                                                if let Some(curr_watermark) = *opt_curr_watermark {
2563                                                    if watermark <= curr_watermark {
2564                                                        return;
2565                                                    }
2566                                                }
2567                                                *opt_curr_watermark = opt_watermark;
2568                                                map.retain(|k, _| *k > watermark);
2569                                            }
2570                                        }
2571                                    })
2572                                    -> flat_map(|(map, _curr_watermark)| map);
2573                            },
2574                            None,
2575                            Some(&next_stmt_id.to_string()),
2576                        );
2577                    }
2578                    BuildersOrCallback::Callback(_, node_callback) => {
2579                        node_callback(self, next_stmt_id);
2580                    }
2581                }
2582
2583                *next_stmt_id += 1;
2584
2585                (fold_ident, input_location_id)
2586            }
2587
2588            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2589                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2590                    parse_quote!(reduce)
2591                } else {
2592                    parse_quote!(reduce_keyed)
2593                };
2594
2595                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2596                    self
2597                else {
2598                    unreachable!()
2599                };
2600
2601                let (input, lifetime) =
2602                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2603                        debug_assert!(!input.metadata().location_kind.is_top_level());
2604                        (input, quote!('static))
2605                    } else if input.metadata().location_kind.is_top_level() {
2606                        (input, quote!('static))
2607                    } else {
2608                        (input, quote!('tick))
2609                    };
2610
2611                let (input_ident, input_location_id) =
2612                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2613
2614                let reduce_ident =
2615                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2616
2617                match builders_or_callback {
2618                    BuildersOrCallback::Builders(graph_builders) => {
2619                        let builder = graph_builders.entry(input_location_id).or_default();
2620                        builder.add_dfir(
2621                            parse_quote! {
2622                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2623                            },
2624                            None,
2625                            Some(&next_stmt_id.to_string()),
2626                        );
2627                    }
2628                    BuildersOrCallback::Callback(_, node_callback) => {
2629                        node_callback(self, next_stmt_id);
2630                    }
2631                }
2632
2633                *next_stmt_id += 1;
2634
2635                (reduce_ident, input_location_id)
2636            }
2637
2638            HydroNode::Network {
2639                serialize_fn: serialize_pipeline,
2640                instantiate_fn,
2641                deserialize_fn: deserialize_pipeline,
2642                input,
2643                metadata,
2644                ..
2645            } => {
2646                let (input_ident, input_location_id) =
2647                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2648
2649                let to_id = metadata.location_kind.root().raw_id();
2650
2651                let receiver_stream_ident =
2652                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2653
2654                match builders_or_callback {
2655                    BuildersOrCallback::Builders(graph_builders) => {
2656                        let (sink_expr, source_expr) = match instantiate_fn {
2657                            DebugInstantiate::Building => (
2658                                syn::parse_quote!(DUMMY_SINK),
2659                                syn::parse_quote!(DUMMY_SOURCE),
2660                            ),
2661
2662                            DebugInstantiate::Finalized(finalized) => {
2663                                (finalized.sink.clone(), finalized.source.clone())
2664                            }
2665                        };
2666
2667                        let sender_builder = graph_builders.entry(input_location_id).or_default();
2668                        if let Some(serialize_pipeline) = serialize_pipeline {
2669                            sender_builder.add_dfir(
2670                                parse_quote! {
2671                                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
2672                                },
2673                                None,
2674                                // operator tag separates send and receive, which otherwise have the same next_stmt_id
2675                                Some(&format!("send{}", next_stmt_id)),
2676                            );
2677                        } else {
2678                            sender_builder.add_dfir(
2679                                parse_quote! {
2680                                    #input_ident -> dest_sink(#sink_expr);
2681                                },
2682                                None,
2683                                Some(&format!("send{}", next_stmt_id)),
2684                            );
2685                        }
2686
2687                        let receiver_builder = graph_builders.entry(to_id).or_default();
2688                        if let Some(deserialize_pipeline) = deserialize_pipeline {
2689                            receiver_builder.add_dfir(parse_quote! {
2690                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2691                            }, None, Some(&format!("recv{}", next_stmt_id)));
2692                        } else {
2693                            receiver_builder.add_dfir(
2694                                parse_quote! {
2695                                    #receiver_stream_ident = source_stream(#source_expr);
2696                                },
2697                                None,
2698                                Some(&format!("recv{}", next_stmt_id)),
2699                            );
2700                        }
2701                    }
2702                    BuildersOrCallback::Callback(_, node_callback) => {
2703                        node_callback(self, next_stmt_id);
2704                    }
2705                }
2706
2707                *next_stmt_id += 1;
2708
2709                (receiver_stream_ident, to_id)
2710            }
2711
2712            HydroNode::ExternalInput {
2713                instantiate_fn,
2714                deserialize_fn: deserialize_pipeline,
2715                metadata,
2716                ..
2717            } => {
2718                let to_id = metadata.location_kind.root().raw_id();
2719
2720                let receiver_stream_ident =
2721                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2722
2723                match builders_or_callback {
2724                    BuildersOrCallback::Builders(graph_builders) => {
2725                        let (_, source_expr) = match instantiate_fn {
2726                            DebugInstantiate::Building => (
2727                                syn::parse_quote!(DUMMY_SINK),
2728                                syn::parse_quote!(DUMMY_SOURCE),
2729                            ),
2730
2731                            DebugInstantiate::Finalized(finalized) => {
2732                                (finalized.sink.clone(), finalized.source.clone())
2733                            }
2734                        };
2735
2736                        let receiver_builder = graph_builders.entry(to_id).or_default();
2737                        if let Some(deserialize_pipeline) = deserialize_pipeline {
2738                            receiver_builder.add_dfir(parse_quote! {
2739                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2740                            }, None, Some(&format!("recv{}", next_stmt_id)));
2741                        } else {
2742                            receiver_builder.add_dfir(
2743                                parse_quote! {
2744                                    #receiver_stream_ident = source_stream(#source_expr);
2745                                },
2746                                None,
2747                                Some(&format!("recv{}", next_stmt_id)),
2748                            );
2749                        }
2750                    }
2751                    BuildersOrCallback::Callback(_, node_callback) => {
2752                        node_callback(self, next_stmt_id);
2753                    }
2754                }
2755
2756                *next_stmt_id += 1;
2757
2758                (receiver_stream_ident, to_id)
2759            }
2760
2761            HydroNode::Counter {
2762                tag,
2763                duration,
2764                prefix,
2765                input,
2766                ..
2767            } => {
2768                let (input_ident, input_location_id) =
2769                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2770
2771                let counter_ident =
2772                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2773
2774                match builders_or_callback {
2775                    BuildersOrCallback::Builders(graph_builders) => {
2776                        let builder = graph_builders.entry(input_location_id).or_default();
2777                        builder.add_dfir(
2778                            parse_quote! {
2779                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
2780                            },
2781                            None,
2782                            Some(&next_stmt_id.to_string()),
2783                        );
2784                    }
2785                    BuildersOrCallback::Callback(_, node_callback) => {
2786                        node_callback(self, next_stmt_id);
2787                    }
2788                }
2789
2790                *next_stmt_id += 1;
2791
2792                (counter_ident, input_location_id)
2793            }
2794        }
2795    }
2796
2797    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2798        match self {
2799            HydroNode::Placeholder => {
2800                panic!()
2801            }
2802            HydroNode::Source { source, .. } => match source {
2803                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2804                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2805            },
2806            HydroNode::CycleSource { .. }
2807            | HydroNode::Tee { .. }
2808            | HydroNode::Persist { .. }
2809            | HydroNode::YieldConcat { .. }
2810            | HydroNode::BeginAtomic { .. }
2811            | HydroNode::EndAtomic { .. }
2812            | HydroNode::Batch { .. }
2813            | HydroNode::Chain { .. }
2814            | HydroNode::ChainFirst { .. }
2815            | HydroNode::CrossProduct { .. }
2816            | HydroNode::CrossSingleton { .. }
2817            | HydroNode::ResolveFutures { .. }
2818            | HydroNode::ResolveFuturesOrdered { .. }
2819            | HydroNode::Join { .. }
2820            | HydroNode::Difference { .. }
2821            | HydroNode::AntiJoin { .. }
2822            | HydroNode::DeferTick { .. }
2823            | HydroNode::Enumerate { .. }
2824            | HydroNode::Unique { .. }
2825            | HydroNode::Sort { .. } => {}
2826            HydroNode::Map { f, .. }
2827            | HydroNode::FlatMap { f, .. }
2828            | HydroNode::Filter { f, .. }
2829            | HydroNode::FilterMap { f, .. }
2830            | HydroNode::Inspect { f, .. }
2831            | HydroNode::Reduce { f, .. }
2832            | HydroNode::ReduceKeyed { f, .. }
2833            | HydroNode::ReduceKeyedWatermark { f, .. } => {
2834                transform(f);
2835            }
2836            HydroNode::Fold { init, acc, .. }
2837            | HydroNode::Scan { init, acc, .. }
2838            | HydroNode::FoldKeyed { init, acc, .. } => {
2839                transform(init);
2840                transform(acc);
2841            }
2842            HydroNode::Network {
2843                serialize_fn,
2844                deserialize_fn,
2845                ..
2846            } => {
2847                if let Some(serialize_fn) = serialize_fn {
2848                    transform(serialize_fn);
2849                }
2850                if let Some(deserialize_fn) = deserialize_fn {
2851                    transform(deserialize_fn);
2852                }
2853            }
2854            HydroNode::ExternalInput { deserialize_fn, .. } => {
2855                if let Some(deserialize_fn) = deserialize_fn {
2856                    transform(deserialize_fn);
2857                }
2858            }
2859            HydroNode::Counter { duration, .. } => {
2860                transform(duration);
2861            }
2862        }
2863    }
2864
2865    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
2866        &self.metadata().op
2867    }
2868
2869    pub fn metadata(&self) -> &HydroIrMetadata {
2870        match self {
2871            HydroNode::Placeholder => {
2872                panic!()
2873            }
2874            HydroNode::Source { metadata, .. } => metadata,
2875            HydroNode::CycleSource { metadata, .. } => metadata,
2876            HydroNode::Tee { metadata, .. } => metadata,
2877            HydroNode::Persist { metadata, .. } => metadata,
2878            HydroNode::YieldConcat { metadata, .. } => metadata,
2879            HydroNode::BeginAtomic { metadata, .. } => metadata,
2880            HydroNode::EndAtomic { metadata, .. } => metadata,
2881            HydroNode::Batch { metadata, .. } => metadata,
2882            HydroNode::Chain { metadata, .. } => metadata,
2883            HydroNode::ChainFirst { metadata, .. } => metadata,
2884            HydroNode::CrossProduct { metadata, .. } => metadata,
2885            HydroNode::CrossSingleton { metadata, .. } => metadata,
2886            HydroNode::Join { metadata, .. } => metadata,
2887            HydroNode::Difference { metadata, .. } => metadata,
2888            HydroNode::AntiJoin { metadata, .. } => metadata,
2889            HydroNode::ResolveFutures { metadata, .. } => metadata,
2890            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2891            HydroNode::Map { metadata, .. } => metadata,
2892            HydroNode::FlatMap { metadata, .. } => metadata,
2893            HydroNode::Filter { metadata, .. } => metadata,
2894            HydroNode::FilterMap { metadata, .. } => metadata,
2895            HydroNode::DeferTick { metadata, .. } => metadata,
2896            HydroNode::Enumerate { metadata, .. } => metadata,
2897            HydroNode::Inspect { metadata, .. } => metadata,
2898            HydroNode::Unique { metadata, .. } => metadata,
2899            HydroNode::Sort { metadata, .. } => metadata,
2900            HydroNode::Scan { metadata, .. } => metadata,
2901            HydroNode::Fold { metadata, .. } => metadata,
2902            HydroNode::FoldKeyed { metadata, .. } => metadata,
2903            HydroNode::Reduce { metadata, .. } => metadata,
2904            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2905            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2906            HydroNode::ExternalInput { metadata, .. } => metadata,
2907            HydroNode::Network { metadata, .. } => metadata,
2908            HydroNode::Counter { metadata, .. } => metadata,
2909        }
2910    }
2911
2912    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
2913        &mut self.metadata_mut().op
2914    }
2915
2916    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2917        match self {
2918            HydroNode::Placeholder => {
2919                panic!()
2920            }
2921            HydroNode::Source { metadata, .. } => metadata,
2922            HydroNode::CycleSource { metadata, .. } => metadata,
2923            HydroNode::Tee { metadata, .. } => metadata,
2924            HydroNode::Persist { metadata, .. } => metadata,
2925            HydroNode::YieldConcat { metadata, .. } => metadata,
2926            HydroNode::BeginAtomic { metadata, .. } => metadata,
2927            HydroNode::EndAtomic { metadata, .. } => metadata,
2928            HydroNode::Batch { metadata, .. } => metadata,
2929            HydroNode::Chain { metadata, .. } => metadata,
2930            HydroNode::ChainFirst { metadata, .. } => metadata,
2931            HydroNode::CrossProduct { metadata, .. } => metadata,
2932            HydroNode::CrossSingleton { metadata, .. } => metadata,
2933            HydroNode::Join { metadata, .. } => metadata,
2934            HydroNode::Difference { metadata, .. } => metadata,
2935            HydroNode::AntiJoin { metadata, .. } => metadata,
2936            HydroNode::ResolveFutures { metadata, .. } => metadata,
2937            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2938            HydroNode::Map { metadata, .. } => metadata,
2939            HydroNode::FlatMap { metadata, .. } => metadata,
2940            HydroNode::Filter { metadata, .. } => metadata,
2941            HydroNode::FilterMap { metadata, .. } => metadata,
2942            HydroNode::DeferTick { metadata, .. } => metadata,
2943            HydroNode::Enumerate { metadata, .. } => metadata,
2944            HydroNode::Inspect { metadata, .. } => metadata,
2945            HydroNode::Unique { metadata, .. } => metadata,
2946            HydroNode::Sort { metadata, .. } => metadata,
2947            HydroNode::Scan { metadata, .. } => metadata,
2948            HydroNode::Fold { metadata, .. } => metadata,
2949            HydroNode::FoldKeyed { metadata, .. } => metadata,
2950            HydroNode::Reduce { metadata, .. } => metadata,
2951            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2952            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2953            HydroNode::ExternalInput { metadata, .. } => metadata,
2954            HydroNode::Network { metadata, .. } => metadata,
2955            HydroNode::Counter { metadata, .. } => metadata,
2956        }
2957    }
2958
2959    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
2960        match self {
2961            HydroNode::Placeholder => {
2962                panic!()
2963            }
2964            HydroNode::Source { .. }
2965            | HydroNode::ExternalInput { .. }
2966            | HydroNode::CycleSource { .. } // CycleSource and Tee should calculate input metadata in separate special ways
2967            | HydroNode::Tee { .. } => {
2968                vec![]
2969            }
2970            HydroNode::Persist { inner, .. }
2971            | HydroNode::YieldConcat { inner, .. }
2972            | HydroNode::BeginAtomic { inner, .. }
2973            | HydroNode::EndAtomic { inner, .. }
2974            | HydroNode::Batch { inner, .. } => {
2975                vec![inner.metadata()]
2976            }
2977            HydroNode::Chain { first, second, .. } => {
2978                vec![first.metadata(), second.metadata()]
2979            }
2980            HydroNode::ChainFirst { first, second, .. } => {
2981                vec![first.metadata(), second.metadata()]
2982            }
2983            HydroNode::CrossProduct { left, right, .. }
2984            | HydroNode::CrossSingleton { left, right, .. }
2985            | HydroNode::Join { left, right, .. } => {
2986                vec![left.metadata(), right.metadata()]
2987            }
2988            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2989                vec![pos.metadata(), neg.metadata()]
2990            }
2991            HydroNode::Map { input, .. }
2992            | HydroNode::FlatMap { input, .. }
2993            | HydroNode::Filter { input, .. }
2994            | HydroNode::FilterMap { input, .. }
2995            | HydroNode::Sort { input, .. }
2996            | HydroNode::DeferTick { input, .. }
2997            | HydroNode::Enumerate { input, .. }
2998            | HydroNode::Inspect { input, .. }
2999            | HydroNode::Unique { input, .. }
3000            | HydroNode::Network { input, .. }
3001            | HydroNode::Counter { input, .. }
3002            | HydroNode::ResolveFutures { input, .. }
3003            | HydroNode::ResolveFuturesOrdered { input, .. } => {
3004                vec![input.metadata()]
3005            }
3006            HydroNode::Fold { input, .. }
3007            | HydroNode::FoldKeyed { input, .. }
3008            | HydroNode::Reduce { input, .. }
3009            | HydroNode::ReduceKeyed { input, .. }
3010            | HydroNode::Scan { input, .. } => {
3011                // Skip persist before fold/reduce
3012                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3013                    vec![inner.metadata()]
3014                } else {
3015                    vec![input.metadata()]
3016                }
3017            }
3018            HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
3019                // Skip persist before fold/reduce
3020                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3021                    vec![inner.metadata(), watermark.metadata()]
3022                } else {
3023                    vec![input.metadata(), watermark.metadata()]
3024                }
3025            }
3026        }
3027    }
3028
3029    pub fn print_root(&self) -> String {
3030        match self {
3031            HydroNode::Placeholder => {
3032                panic!()
3033            }
3034            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3035            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3036            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3037            HydroNode::Persist { .. } => "Persist()".to_string(),
3038            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3039            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3040            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3041            HydroNode::Batch { .. } => "Batch()".to_string(),
3042            HydroNode::Chain { first, second, .. } => {
3043                format!("Chain({}, {})", first.print_root(), second.print_root())
3044            }
3045            HydroNode::ChainFirst { first, second, .. } => {
3046                format!(
3047                    "ChainFirst({}, {})",
3048                    first.print_root(),
3049                    second.print_root()
3050                )
3051            }
3052            HydroNode::CrossProduct { left, right, .. } => {
3053                format!(
3054                    "CrossProduct({}, {})",
3055                    left.print_root(),
3056                    right.print_root()
3057                )
3058            }
3059            HydroNode::CrossSingleton { left, right, .. } => {
3060                format!(
3061                    "CrossSingleton({}, {})",
3062                    left.print_root(),
3063                    right.print_root()
3064                )
3065            }
3066            HydroNode::Join { left, right, .. } => {
3067                format!("Join({}, {})", left.print_root(), right.print_root())
3068            }
3069            HydroNode::Difference { pos, neg, .. } => {
3070                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3071            }
3072            HydroNode::AntiJoin { pos, neg, .. } => {
3073                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3074            }
3075            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3076            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3077            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3078            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3079            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3080            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3081            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3082            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3083            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3084            HydroNode::Unique { .. } => "Unique()".to_string(),
3085            HydroNode::Sort { .. } => "Sort()".to_string(),
3086            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3087            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3088            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3089            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3090            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3091            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3092            HydroNode::Network { .. } => "Network()".to_string(),
3093            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3094            HydroNode::Counter { tag, duration, .. } => {
3095                format!("Counter({:?}, {:?})", tag, duration)
3096            }
3097        }
3098    }
3099}
3100
3101#[cfg(feature = "build")]
3102fn instantiate_network<'a, D>(
3103    from_location: &LocationId,
3104    to_location: &LocationId,
3105    processes: &HashMap<usize, D::Process>,
3106    clusters: &HashMap<usize, D::Cluster>,
3107    compile_env: &D::CompileEnv,
3108) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3109where
3110    D: Deploy<'a>,
3111{
3112    let ((sink, source), connect_fn) = match (from_location, to_location) {
3113        (LocationId::Process(from), LocationId::Process(to)) => {
3114            let from_node = processes
3115                .get(from)
3116                .unwrap_or_else(|| {
3117                    panic!("A process used in the graph was not instantiated: {}", from)
3118                })
3119                .clone();
3120            let to_node = processes
3121                .get(to)
3122                .unwrap_or_else(|| {
3123                    panic!("A process used in the graph was not instantiated: {}", to)
3124                })
3125                .clone();
3126
3127            let sink_port = D::allocate_process_port(&from_node);
3128            let source_port = D::allocate_process_port(&to_node);
3129
3130            (
3131                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3132                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3133            )
3134        }
3135        (LocationId::Process(from), LocationId::Cluster(to)) => {
3136            let from_node = processes
3137                .get(from)
3138                .unwrap_or_else(|| {
3139                    panic!("A process used in the graph was not instantiated: {}", from)
3140                })
3141                .clone();
3142            let to_node = clusters
3143                .get(to)
3144                .unwrap_or_else(|| {
3145                    panic!("A cluster used in the graph was not instantiated: {}", to)
3146                })
3147                .clone();
3148
3149            let sink_port = D::allocate_process_port(&from_node);
3150            let source_port = D::allocate_cluster_port(&to_node);
3151
3152            (
3153                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3154                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3155            )
3156        }
3157        (LocationId::Cluster(from), LocationId::Process(to)) => {
3158            let from_node = clusters
3159                .get(from)
3160                .unwrap_or_else(|| {
3161                    panic!("A cluster used in the graph was not instantiated: {}", from)
3162                })
3163                .clone();
3164            let to_node = processes
3165                .get(to)
3166                .unwrap_or_else(|| {
3167                    panic!("A process used in the graph was not instantiated: {}", to)
3168                })
3169                .clone();
3170
3171            let sink_port = D::allocate_cluster_port(&from_node);
3172            let source_port = D::allocate_process_port(&to_node);
3173
3174            (
3175                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3176                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3177            )
3178        }
3179        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3180            let from_node = clusters
3181                .get(from)
3182                .unwrap_or_else(|| {
3183                    panic!("A cluster used in the graph was not instantiated: {}", from)
3184                })
3185                .clone();
3186            let to_node = clusters
3187                .get(to)
3188                .unwrap_or_else(|| {
3189                    panic!("A cluster used in the graph was not instantiated: {}", to)
3190                })
3191                .clone();
3192
3193            let sink_port = D::allocate_cluster_port(&from_node);
3194            let source_port = D::allocate_cluster_port(&to_node);
3195
3196            (
3197                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3198                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3199            )
3200        }
3201        (LocationId::Tick(_, _), _) => panic!(),
3202        (_, LocationId::Tick(_, _)) => panic!(),
3203        (LocationId::Atomic(_), _) => panic!(),
3204        (_, LocationId::Atomic(_)) => panic!(),
3205    };
3206    (sink, source, connect_fn)
3207}
3208
3209#[cfg(test)]
3210mod test {
3211    use std::mem::size_of;
3212
3213    use stageleft::{QuotedWithContext, q};
3214
3215    use super::*;
3216
3217    #[test]
3218    fn hydro_node_size() {
3219        assert_eq!(size_of::<HydroNode>(), 248);
3220    }
3221
3222    #[test]
3223    fn hydro_root_size() {
3224        assert_eq!(size_of::<HydroRoot>(), 176);
3225    }
3226
3227    #[test]
3228    fn test_simplify_q_macro_basic() {
3229        // Test basic non-q! expression
3230        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3231        let result = simplify_q_macro(simple_expr.clone());
3232        assert_eq!(result, simple_expr);
3233    }
3234
3235    #[test]
3236    fn test_simplify_q_macro_actual_stageleft_call() {
3237        // Test a simplified version of what a real stageleft call might look like
3238        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3239        let result = simplify_q_macro(stageleft_call);
3240        // This should be processed by our visitor and simplified to q!(...)
3241        // since we detect the stageleft::runtime_support::fn_* pattern
3242        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3243    }
3244
3245    #[test]
3246    fn test_closure_no_pipe_at_start() {
3247        // Test a closure that does not start with a pipe
3248        let stageleft_call = q!({
3249            let foo = 123;
3250            move |b: usize| b + foo
3251        })
3252        .splice_fn1_ctx(&());
3253        let result = simplify_q_macro(stageleft_call);
3254        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3255    }
3256}