1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307}
308
309#[cfg(feature = "build")]
310pub enum BuildersOrCallback<'a, L, N>
311where
312 L: FnMut(&mut HydroRoot, &mut usize),
313 N: FnMut(&mut HydroNode, &mut usize),
314{
315 Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
316 Callback(L, N),
317}
318
319#[derive(Debug, Hash)]
323pub enum HydroRoot {
324 ForEach {
325 f: DebugExpr,
326 input: Box<HydroNode>,
327 op_metadata: HydroIrOpMetadata,
328 },
329 SendExternal {
330 to_external_id: usize,
331 to_key: usize,
332 to_many: bool,
333 serialize_fn: Option<DebugExpr>,
334 instantiate_fn: DebugInstantiate,
335 input: Box<HydroNode>,
336 op_metadata: HydroIrOpMetadata,
337 },
338 DestSink {
339 sink: DebugExpr,
340 input: Box<HydroNode>,
341 op_metadata: HydroIrOpMetadata,
342 },
343 CycleSink {
344 ident: syn::Ident,
345 input: Box<HydroNode>,
346 out_location: LocationId,
347 op_metadata: HydroIrOpMetadata,
348 },
349}
350
351impl HydroRoot {
352 #[cfg(feature = "build")]
353 pub fn compile_network<'a, D>(
354 &mut self,
355 compile_env: &D::CompileEnv,
356 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
357 seen_tees: &mut SeenTees,
358 processes: &HashMap<usize, D::Process>,
359 clusters: &HashMap<usize, D::Cluster>,
360 externals: &HashMap<usize, D::External>,
361 ) where
362 D: Deploy<'a>,
363 {
364 self.transform_bottom_up(
365 &mut |l| {
366 if let HydroRoot::SendExternal {
367 input,
368 to_external_id,
369 to_key,
370 to_many,
371 instantiate_fn,
372 ..
373 } = l
374 {
375 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
376 DebugInstantiate::Building => {
377 let to_node = externals
378 .get(to_external_id)
379 .unwrap_or_else(|| {
380 panic!("A external used in the graph was not instantiated: {}", to_external_id)
381 })
382 .clone();
383
384 match input.metadata().location_kind.root() {
385 LocationId::Process(process_id) => {
386 if *to_many {
387 (
388 (
389 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
390 parse_quote!(DUMMY),
391 ),
392 Box::new(|| {}) as Box<dyn FnOnce()>,
393 )
394 } else {
395 let from_node = processes
396 .get(process_id)
397 .unwrap_or_else(|| {
398 panic!("A process used in the graph was not instantiated: {}", process_id)
399 })
400 .clone();
401
402 let sink_port = D::allocate_process_port(&from_node);
403 let source_port = D::allocate_external_port(&to_node);
404
405 to_node.register(*to_key, source_port.clone());
406
407 (
408 (
409 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
410 parse_quote!(DUMMY),
411 ),
412 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
413 )
414 }
415 }
416 LocationId::Cluster(_) => todo!(),
417 _ => panic!()
418 }
419 },
420
421 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
422 };
423
424 *instantiate_fn = DebugInstantiateFinalized {
425 sink: sink_expr,
426 source: source_expr,
427 connect_fn: Some(connect_fn),
428 }
429 .into();
430 }
431 },
432 &mut |n| {
433 if let HydroNode::Network {
434 input,
435 instantiate_fn,
436 metadata,
437 ..
438 } = n
439 {
440 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
441 DebugInstantiate::Building => instantiate_network::<D>(
442 input.metadata().location_kind.root(),
443 metadata.location_kind.root(),
444 processes,
445 clusters,
446 compile_env,
447 ),
448
449 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
450 };
451
452 *instantiate_fn = DebugInstantiateFinalized {
453 sink: sink_expr,
454 source: source_expr,
455 connect_fn: Some(connect_fn),
456 }
457 .into();
458 } else if let HydroNode::ExternalInput {
459 from_external_id,
460 from_key,
461 from_many,
462 codec_type,
463 port_hint,
464 instantiate_fn,
465 metadata,
466 ..
467 } = n
468 {
469 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
470 DebugInstantiate::Building => {
471 let from_node = externals
472 .get(from_external_id)
473 .unwrap_or_else(|| {
474 panic!(
475 "A external used in the graph was not instantiated: {}",
476 from_external_id
477 )
478 })
479 .clone();
480
481 match metadata.location_kind.root() {
482 LocationId::Process(process_id) => {
483 let to_node = processes
484 .get(process_id)
485 .unwrap_or_else(|| {
486 panic!("A process used in the graph was not instantiated: {}", process_id)
487 })
488 .clone();
489
490 let sink_port = D::allocate_external_port(&from_node);
491 let source_port = D::allocate_process_port(&to_node);
492
493 from_node.register(*from_key, sink_port.clone());
494
495 (
496 (
497 parse_quote!(DUMMY),
498 if *from_many {
499 D::e2o_many_source(
500 compile_env,
501 extra_stmts.entry(*process_id).or_default(),
502 &to_node, &source_port,
503 codec_type.0.as_ref(),
504 format!("{}_{}", *from_external_id, *from_key)
505 )
506 } else {
507 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
508 },
509 ),
510 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
511 )
512 }
513 LocationId::Cluster(_) => todo!(),
514 _ => panic!()
515 }
516 },
517
518 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
519 };
520
521 *instantiate_fn = DebugInstantiateFinalized {
522 sink: sink_expr,
523 source: source_expr,
524 connect_fn: Some(connect_fn),
525 }
526 .into();
527 }
528 },
529 seen_tees,
530 false,
531 );
532 }
533
534 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
535 self.transform_bottom_up(
536 &mut |l| {
537 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
538 match instantiate_fn {
539 DebugInstantiate::Building => panic!("network not built"),
540
541 DebugInstantiate::Finalized(finalized) => {
542 (finalized.connect_fn.take().unwrap())();
543 }
544 }
545 }
546 },
547 &mut |n| {
548 if let HydroNode::Network { instantiate_fn, .. }
549 | HydroNode::ExternalInput { instantiate_fn, .. } = n
550 {
551 match instantiate_fn {
552 DebugInstantiate::Building => panic!("network not built"),
553
554 DebugInstantiate::Finalized(finalized) => {
555 (finalized.connect_fn.take().unwrap())();
556 }
557 }
558 }
559 },
560 seen_tees,
561 false,
562 );
563 }
564
565 pub fn transform_bottom_up(
566 &mut self,
567 transform_root: &mut impl FnMut(&mut HydroRoot),
568 transform_node: &mut impl FnMut(&mut HydroNode),
569 seen_tees: &mut SeenTees,
570 check_well_formed: bool,
571 ) {
572 self.transform_children(
573 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
574 seen_tees,
575 );
576
577 transform_root(self);
578 }
579
580 pub fn transform_children(
581 &mut self,
582 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
583 seen_tees: &mut SeenTees,
584 ) {
585 match self {
586 HydroRoot::ForEach { input, .. }
587 | HydroRoot::SendExternal { input, .. }
588 | HydroRoot::DestSink { input, .. }
589 | HydroRoot::CycleSink { input, .. } => {
590 transform(input, seen_tees);
591 }
592 }
593 }
594
595 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
596 match self {
597 HydroRoot::ForEach {
598 f,
599 input,
600 op_metadata,
601 } => HydroRoot::ForEach {
602 f: f.clone(),
603 input: Box::new(input.deep_clone(seen_tees)),
604 op_metadata: op_metadata.clone(),
605 },
606 HydroRoot::SendExternal {
607 to_external_id,
608 to_key,
609 to_many,
610 serialize_fn,
611 instantiate_fn,
612 input,
613 op_metadata,
614 } => HydroRoot::SendExternal {
615 to_external_id: *to_external_id,
616 to_key: *to_key,
617 to_many: *to_many,
618 serialize_fn: serialize_fn.clone(),
619 instantiate_fn: instantiate_fn.clone(),
620 input: Box::new(input.deep_clone(seen_tees)),
621 op_metadata: op_metadata.clone(),
622 },
623 HydroRoot::DestSink {
624 sink,
625 input,
626 op_metadata,
627 } => HydroRoot::DestSink {
628 sink: sink.clone(),
629 input: Box::new(input.deep_clone(seen_tees)),
630 op_metadata: op_metadata.clone(),
631 },
632 HydroRoot::CycleSink {
633 ident,
634 input,
635 out_location,
636 op_metadata,
637 } => HydroRoot::CycleSink {
638 ident: ident.clone(),
639 input: Box::new(input.deep_clone(seen_tees)),
640 out_location: out_location.clone(),
641 op_metadata: op_metadata.clone(),
642 },
643 }
644 }
645
646 #[cfg(feature = "build")]
647 pub fn emit(
648 &mut self,
649 graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
650 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
651 next_stmt_id: &mut usize,
652 ) {
653 self.emit_core(
654 &mut BuildersOrCallback::Builders::<
655 fn(&mut HydroRoot, &mut usize),
656 fn(&mut HydroNode, &mut usize),
657 >(graph_builders),
658 built_tees,
659 next_stmt_id,
660 );
661 }
662
663 #[cfg(feature = "build")]
664 pub fn emit_core(
665 &mut self,
666 builders_or_callback: &mut BuildersOrCallback<
667 impl FnMut(&mut HydroRoot, &mut usize),
668 impl FnMut(&mut HydroNode, &mut usize),
669 >,
670 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
671 next_stmt_id: &mut usize,
672 ) {
673 match self {
674 HydroRoot::ForEach { f, input, .. } => {
675 let (input_ident, input_location_id) =
676 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
677
678 match builders_or_callback {
679 BuildersOrCallback::Builders(graph_builders) => {
680 graph_builders
681 .entry(input_location_id)
682 .or_default()
683 .add_dfir(
684 parse_quote! {
685 #input_ident -> for_each(#f);
686 },
687 None,
688 Some(&next_stmt_id.to_string()),
689 );
690 }
691 BuildersOrCallback::Callback(leaf_callback, _) => {
692 leaf_callback(self, next_stmt_id);
693 }
694 }
695
696 *next_stmt_id += 1;
697 }
698
699 HydroRoot::SendExternal {
700 serialize_fn,
701 instantiate_fn,
702 input,
703 ..
704 } => {
705 let (input_ident, input_location_id) =
706 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
707
708 match builders_or_callback {
709 BuildersOrCallback::Builders(graph_builders) => {
710 let (sink_expr, _) = match instantiate_fn {
711 DebugInstantiate::Building => (
712 syn::parse_quote!(DUMMY_SINK),
713 syn::parse_quote!(DUMMY_SOURCE),
714 ),
715
716 DebugInstantiate::Finalized(finalized) => {
717 (finalized.sink.clone(), finalized.source.clone())
718 }
719 };
720
721 let sender_builder = graph_builders.entry(input_location_id).or_default();
722 if let Some(serialize_fn) = serialize_fn {
723 sender_builder.add_dfir(
724 parse_quote! {
725 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
726 },
727 None,
728 Some(&format!("send{}", next_stmt_id)),
730 );
731 } else {
732 sender_builder.add_dfir(
733 parse_quote! {
734 #input_ident -> dest_sink(#sink_expr);
735 },
736 None,
737 Some(&format!("send{}", next_stmt_id)),
738 );
739 }
740 }
741 BuildersOrCallback::Callback(leaf_callback, _) => {
742 leaf_callback(self, next_stmt_id);
743 }
744 }
745
746 *next_stmt_id += 1;
747 }
748
749 HydroRoot::DestSink { sink, input, .. } => {
750 let (input_ident, input_location_id) =
751 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
752
753 match builders_or_callback {
754 BuildersOrCallback::Builders(graph_builders) => {
755 graph_builders
756 .entry(input_location_id)
757 .or_default()
758 .add_dfir(
759 parse_quote! {
760 #input_ident -> dest_sink(#sink);
761 },
762 None,
763 Some(&next_stmt_id.to_string()),
764 );
765 }
766 BuildersOrCallback::Callback(leaf_callback, _) => {
767 leaf_callback(self, next_stmt_id);
768 }
769 }
770
771 *next_stmt_id += 1;
772 }
773
774 HydroRoot::CycleSink {
775 ident,
776 input,
777 out_location,
778 ..
779 } => {
780 let (input_ident, input_location_id) =
781 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
782
783 let location_id = out_location.root().raw_id();
784
785 match builders_or_callback {
786 BuildersOrCallback::Builders(graph_builders) => {
787 assert_eq!(
788 input_location_id, location_id,
789 "cycle_sink location mismatch"
790 );
791
792 graph_builders.entry(location_id).or_default().add_dfir(
793 parse_quote! {
794 #ident = #input_ident;
795 },
796 None,
797 None,
798 );
799 }
800 BuildersOrCallback::Callback(_, _) => {}
802 }
803 }
804 }
805 }
806
807 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
808 match self {
809 HydroRoot::ForEach { op_metadata, .. }
810 | HydroRoot::SendExternal { op_metadata, .. }
811 | HydroRoot::DestSink { op_metadata, .. }
812 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
813 }
814 }
815
816 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
817 match self {
818 HydroRoot::ForEach { op_metadata, .. }
819 | HydroRoot::SendExternal { op_metadata, .. }
820 | HydroRoot::DestSink { op_metadata, .. }
821 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
822 }
823 }
824
825 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
826 match self {
827 HydroRoot::ForEach { input, .. }
828 | HydroRoot::SendExternal { input, .. }
829 | HydroRoot::DestSink { input, .. }
830 | HydroRoot::CycleSink { input, .. } => {
831 vec![input.metadata()]
832 }
833 }
834 }
835
836 pub fn print_root(&self) -> String {
837 match self {
838 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
839 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
840 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
841 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
842 }
843 }
844
845 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
846 match self {
847 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
848 transform(f);
849 }
850 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
851 }
852 }
853}
854
855#[cfg(feature = "build")]
856pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
857 let mut builders = BTreeMap::new();
858 let mut built_tees = HashMap::new();
859 let mut next_stmt_id = 0;
860 for leaf in ir {
861 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
862 }
863 builders
864}
865
866#[cfg(feature = "build")]
867pub fn traverse_dfir(
868 ir: &mut [HydroRoot],
869 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
870 transform_node: impl FnMut(&mut HydroNode, &mut usize),
871) {
872 let mut seen_tees = HashMap::new();
873 let mut next_stmt_id = 0;
874 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
875 ir.iter_mut().for_each(|leaf| {
876 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
877 });
878}
879
880pub fn transform_bottom_up(
881 ir: &mut [HydroRoot],
882 transform_root: &mut impl FnMut(&mut HydroRoot),
883 transform_node: &mut impl FnMut(&mut HydroNode),
884 check_well_formed: bool,
885) {
886 let mut seen_tees = HashMap::new();
887 ir.iter_mut().for_each(|leaf| {
888 leaf.transform_bottom_up(
889 transform_root,
890 transform_node,
891 &mut seen_tees,
892 check_well_formed,
893 );
894 });
895}
896
897pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
898 let mut seen_tees = HashMap::new();
899 ir.iter()
900 .map(|leaf| leaf.deep_clone(&mut seen_tees))
901 .collect()
902}
903
904type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
905thread_local! {
906 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
907}
908
909pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
910 PRINTED_TEES.with(|printed_tees| {
911 let mut printed_tees_mut = printed_tees.borrow_mut();
912 *printed_tees_mut = Some((0, HashMap::new()));
913 drop(printed_tees_mut);
914
915 let ret = f();
916
917 let mut printed_tees_mut = printed_tees.borrow_mut();
918 *printed_tees_mut = None;
919
920 ret
921 })
922}
923
924pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
925
926impl TeeNode {
927 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
928 Rc::as_ptr(&self.0)
929 }
930}
931
932impl Debug for TeeNode {
933 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934 PRINTED_TEES.with(|printed_tees| {
935 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
936 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
937
938 if let Some(printed_tees_mut) = printed_tees_mut {
939 if let Some(existing) = printed_tees_mut
940 .1
941 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
942 {
943 write!(f, "<tee {}>", existing)
944 } else {
945 let next_id = printed_tees_mut.0;
946 printed_tees_mut.0 += 1;
947 printed_tees_mut
948 .1
949 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
950 drop(printed_tees_mut_borrow);
951 write!(f, "<tee {}>: ", next_id)?;
952 Debug::fmt(&self.0.borrow(), f)
953 }
954 } else {
955 drop(printed_tees_mut_borrow);
956 write!(f, "<tee>: ")?;
957 Debug::fmt(&self.0.borrow(), f)
958 }
959 })
960 }
961}
962
963impl Hash for TeeNode {
964 fn hash<H: Hasher>(&self, state: &mut H) {
965 self.0.borrow_mut().hash(state);
966 }
967}
968
969#[derive(Clone)]
970pub struct HydroIrMetadata {
971 pub location_kind: LocationId,
972 pub output_type: Option<DebugType>,
973 pub cardinality: Option<usize>,
974 pub tag: Option<String>,
975 pub op: HydroIrOpMetadata,
976}
977
978impl Hash for HydroIrMetadata {
980 fn hash<H: Hasher>(&self, _: &mut H) {}
981}
982
983impl PartialEq for HydroIrMetadata {
984 fn eq(&self, _: &Self) -> bool {
985 true
986 }
987}
988
989impl Eq for HydroIrMetadata {}
990
991impl Debug for HydroIrMetadata {
992 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
993 f.debug_struct("HydroIrMetadata")
994 .field("location_kind", &self.location_kind)
995 .field("output_type", &self.output_type)
996 .finish()
997 }
998}
999
1000#[derive(Clone)]
1003pub struct HydroIrOpMetadata {
1004 pub backtrace: Backtrace,
1005 pub cpu_usage: Option<f64>,
1006 pub network_recv_cpu_usage: Option<f64>,
1007 pub id: Option<usize>,
1008}
1009
1010impl HydroIrOpMetadata {
1011 #[expect(
1012 clippy::new_without_default,
1013 reason = "explicit calls to new ensure correct backtrace bounds"
1014 )]
1015 pub fn new() -> HydroIrOpMetadata {
1016 Self::new_with_skip(1)
1017 }
1018
1019 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1020 HydroIrOpMetadata {
1021 backtrace: Backtrace::get_backtrace(2 + skip_count),
1022 cpu_usage: None,
1023 network_recv_cpu_usage: None,
1024 id: None,
1025 }
1026 }
1027}
1028
1029impl Debug for HydroIrOpMetadata {
1030 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1031 f.debug_struct("HydroIrOpMetadata").finish()
1032 }
1033}
1034
1035impl Hash for HydroIrOpMetadata {
1036 fn hash<H: Hasher>(&self, _: &mut H) {}
1037}
1038
1039#[derive(Debug, Hash)]
1042pub enum HydroNode {
1043 Placeholder,
1044
1045 Source {
1046 source: HydroSource,
1047 metadata: HydroIrMetadata,
1048 },
1049
1050 CycleSource {
1051 ident: syn::Ident,
1052 metadata: HydroIrMetadata,
1053 },
1054
1055 Tee {
1056 inner: TeeNode,
1057 metadata: HydroIrMetadata,
1058 },
1059
1060 Persist {
1061 inner: Box<HydroNode>,
1062 metadata: HydroIrMetadata,
1063 },
1064
1065 BeginAtomic {
1066 inner: Box<HydroNode>,
1067 metadata: HydroIrMetadata,
1068 },
1069
1070 EndAtomic {
1071 inner: Box<HydroNode>,
1072 metadata: HydroIrMetadata,
1073 },
1074
1075 Batch {
1076 inner: Box<HydroNode>,
1077 metadata: HydroIrMetadata,
1078 },
1079
1080 YieldConcat {
1081 inner: Box<HydroNode>,
1082 metadata: HydroIrMetadata,
1083 },
1084
1085 Chain {
1086 first: Box<HydroNode>,
1087 second: Box<HydroNode>,
1088 metadata: HydroIrMetadata,
1089 },
1090
1091 ChainFirst {
1092 first: Box<HydroNode>,
1093 second: Box<HydroNode>,
1094 metadata: HydroIrMetadata,
1095 },
1096
1097 CrossProduct {
1098 left: Box<HydroNode>,
1099 right: Box<HydroNode>,
1100 metadata: HydroIrMetadata,
1101 },
1102
1103 CrossSingleton {
1104 left: Box<HydroNode>,
1105 right: Box<HydroNode>,
1106 metadata: HydroIrMetadata,
1107 },
1108
1109 Join {
1110 left: Box<HydroNode>,
1111 right: Box<HydroNode>,
1112 metadata: HydroIrMetadata,
1113 },
1114
1115 Difference {
1116 pos: Box<HydroNode>,
1117 neg: Box<HydroNode>,
1118 metadata: HydroIrMetadata,
1119 },
1120
1121 AntiJoin {
1122 pos: Box<HydroNode>,
1123 neg: Box<HydroNode>,
1124 metadata: HydroIrMetadata,
1125 },
1126
1127 ResolveFutures {
1128 input: Box<HydroNode>,
1129 metadata: HydroIrMetadata,
1130 },
1131 ResolveFuturesOrdered {
1132 input: Box<HydroNode>,
1133 metadata: HydroIrMetadata,
1134 },
1135
1136 Map {
1137 f: DebugExpr,
1138 input: Box<HydroNode>,
1139 metadata: HydroIrMetadata,
1140 },
1141 FlatMap {
1142 f: DebugExpr,
1143 input: Box<HydroNode>,
1144 metadata: HydroIrMetadata,
1145 },
1146 Filter {
1147 f: DebugExpr,
1148 input: Box<HydroNode>,
1149 metadata: HydroIrMetadata,
1150 },
1151 FilterMap {
1152 f: DebugExpr,
1153 input: Box<HydroNode>,
1154 metadata: HydroIrMetadata,
1155 },
1156
1157 DeferTick {
1158 input: Box<HydroNode>,
1159 metadata: HydroIrMetadata,
1160 },
1161 Enumerate {
1162 input: Box<HydroNode>,
1163 metadata: HydroIrMetadata,
1164 },
1165 Inspect {
1166 f: DebugExpr,
1167 input: Box<HydroNode>,
1168 metadata: HydroIrMetadata,
1169 },
1170
1171 Unique {
1172 input: Box<HydroNode>,
1173 metadata: HydroIrMetadata,
1174 },
1175
1176 Sort {
1177 input: Box<HydroNode>,
1178 metadata: HydroIrMetadata,
1179 },
1180 Fold {
1181 init: DebugExpr,
1182 acc: DebugExpr,
1183 input: Box<HydroNode>,
1184 metadata: HydroIrMetadata,
1185 },
1186
1187 Scan {
1188 init: DebugExpr,
1189 acc: DebugExpr,
1190 input: Box<HydroNode>,
1191 metadata: HydroIrMetadata,
1192 },
1193 FoldKeyed {
1194 init: DebugExpr,
1195 acc: DebugExpr,
1196 input: Box<HydroNode>,
1197 metadata: HydroIrMetadata,
1198 },
1199
1200 Reduce {
1201 f: DebugExpr,
1202 input: Box<HydroNode>,
1203 metadata: HydroIrMetadata,
1204 },
1205 ReduceKeyed {
1206 f: DebugExpr,
1207 input: Box<HydroNode>,
1208 metadata: HydroIrMetadata,
1209 },
1210 ReduceKeyedWatermark {
1211 f: DebugExpr,
1212 input: Box<HydroNode>,
1213 watermark: Box<HydroNode>,
1214 metadata: HydroIrMetadata,
1215 },
1216
1217 Network {
1218 serialize_fn: Option<DebugExpr>,
1219 instantiate_fn: DebugInstantiate,
1220 deserialize_fn: Option<DebugExpr>,
1221 input: Box<HydroNode>,
1222 metadata: HydroIrMetadata,
1223 },
1224
1225 ExternalInput {
1226 from_external_id: usize,
1227 from_key: usize,
1228 from_many: bool,
1229 codec_type: DebugType,
1230 port_hint: NetworkHint,
1231 instantiate_fn: DebugInstantiate,
1232 deserialize_fn: Option<DebugExpr>,
1233 metadata: HydroIrMetadata,
1234 },
1235
1236 Counter {
1237 tag: String,
1238 duration: DebugExpr,
1239 prefix: String,
1240 input: Box<HydroNode>,
1241 metadata: HydroIrMetadata,
1242 },
1243}
1244
1245pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1246pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1247
1248impl HydroNode {
1249 pub fn transform_bottom_up(
1250 &mut self,
1251 transform: &mut impl FnMut(&mut HydroNode),
1252 seen_tees: &mut SeenTees,
1253 check_well_formed: bool,
1254 ) {
1255 self.transform_children(
1256 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1257 seen_tees,
1258 );
1259
1260 transform(self);
1261
1262 let self_location = self.metadata().location_kind.root();
1263
1264 if check_well_formed {
1265 match &*self {
1266 HydroNode::Network { .. } => {}
1267 _ => {
1268 self.input_metadata().iter().for_each(|i| {
1269 if i.location_kind.root() != self_location {
1270 panic!(
1271 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1272 i,
1273 i.location_kind.root(),
1274 self,
1275 self_location
1276 )
1277 }
1278 });
1279 }
1280 }
1281 }
1282 }
1283
1284 #[inline(always)]
1285 pub fn transform_children(
1286 &mut self,
1287 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1288 seen_tees: &mut SeenTees,
1289 ) {
1290 match self {
1291 HydroNode::Placeholder => {
1292 panic!();
1293 }
1294
1295 HydroNode::Source { .. }
1296 | HydroNode::CycleSource { .. }
1297 | HydroNode::ExternalInput { .. } => {}
1298
1299 HydroNode::Tee { inner, .. } => {
1300 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1301 *inner = TeeNode(transformed.clone());
1302 } else {
1303 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1304 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1305 let mut orig = inner.0.replace(HydroNode::Placeholder);
1306 transform(&mut orig, seen_tees);
1307 *transformed_cell.borrow_mut() = orig;
1308 *inner = TeeNode(transformed_cell);
1309 }
1310 }
1311
1312 HydroNode::Persist { inner, .. }
1313 | HydroNode::BeginAtomic { inner, .. }
1314 | HydroNode::EndAtomic { inner, .. }
1315 | HydroNode::Batch { inner, .. }
1316 | HydroNode::YieldConcat { inner, .. } => {
1317 transform(inner.as_mut(), seen_tees);
1318 }
1319
1320 HydroNode::Chain { first, second, .. } => {
1321 transform(first.as_mut(), seen_tees);
1322 transform(second.as_mut(), seen_tees);
1323 }
1324
1325 HydroNode::ChainFirst { first, second, .. } => {
1326 transform(first.as_mut(), seen_tees);
1327 transform(second.as_mut(), seen_tees);
1328 }
1329
1330 HydroNode::CrossSingleton { left, right, .. }
1331 | HydroNode::CrossProduct { left, right, .. }
1332 | HydroNode::Join { left, right, .. } => {
1333 transform(left.as_mut(), seen_tees);
1334 transform(right.as_mut(), seen_tees);
1335 }
1336
1337 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1338 transform(pos.as_mut(), seen_tees);
1339 transform(neg.as_mut(), seen_tees);
1340 }
1341
1342 HydroNode::ReduceKeyedWatermark {
1343 input, watermark, ..
1344 } => {
1345 transform(input.as_mut(), seen_tees);
1346 transform(watermark.as_mut(), seen_tees);
1347 }
1348
1349 HydroNode::Map { input, .. }
1350 | HydroNode::ResolveFutures { input, .. }
1351 | HydroNode::ResolveFuturesOrdered { input, .. }
1352 | HydroNode::FlatMap { input, .. }
1353 | HydroNode::Filter { input, .. }
1354 | HydroNode::FilterMap { input, .. }
1355 | HydroNode::Sort { input, .. }
1356 | HydroNode::DeferTick { input, .. }
1357 | HydroNode::Enumerate { input, .. }
1358 | HydroNode::Inspect { input, .. }
1359 | HydroNode::Unique { input, .. }
1360 | HydroNode::Network { input, .. }
1361 | HydroNode::Fold { input, .. }
1362 | HydroNode::Scan { input, .. }
1363 | HydroNode::FoldKeyed { input, .. }
1364 | HydroNode::Reduce { input, .. }
1365 | HydroNode::ReduceKeyed { input, .. }
1366 | HydroNode::Counter { input, .. } => {
1367 transform(input.as_mut(), seen_tees);
1368 }
1369 }
1370 }
1371
1372 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1373 match self {
1374 HydroNode::Placeholder => HydroNode::Placeholder,
1375 HydroNode::Source { source, metadata } => HydroNode::Source {
1376 source: source.clone(),
1377 metadata: metadata.clone(),
1378 },
1379 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1380 ident: ident.clone(),
1381 metadata: metadata.clone(),
1382 },
1383 HydroNode::Tee { inner, metadata } => {
1384 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1385 HydroNode::Tee {
1386 inner: TeeNode(transformed.clone()),
1387 metadata: metadata.clone(),
1388 }
1389 } else {
1390 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1391 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1392 let cloned = inner.0.borrow().deep_clone(seen_tees);
1393 *new_rc.borrow_mut() = cloned;
1394 HydroNode::Tee {
1395 inner: TeeNode(new_rc),
1396 metadata: metadata.clone(),
1397 }
1398 }
1399 }
1400 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1401 inner: Box::new(inner.deep_clone(seen_tees)),
1402 metadata: metadata.clone(),
1403 },
1404 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1405 inner: Box::new(inner.deep_clone(seen_tees)),
1406 metadata: metadata.clone(),
1407 },
1408 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1409 inner: Box::new(inner.deep_clone(seen_tees)),
1410 metadata: metadata.clone(),
1411 },
1412 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1413 inner: Box::new(inner.deep_clone(seen_tees)),
1414 metadata: metadata.clone(),
1415 },
1416 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1417 inner: Box::new(inner.deep_clone(seen_tees)),
1418 metadata: metadata.clone(),
1419 },
1420 HydroNode::Chain {
1421 first,
1422 second,
1423 metadata,
1424 } => HydroNode::Chain {
1425 first: Box::new(first.deep_clone(seen_tees)),
1426 second: Box::new(second.deep_clone(seen_tees)),
1427 metadata: metadata.clone(),
1428 },
1429 HydroNode::ChainFirst {
1430 first,
1431 second,
1432 metadata,
1433 } => HydroNode::ChainFirst {
1434 first: Box::new(first.deep_clone(seen_tees)),
1435 second: Box::new(second.deep_clone(seen_tees)),
1436 metadata: metadata.clone(),
1437 },
1438 HydroNode::CrossProduct {
1439 left,
1440 right,
1441 metadata,
1442 } => HydroNode::CrossProduct {
1443 left: Box::new(left.deep_clone(seen_tees)),
1444 right: Box::new(right.deep_clone(seen_tees)),
1445 metadata: metadata.clone(),
1446 },
1447 HydroNode::CrossSingleton {
1448 left,
1449 right,
1450 metadata,
1451 } => HydroNode::CrossSingleton {
1452 left: Box::new(left.deep_clone(seen_tees)),
1453 right: Box::new(right.deep_clone(seen_tees)),
1454 metadata: metadata.clone(),
1455 },
1456 HydroNode::Join {
1457 left,
1458 right,
1459 metadata,
1460 } => HydroNode::Join {
1461 left: Box::new(left.deep_clone(seen_tees)),
1462 right: Box::new(right.deep_clone(seen_tees)),
1463 metadata: metadata.clone(),
1464 },
1465 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1466 pos: Box::new(pos.deep_clone(seen_tees)),
1467 neg: Box::new(neg.deep_clone(seen_tees)),
1468 metadata: metadata.clone(),
1469 },
1470 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1471 pos: Box::new(pos.deep_clone(seen_tees)),
1472 neg: Box::new(neg.deep_clone(seen_tees)),
1473 metadata: metadata.clone(),
1474 },
1475 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1476 input: Box::new(input.deep_clone(seen_tees)),
1477 metadata: metadata.clone(),
1478 },
1479 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1480 HydroNode::ResolveFuturesOrdered {
1481 input: Box::new(input.deep_clone(seen_tees)),
1482 metadata: metadata.clone(),
1483 }
1484 }
1485 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1486 f: f.clone(),
1487 input: Box::new(input.deep_clone(seen_tees)),
1488 metadata: metadata.clone(),
1489 },
1490 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1491 f: f.clone(),
1492 input: Box::new(input.deep_clone(seen_tees)),
1493 metadata: metadata.clone(),
1494 },
1495 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1496 f: f.clone(),
1497 input: Box::new(input.deep_clone(seen_tees)),
1498 metadata: metadata.clone(),
1499 },
1500 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1501 f: f.clone(),
1502 input: Box::new(input.deep_clone(seen_tees)),
1503 metadata: metadata.clone(),
1504 },
1505 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1506 input: Box::new(input.deep_clone(seen_tees)),
1507 metadata: metadata.clone(),
1508 },
1509 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1510 input: Box::new(input.deep_clone(seen_tees)),
1511 metadata: metadata.clone(),
1512 },
1513 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1514 f: f.clone(),
1515 input: Box::new(input.deep_clone(seen_tees)),
1516 metadata: metadata.clone(),
1517 },
1518 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1519 input: Box::new(input.deep_clone(seen_tees)),
1520 metadata: metadata.clone(),
1521 },
1522 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1523 input: Box::new(input.deep_clone(seen_tees)),
1524 metadata: metadata.clone(),
1525 },
1526 HydroNode::Fold {
1527 init,
1528 acc,
1529 input,
1530 metadata,
1531 } => HydroNode::Fold {
1532 init: init.clone(),
1533 acc: acc.clone(),
1534 input: Box::new(input.deep_clone(seen_tees)),
1535 metadata: metadata.clone(),
1536 },
1537 HydroNode::Scan {
1538 init,
1539 acc,
1540 input,
1541 metadata,
1542 } => HydroNode::Scan {
1543 init: init.clone(),
1544 acc: acc.clone(),
1545 input: Box::new(input.deep_clone(seen_tees)),
1546 metadata: metadata.clone(),
1547 },
1548 HydroNode::FoldKeyed {
1549 init,
1550 acc,
1551 input,
1552 metadata,
1553 } => HydroNode::FoldKeyed {
1554 init: init.clone(),
1555 acc: acc.clone(),
1556 input: Box::new(input.deep_clone(seen_tees)),
1557 metadata: metadata.clone(),
1558 },
1559 HydroNode::ReduceKeyedWatermark {
1560 f,
1561 input,
1562 watermark,
1563 metadata,
1564 } => HydroNode::ReduceKeyedWatermark {
1565 f: f.clone(),
1566 input: Box::new(input.deep_clone(seen_tees)),
1567 watermark: Box::new(watermark.deep_clone(seen_tees)),
1568 metadata: metadata.clone(),
1569 },
1570 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1571 f: f.clone(),
1572 input: Box::new(input.deep_clone(seen_tees)),
1573 metadata: metadata.clone(),
1574 },
1575 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1576 f: f.clone(),
1577 input: Box::new(input.deep_clone(seen_tees)),
1578 metadata: metadata.clone(),
1579 },
1580 HydroNode::Network {
1581 serialize_fn,
1582 instantiate_fn,
1583 deserialize_fn,
1584 input,
1585 metadata,
1586 } => HydroNode::Network {
1587 serialize_fn: serialize_fn.clone(),
1588 instantiate_fn: instantiate_fn.clone(),
1589 deserialize_fn: deserialize_fn.clone(),
1590 input: Box::new(input.deep_clone(seen_tees)),
1591 metadata: metadata.clone(),
1592 },
1593 HydroNode::ExternalInput {
1594 from_external_id,
1595 from_key,
1596 from_many,
1597 codec_type,
1598 port_hint,
1599 instantiate_fn,
1600 deserialize_fn,
1601 metadata,
1602 } => HydroNode::ExternalInput {
1603 from_external_id: *from_external_id,
1604 from_key: *from_key,
1605 from_many: *from_many,
1606 codec_type: codec_type.clone(),
1607 port_hint: *port_hint,
1608 instantiate_fn: instantiate_fn.clone(),
1609 deserialize_fn: deserialize_fn.clone(),
1610 metadata: metadata.clone(),
1611 },
1612 HydroNode::Counter {
1613 tag,
1614 duration,
1615 prefix,
1616 input,
1617 metadata,
1618 } => HydroNode::Counter {
1619 tag: tag.clone(),
1620 duration: duration.clone(),
1621 prefix: prefix.clone(),
1622 input: Box::new(input.deep_clone(seen_tees)),
1623 metadata: metadata.clone(),
1624 },
1625 }
1626 }
1627
1628 #[cfg(feature = "build")]
1629 pub fn emit_core(
1630 &mut self,
1631 builders_or_callback: &mut BuildersOrCallback<
1632 impl FnMut(&mut HydroRoot, &mut usize),
1633 impl FnMut(&mut HydroNode, &mut usize),
1634 >,
1635 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1636 next_stmt_id: &mut usize,
1637 ) -> (syn::Ident, usize) {
1638 match self {
1639 HydroNode::Placeholder => {
1640 panic!()
1641 }
1642
1643 HydroNode::Persist { inner, .. } => {
1644 let (inner_ident, location) =
1645 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1646
1647 let persist_ident =
1648 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1649
1650 match builders_or_callback {
1651 BuildersOrCallback::Builders(graph_builders) => {
1652 let builder = graph_builders.entry(location).or_default();
1653 builder.add_dfir(
1654 parse_quote! {
1655 #persist_ident = #inner_ident -> persist::<'static>();
1656 },
1657 None,
1658 Some(&next_stmt_id.to_string()),
1659 );
1660 }
1661 BuildersOrCallback::Callback(_, node_callback) => {
1662 node_callback(self, next_stmt_id);
1663 }
1664 }
1665
1666 *next_stmt_id += 1;
1667
1668 (persist_ident, location)
1669 }
1670
1671 HydroNode::Batch { inner, .. } => {
1672 let (inner_ident, location) =
1673 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1674
1675 let batch_ident =
1676 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1677
1678 match builders_or_callback {
1679 BuildersOrCallback::Builders(graph_builders) => {
1680 let builder = graph_builders.entry(location).or_default();
1681 builder.add_dfir(
1682 parse_quote! {
1683 #batch_ident = #inner_ident;
1684 },
1685 None,
1686 None,
1687 );
1688 }
1689 BuildersOrCallback::Callback(_, node_callback) => {
1690 node_callback(self, next_stmt_id);
1691 }
1692 }
1693
1694 *next_stmt_id += 1;
1695
1696 (batch_ident, location)
1697 }
1698
1699 HydroNode::YieldConcat { inner, .. } => {
1700 let (inner_ident, location) =
1701 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1702
1703 let yield_ident =
1704 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1705
1706 match builders_or_callback {
1707 BuildersOrCallback::Builders(graph_builders) => {
1708 let builder = graph_builders.entry(location).or_default();
1709 builder.add_dfir(
1710 parse_quote! {
1711 #yield_ident = #inner_ident;
1712 },
1713 None,
1714 None,
1715 );
1716 }
1717 BuildersOrCallback::Callback(_, node_callback) => {
1718 node_callback(self, next_stmt_id);
1719 }
1720 }
1721
1722 *next_stmt_id += 1;
1723
1724 (yield_ident, location)
1725 }
1726
1727 HydroNode::BeginAtomic { inner, .. } => {
1728 inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
1729 }
1730
1731 HydroNode::EndAtomic { inner, .. } => {
1732 inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
1733 }
1734
1735 HydroNode::Source {
1736 source, metadata, ..
1737 } => {
1738 let location_id = metadata.location_kind.root().raw_id();
1739
1740 if let HydroSource::ExternalNetwork() = source {
1741 (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1742 } else {
1743 let source_ident =
1744 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1745
1746 let source_stmt = match source {
1747 HydroSource::Stream(expr) => {
1748 debug_assert!(metadata.location_kind.is_top_level());
1749 parse_quote! {
1750 #source_ident = source_stream(#expr);
1751 }
1752 }
1753
1754 HydroSource::ExternalNetwork() => {
1755 unreachable!()
1756 }
1757
1758 HydroSource::Iter(expr) => {
1759 if metadata.location_kind.is_top_level() {
1760 parse_quote! {
1761 #source_ident = source_iter(#expr);
1762 }
1763 } else {
1764 parse_quote! {
1766 #source_ident = source_iter(#expr) -> persist::<'static>();
1767 }
1768 }
1769 }
1770
1771 HydroSource::Spin() => {
1772 debug_assert!(metadata.location_kind.is_top_level());
1773 parse_quote! {
1774 #source_ident = spin();
1775 }
1776 }
1777 };
1778
1779 match builders_or_callback {
1780 BuildersOrCallback::Builders(graph_builders) => {
1781 let builder = graph_builders.entry(location_id).or_default();
1782 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1783 }
1784 BuildersOrCallback::Callback(_, node_callback) => {
1785 node_callback(self, next_stmt_id);
1786 }
1787 }
1788
1789 *next_stmt_id += 1;
1790
1791 (source_ident, location_id)
1792 }
1793 }
1794
1795 HydroNode::CycleSource {
1796 ident, metadata, ..
1797 } => {
1798 let location_id = metadata.location_kind.root().raw_id();
1799
1800 let ident = ident.clone();
1801
1802 match builders_or_callback {
1803 BuildersOrCallback::Builders(_) => {}
1804 BuildersOrCallback::Callback(_, node_callback) => {
1805 node_callback(self, next_stmt_id);
1806 }
1807 }
1808
1809 *next_stmt_id += 1;
1811
1812 (ident, location_id)
1813 }
1814
1815 HydroNode::Tee { inner, .. } => {
1816 let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1817 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1818 {
1819 match builders_or_callback {
1820 BuildersOrCallback::Builders(_) => {}
1821 BuildersOrCallback::Callback(_, node_callback) => {
1822 node_callback(self, next_stmt_id);
1823 }
1824 }
1825
1826 (teed_from.clone(), *inner_location_id)
1827 } else {
1828 let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1829 builders_or_callback,
1830 built_tees,
1831 next_stmt_id,
1832 );
1833
1834 let tee_ident =
1835 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1836
1837 built_tees.insert(
1838 inner.0.as_ref() as *const RefCell<HydroNode>,
1839 (tee_ident.clone(), inner_location_id),
1840 );
1841
1842 match builders_or_callback {
1843 BuildersOrCallback::Builders(graph_builders) => {
1844 let builder = graph_builders.entry(inner_location_id).or_default();
1845 builder.add_dfir(
1846 parse_quote! {
1847 #tee_ident = #inner_ident -> tee();
1848 },
1849 None,
1850 Some(&next_stmt_id.to_string()),
1851 );
1852 }
1853 BuildersOrCallback::Callback(_, node_callback) => {
1854 node_callback(self, next_stmt_id);
1855 }
1856 }
1857
1858 (tee_ident, inner_location_id)
1859 };
1860
1861 *next_stmt_id += 1;
1865 (ret_ident, inner_location_id)
1866 }
1867
1868 HydroNode::Chain { first, second, .. } => {
1869 let (first_ident, first_location_id) =
1870 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1871 let (second_ident, second_location_id) =
1872 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1873
1874 let chain_ident =
1875 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1876
1877 match builders_or_callback {
1878 BuildersOrCallback::Builders(graph_builders) => {
1879 assert_eq!(
1880 first_location_id, second_location_id,
1881 "chain inputs must be in the same location"
1882 );
1883 let builder = graph_builders.entry(first_location_id).or_default();
1884 builder.add_dfir(
1885 parse_quote! {
1886 #chain_ident = chain();
1887 #first_ident -> [0]#chain_ident;
1888 #second_ident -> [1]#chain_ident;
1889 },
1890 None,
1891 Some(&next_stmt_id.to_string()),
1892 );
1893 }
1894 BuildersOrCallback::Callback(_, node_callback) => {
1895 node_callback(self, next_stmt_id);
1896 }
1897 }
1898
1899 *next_stmt_id += 1;
1900
1901 (chain_ident, first_location_id)
1902 }
1903
1904 HydroNode::ChainFirst { first, second, .. } => {
1905 let (first_ident, first_location_id) =
1906 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1907 let (second_ident, second_location_id) =
1908 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1909
1910 let chain_ident =
1911 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1912
1913 match builders_or_callback {
1914 BuildersOrCallback::Builders(graph_builders) => {
1915 assert_eq!(
1916 first_location_id, second_location_id,
1917 "chain inputs must be in the same location"
1918 );
1919 let builder = graph_builders.entry(first_location_id).or_default();
1920 builder.add_dfir(
1921 parse_quote! {
1922 #chain_ident = chain_first_n(1);
1923 #first_ident -> [0]#chain_ident;
1924 #second_ident -> [1]#chain_ident;
1925 },
1926 None,
1927 Some(&next_stmt_id.to_string()),
1928 );
1929 }
1930 BuildersOrCallback::Callback(_, node_callback) => {
1931 node_callback(self, next_stmt_id);
1932 }
1933 }
1934
1935 *next_stmt_id += 1;
1936
1937 (chain_ident, first_location_id)
1938 }
1939
1940 HydroNode::CrossSingleton { left, right, .. } => {
1941 let (left_ident, left_location_id) =
1942 left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1943 let (right_ident, right_location_id) =
1944 right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1945
1946 let cross_ident =
1947 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1948
1949 match builders_or_callback {
1950 BuildersOrCallback::Builders(graph_builders) => {
1951 assert_eq!(
1952 left_location_id, right_location_id,
1953 "cross_singleton inputs must be in the same location"
1954 );
1955
1956 let builder = graph_builders.entry(left_location_id).or_default();
1957 builder.add_dfir(
1958 parse_quote! {
1959 #cross_ident = cross_singleton();
1960 #left_ident -> [input]#cross_ident;
1961 #right_ident -> [single]#cross_ident;
1962 },
1963 None,
1964 Some(&next_stmt_id.to_string()),
1965 );
1966 }
1967 BuildersOrCallback::Callback(_, node_callback) => {
1968 node_callback(self, next_stmt_id);
1969 }
1970 }
1971
1972 *next_stmt_id += 1;
1973
1974 (cross_ident, left_location_id)
1975 }
1976
1977 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1978 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1979 parse_quote!(cross_join_multiset)
1980 } else {
1981 parse_quote!(join_multiset)
1982 };
1983
1984 let (HydroNode::CrossProduct { left, right, .. }
1985 | HydroNode::Join { left, right, .. }) = self
1986 else {
1987 unreachable!()
1988 };
1989
1990 let is_top_level = left.metadata().location_kind.is_top_level()
1991 && right.metadata().location_kind.is_top_level();
1992 let (left_inner, left_lifetime) =
1993 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1994 debug_assert!(!left.metadata().location_kind.is_top_level());
1995 (left, quote!('static))
1996 } else if left.metadata().location_kind.is_top_level() {
1997 (left, quote!('static))
1998 } else {
1999 (left, quote!('tick))
2000 };
2001
2002 let (right_inner, right_lifetime) =
2003 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2004 debug_assert!(!right.metadata().location_kind.is_top_level());
2005 (right, quote!('static))
2006 } else if right.metadata().location_kind.is_top_level() {
2007 (right, quote!('static))
2008 } else {
2009 (right, quote!('tick))
2010 };
2011
2012 let (left_ident, left_location_id) =
2013 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2014 let (right_ident, right_location_id) =
2015 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2016
2017 let stream_ident =
2018 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2019
2020 match builders_or_callback {
2021 BuildersOrCallback::Builders(graph_builders) => {
2022 assert_eq!(
2023 left_location_id, right_location_id,
2024 "join / cross product inputs must be in the same location"
2025 );
2026
2027 let builder = graph_builders.entry(left_location_id).or_default();
2028 builder.add_dfir(
2029 if is_top_level {
2030 parse_quote! {
2033 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2034 #left_ident -> [0]#stream_ident;
2035 #right_ident -> [1]#stream_ident;
2036 }
2037 } else {
2038 parse_quote! {
2039 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2040 #left_ident -> [0]#stream_ident;
2041 #right_ident -> [1]#stream_ident;
2042 }
2043 }
2044 ,
2045 None,
2046 Some(&next_stmt_id.to_string()),
2047 );
2048 }
2049 BuildersOrCallback::Callback(_, node_callback) => {
2050 node_callback(self, next_stmt_id);
2051 }
2052 }
2053
2054 *next_stmt_id += 1;
2055
2056 (stream_ident, left_location_id)
2057 }
2058
2059 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2060 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2061 parse_quote!(difference_multiset)
2062 } else {
2063 parse_quote!(anti_join_multiset)
2064 };
2065
2066 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2067 self
2068 else {
2069 unreachable!()
2070 };
2071
2072 let (neg, neg_lifetime) =
2073 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2074 debug_assert!(!neg.metadata().location_kind.is_top_level());
2075 (neg, quote!('static))
2076 } else if neg.metadata().location_kind.is_top_level() {
2077 (neg, quote!('static))
2078 } else {
2079 (neg, quote!('tick))
2080 };
2081
2082 let (pos_ident, pos_location_id) =
2083 pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2084 let (neg_ident, neg_location_id) =
2085 neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2086
2087 let stream_ident =
2088 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2089
2090 match builders_or_callback {
2091 BuildersOrCallback::Builders(graph_builders) => {
2092 assert_eq!(
2093 pos_location_id, neg_location_id,
2094 "difference / anti join inputs must be in the same location"
2095 );
2096
2097 let builder = graph_builders.entry(pos_location_id).or_default();
2098 builder.add_dfir(
2099 parse_quote! {
2100 #stream_ident = #operator::<'tick, #neg_lifetime>();
2101 #pos_ident -> [pos]#stream_ident;
2102 #neg_ident -> [neg]#stream_ident;
2103 },
2104 None,
2105 Some(&next_stmt_id.to_string()),
2106 );
2107 }
2108 BuildersOrCallback::Callback(_, node_callback) => {
2109 node_callback(self, next_stmt_id);
2110 }
2111 }
2112
2113 *next_stmt_id += 1;
2114
2115 (stream_ident, pos_location_id)
2116 }
2117
2118 HydroNode::ResolveFutures { input, .. } => {
2119 let (input_ident, input_location_id) =
2120 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2121
2122 let futures_ident =
2123 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2124
2125 match builders_or_callback {
2126 BuildersOrCallback::Builders(graph_builders) => {
2127 let builder = graph_builders.entry(input_location_id).or_default();
2128 builder.add_dfir(
2129 parse_quote! {
2130 #futures_ident = #input_ident -> resolve_futures();
2131 },
2132 None,
2133 Some(&next_stmt_id.to_string()),
2134 );
2135 }
2136 BuildersOrCallback::Callback(_, node_callback) => {
2137 node_callback(self, next_stmt_id);
2138 }
2139 }
2140
2141 *next_stmt_id += 1;
2142
2143 (futures_ident, input_location_id)
2144 }
2145
2146 HydroNode::ResolveFuturesOrdered { input, .. } => {
2147 let (input_ident, input_location_id) =
2148 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2149
2150 let futures_ident =
2151 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2152
2153 match builders_or_callback {
2154 BuildersOrCallback::Builders(graph_builders) => {
2155 let builder = graph_builders.entry(input_location_id).or_default();
2156 builder.add_dfir(
2157 parse_quote! {
2158 #futures_ident = #input_ident -> resolve_futures_ordered();
2159 },
2160 None,
2161 Some(&next_stmt_id.to_string()),
2162 );
2163 }
2164 BuildersOrCallback::Callback(_, node_callback) => {
2165 node_callback(self, next_stmt_id);
2166 }
2167 }
2168
2169 *next_stmt_id += 1;
2170
2171 (futures_ident, input_location_id)
2172 }
2173
2174 HydroNode::Map { f, input, .. } => {
2175 let (input_ident, input_location_id) =
2176 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2177
2178 let map_ident =
2179 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2180
2181 match builders_or_callback {
2182 BuildersOrCallback::Builders(graph_builders) => {
2183 let builder = graph_builders.entry(input_location_id).or_default();
2184 builder.add_dfir(
2185 parse_quote! {
2186 #map_ident = #input_ident -> map(#f);
2187 },
2188 None,
2189 Some(&next_stmt_id.to_string()),
2190 );
2191 }
2192 BuildersOrCallback::Callback(_, node_callback) => {
2193 node_callback(self, next_stmt_id);
2194 }
2195 }
2196
2197 *next_stmt_id += 1;
2198
2199 (map_ident, input_location_id)
2200 }
2201
2202 HydroNode::FlatMap { f, input, .. } => {
2203 let (input_ident, input_location_id) =
2204 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2205
2206 let flat_map_ident =
2207 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2208
2209 match builders_or_callback {
2210 BuildersOrCallback::Builders(graph_builders) => {
2211 let builder = graph_builders.entry(input_location_id).or_default();
2212 builder.add_dfir(
2213 parse_quote! {
2214 #flat_map_ident = #input_ident -> flat_map(#f);
2215 },
2216 None,
2217 Some(&next_stmt_id.to_string()),
2218 );
2219 }
2220 BuildersOrCallback::Callback(_, node_callback) => {
2221 node_callback(self, next_stmt_id);
2222 }
2223 }
2224
2225 *next_stmt_id += 1;
2226
2227 (flat_map_ident, input_location_id)
2228 }
2229
2230 HydroNode::Filter { f, input, .. } => {
2231 let (input_ident, input_location_id) =
2232 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2233
2234 let filter_ident =
2235 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2236
2237 match builders_or_callback {
2238 BuildersOrCallback::Builders(graph_builders) => {
2239 let builder = graph_builders.entry(input_location_id).or_default();
2240 builder.add_dfir(
2241 parse_quote! {
2242 #filter_ident = #input_ident -> filter(#f);
2243 },
2244 None,
2245 Some(&next_stmt_id.to_string()),
2246 );
2247 }
2248 BuildersOrCallback::Callback(_, node_callback) => {
2249 node_callback(self, next_stmt_id);
2250 }
2251 }
2252
2253 *next_stmt_id += 1;
2254
2255 (filter_ident, input_location_id)
2256 }
2257
2258 HydroNode::FilterMap { f, input, .. } => {
2259 let (input_ident, input_location_id) =
2260 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2261
2262 let filter_map_ident =
2263 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2264
2265 match builders_or_callback {
2266 BuildersOrCallback::Builders(graph_builders) => {
2267 let builder = graph_builders.entry(input_location_id).or_default();
2268 builder.add_dfir(
2269 parse_quote! {
2270 #filter_map_ident = #input_ident -> filter_map(#f);
2271 },
2272 None,
2273 Some(&next_stmt_id.to_string()),
2274 );
2275 }
2276 BuildersOrCallback::Callback(_, node_callback) => {
2277 node_callback(self, next_stmt_id);
2278 }
2279 }
2280
2281 *next_stmt_id += 1;
2282
2283 (filter_map_ident, input_location_id)
2284 }
2285
2286 HydroNode::Sort { input, .. } => {
2287 let (input_ident, input_location_id) =
2288 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2289
2290 let sort_ident =
2291 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2292
2293 match builders_or_callback {
2294 BuildersOrCallback::Builders(graph_builders) => {
2295 let builder = graph_builders.entry(input_location_id).or_default();
2296 builder.add_dfir(
2297 parse_quote! {
2298 #sort_ident = #input_ident -> sort();
2299 },
2300 None,
2301 Some(&next_stmt_id.to_string()),
2302 );
2303 }
2304 BuildersOrCallback::Callback(_, node_callback) => {
2305 node_callback(self, next_stmt_id);
2306 }
2307 }
2308
2309 *next_stmt_id += 1;
2310
2311 (sort_ident, input_location_id)
2312 }
2313
2314 HydroNode::DeferTick { input, .. } => {
2315 let (input_ident, input_location_id) =
2316 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2317
2318 let defer_tick_ident =
2319 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2320
2321 match builders_or_callback {
2322 BuildersOrCallback::Builders(graph_builders) => {
2323 let builder = graph_builders.entry(input_location_id).or_default();
2324 builder.add_dfir(
2325 parse_quote! {
2326 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2327 },
2328 None,
2329 Some(&next_stmt_id.to_string()),
2330 );
2331 }
2332 BuildersOrCallback::Callback(_, node_callback) => {
2333 node_callback(self, next_stmt_id);
2334 }
2335 }
2336
2337 *next_stmt_id += 1;
2338
2339 (defer_tick_ident, input_location_id)
2340 }
2341
2342 HydroNode::Enumerate { input, .. } => {
2343 let (input_ident, input_location_id) =
2344 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2345
2346 let enumerate_ident =
2347 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2348
2349 match builders_or_callback {
2350 BuildersOrCallback::Builders(graph_builders) => {
2351 let builder = graph_builders.entry(input_location_id).or_default();
2352 let lifetime = if input.metadata().location_kind.is_top_level() {
2353 quote!('static)
2354 } else {
2355 quote!('tick)
2356 };
2357 builder.add_dfir(
2358 parse_quote! {
2359 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2360 },
2361 None,
2362 Some(&next_stmt_id.to_string()),
2363 );
2364 }
2365 BuildersOrCallback::Callback(_, node_callback) => {
2366 node_callback(self, next_stmt_id);
2367 }
2368 }
2369
2370 *next_stmt_id += 1;
2371
2372 (enumerate_ident, input_location_id)
2373 }
2374
2375 HydroNode::Inspect { f, input, .. } => {
2376 let (input_ident, input_location_id) =
2377 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2378
2379 let inspect_ident =
2380 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2381
2382 match builders_or_callback {
2383 BuildersOrCallback::Builders(graph_builders) => {
2384 let builder = graph_builders.entry(input_location_id).or_default();
2385 builder.add_dfir(
2386 parse_quote! {
2387 #inspect_ident = #input_ident -> inspect(#f);
2388 },
2389 None,
2390 Some(&next_stmt_id.to_string()),
2391 );
2392 }
2393 BuildersOrCallback::Callback(_, node_callback) => {
2394 node_callback(self, next_stmt_id);
2395 }
2396 }
2397
2398 *next_stmt_id += 1;
2399
2400 (inspect_ident, input_location_id)
2401 }
2402
2403 HydroNode::Unique { input, .. } => {
2404 let (input_ident, input_location_id) =
2405 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2406
2407 let unique_ident =
2408 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2409
2410 match builders_or_callback {
2411 BuildersOrCallback::Builders(graph_builders) => {
2412 let builder = graph_builders.entry(input_location_id).or_default();
2413 builder.add_dfir(
2414 parse_quote! {
2415 #unique_ident = #input_ident -> unique::<'tick>();
2416 },
2417 None,
2418 Some(&next_stmt_id.to_string()),
2419 );
2420 }
2421 BuildersOrCallback::Callback(_, node_callback) => {
2422 node_callback(self, next_stmt_id);
2423 }
2424 }
2425
2426 *next_stmt_id += 1;
2427
2428 (unique_ident, input_location_id)
2429 }
2430
2431 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2432 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2433 parse_quote!(fold)
2434 } else if matches!(self, HydroNode::Scan { .. }) {
2435 parse_quote!(scan)
2436 } else {
2437 parse_quote!(fold_keyed)
2438 };
2439
2440 let (HydroNode::Fold {
2441 init, acc, input, ..
2442 }
2443 | HydroNode::FoldKeyed {
2444 init, acc, input, ..
2445 }
2446 | HydroNode::Scan {
2447 init, acc, input, ..
2448 }) = self
2449 else {
2450 unreachable!()
2451 };
2452
2453 let (input, lifetime) =
2454 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2455 debug_assert!(!input.metadata().location_kind.is_top_level());
2456 (input, quote!('static))
2457 } else if input.metadata().location_kind.is_top_level() {
2458 (input, quote!('static))
2459 } else {
2460 (input, quote!('tick))
2461 };
2462
2463 let (input_ident, input_location_id) =
2464 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2465
2466 let fold_ident =
2467 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2468
2469 match builders_or_callback {
2470 BuildersOrCallback::Builders(graph_builders) => {
2471 let builder = graph_builders.entry(input_location_id).or_default();
2472 builder.add_dfir(
2473 parse_quote! {
2474 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2475 },
2476 None,
2477 Some(&next_stmt_id.to_string()),
2478 );
2479 }
2480 BuildersOrCallback::Callback(_, node_callback) => {
2481 node_callback(self, next_stmt_id);
2482 }
2483 }
2484
2485 *next_stmt_id += 1;
2486
2487 (fold_ident, input_location_id)
2488 }
2489
2490 HydroNode::ReduceKeyedWatermark {
2491 f,
2492 input,
2493 watermark,
2494 ..
2495 } => {
2496 let (input, lifetime) =
2497 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2498 debug_assert!(!input.metadata().location_kind.is_top_level());
2499 (input, quote!('static))
2500 } else if input.metadata().location_kind.is_top_level() {
2501 (input, quote!('static))
2502 } else {
2503 (input, quote!('tick))
2504 };
2505
2506 let (input_ident, input_location_id) =
2507 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2508
2509 let (watermark_ident, watermark_location_id) =
2510 watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2511
2512 let chain_ident = syn::Ident::new(
2513 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2514 Span::call_site(),
2515 );
2516
2517 let fold_ident =
2518 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2519
2520 match builders_or_callback {
2521 BuildersOrCallback::Builders(graph_builders) => {
2522 assert_eq!(
2523 input_location_id, watermark_location_id,
2524 "ReduceKeyedWatermark inputs must be in the same location"
2525 );
2526
2527 let builder = graph_builders.entry(input_location_id).or_default();
2528 builder.add_dfir(
2533 parse_quote! {
2534 #chain_ident = chain();
2535 #input_ident
2536 -> map(|x| (Some(x), None))
2537 -> [0]#chain_ident;
2538 #watermark_ident
2539 -> map(|watermark| (None, Some(watermark)))
2540 -> [1]#chain_ident;
2541
2542 #fold_ident = #chain_ident
2543 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2544 let __reduce_keyed_fn = #f;
2545 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2546 if let Some((k, v)) = opt_payload {
2547 if let Some(curr_watermark) = *opt_curr_watermark {
2548 if k <= curr_watermark {
2549 return;
2550 }
2551 }
2552 match map.entry(k) {
2553 ::std::collections::hash_map::Entry::Vacant(e) => {
2554 e.insert(v);
2555 }
2556 ::std::collections::hash_map::Entry::Occupied(mut e) => {
2557 __reduce_keyed_fn(e.get_mut(), v);
2558 }
2559 }
2560 } else {
2561 let watermark = opt_watermark.unwrap();
2562 if let Some(curr_watermark) = *opt_curr_watermark {
2563 if watermark <= curr_watermark {
2564 return;
2565 }
2566 }
2567 *opt_curr_watermark = opt_watermark;
2568 map.retain(|k, _| *k > watermark);
2569 }
2570 }
2571 })
2572 -> flat_map(|(map, _curr_watermark)| map);
2573 },
2574 None,
2575 Some(&next_stmt_id.to_string()),
2576 );
2577 }
2578 BuildersOrCallback::Callback(_, node_callback) => {
2579 node_callback(self, next_stmt_id);
2580 }
2581 }
2582
2583 *next_stmt_id += 1;
2584
2585 (fold_ident, input_location_id)
2586 }
2587
2588 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2589 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2590 parse_quote!(reduce)
2591 } else {
2592 parse_quote!(reduce_keyed)
2593 };
2594
2595 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2596 self
2597 else {
2598 unreachable!()
2599 };
2600
2601 let (input, lifetime) =
2602 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2603 debug_assert!(!input.metadata().location_kind.is_top_level());
2604 (input, quote!('static))
2605 } else if input.metadata().location_kind.is_top_level() {
2606 (input, quote!('static))
2607 } else {
2608 (input, quote!('tick))
2609 };
2610
2611 let (input_ident, input_location_id) =
2612 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2613
2614 let reduce_ident =
2615 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2616
2617 match builders_or_callback {
2618 BuildersOrCallback::Builders(graph_builders) => {
2619 let builder = graph_builders.entry(input_location_id).or_default();
2620 builder.add_dfir(
2621 parse_quote! {
2622 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2623 },
2624 None,
2625 Some(&next_stmt_id.to_string()),
2626 );
2627 }
2628 BuildersOrCallback::Callback(_, node_callback) => {
2629 node_callback(self, next_stmt_id);
2630 }
2631 }
2632
2633 *next_stmt_id += 1;
2634
2635 (reduce_ident, input_location_id)
2636 }
2637
2638 HydroNode::Network {
2639 serialize_fn: serialize_pipeline,
2640 instantiate_fn,
2641 deserialize_fn: deserialize_pipeline,
2642 input,
2643 metadata,
2644 ..
2645 } => {
2646 let (input_ident, input_location_id) =
2647 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2648
2649 let to_id = metadata.location_kind.root().raw_id();
2650
2651 let receiver_stream_ident =
2652 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2653
2654 match builders_or_callback {
2655 BuildersOrCallback::Builders(graph_builders) => {
2656 let (sink_expr, source_expr) = match instantiate_fn {
2657 DebugInstantiate::Building => (
2658 syn::parse_quote!(DUMMY_SINK),
2659 syn::parse_quote!(DUMMY_SOURCE),
2660 ),
2661
2662 DebugInstantiate::Finalized(finalized) => {
2663 (finalized.sink.clone(), finalized.source.clone())
2664 }
2665 };
2666
2667 let sender_builder = graph_builders.entry(input_location_id).or_default();
2668 if let Some(serialize_pipeline) = serialize_pipeline {
2669 sender_builder.add_dfir(
2670 parse_quote! {
2671 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
2672 },
2673 None,
2674 Some(&format!("send{}", next_stmt_id)),
2676 );
2677 } else {
2678 sender_builder.add_dfir(
2679 parse_quote! {
2680 #input_ident -> dest_sink(#sink_expr);
2681 },
2682 None,
2683 Some(&format!("send{}", next_stmt_id)),
2684 );
2685 }
2686
2687 let receiver_builder = graph_builders.entry(to_id).or_default();
2688 if let Some(deserialize_pipeline) = deserialize_pipeline {
2689 receiver_builder.add_dfir(parse_quote! {
2690 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2691 }, None, Some(&format!("recv{}", next_stmt_id)));
2692 } else {
2693 receiver_builder.add_dfir(
2694 parse_quote! {
2695 #receiver_stream_ident = source_stream(#source_expr);
2696 },
2697 None,
2698 Some(&format!("recv{}", next_stmt_id)),
2699 );
2700 }
2701 }
2702 BuildersOrCallback::Callback(_, node_callback) => {
2703 node_callback(self, next_stmt_id);
2704 }
2705 }
2706
2707 *next_stmt_id += 1;
2708
2709 (receiver_stream_ident, to_id)
2710 }
2711
2712 HydroNode::ExternalInput {
2713 instantiate_fn,
2714 deserialize_fn: deserialize_pipeline,
2715 metadata,
2716 ..
2717 } => {
2718 let to_id = metadata.location_kind.root().raw_id();
2719
2720 let receiver_stream_ident =
2721 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2722
2723 match builders_or_callback {
2724 BuildersOrCallback::Builders(graph_builders) => {
2725 let (_, source_expr) = match instantiate_fn {
2726 DebugInstantiate::Building => (
2727 syn::parse_quote!(DUMMY_SINK),
2728 syn::parse_quote!(DUMMY_SOURCE),
2729 ),
2730
2731 DebugInstantiate::Finalized(finalized) => {
2732 (finalized.sink.clone(), finalized.source.clone())
2733 }
2734 };
2735
2736 let receiver_builder = graph_builders.entry(to_id).or_default();
2737 if let Some(deserialize_pipeline) = deserialize_pipeline {
2738 receiver_builder.add_dfir(parse_quote! {
2739 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2740 }, None, Some(&format!("recv{}", next_stmt_id)));
2741 } else {
2742 receiver_builder.add_dfir(
2743 parse_quote! {
2744 #receiver_stream_ident = source_stream(#source_expr);
2745 },
2746 None,
2747 Some(&format!("recv{}", next_stmt_id)),
2748 );
2749 }
2750 }
2751 BuildersOrCallback::Callback(_, node_callback) => {
2752 node_callback(self, next_stmt_id);
2753 }
2754 }
2755
2756 *next_stmt_id += 1;
2757
2758 (receiver_stream_ident, to_id)
2759 }
2760
2761 HydroNode::Counter {
2762 tag,
2763 duration,
2764 prefix,
2765 input,
2766 ..
2767 } => {
2768 let (input_ident, input_location_id) =
2769 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2770
2771 let counter_ident =
2772 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2773
2774 match builders_or_callback {
2775 BuildersOrCallback::Builders(graph_builders) => {
2776 let builder = graph_builders.entry(input_location_id).or_default();
2777 builder.add_dfir(
2778 parse_quote! {
2779 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
2780 },
2781 None,
2782 Some(&next_stmt_id.to_string()),
2783 );
2784 }
2785 BuildersOrCallback::Callback(_, node_callback) => {
2786 node_callback(self, next_stmt_id);
2787 }
2788 }
2789
2790 *next_stmt_id += 1;
2791
2792 (counter_ident, input_location_id)
2793 }
2794 }
2795 }
2796
2797 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2798 match self {
2799 HydroNode::Placeholder => {
2800 panic!()
2801 }
2802 HydroNode::Source { source, .. } => match source {
2803 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2804 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2805 },
2806 HydroNode::CycleSource { .. }
2807 | HydroNode::Tee { .. }
2808 | HydroNode::Persist { .. }
2809 | HydroNode::YieldConcat { .. }
2810 | HydroNode::BeginAtomic { .. }
2811 | HydroNode::EndAtomic { .. }
2812 | HydroNode::Batch { .. }
2813 | HydroNode::Chain { .. }
2814 | HydroNode::ChainFirst { .. }
2815 | HydroNode::CrossProduct { .. }
2816 | HydroNode::CrossSingleton { .. }
2817 | HydroNode::ResolveFutures { .. }
2818 | HydroNode::ResolveFuturesOrdered { .. }
2819 | HydroNode::Join { .. }
2820 | HydroNode::Difference { .. }
2821 | HydroNode::AntiJoin { .. }
2822 | HydroNode::DeferTick { .. }
2823 | HydroNode::Enumerate { .. }
2824 | HydroNode::Unique { .. }
2825 | HydroNode::Sort { .. } => {}
2826 HydroNode::Map { f, .. }
2827 | HydroNode::FlatMap { f, .. }
2828 | HydroNode::Filter { f, .. }
2829 | HydroNode::FilterMap { f, .. }
2830 | HydroNode::Inspect { f, .. }
2831 | HydroNode::Reduce { f, .. }
2832 | HydroNode::ReduceKeyed { f, .. }
2833 | HydroNode::ReduceKeyedWatermark { f, .. } => {
2834 transform(f);
2835 }
2836 HydroNode::Fold { init, acc, .. }
2837 | HydroNode::Scan { init, acc, .. }
2838 | HydroNode::FoldKeyed { init, acc, .. } => {
2839 transform(init);
2840 transform(acc);
2841 }
2842 HydroNode::Network {
2843 serialize_fn,
2844 deserialize_fn,
2845 ..
2846 } => {
2847 if let Some(serialize_fn) = serialize_fn {
2848 transform(serialize_fn);
2849 }
2850 if let Some(deserialize_fn) = deserialize_fn {
2851 transform(deserialize_fn);
2852 }
2853 }
2854 HydroNode::ExternalInput { deserialize_fn, .. } => {
2855 if let Some(deserialize_fn) = deserialize_fn {
2856 transform(deserialize_fn);
2857 }
2858 }
2859 HydroNode::Counter { duration, .. } => {
2860 transform(duration);
2861 }
2862 }
2863 }
2864
2865 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
2866 &self.metadata().op
2867 }
2868
2869 pub fn metadata(&self) -> &HydroIrMetadata {
2870 match self {
2871 HydroNode::Placeholder => {
2872 panic!()
2873 }
2874 HydroNode::Source { metadata, .. } => metadata,
2875 HydroNode::CycleSource { metadata, .. } => metadata,
2876 HydroNode::Tee { metadata, .. } => metadata,
2877 HydroNode::Persist { metadata, .. } => metadata,
2878 HydroNode::YieldConcat { metadata, .. } => metadata,
2879 HydroNode::BeginAtomic { metadata, .. } => metadata,
2880 HydroNode::EndAtomic { metadata, .. } => metadata,
2881 HydroNode::Batch { metadata, .. } => metadata,
2882 HydroNode::Chain { metadata, .. } => metadata,
2883 HydroNode::ChainFirst { metadata, .. } => metadata,
2884 HydroNode::CrossProduct { metadata, .. } => metadata,
2885 HydroNode::CrossSingleton { metadata, .. } => metadata,
2886 HydroNode::Join { metadata, .. } => metadata,
2887 HydroNode::Difference { metadata, .. } => metadata,
2888 HydroNode::AntiJoin { metadata, .. } => metadata,
2889 HydroNode::ResolveFutures { metadata, .. } => metadata,
2890 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2891 HydroNode::Map { metadata, .. } => metadata,
2892 HydroNode::FlatMap { metadata, .. } => metadata,
2893 HydroNode::Filter { metadata, .. } => metadata,
2894 HydroNode::FilterMap { metadata, .. } => metadata,
2895 HydroNode::DeferTick { metadata, .. } => metadata,
2896 HydroNode::Enumerate { metadata, .. } => metadata,
2897 HydroNode::Inspect { metadata, .. } => metadata,
2898 HydroNode::Unique { metadata, .. } => metadata,
2899 HydroNode::Sort { metadata, .. } => metadata,
2900 HydroNode::Scan { metadata, .. } => metadata,
2901 HydroNode::Fold { metadata, .. } => metadata,
2902 HydroNode::FoldKeyed { metadata, .. } => metadata,
2903 HydroNode::Reduce { metadata, .. } => metadata,
2904 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2905 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2906 HydroNode::ExternalInput { metadata, .. } => metadata,
2907 HydroNode::Network { metadata, .. } => metadata,
2908 HydroNode::Counter { metadata, .. } => metadata,
2909 }
2910 }
2911
2912 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
2913 &mut self.metadata_mut().op
2914 }
2915
2916 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2917 match self {
2918 HydroNode::Placeholder => {
2919 panic!()
2920 }
2921 HydroNode::Source { metadata, .. } => metadata,
2922 HydroNode::CycleSource { metadata, .. } => metadata,
2923 HydroNode::Tee { metadata, .. } => metadata,
2924 HydroNode::Persist { metadata, .. } => metadata,
2925 HydroNode::YieldConcat { metadata, .. } => metadata,
2926 HydroNode::BeginAtomic { metadata, .. } => metadata,
2927 HydroNode::EndAtomic { metadata, .. } => metadata,
2928 HydroNode::Batch { metadata, .. } => metadata,
2929 HydroNode::Chain { metadata, .. } => metadata,
2930 HydroNode::ChainFirst { metadata, .. } => metadata,
2931 HydroNode::CrossProduct { metadata, .. } => metadata,
2932 HydroNode::CrossSingleton { metadata, .. } => metadata,
2933 HydroNode::Join { metadata, .. } => metadata,
2934 HydroNode::Difference { metadata, .. } => metadata,
2935 HydroNode::AntiJoin { metadata, .. } => metadata,
2936 HydroNode::ResolveFutures { metadata, .. } => metadata,
2937 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2938 HydroNode::Map { metadata, .. } => metadata,
2939 HydroNode::FlatMap { metadata, .. } => metadata,
2940 HydroNode::Filter { metadata, .. } => metadata,
2941 HydroNode::FilterMap { metadata, .. } => metadata,
2942 HydroNode::DeferTick { metadata, .. } => metadata,
2943 HydroNode::Enumerate { metadata, .. } => metadata,
2944 HydroNode::Inspect { metadata, .. } => metadata,
2945 HydroNode::Unique { metadata, .. } => metadata,
2946 HydroNode::Sort { metadata, .. } => metadata,
2947 HydroNode::Scan { metadata, .. } => metadata,
2948 HydroNode::Fold { metadata, .. } => metadata,
2949 HydroNode::FoldKeyed { metadata, .. } => metadata,
2950 HydroNode::Reduce { metadata, .. } => metadata,
2951 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2952 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2953 HydroNode::ExternalInput { metadata, .. } => metadata,
2954 HydroNode::Network { metadata, .. } => metadata,
2955 HydroNode::Counter { metadata, .. } => metadata,
2956 }
2957 }
2958
2959 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
2960 match self {
2961 HydroNode::Placeholder => {
2962 panic!()
2963 }
2964 HydroNode::Source { .. }
2965 | HydroNode::ExternalInput { .. }
2966 | HydroNode::CycleSource { .. } | HydroNode::Tee { .. } => {
2968 vec![]
2969 }
2970 HydroNode::Persist { inner, .. }
2971 | HydroNode::YieldConcat { inner, .. }
2972 | HydroNode::BeginAtomic { inner, .. }
2973 | HydroNode::EndAtomic { inner, .. }
2974 | HydroNode::Batch { inner, .. } => {
2975 vec![inner.metadata()]
2976 }
2977 HydroNode::Chain { first, second, .. } => {
2978 vec![first.metadata(), second.metadata()]
2979 }
2980 HydroNode::ChainFirst { first, second, .. } => {
2981 vec![first.metadata(), second.metadata()]
2982 }
2983 HydroNode::CrossProduct { left, right, .. }
2984 | HydroNode::CrossSingleton { left, right, .. }
2985 | HydroNode::Join { left, right, .. } => {
2986 vec![left.metadata(), right.metadata()]
2987 }
2988 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2989 vec![pos.metadata(), neg.metadata()]
2990 }
2991 HydroNode::Map { input, .. }
2992 | HydroNode::FlatMap { input, .. }
2993 | HydroNode::Filter { input, .. }
2994 | HydroNode::FilterMap { input, .. }
2995 | HydroNode::Sort { input, .. }
2996 | HydroNode::DeferTick { input, .. }
2997 | HydroNode::Enumerate { input, .. }
2998 | HydroNode::Inspect { input, .. }
2999 | HydroNode::Unique { input, .. }
3000 | HydroNode::Network { input, .. }
3001 | HydroNode::Counter { input, .. }
3002 | HydroNode::ResolveFutures { input, .. }
3003 | HydroNode::ResolveFuturesOrdered { input, .. } => {
3004 vec![input.metadata()]
3005 }
3006 HydroNode::Fold { input, .. }
3007 | HydroNode::FoldKeyed { input, .. }
3008 | HydroNode::Reduce { input, .. }
3009 | HydroNode::ReduceKeyed { input, .. }
3010 | HydroNode::Scan { input, .. } => {
3011 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3013 vec![inner.metadata()]
3014 } else {
3015 vec![input.metadata()]
3016 }
3017 }
3018 HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
3019 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3021 vec![inner.metadata(), watermark.metadata()]
3022 } else {
3023 vec![input.metadata(), watermark.metadata()]
3024 }
3025 }
3026 }
3027 }
3028
3029 pub fn print_root(&self) -> String {
3030 match self {
3031 HydroNode::Placeholder => {
3032 panic!()
3033 }
3034 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3035 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3036 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3037 HydroNode::Persist { .. } => "Persist()".to_string(),
3038 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3039 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3040 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3041 HydroNode::Batch { .. } => "Batch()".to_string(),
3042 HydroNode::Chain { first, second, .. } => {
3043 format!("Chain({}, {})", first.print_root(), second.print_root())
3044 }
3045 HydroNode::ChainFirst { first, second, .. } => {
3046 format!(
3047 "ChainFirst({}, {})",
3048 first.print_root(),
3049 second.print_root()
3050 )
3051 }
3052 HydroNode::CrossProduct { left, right, .. } => {
3053 format!(
3054 "CrossProduct({}, {})",
3055 left.print_root(),
3056 right.print_root()
3057 )
3058 }
3059 HydroNode::CrossSingleton { left, right, .. } => {
3060 format!(
3061 "CrossSingleton({}, {})",
3062 left.print_root(),
3063 right.print_root()
3064 )
3065 }
3066 HydroNode::Join { left, right, .. } => {
3067 format!("Join({}, {})", left.print_root(), right.print_root())
3068 }
3069 HydroNode::Difference { pos, neg, .. } => {
3070 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3071 }
3072 HydroNode::AntiJoin { pos, neg, .. } => {
3073 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3074 }
3075 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3076 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3077 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3078 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3079 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3080 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3081 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3082 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3083 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3084 HydroNode::Unique { .. } => "Unique()".to_string(),
3085 HydroNode::Sort { .. } => "Sort()".to_string(),
3086 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3087 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3088 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3089 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3090 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3091 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3092 HydroNode::Network { .. } => "Network()".to_string(),
3093 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3094 HydroNode::Counter { tag, duration, .. } => {
3095 format!("Counter({:?}, {:?})", tag, duration)
3096 }
3097 }
3098 }
3099}
3100
3101#[cfg(feature = "build")]
3102fn instantiate_network<'a, D>(
3103 from_location: &LocationId,
3104 to_location: &LocationId,
3105 processes: &HashMap<usize, D::Process>,
3106 clusters: &HashMap<usize, D::Cluster>,
3107 compile_env: &D::CompileEnv,
3108) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3109where
3110 D: Deploy<'a>,
3111{
3112 let ((sink, source), connect_fn) = match (from_location, to_location) {
3113 (LocationId::Process(from), LocationId::Process(to)) => {
3114 let from_node = processes
3115 .get(from)
3116 .unwrap_or_else(|| {
3117 panic!("A process used in the graph was not instantiated: {}", from)
3118 })
3119 .clone();
3120 let to_node = processes
3121 .get(to)
3122 .unwrap_or_else(|| {
3123 panic!("A process used in the graph was not instantiated: {}", to)
3124 })
3125 .clone();
3126
3127 let sink_port = D::allocate_process_port(&from_node);
3128 let source_port = D::allocate_process_port(&to_node);
3129
3130 (
3131 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3132 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3133 )
3134 }
3135 (LocationId::Process(from), LocationId::Cluster(to)) => {
3136 let from_node = processes
3137 .get(from)
3138 .unwrap_or_else(|| {
3139 panic!("A process used in the graph was not instantiated: {}", from)
3140 })
3141 .clone();
3142 let to_node = clusters
3143 .get(to)
3144 .unwrap_or_else(|| {
3145 panic!("A cluster used in the graph was not instantiated: {}", to)
3146 })
3147 .clone();
3148
3149 let sink_port = D::allocate_process_port(&from_node);
3150 let source_port = D::allocate_cluster_port(&to_node);
3151
3152 (
3153 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3154 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3155 )
3156 }
3157 (LocationId::Cluster(from), LocationId::Process(to)) => {
3158 let from_node = clusters
3159 .get(from)
3160 .unwrap_or_else(|| {
3161 panic!("A cluster used in the graph was not instantiated: {}", from)
3162 })
3163 .clone();
3164 let to_node = processes
3165 .get(to)
3166 .unwrap_or_else(|| {
3167 panic!("A process used in the graph was not instantiated: {}", to)
3168 })
3169 .clone();
3170
3171 let sink_port = D::allocate_cluster_port(&from_node);
3172 let source_port = D::allocate_process_port(&to_node);
3173
3174 (
3175 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3176 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3177 )
3178 }
3179 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3180 let from_node = clusters
3181 .get(from)
3182 .unwrap_or_else(|| {
3183 panic!("A cluster used in the graph was not instantiated: {}", from)
3184 })
3185 .clone();
3186 let to_node = clusters
3187 .get(to)
3188 .unwrap_or_else(|| {
3189 panic!("A cluster used in the graph was not instantiated: {}", to)
3190 })
3191 .clone();
3192
3193 let sink_port = D::allocate_cluster_port(&from_node);
3194 let source_port = D::allocate_cluster_port(&to_node);
3195
3196 (
3197 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3198 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3199 )
3200 }
3201 (LocationId::Tick(_, _), _) => panic!(),
3202 (_, LocationId::Tick(_, _)) => panic!(),
3203 (LocationId::Atomic(_), _) => panic!(),
3204 (_, LocationId::Atomic(_)) => panic!(),
3205 };
3206 (sink, source, connect_fn)
3207}
3208
3209#[cfg(test)]
3210mod test {
3211 use std::mem::size_of;
3212
3213 use stageleft::{QuotedWithContext, q};
3214
3215 use super::*;
3216
3217 #[test]
3218 fn hydro_node_size() {
3219 assert_eq!(size_of::<HydroNode>(), 248);
3220 }
3221
3222 #[test]
3223 fn hydro_root_size() {
3224 assert_eq!(size_of::<HydroRoot>(), 176);
3225 }
3226
3227 #[test]
3228 fn test_simplify_q_macro_basic() {
3229 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3231 let result = simplify_q_macro(simple_expr.clone());
3232 assert_eq!(result, simple_expr);
3233 }
3234
3235 #[test]
3236 fn test_simplify_q_macro_actual_stageleft_call() {
3237 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3239 let result = simplify_q_macro(stageleft_call);
3240 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3243 }
3244
3245 #[test]
3246 fn test_closure_no_pipe_at_start() {
3247 let stageleft_call = q!({
3249 let foo = 123;
3250 move |b: usize| b + foo
3251 })
3252 .splice_fn1_ctx(&());
3253 let result = simplify_q_macro(stageleft_call);
3254 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3255 }
3256}