22awaitCapTasksProcessor module
33
44Design Goals & Features:
5+ - Inherits from BaseQueueProcessor for common loop/backoff/status/notify.
56- It will consume an AwaitingCapTasksQueue as queue instance.
67- It use a while true to consume Task instance from the queue.
7- - If the item is empty then double the wait time for the queue and the max sleep time is 300s.
8- - It allows status check, if the wait time equal or over 16s then the status been set to "idle", otherwise set to "normal".
9- - It has an event method for notification, allows other components invoke.
10- - When the status is "idle" and the method been invoked, start a new round of get from queue immediately.
11- - When the status is "normal" and the method been invoked, do nothing.
128- If the item is not empty then check if all of the CapTasks are in a completed state.
139- If all CapTasks are completed then find the file in waitingCapTask folder and move it to the file_watch directory.
1410- If not all CapTasks are completed then put the item back into the queue for retry.
2319import logging
2420import os
2521import shutil
26- import threading
27- import time
2822from typing import Optional
2923
3024from opentelemetry import trace
3327# Project imports
3428from scl .queue .awaitingCapTasksQueue import AwaitingCapTasksQueue
3529from scl .meta .task import Task
30+ from scl .processor .base_queue_processor import BaseQueueProcessor
3631
3732logger = logging .getLogger (__name__ )
3833
39- # Metrics
40- tasks_consumed_counter = meter .create_counter (
41- "await_cap_processor.tasks_consumed" ,
42- description = "Number of Task instances consumed from AwaitingCapTasksQueue"
43- )
44- files_moved_counter = meter .create_counter (
45- "await_cap_processor.files_moved" ,
46- description = "Number of Task files moved from waitingCapTask to file_watch directory"
47- )
48- file_move_errors_counter = meter .create_counter (
49- "await_cap_processor.file_move_errors" ,
50- description = "Number of errors encountered while moving Task files"
51- )
52- tasks_requeued_counter = meter .create_counter (
53- "await_cap_processor.tasks_requeued" ,
54- description = "Number of Task instances put back into AwaitingCapTasksQueue for retry"
55- )
56- idle_status_gauge = meter .create_up_down_counter (
57- "await_cap_processor.idle_status" ,
58- description = "Indicates whether processor is idle (1) or normal (0)"
59- )
6034
61-
62- class AwaitCapTasksProcessor :
35+ class AwaitCapTasksProcessor (BaseQueueProcessor ):
6336 """
6437 A processor that continuously consumes Task instances from an AwaitingCapTasksQueue,
6538 checks if all associated CapTasks are in a completed state (Processed or Error),
@@ -115,153 +88,98 @@ def __init__(
11588 file_watch_dir: Destination directory for completed Task files (watched by FileWatcher).
11689 name: Optional name for this processor instance (for logging/metrics).
11790 """
91+ super ().__init__ (name = name or "await-captask-processor" )
11892 self .source_queue = source_queue
11993 self .waiting_captask_dir = waiting_captask_dir
12094 self .file_watch_dir = file_watch_dir
121- self .name = name or f"processor-{ id (self )} "
12295
12396 # Ensure directories exist
12497 os .makedirs (self .waiting_captask_dir , exist_ok = True )
12598 os .makedirs (self .file_watch_dir , exist_ok = True )
12699
127- # Wait time management
128- self ._wait_time = 1.0 # seconds
129- self ._max_wait = 300.0 # 5 minutes
130- self ._idle_threshold = 16.0 # status becomes idle after this many seconds
131-
132- # Control flags
133- self ._running = False
134- self ._thread : Optional [threading .Thread ] = None
135- self ._wakeup_event = threading .Event () # used to interrupt sleep
136-
137- logger .info (f"AwaitCapTasksProcessor '{ self .name } ' initialized" )
138-
139- @property
140- def status (self ) -> str :
141- """Return 'idle' if wait time >= 16s, else 'normal'."""
142- return "idle" if self ._wait_time >= self ._idle_threshold else "normal"
143-
144- @tracer .start_as_current_span ("AwaitCapTasksProcessor.start" )
145- def start (self ) -> None :
146- """
147- Start the background consumption loop.
148- """
149- if self ._running :
150- logger .info (f"Processor '{ self .name } ' is already running." )
151- return
100+ # Additional metrics beyond the base class
101+ self .files_moved_counter = meter .create_counter (
102+ f"{ self .name } .files_moved" ,
103+ description = "Number of Task files moved from waitingCapTask to file_watch directory"
104+ )
105+ self .file_move_errors_counter = meter .create_counter (
106+ f"{ self .name } .file_move_errors" ,
107+ description = "Number of errors encountered while moving Task files"
108+ )
109+ self .tasks_requeued_counter = meter .create_counter (
110+ f"{ self .name } .tasks_requeued" ,
111+ description = "Number of Task instances put back into the queue for retry"
112+ )
152113
153- self ._running = True
154- self ._thread = threading .Thread (target = self ._consume_loop , daemon = True )
155- self ._thread .start ()
156- logger .info (f"Processor '{ self .name } ' started (initial wait: { self ._wait_time } s)" )
114+ logger .info ("%s initialized with queue %r" , self .name , source_queue )
157115
158- def stop (self ) -> None :
116+ # ------------------------------------------------------------------ Abstract method overrides
117+ @tracer .start_as_current_span ("AwaitCapTasksProcessor._get_item" )
118+ def _get_item (self ) -> Optional [Task ]:
159119 """
160- Stop the consumption loop gracefully.
120+ Fetch one Task from the AwaitingCapTasksQueue.
121+ Must return None if no item is available (empty queue).
161122 """
162- if not self ._running :
163- return
164-
165- self ._running = False
166- self ._wakeup_event .set () # interrupt any ongoing sleep
167- if self ._thread :
168- self ._thread .join (timeout = 2.0 )
169- logger .info (f"Processor '{ self .name } ' stopped" )
170-
171- @tracer .start_as_current_span ("AwaitCapTasksProcessor._consume_loop" )
172- def _consume_loop (self ) -> None :
173- """Main loop: fetch tasks, check completion, move file or requeue."""
174- current_span = trace .get_current_span ()
175- current_span .set_attribute ("processor.name" , self .name )
176-
177- while self ._running :
178- # Try to get a task from the source queue
179- task = self ._get_task ()
180-
181- if task is None :
182- # Queue empty: double wait time, capped at max
183- self ._wait_time = min (self ._wait_time * 2 , self ._max_wait )
184- logger .debug (
185- f"Processor '{ self .name } ': queue empty, wait time increased to { self ._wait_time } s"
186- )
187- self ._update_idle_metric ()
188- # Sleep with interrupt capability
189- self ._wakeup_event .wait (timeout = self ._wait_time )
190- self ._wakeup_event .clear ()
191- else :
192- # Process the task: check CapTasks completion and route accordingly
193- self ._process_task (task )
194- # Reset wait time to minimum after successful consumption
195- self ._wait_time = 1.0
196- self ._update_idle_metric ()
197- # Immediately proceed to next iteration
198-
199- logger .debug (f"Consume loop for '{ self .name } ' exited" )
200-
201- @tracer .start_as_current_span ("AwaitCapTasksProcessor._get_task" )
202- def _get_task (self ) -> Optional [Task ]:
203- """Fetch one Task from the source AwaitingCapTasksQueue."""
204123 current_span = trace .get_current_span ()
205124 current_span .set_attribute ("processor.name" , self .name )
206125 try :
207126 task = self .source_queue .pop ()
208127 if task :
209128 current_span .set_attribute ("task.available" , True )
210129 current_span .set_attribute ("task.hash" , task .hash )
211- tasks_consumed_counter .add (1 , {"processor.name" : self .name })
212- logger .debug (f"Processor '{ self .name } ' consumed Task { task .hash } " )
130+ logger .debug ("%s: consumed Task %s from source queue" , self .name , task .hash )
213131 else :
214132 current_span .set_attribute ("task.available" , False )
215133 return task
216134 except Exception as e :
217- logger .error (f"Error consuming task from source queue: { e } " )
135+ logger .error ("%s: error consuming task from source queue: %s" , self . name , e )
218136 current_span .record_exception (e )
219137 return None
220138
221- def _all_captasks_completed (self , task : Task ) -> bool :
139+ @tracer .start_as_current_span ("AwaitCapTasksProcessor._process_item" )
140+ def _process_item (self , item : Task ) -> None :
222141 """
223- Check whether all CapTasks of the given Task are in a completed state.
224- Completed states are 'Processed' or 'Error'.
225- """
226- for cap in task .cap_tasks :
227- if cap .status not in ("Processed" , "Error" ):
228- return False
229- return True
230-
231- @tracer .start_as_current_span ("AwaitCapTasksProcessor._process_task" )
232- def _process_task (self , task : Task ) -> None :
233- """
234- Process a consumed Task: if all its CapTasks are completed, move its file
235- from waiting_captask_dir to file_watch_dir; otherwise, put it back into the
236- source AwaitingCapTasksQueue.
142+ Process a Task: if all CapTasks are completed move its file, else requeue.
143+ The base class loop increments the generic items_consumed counter.
237144 """
238145 current_span = trace .get_current_span ()
239146 current_span .set_attribute ("processor.name" , self .name )
240- current_span .set_attribute ("task.hash" , task .hash )
147+ current_span .set_attribute ("task.hash" , item .hash )
241148
242149 try :
243- if self ._all_captasks_completed (task ):
150+ if self ._all_captasks_completed (item ):
244151 # All CapTasks done: move file to file_watch_dir
245- self ._move_completed_file (task .hash , current_span )
152+ self ._move_completed_file (item .hash , current_span )
246153 current_span .set_attribute ("task.completed" , True )
247154 else :
248155 # Not all CapTasks completed: requeue for later retry
249- self .source_queue .push (task )
250- tasks_requeued_counter .add (1 , {"processor.name" : self .name })
156+ self .source_queue .push (item )
157+ self . tasks_requeued_counter .add (1 , {"processor.name" : self .name })
251158 logger .debug (
252- f"Processor ' { self . name } ' requeued Task { task . hash } (CapTasks not all completed)"
159+ "%s: requeued Task %s (CapTasks not all completed)", self . name , item . hash
253160 )
254161 current_span .set_attribute ("task.requeued" , True )
255162 except Exception as e :
256- logger .error (f"Failed to process Task { task . hash } : { e } " , exc_info = True )
163+ logger .error ("%s: failed to process Task %s: %s" , self . name , item . hash , e , exc_info = True )
257164 current_span .record_exception (e )
258165 current_span .set_status (trace .Status (trace .StatusCode .ERROR , "Task processing failed" ))
259166 # Attempt to put back into source queue to avoid losing the task
260167 try :
261- self .source_queue .push (task )
262- logger .warning (f" Task { task . hash } put back into source queue after processing error" )
168+ self .source_queue .push (item )
169+ logger .warning ("%s: Task %s put back into source queue after processing error", self . name , item . hash )
263170 except Exception as push_error :
264- logger .critical (f"Failed to requeue Task { task .hash } after error: { push_error } " )
171+ logger .critical ("%s: failed to requeue Task %s after error: %s" , self .name , item .hash , push_error )
172+
173+ # ------------------------------------------------------------------ Helper methods
174+ def _all_captasks_completed (self , task : Task ) -> bool :
175+ """
176+ Check whether all CapTasks of the given Task are in a completed state.
177+ Completed states are 'Processed' or 'Error'.
178+ """
179+ for cap in task .cap_tasks :
180+ if cap .status not in ("Processed" , "Error" ):
181+ return False
182+ return True
265183
266184 def _move_completed_file (self , task_hash : str , span : trace .Span ) -> None :
267185 """
@@ -277,49 +195,30 @@ def _move_completed_file(self, task_hash: str, span: trace.Span) -> None:
277195
278196 if not os .path .exists (src_path ):
279197 error_msg = f"Expected file { src_path } not found for completed Task { task_hash } "
280- logger .error (error_msg )
198+ logger .error ("%s: %s" , self . name , error_msg )
281199 span .set_status (trace .Status (trace .StatusCode .ERROR , error_msg ))
282- file_move_errors_counter .add (1 , {"processor.name" : self .name , "error" : "file_not_found" })
283- # We still consider the task processed, but the file is missing.
200+ self . file_move_errors_counter .add (1 , {"processor.name" : self .name , "error" : "file_not_found" })
201+ # Task considered processed but file is missing
284202 return
285203
286204 try :
287205 shutil .move (src_path , dst_path )
288- files_moved_counter .add (1 , {"processor.name" : self .name })
206+ self . files_moved_counter .add (1 , {"processor.name" : self .name })
289207 logger .info (
290- f"Processor ' { self . name } ' moved completed Task file { filename } "
291- f"from { self .waiting_captask_dir } to { self .file_watch_dir } "
208+ "%s: moved completed Task file %s from %s to %s" ,
209+ self . name , filename , self .waiting_captask_dir , self .file_watch_dir
292210 )
293211 span .set_attribute ("file.moved" , True )
294212 except Exception as e :
295- logger .error (f"Failed to move file { src_path } to { dst_path } : { e } " )
213+ logger .error ("%s: failed to move file %s to %s: %s" , self . name , src_path , dst_path , e )
296214 span .record_exception (e )
297- file_move_errors_counter .add (1 , {"processor.name" : self .name , "error" : "move_failed" })
215+ self . file_move_errors_counter .add (1 , {"processor.name" : self .name , "error" : "move_failed" })
298216 raise
299217
300- def notify (self ) -> None :
301- """
302- External notification that new tasks may be available.
303- - If current status is 'idle', wake up immediately to fetch new tasks.
304- - If status is 'normal', do nothing (already actively processing).
305- """
306- current_status = self .status
307- logger .debug (f"Notify called on processor '{ self .name } '. Current status: { current_status } " )
308- if current_status == "idle" :
309- logger .info (f"Processor '{ self .name } ' is idle; waking up to consume new tasks" )
310- self ._wakeup_event .set ()
311- else :
312- logger .debug (f"Processor '{ self .name } ' is normal; ignoring notification" )
313-
314- def _update_idle_metric (self ) -> None :
315- """Update the idle gauge metric based on current status."""
316- value = 1 if self .status == "idle" else 0
317- idle_status_gauge .add (value , {"processor.name" : self .name })
318-
319218
320- # Missing / Future Features (kept as comments for open- source tracking):
219+ # Missing / Future Features (kept as comments for open‑ source tracking):
321220# - Support for other file extensions (e.g., .yaml) if needed.
322- # - Configurable wait parameters (initial wait, max wait, idle threshold).
323- # - Dead- letter handling for files that repeatedly fail to move.
221+ # - Configurable wait parameters (initial wait, max wait, idle threshold) – currently fixed in base class .
222+ # - Dead‑ letter handling for files that repeatedly fail to move.
324223# - Batch processing support.
325224# - Integration with external health checks.
0 commit comments