1use crate::expressions::Reference;
2use dashmap::DashMap;
3use once_cell::sync::Lazy;
4use std::{collections::HashMap, sync::{Arc, Mutex, TryLockError, atomic::{AtomicU64, Ordering}}, time::Duration};
5use anyhow::Result;
6
7#[derive(Debug, Default)]
12pub struct LockManager {
13 workflow_locks: DashMap<Arc<Reference>, Arc<Mutex<()>>>,
15 metrics: LockMetrics,
17}
18
19#[derive(Default, Debug)]
20pub struct LockMetrics {
21 pub lock_attempts: AtomicU64,
22 pub lock_successes: AtomicU64,
23 pub lock_contentions: AtomicU64,
24 pub lock_poisonings: AtomicU64,
25}
26
27impl LockManager {
28 fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn get_mutex(reference: Arc<Reference>) -> Arc<Mutex<()>> {
35 let lock_manager = get_lock_manager();
36 lock_manager.workflow_locks
37 .entry(reference)
38 .or_insert_with(|| Arc::new(Mutex::new(())))
39 .value()
40 .clone()
41 }
42
43 pub fn get_mutex_with_error_handling(
46 reference: Arc<Reference>
47 ) -> Result<Arc<Mutex<()>>> {
48 let lock_manager = get_lock_manager();
49 lock_manager.metrics.lock_attempts.fetch_add(1, Ordering::Relaxed);
50
51 let mutex = lock_manager.workflow_locks
52 .entry(reference.clone())
53 .or_insert_with(|| Arc::new(Mutex::new(())))
54 .value()
55 .clone();
56
57 let result = mutex.try_lock();
59 match result {
60 Ok(_) => {
61 lock_manager.metrics.lock_successes.fetch_add(1, Ordering::Relaxed);
62 Ok(mutex.clone())
63 }
64 Err(TryLockError::Poisoned(err)) => {
65 lock_manager.metrics.lock_poisonings.fetch_add(1, Ordering::Relaxed);
66 Err(anyhow::anyhow!(
67 "Workflow '{}' lock poisoned. \
68 Original error: {}",
69 reference, err
70 ))
71 }
72 Err(TryLockError::WouldBlock) => {
73 lock_manager.metrics.lock_contentions.fetch_add(1, Ordering::Relaxed);
74 Err(anyhow::anyhow!(
75 "Workflow '{}' lock is currently held by another thread. \
76 This might indicate a deadlock or long-running operation.",
77 reference
78 ))
79 }
80 }
81 }
82
83 pub fn try_get_mutex_with_timeout(
85 reference: Arc<Reference>,
86 timeout: Duration
87 ) -> Result<Arc<Mutex<()>>> {
88 let lock_manager = get_lock_manager();
89 lock_manager.metrics.lock_attempts.fetch_add(1, Ordering::Relaxed);
90
91 let mutex = lock_manager.workflow_locks
92 .entry(reference.clone())
93 .or_insert_with(|| Arc::new(Mutex::new(())))
94 .value()
95 .clone();
96
97 let result = mutex.try_lock();
99 match result {
100 Ok(_) => {
101 lock_manager.metrics.lock_successes.fetch_add(1, Ordering::Relaxed);
102 Ok(mutex.clone())
103 }
104 Err(TryLockError::Poisoned(err)) => {
105 lock_manager.metrics.lock_poisonings.fetch_add(1, Ordering::Relaxed);
106 Err(anyhow::anyhow!(
107 "Workflow '{}' lock poisoned during timeout attempt: {}",
108 reference, err
109 ))
110 }
111 Err(TryLockError::WouldBlock) => {
112 lock_manager.metrics.lock_contentions.fetch_add(1, Ordering::Relaxed);
113 Err(anyhow::anyhow!(
114 "Workflow '{}' lock timeout after {:?}: Lock is currently held by another thread",
115 reference, timeout
116 ))
117 }
118 }
119 }
120
121 pub fn is_locked(&self, reference: &Arc<Reference>) -> bool {
123 let lock_manager = get_lock_manager();
124 lock_manager.workflow_locks
125 .get(reference)
126 .map(|lock| lock.try_lock().is_err())
127 .unwrap_or(false)
128 }
129
130 pub fn get_lock_metrics(&self) -> HashMap<String, u64> {
132 let mut metrics = HashMap::new();
133 metrics.insert("lock_attempts".to_string(),
134 self.metrics.lock_attempts.load(Ordering::Relaxed));
135 metrics.insert("lock_successes".to_string(),
136 self.metrics.lock_successes.load(Ordering::Relaxed));
137 metrics.insert("lock_contentions".to_string(),
138 self.metrics.lock_contentions.load(Ordering::Relaxed));
139 metrics.insert("lock_poisonings".to_string(),
140 self.metrics.lock_poisonings.load(Ordering::Relaxed));
141 metrics
142 }
143
144 #[cfg(debug_assertions)]
145 pub fn get_mutex_with_debug_error_handling(
146 reference: Arc<Reference>
147 ) -> Result<Arc<Mutex<()>>> {
148 let lock_manager = get_lock_manager();
149 let mutex = lock_manager.workflow_locks
150 .entry(reference.clone())
151 .or_insert_with(|| Arc::new(Mutex::new(())))
152 .value()
153 .clone();
154
155 let result = mutex.try_lock();
156 match result {
157 Ok(_) => Ok(mutex.clone()),
158 Err(TryLockError::Poisoned(err)) => {
159 eprintln!("DEBUG: Lock poisoned for workflow '{}': {:?}", reference, err.get_ref());
160 Err(anyhow::anyhow!("Lock poisoned in debug build: {}", err))
161 }
162 Err(TryLockError::WouldBlock) => {
163 Err(anyhow::anyhow!("Lock contention in debug build for workflow '{}'", reference))
164 }
165 }
166 }
167
168 #[cfg(not(debug_assertions))]
169 pub fn get_mutex_with_release_error_handling(
170 reference: Arc<Reference>
171 ) -> Result<Arc<Mutex<()>>> {
172 let lock_manager = get_lock_manager();
174 let mutex = lock_manager.workflow_locks
175 .entry(reference.clone())
176 .or_insert_with(|| Arc::new(Mutex::new(())))
177 .value()
178 .clone();
179
180 match mutex.try_lock() {
181 Ok(_) => Ok(mutex),
182 Err(TryLockError::Poisoned(err)) => {
183 Err(anyhow::anyhow!("Lock poisoned in release build: {}", err))
185 }
186 Err(TryLockError::WouldBlock) => {
187 Err(anyhow::anyhow!("Lock contention in release build for workflow '{}'", reference))
190 }
191 }
192 }
193}
194
195static GLOBAL_LOCK_MANAGER: Lazy<LockManager> = Lazy::new(LockManager::new);
197
198fn get_lock_manager() -> &'static LockManager {
200 &GLOBAL_LOCK_MANAGER
201}