hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::Bounded;
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39    pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43    fn id(&self) -> LocationId {
44        LocationId::Atomic(Box::new(self.tick.id()))
45    }
46
47    fn flow_state(&self) -> &FlowState {
48        self.tick.flow_state()
49    }
50
51    fn is_top_level() -> bool {
52        L::is_top_level()
53    }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58    L: Location<'a>,
59{
60    type Root = L::Root;
61
62    fn root(&self) -> Self::Root {
63        self.tick.root()
64    }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71    fn defer_tick(self) -> Self;
72}
73
74/// Marks the stream as being inside the single global clock domain.
75#[derive(Clone)]
76pub struct Tick<L> {
77    pub(crate) id: usize,
78    pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82    fn id(&self) -> LocationId {
83        LocationId::Tick(self.id, Box::new(self.l.id()))
84    }
85
86    fn flow_state(&self) -> &FlowState {
87        self.l.flow_state()
88    }
89
90    fn is_top_level() -> bool {
91        false
92    }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97    L: Location<'a>,
98{
99    type Root = L::Root;
100
101    fn root(&self) -> Self::Root {
102        self.l.root()
103    }
104}
105
106impl<'a, L> Tick<L>
107where
108    L: Location<'a>,
109{
110    pub fn outer(&self) -> &L {
111        &self.l
112    }
113
114    pub fn spin_batch(
115        &self,
116        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
117    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
118    where
119        L: NoTick,
120    {
121        let out = self
122            .l
123            .spin()
124            .flat_map_ordered(q!(move |_| 0..batch_size))
125            .map(q!(|_| ()));
126
127        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
128    }
129
130    pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
131    where
132        T: Clone,
133        L: NoTick,
134    {
135        self.outer().singleton(e).snapshot(
136            self,
137            nondet!(/** a top-level singleton produces the same value each tick */),
138        )
139    }
140
141    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
142    /// null on all subsequent ticks.
143    ///
144    /// This is useful for bootstrapping stateful computations which need an initial value.
145    ///
146    /// # Example
147    /// ```rust
148    /// # use hydro_lang::prelude::*;
149    /// # use futures::StreamExt;
150    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
151    /// let tick = process.tick();
152    /// // ticks are lazy by default, forces the second tick to run
153    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
154    /// let optional = tick.optional_first_tick(q!(5));
155    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
156    /// # }, |mut stream| async move {
157    /// // 5, 123, 123, 123, ...
158    /// # assert_eq!(stream.next().await.unwrap(), 5);
159    /// # assert_eq!(stream.next().await.unwrap(), 123);
160    /// # assert_eq!(stream.next().await.unwrap(), 123);
161    /// # assert_eq!(stream.next().await.unwrap(), 123);
162    /// # }));
163    /// ```
164    pub fn optional_first_tick<T: Clone>(
165        &self,
166        e: impl QuotedWithContext<'a, T, Tick<L>>,
167    ) -> Optional<T, Self, Bounded> {
168        let e_arr = q!([e]);
169        let e = e_arr.splice_untyped_ctx(self);
170
171        Optional::new(
172            self.clone(),
173            HydroNode::Batch {
174                inner: Box::new(HydroNode::Source {
175                    source: HydroSource::Iter(e.into()),
176                    metadata: self.outer().new_node_metadata::<T>(),
177                }),
178                metadata: self.new_node_metadata::<T>(),
179            },
180        )
181    }
182
183    #[expect(
184        private_bounds,
185        reason = "only Hydro collections can implement ReceiverComplete"
186    )]
187    pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
188    where
189        S: CycleCollection<'a, ForwardRef, Location = Self>,
190        L: NoTick,
191    {
192        let next_id = self.flow_state().borrow_mut().next_cycle_id();
193        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
194
195        (
196            ForwardHandle {
197                completed: false,
198                ident: ident.clone(),
199                expected_location: Location::id(self),
200                _phantom: PhantomData,
201            },
202            S::create_source(ident, self.clone()),
203        )
204    }
205
206    #[expect(
207        private_bounds,
208        reason = "only Hydro collections can implement ReceiverComplete"
209    )]
210    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
211    where
212        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
213        L: NoTick,
214    {
215        let next_id = self.flow_state().borrow_mut().next_cycle_id();
216        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
217
218        (
219            TickCycleHandle {
220                completed: false,
221                ident: ident.clone(),
222                expected_location: Location::id(self),
223                _phantom: PhantomData,
224            },
225            S::create_source(ident, self.clone()).defer_tick(),
226        )
227    }
228
229    #[expect(
230        private_bounds,
231        reason = "only Hydro collections can implement ReceiverComplete"
232    )]
233    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
234    where
235        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
236        L: NoTick,
237    {
238        let next_id = self.flow_state().borrow_mut().next_cycle_id();
239        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
240
241        (
242            TickCycleHandle {
243                completed: false,
244                ident: ident.clone(),
245                expected_location: Location::id(self),
246                _phantom: PhantomData,
247            },
248            // no need to defer_tick, create_source_with_initial does it for us
249            S::create_source_with_initial(ident, initial, self.clone()),
250        )
251    }
252}