aimx/values/
node.rs

1//! Workflow node handle with lazy loading and shared access.
2//!
3//! Provides [`Node`], a clonable handle to a workflow that loads on first use and
4//! shares an immutable snapshot across readers. Mutation is performed on owned
5//! `Workflow` values and committed via [`Node::set_workflow`].
6//! External locking is required for write coordination.
7
8use crate::{
9    aim::{Workflow, WorkflowLike, Writer, WriterLike},
10    expressions::Reference,
11    inference::{Inference, Pattern, parse_inference},
12};
13use nom::IResult;
14use std::{
15    fmt,
16    sync::{Arc, RwLock},
17};
18use anyhow::{Result, anyhow};
19
20/// Internal state for a workflow node.
21///
22/// `Unloaded` stores the [`Reference`]. `Loaded` stores the shared [`Workflow`].
23#[derive(Debug, Clone)]
24enum NodeState {
25    Pending { inference: Arc<Inference> },
26    /// Workflow is not loaded; holds the [`Reference`] used to load it.
27    Unloaded { reference: Arc<Reference>, inference: Arc<Inference> },
28    /// Workflow is loaded; holds the shared [`Workflow`] snapshot.
29    Loaded { workflow: Arc<Workflow>, inference: Arc<Inference> },
30}
31
32/// A clonable handle to a workflow with lazy loading and shared reads.
33#[derive(Debug, Clone)]
34pub struct Node {
35    /// Internal state protected by read-write lock
36    inner: Arc<RwLock<NodeState>>,
37}
38
39impl Default for Node {
40    fn default() -> Self {
41        let inference = Inference::new(Pattern::Evaluate, None);
42        Self::new(Arc::new(inference))
43    }
44}
45
46impl Node {
47    /// Creates a new `Node` in the `Pending` state with the given [`Inference`].
48    pub fn new(inference: Arc<Inference>) -> Self {
49        Node {
50            inner: Arc::new(RwLock::new(NodeState::Pending {
51                inference, 
52            })),
53        }
54    }
55
56    pub fn init_new(reference: Arc<Reference>, inference: Arc<Inference>) -> Self {
57        let node = Self::new(inference);
58        node.init(reference);
59        node
60    }
61
62    pub fn init_default(reference: Arc<Reference>) -> Self {
63        let inference = Inference::new(Pattern::Evaluate, None);
64        let node = Self::new(Arc::new(inference));
65        node.init(reference);
66        node
67    }
68
69    pub fn parse(input: &str) -> Result<Self> {
70        match parse_node(input) {
71            Ok((_, node)) => Ok(node),
72            Err(e) => Err(anyhow!("Parse error: {}", e)),
73        }
74    } 
75
76    /// Initializes the `Node` in the `Unloaded` state with the given [`Reference`].
77    pub fn init(&self, reference: Arc<Reference>) {
78        // Acquire read lock first
79        let read_guard = self.inner.read()
80            .expect("Lock poisoned in init() node read.");
81
82        if let NodeState::Pending { inference: _ } = &*read_guard {
83            // Need to load, so release read lock and acquire write lock
84            drop(read_guard);
85            let mut write_guard = self.inner.write()
86                .expect("Lock poisoned in init() node write.");
87            // Double-check after acquiring write lock
88            if let NodeState::Pending { inference} = &*write_guard {
89                *write_guard = NodeState::Unloaded { 
90                    reference,
91                    inference: inference.clone(),
92                };
93            }
94        }
95    }
96
97pub fn reference(&self) -> Result<Arc<Reference>> {
98       // Acquire read lock first
99         let read_guard = self.inner.read()
100            .expect("Lock poisoned in reference() node read.");
101
102         match &*read_guard {
103             NodeState::Pending { inference: _ } => Err(anyhow!("Node not initialized")),
104             NodeState::Unloaded { reference, inference: _ } => Ok(reference.clone()),
105             NodeState::Loaded { workflow, inference: _ } => Ok(workflow.locator()),
106         }
107    }
108
109    /// Returns the inference characteristic for this workflow.
110    ///
111    /// The model provides an abstraction to decouple inference from reliance on a
112    /// specific underlying model. The model performance characteristic reflects one
113    /// of six specializations:
114    ///
115    /// * Fast - optimized for speed and low latency
116    /// * Standard - balanced performance for general use
117    /// * Thinking - optimized for complex reasoning
118    /// * Extraction - optimized for a large context window
119    /// * Instruct - optimized for closely following instructions
120    /// * Coder - optimized for programming tasks
121    /// 
122    /// The pattern provides an abstraction for different kinds of workflow tasks.
123    /// 
124    /// * Evaluate - non-agentic workflow evaluation
125    /// * Inference - general inference pattern
126    /// * Search - search extraction pattern
127    /// * Summarize - map-reduce pattern
128    /// * Compose - ReAct composer pattern for accurate annotation based document editing (Crate markdown)
129    /// * Debate - debate pattern where two agents each make an assessment; a third judge scores resolution.
130pub fn inference(&self) -> Arc<Inference> {
131       // Acquire read lock first
132         let read_guard = self.inner.read()
133            .expect("Lock poisoned in inference() node read.");
134
135         match &*read_guard {
136             NodeState::Pending { inference }
137             | NodeState::Unloaded { reference: _, inference }
138             | NodeState::Loaded { workflow: _, inference } => {
139                 inference.clone()
140             }
141         }
142    }
143
144    /// Gets the workflow for read-only access, loading it with `Workflow::load_new` if needed.
145    ///
146    /// Uses a double-checked lock: fast `RwLock` read when loaded, upgrades to
147    /// write lock to perform a single load on transition from `Unloaded`.
148    /// Panics on poisoned locks.
149    pub fn get_workflow(&self) -> Result<Arc<Workflow>> {
150        // Acquire read lock first
151        let read_guard = self.inner.read()
152            .expect("Lock poisoned in get_workflow() node read.");
153
154        match &*read_guard {
155            NodeState::Pending { inference: _ } => {
156                Err(anyhow!("Node not initialized"))
157            }
158            NodeState::Loaded { workflow, inference: _ } => {
159                Ok(workflow.clone())
160            }
161            NodeState::Unloaded { reference: _, inference: _ } => {
162                // Need to load, so release read lock and acquire write lock
163                drop(read_guard);
164                let mut write_guard = self.inner.write()
165                    .expect("Lock poisoned in get_workflow() node write.");
166
167                // Double-check after acquiring write lock
168                match &*write_guard {
169                    NodeState::Pending { inference: _ } => {
170                        Err(anyhow!("Node not initialized"))
171                    }
172                    NodeState::Loaded { workflow, inference: _ } => {
173                        Ok(workflow.clone())
174                    }
175                    NodeState::Unloaded { reference, inference } => {
176                        // Load the workflow
177                        let workflow = Workflow::open(reference.clone())?;
178                        let arc_workflow = Arc::new(workflow);
179
180                        // Update state
181                        *write_guard = NodeState::Loaded {
182                            workflow: arc_workflow.clone(),
183                            inference: inference.clone(),
184                        };
185
186                        Ok(arc_workflow)
187                    }
188                }
189            }
190        }
191    }
192
193    /// Returns the workflow as `Arc<dyn WorkflowLike>`.
194    pub fn get_workflow_like(&self) -> Result<Arc<dyn WorkflowLike>> {
195        let workflow_like = self.get_workflow()?;
196        Ok(workflow_like as Arc<dyn WorkflowLike>)
197    }
198
199    /// Returns an owned [`Workflow`] for mutation.
200    pub fn get_workflow_mut(&self) -> Result<Workflow> {
201        // Fork a copy of the read-only workflow
202        let workflow = (*self.get_workflow()?).clone();
203        Ok(workflow)
204    }
205
206    /// Atomically replace the old workflow with the new workflow.
207    ///
208    /// Sets state to `Loaded` with a new shared snapshot. Panics on poisoned lock.
209    pub fn set_workflow(&self, workflow: Workflow) {
210        let mut write_guard = self.inner.write()
211            .expect("Lock poisoned in set_workflow() node write.");
212        let inference: Arc<Inference> = match &*write_guard {
213            NodeState::Pending { inference }
214            | NodeState::Unloaded { reference: _, inference }
215            | NodeState::Loaded { workflow: _, inference } => {
216                inference.clone()
217            }
218        };
219        *write_guard = NodeState::Loaded {
220            workflow: Arc::new(workflow),
221            inference,
222        };
223    }
224
225pub fn compact(&self) {
226        // Check if loaded
227        let read_guard = self.inner.read()
228            .expect("Lock poisoned in compact() node read.");
229
230        if let NodeState::Loaded { workflow: _, inference: _ } = &*read_guard {
231            drop(read_guard);                
232            let mut write_guard = self.inner.write()
233                .expect("Lock poisoned in compact() node write.");
234
235            // Double-check pattern
236            if let NodeState::Loaded { workflow , inference} = &*write_guard {
237                let workflow = workflow.clone();
238                if !workflow.is_touched() {
239                    let reference = workflow.locator();
240                    *write_guard = NodeState::Unloaded {
241                        reference,
242                        inference: inference.clone(),
243                    };
244                }
245            }
246        }
247    }
248
249    /// Recursively saves all workflows and instances within this node's workflow hierarchy.
250    /// 
251    /// This method saves the current workflow if it has pending changes, then recursively
252    /// saves all child nodes and instances. It handles both loaded and unloaded workflows
253    /// to ensure comprehensive state persistence.
254    /// 
255    /// # Returns
256    /// 
257    /// `Ok(())` if all save operations complete successfully, or an error if any
258    /// workflow or instance fails to save.
259    /// 
260    /// # Behavior
261    /// 
262    /// - Saves the current workflow if it has pending changes (`is_touched()`)
263    /// - Recursively saves all child nodes in the workflow
264    /// - Saves all instances within the workflow
265    /// - Handles unloaded workflows by checking if they need saving
266/// - Fails fast on any save error to maintain consistency
267 pub fn save_all(&self) -> Result<()> {
268        // Check if loaded
269        let read_guard = self.inner.read()
270            .expect("Lock poisoned in save_all() node read.");
271
272        if let NodeState::Loaded { workflow, inference } = &*read_guard {
273            let workflow = (*workflow).clone();
274            let inference = inference.clone();
275            
276            // Always save the workflow if it has changes
277            if workflow.is_touched() {
278                let mut mutable_workflow = (*workflow).clone();
279                mutable_workflow.save()?;
280                
281                // Recursively save child nodes and instances
282                for rule in mutable_workflow.iter_rules() {
283                    if let Some(node) = rule.get_node() {
284                        node.save_all()?;
285                    // Also handle instances within this workflow
286                    } else if let Some(instance) = rule.get_instance() {
287                        instance.save_all()?;
288                    }
289                }
290                
291                // Update the node with the saved workflow
292                drop(read_guard);
293                let mut write_guard = self.inner.write()
294                    .expect("Lock poisoned in save_all() node write.");
295                *write_guard = NodeState::Loaded {
296                    workflow: Arc::new(mutable_workflow),
297                    inference,
298                };
299            }
300        }
301        Ok(())
302    }
303
304    pub fn print(&self, writer: &mut Writer) {
305        let read_guard = self.inner.read()
306            .expect("Lock poisoned in print() node read.");
307        match &*read_guard {
308            NodeState::Pending { inference }
309            | NodeState::Unloaded { reference: _, inference }
310            | NodeState::Loaded { workflow: _, inference } => {
311                inference.print(writer);
312            }
313        }
314    }
315}
316
317pub fn parse_node(input: &str) -> IResult<&str, Node> {
318    let (input, opt_inference) = parse_inference(input)?;
319    let inference = match opt_inference {
320        Some(inference) => inference,
321        None => Inference::new(Pattern::Evaluate, None),
322    };
323    Ok((input, Node::new(Arc::new(inference))))
324}
325
326impl PartialEq for Node {
327    /// Equality is based on shared internal `Arc` identity, not workflow content.
328    fn eq(&self, other: &Self) -> bool {
329        Arc::ptr_eq(&self.inner, &other.inner)
330    }
331}
332
333impl WriterLike for Node {
334    fn write(&self, writer: &mut Writer) {
335        self.print(writer);
336    }
337}
338
339impl fmt::Display for Node {
340    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341        write!(f, "{}", self.to_stringized())
342    }
343}