hydro_lang/graph/
render.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8// Re-export specific implementations
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::compile::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12use crate::location::dynamic::LocationId;
13
14/// Label for a graph node - can be either a static string or contain expressions.
15#[derive(Debug, Clone)]
16pub enum NodeLabel {
17    /// A static string label
18    Static(String),
19    /// A label with an operation name and expression arguments
20    WithExprs {
21        op_name: String,
22        exprs: Vec<DebugExpr>,
23    },
24}
25
26impl NodeLabel {
27    /// Create a static label
28    pub fn static_label(s: String) -> Self {
29        Self::Static(s)
30    }
31
32    /// Create a label for an operation with multiple expression
33    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
34        Self::WithExprs { op_name, exprs }
35    }
36}
37
38impl std::fmt::Display for NodeLabel {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            Self::Static(s) => write!(f, "{}", s),
42            Self::WithExprs { op_name, exprs } => {
43                if exprs.is_empty() {
44                    write!(f, "{}()", op_name)
45                } else {
46                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
47                    write!(f, "{}({})", op_name, expr_strs.join(", "))
48                }
49            }
50        }
51    }
52}
53
54/// Base struct for text-based graph writers that use indentation.
55/// Contains common fields shared by DOT and Mermaid writers.
56pub struct IndentedGraphWriter<W> {
57    pub write: W,
58    pub indent: usize,
59    pub config: HydroWriteConfig,
60}
61
62impl<W> IndentedGraphWriter<W> {
63    /// Create a new writer with default configuration.
64    pub fn new(write: W) -> Self {
65        Self {
66            write,
67            indent: 0,
68            config: HydroWriteConfig::default(),
69        }
70    }
71
72    /// Create a new writer with the given configuration.
73    pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
74        Self {
75            write,
76            indent: 0,
77            config: config.clone(),
78        }
79    }
80}
81
82impl<W: Write> IndentedGraphWriter<W> {
83    /// Write an indented line using the current indentation level.
84    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
85        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
86    }
87}
88
89/// Common error type used by all graph writers.
90pub type GraphWriteError = std::fmt::Error;
91
92/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
93#[auto_impl(&mut, Box)]
94pub trait HydroGraphWrite {
95    /// Error type emitted by writing.
96    type Err: Error;
97
98    /// Begin the graph. First method called.
99    fn write_prologue(&mut self) -> Result<(), Self::Err>;
100
101    /// Write a node definition with styling.
102    fn write_node_definition(
103        &mut self,
104        node_id: usize,
105        node_label: &NodeLabel,
106        node_type: HydroNodeType,
107        location_id: Option<usize>,
108        location_type: Option<&str>,
109    ) -> Result<(), Self::Err>;
110
111    /// Write an edge between nodes with optional labeling.
112    fn write_edge(
113        &mut self,
114        src_id: usize,
115        dst_id: usize,
116        edge_type: HydroEdgeType,
117        label: Option<&str>,
118    ) -> Result<(), Self::Err>;
119
120    /// Begin writing a location grouping (process/cluster).
121    fn write_location_start(
122        &mut self,
123        location_id: usize,
124        location_type: &str,
125    ) -> Result<(), Self::Err>;
126
127    /// Write a node within a location.
128    fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
129
130    /// End writing a location grouping.
131    fn write_location_end(&mut self) -> Result<(), Self::Err>;
132
133    /// End the graph. Last method called.
134    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
135}
136
137/// Types of nodes in Hydro IR for styling purposes.
138#[derive(Debug, Clone, Copy)]
139pub enum HydroNodeType {
140    Source,
141    Transform,
142    Join,
143    Aggregation,
144    Network,
145    Sink,
146    Tee,
147}
148
149/// Types of edges in Hydro IR.
150#[derive(Debug, Clone, Copy)]
151pub enum HydroEdgeType {
152    Stream,
153    Persistent,
154    Network,
155    Cycle,
156}
157
158/// Configuration for graph writing.
159#[derive(Debug, Clone)]
160pub struct HydroWriteConfig {
161    pub show_metadata: bool,
162    pub show_location_groups: bool,
163    pub use_short_labels: bool,
164    pub process_id_name: Vec<(usize, String)>,
165    pub cluster_id_name: Vec<(usize, String)>,
166    pub external_id_name: Vec<(usize, String)>,
167}
168
169impl Default for HydroWriteConfig {
170    fn default() -> Self {
171        Self {
172            show_metadata: false,
173            show_location_groups: true,
174            use_short_labels: true, // Default to short labels for all renderers
175            process_id_name: vec![],
176            cluster_id_name: vec![],
177            external_id_name: vec![],
178        }
179    }
180}
181
182/// Graph structure tracker for Hydro IR rendering.
183#[derive(Debug, Default)]
184pub struct HydroGraphStructure {
185    pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, /* node_id -> (label, type, location) */
186    pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, // (src, dst, edge_type, label)
187    pub locations: HashMap<usize, String>,                         // location_id -> location_type
188    pub next_node_id: usize,
189}
190
191impl HydroGraphStructure {
192    pub fn new() -> Self {
193        Self::default()
194    }
195
196    pub fn add_node(
197        &mut self,
198        label: NodeLabel,
199        node_type: HydroNodeType,
200        location: Option<usize>,
201    ) -> usize {
202        let node_id = self.next_node_id;
203        self.next_node_id += 1;
204        self.nodes.insert(node_id, (label, node_type, location));
205        node_id
206    }
207
208    pub fn add_edge(
209        &mut self,
210        src: usize,
211        dst: usize,
212        edge_type: HydroEdgeType,
213        label: Option<String>,
214    ) {
215        self.edges.push((src, dst, edge_type, label));
216    }
217
218    pub fn add_location(&mut self, location_id: usize, location_type: String) {
219        self.locations.insert(location_id, location_type);
220    }
221}
222
223/// Function to extract an op_name from a print_root() result for use in labels.
224pub fn extract_op_name(full_label: String) -> String {
225    full_label
226        .split('(')
227        .next()
228        .unwrap_or("unknown")
229        .to_string()
230        .to_lowercase()
231}
232
233/// Extract a short, readable label from the full token stream label using print_root() style naming
234pub fn extract_short_label(full_label: &str) -> String {
235    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
236    if let Some(op_name) = full_label.split('(').next() {
237        let base_name = op_name.to_lowercase();
238        match base_name.as_str() {
239            // Handle special cases for UI display
240            "source" => {
241                if full_label.contains("Iter") {
242                    "source_iter".to_string()
243                } else if full_label.contains("Stream") {
244                    "source_stream".to_string()
245                } else if full_label.contains("ExternalNetwork") {
246                    "external_network".to_string()
247                } else if full_label.contains("Spin") {
248                    "spin".to_string()
249                } else {
250                    "source".to_string()
251                }
252            }
253            "network" => {
254                if full_label.contains("deser") {
255                    "network(recv)".to_string()
256                } else if full_label.contains("ser") {
257                    "network(send)".to_string()
258                } else {
259                    "network".to_string()
260                }
261            }
262            // For all other cases, just use the lowercase base name (same as extract_op_name)
263            _ => base_name,
264        }
265    } else {
266        // Fallback for labels that don't follow the pattern
267        if full_label.len() > 20 {
268            format!("{}...", &full_label[..17])
269        } else {
270            full_label.to_string()
271        }
272    }
273}
274
275/// Helper function to extract location ID and type from metadata.
276fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
277    match location_id.root() {
278        LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279        LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280        _ => panic!("unexpected location type"),
281    }
282}
283
284/// Helper function to set up location in structure from metadata.
285fn setup_location(
286    structure: &mut HydroGraphStructure,
287    metadata: &crate::compile::ir::HydroIrMetadata,
288) -> Option<usize> {
289    let (location_id, location_type) = extract_location_id(&metadata.location_kind);
290    if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
291        structure.add_location(loc_id, loc_type);
292    }
293    location_id
294}
295
296impl HydroRoot {
297    /// Core graph writing logic that works with any GraphWrite implementation.
298    pub fn write_graph<W>(
299        &self,
300        mut graph_write: W,
301        config: &HydroWriteConfig,
302    ) -> Result<(), W::Err>
303    where
304        W: HydroGraphWrite,
305    {
306        let mut structure = HydroGraphStructure::new();
307        let mut seen_tees = HashMap::new();
308
309        // Build the graph structure by traversing the IR
310        let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
311
312        // Write the graph
313        graph_write.write_prologue()?;
314
315        // Write node definitions
316        for (&node_id, (label, node_type, location)) in &structure.nodes {
317            let (location_id, location_type) = if let Some(loc_id) = location {
318                (
319                    Some(*loc_id),
320                    structure.locations.get(loc_id).map(|s| s.as_str()),
321                )
322            } else {
323                (None, None)
324            };
325
326            // Check if this is a label that came from an expression-containing operation
327            // We can detect this by looking for the pattern "op_name(...)" and checking if we have the original expressions
328            graph_write.write_node_definition(
329                node_id,
330                label,
331                *node_type,
332                location_id,
333                location_type,
334            )?;
335        }
336
337        // Group nodes by location if requested
338        if config.show_location_groups {
339            let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
340            for (&node_id, (_, _, location)) in &structure.nodes {
341                if let Some(location_id) = location {
342                    nodes_by_location
343                        .entry(*location_id)
344                        .or_default()
345                        .push(node_id);
346                }
347            }
348
349            for (&location_id, node_ids) in &nodes_by_location {
350                if let Some(location_type) = structure.locations.get(&location_id) {
351                    graph_write.write_location_start(location_id, location_type)?;
352                    for &node_id in node_ids {
353                        graph_write.write_node(node_id)?;
354                    }
355                    graph_write.write_location_end()?;
356                }
357            }
358        }
359
360        // Write edges
361        for (src_id, dst_id, edge_type, label) in &structure.edges {
362            graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
363        }
364
365        graph_write.write_epilogue()?;
366        Ok(())
367    }
368
369    /// Build the graph structure by traversing the IR tree.
370    pub fn build_graph_structure(
371        &self,
372        structure: &mut HydroGraphStructure,
373        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
374        config: &HydroWriteConfig,
375    ) -> usize {
376        // Helper function for sink nodes to reduce duplication
377        fn build_sink_node(
378            structure: &mut HydroGraphStructure,
379            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
380            config: &HydroWriteConfig,
381            input: &HydroNode,
382            metadata: Option<&crate::compile::ir::HydroIrMetadata>,
383            label: NodeLabel,
384            edge_type: HydroEdgeType,
385        ) -> usize {
386            let input_id = input.build_graph_structure(structure, seen_tees, config);
387            let location_id = metadata.and_then(|m| setup_location(structure, m));
388            let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
389            structure.add_edge(input_id, sink_id, edge_type, None);
390            sink_id
391        }
392
393        match self {
394            // Sink operations with Stream edges - grouped by edge type
395            HydroRoot::ForEach { f, input, .. } => build_sink_node(
396                structure,
397                seen_tees,
398                config,
399                input,
400                None,
401                NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
402                HydroEdgeType::Stream,
403            ),
404
405            HydroRoot::SendExternal {
406                to_external_id,
407                to_key,
408                input,
409                ..
410            } => build_sink_node(
411                structure,
412                seen_tees,
413                config,
414                input,
415                None,
416                NodeLabel::with_exprs(
417                    format!("send_external({}:{})", to_external_id, to_key),
418                    vec![],
419                ),
420                HydroEdgeType::Stream,
421            ),
422
423            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
424                structure,
425                seen_tees,
426                config,
427                input,
428                None,
429                NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
430                HydroEdgeType::Stream,
431            ),
432
433            // Sink operation with Cycle edge - grouped by edge type
434            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
435                structure,
436                seen_tees,
437                config,
438                input,
439                None,
440                NodeLabel::static_label(format!("cycle_sink({})", ident)),
441                HydroEdgeType::Cycle,
442            ),
443        }
444    }
445}
446
447impl HydroNode {
448    /// Build the graph structure recursively for this node.
449    pub fn build_graph_structure(
450        &self,
451        structure: &mut HydroGraphStructure,
452        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
453        config: &HydroWriteConfig,
454    ) -> usize {
455        use crate::location::dynamic::LocationId;
456
457        // Helper functions to reduce duplication, categorized by input/expression patterns
458
459        /// Common parameters for transform builder functions to reduce argument count
460        struct TransformParams<'a> {
461            structure: &'a mut HydroGraphStructure,
462            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
463            config: &'a HydroWriteConfig,
464            input: &'a HydroNode,
465            metadata: &'a crate::compile::ir::HydroIrMetadata,
466            op_name: String,
467            node_type: HydroNodeType,
468            edge_type: HydroEdgeType,
469        }
470
471        // Single-input transform with no expressions
472        fn build_simple_transform(params: TransformParams) -> usize {
473            let input_id = params.input.build_graph_structure(
474                params.structure,
475                params.seen_tees,
476                params.config,
477            );
478            let location_id = setup_location(params.structure, params.metadata);
479            let node_id = params.structure.add_node(
480                NodeLabel::Static(params.op_name.to_string()),
481                params.node_type,
482                location_id,
483            );
484            params
485                .structure
486                .add_edge(input_id, node_id, params.edge_type, None);
487            node_id
488        }
489
490        // Single-input transform with one expression
491        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
492            let input_id = params.input.build_graph_structure(
493                params.structure,
494                params.seen_tees,
495                params.config,
496            );
497            let location_id = setup_location(params.structure, params.metadata);
498            let node_id = params.structure.add_node(
499                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
500                params.node_type,
501                location_id,
502            );
503            params
504                .structure
505                .add_edge(input_id, node_id, params.edge_type, None);
506            node_id
507        }
508
509        // Single-input transform with two expressions
510        fn build_dual_expr_transform(
511            params: TransformParams,
512            expr1: &DebugExpr,
513            expr2: &DebugExpr,
514        ) -> usize {
515            let input_id = params.input.build_graph_structure(
516                params.structure,
517                params.seen_tees,
518                params.config,
519            );
520            let location_id = setup_location(params.structure, params.metadata);
521            let node_id = params.structure.add_node(
522                NodeLabel::with_exprs(
523                    params.op_name.to_string(),
524                    vec![expr1.clone(), expr2.clone()],
525                ),
526                params.node_type,
527                location_id,
528            );
529            params
530                .structure
531                .add_edge(input_id, node_id, params.edge_type, None);
532            node_id
533        }
534
535        // Helper function for source nodes
536        fn build_source_node(
537            structure: &mut HydroGraphStructure,
538            metadata: &crate::compile::ir::HydroIrMetadata,
539            label: String,
540        ) -> usize {
541            let location_id = setup_location(structure, metadata);
542            structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
543        }
544
545        match self {
546            HydroNode::Placeholder => structure.add_node(
547                NodeLabel::Static("PLACEHOLDER".to_string()),
548                HydroNodeType::Transform,
549                None,
550            ),
551
552            HydroNode::Source {
553                source, metadata, ..
554            } => {
555                let label = match source {
556                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
557                    HydroSource::ExternalNetwork() => "external_network()".to_string(),
558                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
559                    HydroSource::Spin() => "spin()".to_string(),
560                };
561                build_source_node(structure, metadata, label)
562            }
563
564            HydroNode::ExternalInput {
565                from_external_id,
566                from_key,
567                metadata,
568                ..
569            } => build_source_node(
570                structure,
571                metadata,
572                format!("external_input({}:{})", from_external_id, from_key),
573            ),
574
575            HydroNode::CycleSource {
576                ident, metadata, ..
577            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
578
579            HydroNode::Tee { inner, metadata } => {
580                let ptr = inner.as_ptr();
581                if let Some(&existing_id) = seen_tees.get(&ptr) {
582                    return existing_id;
583                }
584
585                let input_id = inner
586                    .0
587                    .borrow()
588                    .build_graph_structure(structure, seen_tees, config);
589                let location_id = setup_location(structure, metadata);
590
591                let tee_id = structure.add_node(
592                    NodeLabel::Static(extract_op_name(self.print_root())),
593                    HydroNodeType::Tee,
594                    location_id,
595                );
596
597                seen_tees.insert(ptr, tee_id);
598
599                structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
600
601                tee_id
602            }
603
604            // Transform operations with Stream edges - grouped by node/edge type
605            HydroNode::DeferTick {
606                input: inner,
607                metadata,
608            }
609            | HydroNode::Enumerate {
610                input: inner,
611                metadata,
612                ..
613            }
614            | HydroNode::Unique {
615                input: inner,
616                metadata,
617            }
618            | HydroNode::ResolveFutures {
619                input: inner,
620                metadata,
621            }
622            | HydroNode::ResolveFuturesOrdered {
623                input: inner,
624                metadata,
625            } => build_simple_transform(TransformParams {
626                structure,
627                seen_tees,
628                config,
629                input: inner,
630                metadata,
631                op_name: extract_op_name(self.print_root()),
632                node_type: HydroNodeType::Transform,
633                edge_type: HydroEdgeType::Stream,
634            }),
635
636            // Transform operation with Persistent edge - grouped by node/edge type
637            HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
638                structure,
639                seen_tees,
640                config,
641                input: inner,
642                metadata,
643                op_name: extract_op_name(self.print_root()),
644                node_type: HydroNodeType::Transform,
645                edge_type: HydroEdgeType::Persistent,
646            }),
647
648            // Aggregation operation with Stream edge - grouped by node/edge type
649            HydroNode::Sort {
650                input: inner,
651                metadata,
652            } => build_simple_transform(TransformParams {
653                structure,
654                seen_tees,
655                config,
656                input: inner,
657                metadata,
658                op_name: extract_op_name(self.print_root()),
659                node_type: HydroNodeType::Aggregation,
660                edge_type: HydroEdgeType::Stream,
661            }),
662
663            // Single-expression Transform operations - grouped by node type
664            HydroNode::Map { f, input, metadata }
665            | HydroNode::Filter { f, input, metadata }
666            | HydroNode::FlatMap { f, input, metadata }
667            | HydroNode::FilterMap { f, input, metadata }
668            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
669                TransformParams {
670                    structure,
671                    seen_tees,
672                    config,
673                    input,
674                    metadata,
675                    op_name: extract_op_name(self.print_root()),
676                    node_type: HydroNodeType::Transform,
677                    edge_type: HydroEdgeType::Stream,
678                },
679                f,
680            ),
681
682            // Single-expression Aggregation operations - grouped by node type
683            HydroNode::Reduce { f, input, metadata }
684            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
685                TransformParams {
686                    structure,
687                    seen_tees,
688                    config,
689                    input,
690                    metadata,
691                    op_name: extract_op_name(self.print_root()),
692                    node_type: HydroNodeType::Aggregation,
693                    edge_type: HydroEdgeType::Stream,
694                },
695                f,
696            ),
697
698            // Join-like operations with left/right edge labels - grouped by edge labeling
699            HydroNode::Join {
700                left,
701                right,
702                metadata,
703            }
704            | HydroNode::CrossProduct {
705                left,
706                right,
707                metadata,
708            }
709            | HydroNode::CrossSingleton {
710                left,
711                right,
712                metadata,
713            } => {
714                let left_id = left.build_graph_structure(structure, seen_tees, config);
715                let right_id = right.build_graph_structure(structure, seen_tees, config);
716                let location_id = setup_location(structure, metadata);
717                let node_id = structure.add_node(
718                    NodeLabel::Static(extract_op_name(self.print_root())),
719                    HydroNodeType::Join,
720                    location_id,
721                );
722                structure.add_edge(
723                    left_id,
724                    node_id,
725                    HydroEdgeType::Stream,
726                    Some("left".to_string()),
727                );
728                structure.add_edge(
729                    right_id,
730                    node_id,
731                    HydroEdgeType::Stream,
732                    Some("right".to_string()),
733                );
734                node_id
735            }
736
737            // Join-like operations with pos/neg edge labels - grouped by edge labeling
738            HydroNode::Difference {
739                pos: left,
740                neg: right,
741                metadata,
742            }
743            | HydroNode::AntiJoin {
744                pos: left,
745                neg: right,
746                metadata,
747            } => {
748                let left_id = left.build_graph_structure(structure, seen_tees, config);
749                let right_id = right.build_graph_structure(structure, seen_tees, config);
750                let location_id = setup_location(structure, metadata);
751                let node_id = structure.add_node(
752                    NodeLabel::Static(extract_op_name(self.print_root())),
753                    HydroNodeType::Join,
754                    location_id,
755                );
756                structure.add_edge(
757                    left_id,
758                    node_id,
759                    HydroEdgeType::Stream,
760                    Some("pos".to_string()),
761                );
762                structure.add_edge(
763                    right_id,
764                    node_id,
765                    HydroEdgeType::Stream,
766                    Some("neg".to_string()),
767                );
768                node_id
769            }
770
771            // Dual expression transforms - consolidated using pattern matching
772            HydroNode::Fold {
773                init,
774                acc,
775                input,
776                metadata,
777            }
778            | HydroNode::FoldKeyed {
779                init,
780                acc,
781                input,
782                metadata,
783            }
784            | HydroNode::Scan {
785                init,
786                acc,
787                input,
788                metadata,
789            } => {
790                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
791
792                build_dual_expr_transform(
793                    TransformParams {
794                        structure,
795                        seen_tees,
796                        config,
797                        input,
798                        metadata,
799                        op_name: extract_op_name(self.print_root()),
800                        node_type,
801                        edge_type: HydroEdgeType::Stream,
802                    },
803                    init,
804                    acc,
805                )
806            }
807
808            // Combination of join and transform
809            HydroNode::ReduceKeyedWatermark {
810                f,
811                input,
812                watermark,
813                metadata,
814            } => {
815                let input_id = input.build_graph_structure(structure, seen_tees, config);
816                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
817                let location_id = setup_location(structure, metadata);
818                let join_node_id = structure.add_node(
819                    NodeLabel::Static(extract_op_name(self.print_root())),
820                    HydroNodeType::Join,
821                    location_id,
822                );
823                structure.add_edge(
824                    input_id,
825                    join_node_id,
826                    HydroEdgeType::Stream,
827                    Some("input".to_string()),
828                );
829                structure.add_edge(
830                    watermark_id,
831                    join_node_id,
832                    HydroEdgeType::Stream,
833                    Some("watermark".to_string()),
834                );
835
836                let node_id = structure.add_node(
837                    NodeLabel::with_exprs(
838                        extract_op_name(self.print_root()).to_string(),
839                        vec![f.clone()],
840                    ),
841                    HydroNodeType::Aggregation,
842                    location_id,
843                );
844                structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
845                node_id
846            }
847
848            HydroNode::Network {
849                serialize_fn,
850                deserialize_fn,
851                input,
852                metadata,
853                ..
854            } => {
855                let input_id = input.build_graph_structure(structure, seen_tees, config);
856                let _from_location_id = setup_location(structure, metadata);
857
858                let to_location_id = match metadata.location_kind.root() {
859                    LocationId::Process(id) => {
860                        structure.add_location(*id, "Process".to_string());
861                        Some(*id)
862                    }
863                    LocationId::Cluster(id) => {
864                        structure.add_location(*id, "Cluster".to_string());
865                        Some(*id)
866                    }
867                    _ => None,
868                };
869
870                let mut label = "network(".to_string();
871                if serialize_fn.is_some() {
872                    label.push_str("ser");
873                }
874                if deserialize_fn.is_some() {
875                    if serialize_fn.is_some() {
876                        label.push_str(" + ");
877                    }
878                    label.push_str("deser");
879                }
880                label.push(')');
881
882                let network_id = structure.add_node(
883                    NodeLabel::Static(label),
884                    HydroNodeType::Network,
885                    to_location_id,
886                );
887                structure.add_edge(
888                    input_id,
889                    network_id,
890                    HydroEdgeType::Network,
891                    Some(format!("to {:?}", to_location_id)),
892                );
893                network_id
894            }
895
896            // Handle remaining node types
897            HydroNode::Batch { inner, .. } => {
898                // Unpersist is typically optimized away, just pass through
899                inner.build_graph_structure(structure, seen_tees, config)
900            }
901
902            HydroNode::YieldConcat { inner, .. } => {
903                // Unpersist is typically optimized away, just pass through
904                inner.build_graph_structure(structure, seen_tees, config)
905            }
906
907            HydroNode::BeginAtomic { inner, .. } => {
908                inner.build_graph_structure(structure, seen_tees, config)
909            }
910
911            HydroNode::EndAtomic { inner, .. } => {
912                inner.build_graph_structure(structure, seen_tees, config)
913            }
914
915            HydroNode::Chain {
916                first,
917                second,
918                metadata,
919            } => {
920                let first_id = first.build_graph_structure(structure, seen_tees, config);
921                let second_id = second.build_graph_structure(structure, seen_tees, config);
922                let location_id = setup_location(structure, metadata);
923                let chain_id = structure.add_node(
924                    NodeLabel::Static(extract_op_name(self.print_root())),
925                    HydroNodeType::Transform,
926                    location_id,
927                );
928                structure.add_edge(
929                    first_id,
930                    chain_id,
931                    HydroEdgeType::Stream,
932                    Some("first".to_string()),
933                );
934                structure.add_edge(
935                    second_id,
936                    chain_id,
937                    HydroEdgeType::Stream,
938                    Some("second".to_string()),
939                );
940                chain_id
941            }
942
943            HydroNode::ChainFirst {
944                first,
945                second,
946                metadata,
947            } => {
948                let first_id = first.build_graph_structure(structure, seen_tees, config);
949                let second_id = second.build_graph_structure(structure, seen_tees, config);
950                let location_id = setup_location(structure, metadata);
951                let chain_id = structure.add_node(
952                    NodeLabel::Static(extract_op_name(self.print_root())),
953                    HydroNodeType::Transform,
954                    location_id,
955                );
956                structure.add_edge(
957                    first_id,
958                    chain_id,
959                    HydroEdgeType::Stream,
960                    Some("first".to_string()),
961                );
962                structure.add_edge(
963                    second_id,
964                    chain_id,
965                    HydroEdgeType::Stream,
966                    Some("second".to_string()),
967                );
968                chain_id
969            }
970
971            HydroNode::Counter {
972                tag: _,
973                prefix: _,
974                duration,
975                input,
976                metadata,
977            } => build_single_expr_transform(
978                TransformParams {
979                    structure,
980                    seen_tees,
981                    config,
982                    input,
983                    metadata,
984                    op_name: extract_op_name(self.print_root()),
985                    node_type: HydroNodeType::Transform,
986                    edge_type: HydroEdgeType::Stream,
987                },
988                duration,
989            ),
990        }
991    }
992}
993
994/// Utility functions for rendering multiple roots as a single graph.
995/// Macro to reduce duplication in render functions.
996macro_rules! render_hydro_ir {
997    ($name:ident, $write_fn:ident) => {
998        pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
999            let mut output = String::new();
1000            $write_fn(&mut output, roots, config).unwrap();
1001            output
1002        }
1003    };
1004}
1005
1006/// Macro to reduce duplication in write functions.
1007macro_rules! write_hydro_ir {
1008    ($name:ident, $writer_type:ty, $constructor:expr) => {
1009        pub fn $name(
1010            output: impl std::fmt::Write,
1011            roots: &[HydroRoot],
1012            config: &HydroWriteConfig,
1013        ) -> std::fmt::Result {
1014            let mut graph_write: $writer_type = $constructor(output, config);
1015            write_hydro_ir_graph(&mut graph_write, roots, config)
1016        }
1017    };
1018}
1019
1020render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1021write_hydro_ir!(
1022    write_hydro_ir_mermaid,
1023    HydroMermaid<_>,
1024    HydroMermaid::new_with_config
1025);
1026
1027render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1028write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1029
1030render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1031write_hydro_ir!(
1032    write_hydro_ir_reactflow,
1033    HydroReactFlow<_>,
1034    HydroReactFlow::new
1035);
1036
1037fn write_hydro_ir_graph<W>(
1038    mut graph_write: W,
1039    roots: &[HydroRoot],
1040    config: &HydroWriteConfig,
1041) -> Result<(), W::Err>
1042where
1043    W: HydroGraphWrite,
1044{
1045    let mut structure = HydroGraphStructure::new();
1046    let mut seen_tees = HashMap::new();
1047
1048    // Build the graph structure for all roots
1049    for leaf in roots {
1050        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1051    }
1052
1053    // Write the graph using the same logic as individual roots
1054    graph_write.write_prologue()?;
1055
1056    for (&node_id, (label, node_type, location)) in &structure.nodes {
1057        let (location_id, location_type) = if let Some(loc_id) = location {
1058            (
1059                Some(*loc_id),
1060                structure.locations.get(loc_id).map(|s| s.as_str()),
1061            )
1062        } else {
1063            (None, None)
1064        };
1065        graph_write.write_node_definition(
1066            node_id,
1067            label,
1068            *node_type,
1069            location_id,
1070            location_type,
1071        )?;
1072    }
1073
1074    if config.show_location_groups {
1075        let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1076        for (&node_id, (_, _, location)) in &structure.nodes {
1077            if let Some(location_id) = location {
1078                nodes_by_location
1079                    .entry(*location_id)
1080                    .or_default()
1081                    .push(node_id);
1082            }
1083        }
1084
1085        for (&location_id, node_ids) in &nodes_by_location {
1086            if let Some(location_type) = structure.locations.get(&location_id) {
1087                graph_write.write_location_start(location_id, location_type)?;
1088                for &node_id in node_ids {
1089                    graph_write.write_node(node_id)?;
1090                }
1091                graph_write.write_location_end()?;
1092            }
1093        }
1094    }
1095
1096    for (src_id, dst_id, edge_type, label) in &structure.edges {
1097        graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1098    }
1099
1100    graph_write.write_epilogue()?;
1101    Ok(())
1102}