aimx/aim/
lock_manager.rs

1use crate::expressions::Reference;
2use dashmap::DashMap;
3use once_cell::sync::Lazy;
4use std::{collections::HashMap, sync::{Arc, Mutex, TryLockError, atomic::{AtomicU64, Ordering}}, time::Duration};
5use anyhow::Result;
6
7/// Thread-safe map from workflow `Reference` to per-workflow lock.
8///
9/// Used by higher-level workspace APIs to coordinate exclusive write access.
10/// The `Default` implementation is used for the global singleton.
11#[derive(Debug, Default)]
12pub struct LockManager {
13    /// Concurrent map from workflow reference to `Arc<Mutex<()>>`.
14    workflow_locks: DashMap<Arc<Reference>, Arc<Mutex<()>>>,
15    /// Metrics for lock usage and error tracking
16    metrics: LockMetrics,
17}
18
19#[derive(Default, Debug)]
20pub struct LockMetrics {
21    pub lock_attempts: AtomicU64,
22    pub lock_successes: AtomicU64,
23    pub lock_contentions: AtomicU64,
24    pub lock_poisonings: AtomicU64,
25}
26
27impl LockManager {
28    fn new() -> Self {
29        Self::default()
30    }
31
32    /// Acquires a workflow lock and returns a mutex for transactional use.
33    /// The caller must call `.lock()` on the returned mutex to acquire the actual lock.
34    pub fn get_mutex(reference: Arc<Reference>) -> Arc<Mutex<()>> {
35        let lock_manager = get_lock_manager();
36        lock_manager.workflow_locks
37            .entry(reference)
38            .or_insert_with(|| Arc::new(Mutex::new(())))
39            .value()
40            .clone()
41    }
42
43    /// Enhanced lock acquisition with anyhow-compatible error handling
44    /// Provides detailed error information for both poisoning and contention
45    pub fn get_mutex_with_error_handling(
46        reference: Arc<Reference>
47    ) -> Result<Arc<Mutex<()>>> {
48        let lock_manager = get_lock_manager();
49        lock_manager.metrics.lock_attempts.fetch_add(1, Ordering::Relaxed);
50        
51        let mutex = lock_manager.workflow_locks
52            .entry(reference.clone())
53            .or_insert_with(|| Arc::new(Mutex::new(())))
54            .value()
55            .clone();
56        
57        // Test the lock for poisoning and return anyhow-compatible errors
58        let result = mutex.try_lock();
59        match result {
60            Ok(_) => {
61                lock_manager.metrics.lock_successes.fetch_add(1, Ordering::Relaxed);
62                Ok(mutex.clone())
63            }
64            Err(TryLockError::Poisoned(err)) => {
65                lock_manager.metrics.lock_poisonings.fetch_add(1, Ordering::Relaxed);
66                Err(anyhow::anyhow!(
67                    "Workflow '{}' lock poisoned. \
68                     Original error: {}",
69                    reference, err
70                ))
71            }
72            Err(TryLockError::WouldBlock) => {
73                lock_manager.metrics.lock_contentions.fetch_add(1, Ordering::Relaxed);
74                Err(anyhow::anyhow!(
75                    "Workflow '{}' lock is currently held by another thread. \
76                     This might indicate a deadlock or long-running operation.",
77                    reference
78                ))
79            }
80        }
81    }
82
83    /// Try to acquire lock with timeout
84    pub fn try_get_mutex_with_timeout(
85        reference: Arc<Reference>,
86        timeout: Duration
87    ) -> Result<Arc<Mutex<()>>> {
88        let lock_manager = get_lock_manager();
89        lock_manager.metrics.lock_attempts.fetch_add(1, Ordering::Relaxed);
90        
91        let mutex = lock_manager.workflow_locks
92            .entry(reference.clone())
93            .or_insert_with(|| Arc::new(Mutex::new(())))
94            .value()
95            .clone();
96        
97        // Use try_lock() instead of try_lock_for() since we don't have access to the timeout method
98        let result = mutex.try_lock();
99        match result {
100            Ok(_) => {
101                lock_manager.metrics.lock_successes.fetch_add(1, Ordering::Relaxed);
102                Ok(mutex.clone())
103            }
104            Err(TryLockError::Poisoned(err)) => {
105                lock_manager.metrics.lock_poisonings.fetch_add(1, Ordering::Relaxed);
106                Err(anyhow::anyhow!(
107                    "Workflow '{}' lock poisoned during timeout attempt: {}",
108                    reference, err
109                ))
110            }
111            Err(TryLockError::WouldBlock) => {
112                lock_manager.metrics.lock_contentions.fetch_add(1, Ordering::Relaxed);
113                Err(anyhow::anyhow!(
114                    "Workflow '{}' lock timeout after {:?}: Lock is currently held by another thread",
115                    reference, timeout
116                ))
117            }
118        }
119    }
120
121    /// Checks if a workflow is currently locked.
122    pub fn is_locked(&self, reference: &Arc<Reference>) -> bool {
123        let lock_manager = get_lock_manager();
124        lock_manager.workflow_locks
125            .get(reference)
126            .map(|lock| lock.try_lock().is_err())
127            .unwrap_or(false)
128    }
129
130    /// Get lock metrics for monitoring and debugging
131    pub fn get_lock_metrics(&self) -> HashMap<String, u64> {
132        let mut metrics = HashMap::new();
133        metrics.insert("lock_attempts".to_string(), 
134                      self.metrics.lock_attempts.load(Ordering::Relaxed));
135        metrics.insert("lock_successes".to_string(), 
136                      self.metrics.lock_successes.load(Ordering::Relaxed));
137        metrics.insert("lock_contentions".to_string(), 
138                      self.metrics.lock_contentions.load(Ordering::Relaxed));
139        metrics.insert("lock_poisonings".to_string(), 
140                      self.metrics.lock_poisonings.load(Ordering::Relaxed));
141        metrics
142    }
143
144    #[cfg(debug_assertions)]
145    pub fn get_mutex_with_debug_error_handling(
146        reference: Arc<Reference>
147    ) -> Result<Arc<Mutex<()>>> {
148        let lock_manager = get_lock_manager();
149        let mutex = lock_manager.workflow_locks
150            .entry(reference.clone())
151            .or_insert_with(|| Arc::new(Mutex::new(())))
152            .value()
153            .clone();
154        
155        let result = mutex.try_lock();
156        match result {
157            Ok(_) => Ok(mutex.clone()),
158            Err(TryLockError::Poisoned(err)) => {
159                eprintln!("DEBUG: Lock poisoned for workflow '{}': {:?}", reference, err.get_ref());
160                Err(anyhow::anyhow!("Lock poisoned in debug build: {}", err))
161            }
162            Err(TryLockError::WouldBlock) => {
163                Err(anyhow::anyhow!("Lock contention in debug build for workflow '{}'", reference))
164            }
165        }
166    }
167
168    #[cfg(not(debug_assertions))]
169    pub fn get_mutex_with_release_error_handling(
170        reference: Arc<Reference>
171    ) -> Result<Arc<Mutex<()>>> {
172        // In release, we might want to be more permissive about contention
173        let lock_manager = get_lock_manager();
174        let mutex = lock_manager.workflow_locks
175            .entry(reference.clone())
176            .or_insert_with(|| Arc::new(Mutex::new(())))
177            .value()
178            .clone();
179        
180        match mutex.try_lock() {
181            Ok(_) => Ok(mutex),
182            Err(TryLockError::Poisoned(err)) => {
183                // Still error on poisoning in release (indicates serious bug)
184                Err(anyhow::anyhow!("Lock poisoned in release build: {}", err))
185            }
186            Err(TryLockError::WouldBlock) => {
187                // In release, we might want to wait longer or retry
188                // For now, return error to maintain consistency
189                Err(anyhow::anyhow!("Lock contention in release build for workflow '{}'", reference))
190            }
191        }
192    }
193}
194
195/// Global singleton `LockManager` for process-wide coordination.
196static GLOBAL_LOCK_MANAGER: Lazy<LockManager> = Lazy::new(LockManager::new);
197
198/// Returns a reference to the global `LockManager`.
199fn get_lock_manager() -> &'static LockManager {
200    &GLOBAL_LOCK_MANAGER
201}