1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::compile::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12use crate::location::dynamic::LocationId;
13
14#[derive(Debug, Clone)]
16pub enum NodeLabel {
17 Static(String),
19 WithExprs {
21 op_name: String,
22 exprs: Vec<DebugExpr>,
23 },
24}
25
26impl NodeLabel {
27 pub fn static_label(s: String) -> Self {
29 Self::Static(s)
30 }
31
32 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
34 Self::WithExprs { op_name, exprs }
35 }
36}
37
38impl std::fmt::Display for NodeLabel {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 Self::Static(s) => write!(f, "{}", s),
42 Self::WithExprs { op_name, exprs } => {
43 if exprs.is_empty() {
44 write!(f, "{}()", op_name)
45 } else {
46 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
47 write!(f, "{}({})", op_name, expr_strs.join(", "))
48 }
49 }
50 }
51 }
52}
53
54pub struct IndentedGraphWriter<W> {
57 pub write: W,
58 pub indent: usize,
59 pub config: HydroWriteConfig,
60}
61
62impl<W> IndentedGraphWriter<W> {
63 pub fn new(write: W) -> Self {
65 Self {
66 write,
67 indent: 0,
68 config: HydroWriteConfig::default(),
69 }
70 }
71
72 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
74 Self {
75 write,
76 indent: 0,
77 config: config.clone(),
78 }
79 }
80}
81
82impl<W: Write> IndentedGraphWriter<W> {
83 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
85 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
86 }
87}
88
89pub type GraphWriteError = std::fmt::Error;
91
92#[auto_impl(&mut, Box)]
94pub trait HydroGraphWrite {
95 type Err: Error;
97
98 fn write_prologue(&mut self) -> Result<(), Self::Err>;
100
101 fn write_node_definition(
103 &mut self,
104 node_id: usize,
105 node_label: &NodeLabel,
106 node_type: HydroNodeType,
107 location_id: Option<usize>,
108 location_type: Option<&str>,
109 ) -> Result<(), Self::Err>;
110
111 fn write_edge(
113 &mut self,
114 src_id: usize,
115 dst_id: usize,
116 edge_type: HydroEdgeType,
117 label: Option<&str>,
118 ) -> Result<(), Self::Err>;
119
120 fn write_location_start(
122 &mut self,
123 location_id: usize,
124 location_type: &str,
125 ) -> Result<(), Self::Err>;
126
127 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
129
130 fn write_location_end(&mut self) -> Result<(), Self::Err>;
132
133 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
135}
136
137#[derive(Debug, Clone, Copy)]
139pub enum HydroNodeType {
140 Source,
141 Transform,
142 Join,
143 Aggregation,
144 Network,
145 Sink,
146 Tee,
147}
148
149#[derive(Debug, Clone, Copy)]
151pub enum HydroEdgeType {
152 Stream,
153 Persistent,
154 Network,
155 Cycle,
156}
157
158#[derive(Debug, Clone)]
160pub struct HydroWriteConfig {
161 pub show_metadata: bool,
162 pub show_location_groups: bool,
163 pub use_short_labels: bool,
164 pub process_id_name: Vec<(usize, String)>,
165 pub cluster_id_name: Vec<(usize, String)>,
166 pub external_id_name: Vec<(usize, String)>,
167}
168
169impl Default for HydroWriteConfig {
170 fn default() -> Self {
171 Self {
172 show_metadata: false,
173 show_location_groups: true,
174 use_short_labels: true, process_id_name: vec![],
176 cluster_id_name: vec![],
177 external_id_name: vec![],
178 }
179 }
180}
181
182#[derive(Debug, Default)]
184pub struct HydroGraphStructure {
185 pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, pub locations: HashMap<usize, String>, pub next_node_id: usize,
189}
190
191impl HydroGraphStructure {
192 pub fn new() -> Self {
193 Self::default()
194 }
195
196 pub fn add_node(
197 &mut self,
198 label: NodeLabel,
199 node_type: HydroNodeType,
200 location: Option<usize>,
201 ) -> usize {
202 let node_id = self.next_node_id;
203 self.next_node_id += 1;
204 self.nodes.insert(node_id, (label, node_type, location));
205 node_id
206 }
207
208 pub fn add_edge(
209 &mut self,
210 src: usize,
211 dst: usize,
212 edge_type: HydroEdgeType,
213 label: Option<String>,
214 ) {
215 self.edges.push((src, dst, edge_type, label));
216 }
217
218 pub fn add_location(&mut self, location_id: usize, location_type: String) {
219 self.locations.insert(location_id, location_type);
220 }
221}
222
223pub fn extract_op_name(full_label: String) -> String {
225 full_label
226 .split('(')
227 .next()
228 .unwrap_or("unknown")
229 .to_string()
230 .to_lowercase()
231}
232
233pub fn extract_short_label(full_label: &str) -> String {
235 if let Some(op_name) = full_label.split('(').next() {
237 let base_name = op_name.to_lowercase();
238 match base_name.as_str() {
239 "source" => {
241 if full_label.contains("Iter") {
242 "source_iter".to_string()
243 } else if full_label.contains("Stream") {
244 "source_stream".to_string()
245 } else if full_label.contains("ExternalNetwork") {
246 "external_network".to_string()
247 } else if full_label.contains("Spin") {
248 "spin".to_string()
249 } else {
250 "source".to_string()
251 }
252 }
253 "network" => {
254 if full_label.contains("deser") {
255 "network(recv)".to_string()
256 } else if full_label.contains("ser") {
257 "network(send)".to_string()
258 } else {
259 "network".to_string()
260 }
261 }
262 _ => base_name,
264 }
265 } else {
266 if full_label.len() > 20 {
268 format!("{}...", &full_label[..17])
269 } else {
270 full_label.to_string()
271 }
272 }
273}
274
275fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
277 match location_id.root() {
278 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280 _ => panic!("unexpected location type"),
281 }
282}
283
284fn setup_location(
286 structure: &mut HydroGraphStructure,
287 metadata: &crate::compile::ir::HydroIrMetadata,
288) -> Option<usize> {
289 let (location_id, location_type) = extract_location_id(&metadata.location_kind);
290 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
291 structure.add_location(loc_id, loc_type);
292 }
293 location_id
294}
295
296impl HydroRoot {
297 pub fn write_graph<W>(
299 &self,
300 mut graph_write: W,
301 config: &HydroWriteConfig,
302 ) -> Result<(), W::Err>
303 where
304 W: HydroGraphWrite,
305 {
306 let mut structure = HydroGraphStructure::new();
307 let mut seen_tees = HashMap::new();
308
309 let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
311
312 graph_write.write_prologue()?;
314
315 for (&node_id, (label, node_type, location)) in &structure.nodes {
317 let (location_id, location_type) = if let Some(loc_id) = location {
318 (
319 Some(*loc_id),
320 structure.locations.get(loc_id).map(|s| s.as_str()),
321 )
322 } else {
323 (None, None)
324 };
325
326 graph_write.write_node_definition(
329 node_id,
330 label,
331 *node_type,
332 location_id,
333 location_type,
334 )?;
335 }
336
337 if config.show_location_groups {
339 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
340 for (&node_id, (_, _, location)) in &structure.nodes {
341 if let Some(location_id) = location {
342 nodes_by_location
343 .entry(*location_id)
344 .or_default()
345 .push(node_id);
346 }
347 }
348
349 for (&location_id, node_ids) in &nodes_by_location {
350 if let Some(location_type) = structure.locations.get(&location_id) {
351 graph_write.write_location_start(location_id, location_type)?;
352 for &node_id in node_ids {
353 graph_write.write_node(node_id)?;
354 }
355 graph_write.write_location_end()?;
356 }
357 }
358 }
359
360 for (src_id, dst_id, edge_type, label) in &structure.edges {
362 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
363 }
364
365 graph_write.write_epilogue()?;
366 Ok(())
367 }
368
369 pub fn build_graph_structure(
371 &self,
372 structure: &mut HydroGraphStructure,
373 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
374 config: &HydroWriteConfig,
375 ) -> usize {
376 fn build_sink_node(
378 structure: &mut HydroGraphStructure,
379 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
380 config: &HydroWriteConfig,
381 input: &HydroNode,
382 metadata: Option<&crate::compile::ir::HydroIrMetadata>,
383 label: NodeLabel,
384 edge_type: HydroEdgeType,
385 ) -> usize {
386 let input_id = input.build_graph_structure(structure, seen_tees, config);
387 let location_id = metadata.and_then(|m| setup_location(structure, m));
388 let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
389 structure.add_edge(input_id, sink_id, edge_type, None);
390 sink_id
391 }
392
393 match self {
394 HydroRoot::ForEach { f, input, .. } => build_sink_node(
396 structure,
397 seen_tees,
398 config,
399 input,
400 None,
401 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
402 HydroEdgeType::Stream,
403 ),
404
405 HydroRoot::SendExternal {
406 to_external_id,
407 to_key,
408 input,
409 ..
410 } => build_sink_node(
411 structure,
412 seen_tees,
413 config,
414 input,
415 None,
416 NodeLabel::with_exprs(
417 format!("send_external({}:{})", to_external_id, to_key),
418 vec![],
419 ),
420 HydroEdgeType::Stream,
421 ),
422
423 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
424 structure,
425 seen_tees,
426 config,
427 input,
428 None,
429 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
430 HydroEdgeType::Stream,
431 ),
432
433 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
435 structure,
436 seen_tees,
437 config,
438 input,
439 None,
440 NodeLabel::static_label(format!("cycle_sink({})", ident)),
441 HydroEdgeType::Cycle,
442 ),
443 }
444 }
445}
446
447impl HydroNode {
448 pub fn build_graph_structure(
450 &self,
451 structure: &mut HydroGraphStructure,
452 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
453 config: &HydroWriteConfig,
454 ) -> usize {
455 use crate::location::dynamic::LocationId;
456
457 struct TransformParams<'a> {
461 structure: &'a mut HydroGraphStructure,
462 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
463 config: &'a HydroWriteConfig,
464 input: &'a HydroNode,
465 metadata: &'a crate::compile::ir::HydroIrMetadata,
466 op_name: String,
467 node_type: HydroNodeType,
468 edge_type: HydroEdgeType,
469 }
470
471 fn build_simple_transform(params: TransformParams) -> usize {
473 let input_id = params.input.build_graph_structure(
474 params.structure,
475 params.seen_tees,
476 params.config,
477 );
478 let location_id = setup_location(params.structure, params.metadata);
479 let node_id = params.structure.add_node(
480 NodeLabel::Static(params.op_name.to_string()),
481 params.node_type,
482 location_id,
483 );
484 params
485 .structure
486 .add_edge(input_id, node_id, params.edge_type, None);
487 node_id
488 }
489
490 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
492 let input_id = params.input.build_graph_structure(
493 params.structure,
494 params.seen_tees,
495 params.config,
496 );
497 let location_id = setup_location(params.structure, params.metadata);
498 let node_id = params.structure.add_node(
499 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
500 params.node_type,
501 location_id,
502 );
503 params
504 .structure
505 .add_edge(input_id, node_id, params.edge_type, None);
506 node_id
507 }
508
509 fn build_dual_expr_transform(
511 params: TransformParams,
512 expr1: &DebugExpr,
513 expr2: &DebugExpr,
514 ) -> usize {
515 let input_id = params.input.build_graph_structure(
516 params.structure,
517 params.seen_tees,
518 params.config,
519 );
520 let location_id = setup_location(params.structure, params.metadata);
521 let node_id = params.structure.add_node(
522 NodeLabel::with_exprs(
523 params.op_name.to_string(),
524 vec![expr1.clone(), expr2.clone()],
525 ),
526 params.node_type,
527 location_id,
528 );
529 params
530 .structure
531 .add_edge(input_id, node_id, params.edge_type, None);
532 node_id
533 }
534
535 fn build_source_node(
537 structure: &mut HydroGraphStructure,
538 metadata: &crate::compile::ir::HydroIrMetadata,
539 label: String,
540 ) -> usize {
541 let location_id = setup_location(structure, metadata);
542 structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
543 }
544
545 match self {
546 HydroNode::Placeholder => structure.add_node(
547 NodeLabel::Static("PLACEHOLDER".to_string()),
548 HydroNodeType::Transform,
549 None,
550 ),
551
552 HydroNode::Source {
553 source, metadata, ..
554 } => {
555 let label = match source {
556 HydroSource::Stream(expr) => format!("source_stream({})", expr),
557 HydroSource::ExternalNetwork() => "external_network()".to_string(),
558 HydroSource::Iter(expr) => format!("source_iter({})", expr),
559 HydroSource::Spin() => "spin()".to_string(),
560 };
561 build_source_node(structure, metadata, label)
562 }
563
564 HydroNode::ExternalInput {
565 from_external_id,
566 from_key,
567 metadata,
568 ..
569 } => build_source_node(
570 structure,
571 metadata,
572 format!("external_input({}:{})", from_external_id, from_key),
573 ),
574
575 HydroNode::CycleSource {
576 ident, metadata, ..
577 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
578
579 HydroNode::Tee { inner, metadata } => {
580 let ptr = inner.as_ptr();
581 if let Some(&existing_id) = seen_tees.get(&ptr) {
582 return existing_id;
583 }
584
585 let input_id = inner
586 .0
587 .borrow()
588 .build_graph_structure(structure, seen_tees, config);
589 let location_id = setup_location(structure, metadata);
590
591 let tee_id = structure.add_node(
592 NodeLabel::Static(extract_op_name(self.print_root())),
593 HydroNodeType::Tee,
594 location_id,
595 );
596
597 seen_tees.insert(ptr, tee_id);
598
599 structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
600
601 tee_id
602 }
603
604 HydroNode::DeferTick {
606 input: inner,
607 metadata,
608 }
609 | HydroNode::Enumerate {
610 input: inner,
611 metadata,
612 ..
613 }
614 | HydroNode::Unique {
615 input: inner,
616 metadata,
617 }
618 | HydroNode::ResolveFutures {
619 input: inner,
620 metadata,
621 }
622 | HydroNode::ResolveFuturesOrdered {
623 input: inner,
624 metadata,
625 } => build_simple_transform(TransformParams {
626 structure,
627 seen_tees,
628 config,
629 input: inner,
630 metadata,
631 op_name: extract_op_name(self.print_root()),
632 node_type: HydroNodeType::Transform,
633 edge_type: HydroEdgeType::Stream,
634 }),
635
636 HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
638 structure,
639 seen_tees,
640 config,
641 input: inner,
642 metadata,
643 op_name: extract_op_name(self.print_root()),
644 node_type: HydroNodeType::Transform,
645 edge_type: HydroEdgeType::Persistent,
646 }),
647
648 HydroNode::Sort {
650 input: inner,
651 metadata,
652 } => build_simple_transform(TransformParams {
653 structure,
654 seen_tees,
655 config,
656 input: inner,
657 metadata,
658 op_name: extract_op_name(self.print_root()),
659 node_type: HydroNodeType::Aggregation,
660 edge_type: HydroEdgeType::Stream,
661 }),
662
663 HydroNode::Map { f, input, metadata }
665 | HydroNode::Filter { f, input, metadata }
666 | HydroNode::FlatMap { f, input, metadata }
667 | HydroNode::FilterMap { f, input, metadata }
668 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
669 TransformParams {
670 structure,
671 seen_tees,
672 config,
673 input,
674 metadata,
675 op_name: extract_op_name(self.print_root()),
676 node_type: HydroNodeType::Transform,
677 edge_type: HydroEdgeType::Stream,
678 },
679 f,
680 ),
681
682 HydroNode::Reduce { f, input, metadata }
684 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
685 TransformParams {
686 structure,
687 seen_tees,
688 config,
689 input,
690 metadata,
691 op_name: extract_op_name(self.print_root()),
692 node_type: HydroNodeType::Aggregation,
693 edge_type: HydroEdgeType::Stream,
694 },
695 f,
696 ),
697
698 HydroNode::Join {
700 left,
701 right,
702 metadata,
703 }
704 | HydroNode::CrossProduct {
705 left,
706 right,
707 metadata,
708 }
709 | HydroNode::CrossSingleton {
710 left,
711 right,
712 metadata,
713 } => {
714 let left_id = left.build_graph_structure(structure, seen_tees, config);
715 let right_id = right.build_graph_structure(structure, seen_tees, config);
716 let location_id = setup_location(structure, metadata);
717 let node_id = structure.add_node(
718 NodeLabel::Static(extract_op_name(self.print_root())),
719 HydroNodeType::Join,
720 location_id,
721 );
722 structure.add_edge(
723 left_id,
724 node_id,
725 HydroEdgeType::Stream,
726 Some("left".to_string()),
727 );
728 structure.add_edge(
729 right_id,
730 node_id,
731 HydroEdgeType::Stream,
732 Some("right".to_string()),
733 );
734 node_id
735 }
736
737 HydroNode::Difference {
739 pos: left,
740 neg: right,
741 metadata,
742 }
743 | HydroNode::AntiJoin {
744 pos: left,
745 neg: right,
746 metadata,
747 } => {
748 let left_id = left.build_graph_structure(structure, seen_tees, config);
749 let right_id = right.build_graph_structure(structure, seen_tees, config);
750 let location_id = setup_location(structure, metadata);
751 let node_id = structure.add_node(
752 NodeLabel::Static(extract_op_name(self.print_root())),
753 HydroNodeType::Join,
754 location_id,
755 );
756 structure.add_edge(
757 left_id,
758 node_id,
759 HydroEdgeType::Stream,
760 Some("pos".to_string()),
761 );
762 structure.add_edge(
763 right_id,
764 node_id,
765 HydroEdgeType::Stream,
766 Some("neg".to_string()),
767 );
768 node_id
769 }
770
771 HydroNode::Fold {
773 init,
774 acc,
775 input,
776 metadata,
777 }
778 | HydroNode::FoldKeyed {
779 init,
780 acc,
781 input,
782 metadata,
783 }
784 | HydroNode::Scan {
785 init,
786 acc,
787 input,
788 metadata,
789 } => {
790 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
793 TransformParams {
794 structure,
795 seen_tees,
796 config,
797 input,
798 metadata,
799 op_name: extract_op_name(self.print_root()),
800 node_type,
801 edge_type: HydroEdgeType::Stream,
802 },
803 init,
804 acc,
805 )
806 }
807
808 HydroNode::ReduceKeyedWatermark {
810 f,
811 input,
812 watermark,
813 metadata,
814 } => {
815 let input_id = input.build_graph_structure(structure, seen_tees, config);
816 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
817 let location_id = setup_location(structure, metadata);
818 let join_node_id = structure.add_node(
819 NodeLabel::Static(extract_op_name(self.print_root())),
820 HydroNodeType::Join,
821 location_id,
822 );
823 structure.add_edge(
824 input_id,
825 join_node_id,
826 HydroEdgeType::Stream,
827 Some("input".to_string()),
828 );
829 structure.add_edge(
830 watermark_id,
831 join_node_id,
832 HydroEdgeType::Stream,
833 Some("watermark".to_string()),
834 );
835
836 let node_id = structure.add_node(
837 NodeLabel::with_exprs(
838 extract_op_name(self.print_root()).to_string(),
839 vec![f.clone()],
840 ),
841 HydroNodeType::Aggregation,
842 location_id,
843 );
844 structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
845 node_id
846 }
847
848 HydroNode::Network {
849 serialize_fn,
850 deserialize_fn,
851 input,
852 metadata,
853 ..
854 } => {
855 let input_id = input.build_graph_structure(structure, seen_tees, config);
856 let _from_location_id = setup_location(structure, metadata);
857
858 let to_location_id = match metadata.location_kind.root() {
859 LocationId::Process(id) => {
860 structure.add_location(*id, "Process".to_string());
861 Some(*id)
862 }
863 LocationId::Cluster(id) => {
864 structure.add_location(*id, "Cluster".to_string());
865 Some(*id)
866 }
867 _ => None,
868 };
869
870 let mut label = "network(".to_string();
871 if serialize_fn.is_some() {
872 label.push_str("ser");
873 }
874 if deserialize_fn.is_some() {
875 if serialize_fn.is_some() {
876 label.push_str(" + ");
877 }
878 label.push_str("deser");
879 }
880 label.push(')');
881
882 let network_id = structure.add_node(
883 NodeLabel::Static(label),
884 HydroNodeType::Network,
885 to_location_id,
886 );
887 structure.add_edge(
888 input_id,
889 network_id,
890 HydroEdgeType::Network,
891 Some(format!("to {:?}", to_location_id)),
892 );
893 network_id
894 }
895
896 HydroNode::Batch { inner, .. } => {
898 inner.build_graph_structure(structure, seen_tees, config)
900 }
901
902 HydroNode::YieldConcat { inner, .. } => {
903 inner.build_graph_structure(structure, seen_tees, config)
905 }
906
907 HydroNode::BeginAtomic { inner, .. } => {
908 inner.build_graph_structure(structure, seen_tees, config)
909 }
910
911 HydroNode::EndAtomic { inner, .. } => {
912 inner.build_graph_structure(structure, seen_tees, config)
913 }
914
915 HydroNode::Chain {
916 first,
917 second,
918 metadata,
919 } => {
920 let first_id = first.build_graph_structure(structure, seen_tees, config);
921 let second_id = second.build_graph_structure(structure, seen_tees, config);
922 let location_id = setup_location(structure, metadata);
923 let chain_id = structure.add_node(
924 NodeLabel::Static(extract_op_name(self.print_root())),
925 HydroNodeType::Transform,
926 location_id,
927 );
928 structure.add_edge(
929 first_id,
930 chain_id,
931 HydroEdgeType::Stream,
932 Some("first".to_string()),
933 );
934 structure.add_edge(
935 second_id,
936 chain_id,
937 HydroEdgeType::Stream,
938 Some("second".to_string()),
939 );
940 chain_id
941 }
942
943 HydroNode::ChainFirst {
944 first,
945 second,
946 metadata,
947 } => {
948 let first_id = first.build_graph_structure(structure, seen_tees, config);
949 let second_id = second.build_graph_structure(structure, seen_tees, config);
950 let location_id = setup_location(structure, metadata);
951 let chain_id = structure.add_node(
952 NodeLabel::Static(extract_op_name(self.print_root())),
953 HydroNodeType::Transform,
954 location_id,
955 );
956 structure.add_edge(
957 first_id,
958 chain_id,
959 HydroEdgeType::Stream,
960 Some("first".to_string()),
961 );
962 structure.add_edge(
963 second_id,
964 chain_id,
965 HydroEdgeType::Stream,
966 Some("second".to_string()),
967 );
968 chain_id
969 }
970
971 HydroNode::Counter {
972 tag: _,
973 prefix: _,
974 duration,
975 input,
976 metadata,
977 } => build_single_expr_transform(
978 TransformParams {
979 structure,
980 seen_tees,
981 config,
982 input,
983 metadata,
984 op_name: extract_op_name(self.print_root()),
985 node_type: HydroNodeType::Transform,
986 edge_type: HydroEdgeType::Stream,
987 },
988 duration,
989 ),
990 }
991 }
992}
993
994macro_rules! render_hydro_ir {
997 ($name:ident, $write_fn:ident) => {
998 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
999 let mut output = String::new();
1000 $write_fn(&mut output, roots, config).unwrap();
1001 output
1002 }
1003 };
1004}
1005
1006macro_rules! write_hydro_ir {
1008 ($name:ident, $writer_type:ty, $constructor:expr) => {
1009 pub fn $name(
1010 output: impl std::fmt::Write,
1011 roots: &[HydroRoot],
1012 config: &HydroWriteConfig,
1013 ) -> std::fmt::Result {
1014 let mut graph_write: $writer_type = $constructor(output, config);
1015 write_hydro_ir_graph(&mut graph_write, roots, config)
1016 }
1017 };
1018}
1019
1020render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1021write_hydro_ir!(
1022 write_hydro_ir_mermaid,
1023 HydroMermaid<_>,
1024 HydroMermaid::new_with_config
1025);
1026
1027render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1028write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1029
1030render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1031write_hydro_ir!(
1032 write_hydro_ir_reactflow,
1033 HydroReactFlow<_>,
1034 HydroReactFlow::new
1035);
1036
1037fn write_hydro_ir_graph<W>(
1038 mut graph_write: W,
1039 roots: &[HydroRoot],
1040 config: &HydroWriteConfig,
1041) -> Result<(), W::Err>
1042where
1043 W: HydroGraphWrite,
1044{
1045 let mut structure = HydroGraphStructure::new();
1046 let mut seen_tees = HashMap::new();
1047
1048 for leaf in roots {
1050 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1051 }
1052
1053 graph_write.write_prologue()?;
1055
1056 for (&node_id, (label, node_type, location)) in &structure.nodes {
1057 let (location_id, location_type) = if let Some(loc_id) = location {
1058 (
1059 Some(*loc_id),
1060 structure.locations.get(loc_id).map(|s| s.as_str()),
1061 )
1062 } else {
1063 (None, None)
1064 };
1065 graph_write.write_node_definition(
1066 node_id,
1067 label,
1068 *node_type,
1069 location_id,
1070 location_type,
1071 )?;
1072 }
1073
1074 if config.show_location_groups {
1075 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1076 for (&node_id, (_, _, location)) in &structure.nodes {
1077 if let Some(location_id) = location {
1078 nodes_by_location
1079 .entry(*location_id)
1080 .or_default()
1081 .push(node_id);
1082 }
1083 }
1084
1085 for (&location_id, node_ids) in &nodes_by_location {
1086 if let Some(location_type) = structure.locations.get(&location_id) {
1087 graph_write.write_location_start(location_id, location_type)?;
1088 for &node_id in node_ids {
1089 graph_write.write_node(node_id)?;
1090 }
1091 graph_write.write_location_end()?;
1092 }
1093 }
1094 }
1095
1096 for (src_id, dst_id, edge_type, label) in &structure.edges {
1097 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1098 }
1099
1100 graph_write.write_epilogue()?;
1101 Ok(())
1102}