1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::Bounded;
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39 pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43 fn id(&self) -> LocationId {
44 LocationId::Atomic(Box::new(self.tick.id()))
45 }
46
47 fn flow_state(&self) -> &FlowState {
48 self.tick.flow_state()
49 }
50
51 fn is_top_level() -> bool {
52 L::is_top_level()
53 }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58 L: Location<'a>,
59{
60 type Root = L::Root;
61
62 fn root(&self) -> Self::Root {
63 self.tick.root()
64 }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71 fn defer_tick(self) -> Self;
72}
73
74#[derive(Clone)]
76pub struct Tick<L> {
77 pub(crate) id: usize,
78 pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82 fn id(&self) -> LocationId {
83 LocationId::Tick(self.id, Box::new(self.l.id()))
84 }
85
86 fn flow_state(&self) -> &FlowState {
87 self.l.flow_state()
88 }
89
90 fn is_top_level() -> bool {
91 false
92 }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97 L: Location<'a>,
98{
99 type Root = L::Root;
100
101 fn root(&self) -> Self::Root {
102 self.l.root()
103 }
104}
105
106impl<'a, L> Tick<L>
107where
108 L: Location<'a>,
109{
110 pub fn outer(&self) -> &L {
111 &self.l
112 }
113
114 pub fn spin_batch(
115 &self,
116 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
117 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
118 where
119 L: NoTick,
120 {
121 let out = self
122 .l
123 .spin()
124 .flat_map_ordered(q!(move |_| 0..batch_size))
125 .map(q!(|_| ()));
126
127 out.batch(self, nondet!())
128 }
129
130 pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
131 where
132 T: Clone,
133 L: NoTick,
134 {
135 self.outer().singleton(e).snapshot(
136 self,
137 nondet!(),
138 )
139 }
140
141 pub fn optional_first_tick<T: Clone>(
165 &self,
166 e: impl QuotedWithContext<'a, T, Tick<L>>,
167 ) -> Optional<T, Self, Bounded> {
168 let e_arr = q!([e]);
169 let e = e_arr.splice_untyped_ctx(self);
170
171 Optional::new(
172 self.clone(),
173 HydroNode::Batch {
174 inner: Box::new(HydroNode::Source {
175 source: HydroSource::Iter(e.into()),
176 metadata: self.outer().new_node_metadata::<T>(),
177 }),
178 metadata: self.new_node_metadata::<T>(),
179 },
180 )
181 }
182
183 #[expect(
184 private_bounds,
185 reason = "only Hydro collections can implement ReceiverComplete"
186 )]
187 pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
188 where
189 S: CycleCollection<'a, ForwardRef, Location = Self>,
190 L: NoTick,
191 {
192 let next_id = self.flow_state().borrow_mut().next_cycle_id();
193 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
194
195 (
196 ForwardHandle {
197 completed: false,
198 ident: ident.clone(),
199 expected_location: Location::id(self),
200 _phantom: PhantomData,
201 },
202 S::create_source(ident, self.clone()),
203 )
204 }
205
206 #[expect(
207 private_bounds,
208 reason = "only Hydro collections can implement ReceiverComplete"
209 )]
210 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
211 where
212 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
213 L: NoTick,
214 {
215 let next_id = self.flow_state().borrow_mut().next_cycle_id();
216 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
217
218 (
219 TickCycleHandle {
220 completed: false,
221 ident: ident.clone(),
222 expected_location: Location::id(self),
223 _phantom: PhantomData,
224 },
225 S::create_source(ident, self.clone()).defer_tick(),
226 )
227 }
228
229 #[expect(
230 private_bounds,
231 reason = "only Hydro collections can implement ReceiverComplete"
232 )]
233 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
234 where
235 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
236 L: NoTick,
237 {
238 let next_id = self.flow_state().borrow_mut().next_cycle_id();
239 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
240
241 (
242 TickCycleHandle {
243 completed: false,
244 ident: ident.clone(),
245 expected_location: Location::id(self),
246 _phantom: PhantomData,
247 },
248 S::create_source_with_initial(ident, initial, self.clone()),
250 )
251 }
252}