11"""
22Internal Watcher for Task and CapTask Items
3- It converts the task into a Task or CapTask instance and writes a file to the file_watch directory.
3+
4+ Converts the task into a Task or CapTask instance and writes a file to the file_watch directory.
5+ Supports JSON (default) and YAML serialization via pyyaml.
46
57Dependencies:
68 pyyaml, opentelemetry-api, opentelemetry-sdk
911 pip install pyyaml opentelemetry-api opentelemetry-sdk
1012"""
1113
14+ # ---------------------------------------------------------------------------
15+ # Features and design goals
16+ # ---------------------------------------------------------------------------
17+ # - Accept Task or CapTask instances via add() method.
18+ # - Persist objects as files in a watch directory.
19+ # - Support JSON (default) and YAML output formats.
20+ # - Use hash attribute from objects for unique filenames.
21+ # - OpenTelemetry tracing spans for add operation with type-specific metadata.
22+ # - Metrics counters for successful writes and errors per item type.
23+ # - Structured logging at both INFO and DEBUG levels.
24+ # - Automatically create the watch directory if missing.
25+ # - Clear error messages and type validation.
26+ # ---------------------------------------------------------------------------
27+
1228import logging
1329import os
1430import json
15- from typing import Union , TYPE_CHECKING
31+ from typing import Union
1632
33+ import yaml
1734from opentelemetry import trace
1835
1936from scl .meta .task import Task
2037from scl .meta .captask import CapTask
2138from scl .otel .otel import tracer , meter
2239
23- if TYPE_CHECKING :
24- # For type hints without circular imports
25- pass
26-
2740logger = logging .getLogger (__name__ )
2841
2942
3043class InternalWatcher :
3144 """
3245 A simple internal watcher that accepts Task or CapTask instances and writes them as files
3346 into the file_watch directory for unified processing by the FileWatcher.
34-
35- Example usage:
36- watcher = InternalWatcher("/path/to/watch_dir")
37- task = Task(...) # Create a Task instance
38- task_hash = watcher.add(task) # Writes task file to watch_dir/task_hash.json
39-
40- captask = CapTask(cap_name="email", args=["to@example.com"])
41- captask_hash = watcher.add(captask) # Writes captask file to watch_dir/captask_hash.json
4247 """
4348
44- def __init__ (self , watch_path : str ):
49+ def __init__ (self , watch_path : str , output_format : str = "json" ):
4550 """
46- Initialize the internal watcher with the watch directory path.
51+ Initialize the internal watcher with the watch directory path and output format .
4752
48- :param watch_path: Directory where task files will be written (same as file watcher's watch_path)
53+ :param watch_path: Directory where task files will be written (same as file watcher's watch_path).
54+ :param output_format: Serialization format, either "json" (default) or "yaml".
4955 """
56+ if output_format not in ("json" , "yaml" ):
57+ raise ValueError (f"output_format must be 'json' or 'yaml', got '{ output_format } '" )
58+
5059 self .watch_path = watch_path
60+ self .output_format = output_format
5161 self .logger = logging .getLogger (__name__ )
5262
5363 # Ensure watch directory exists
5464 os .makedirs (self .watch_path , exist_ok = True )
5565
56- # Metrics for Task
66+ # Metrics for Task writes
5767 self .internal_task_counter = meter .create_counter (
5868 "internal_task_write" ,
5969 description = "Number of internal Task instances written to file"
@@ -63,7 +73,7 @@ def __init__(self, watch_path: str):
6373 description = "Number of errors while writing internal Task instances to file"
6474 )
6575
66- # Metrics for CapTask
76+ # Metrics for CapTask writes
6777 self .internal_captask_counter = meter .create_counter (
6878 "internal_captask_write" ,
6979 description = "Number of internal CapTask instances written to file"
@@ -73,7 +83,10 @@ def __init__(self, watch_path: str):
7383 description = "Number of errors while writing internal CapTask instances to file"
7484 )
7585
76- self .logger .info (f"InternalWatcher initialized with watch_path: { self .watch_path } " )
86+ self .logger .info (
87+ "InternalWatcher initialized with watch_path=%s, format=%s" ,
88+ self .watch_path , self .output_format
89+ )
7790
7891 @tracer .start_as_current_span ("internal_watcher_add" )
7992 def add (self , item : Union [Task , CapTask ]) -> str :
@@ -83,11 +96,10 @@ def add(self, item: Union[Task, CapTask]) -> str:
8396 :param item: The Task or CapTask object to persist.
8497 :return: The hash string used in the filename.
8598 :raises TypeError: If the provided item is not a Task or CapTask instance.
86- :raises Exception: If file writing fails (logged and re‑ raised).
99+ :raises Exception: If file writing fails (logged and re- raised).
87100 """
88101 current_span = trace .get_current_span ()
89102
90- # Dispatch based on type
91103 if isinstance (item , Task ):
92104 return self ._add_task (item , current_span )
93105 elif isinstance (item , CapTask ):
@@ -101,7 +113,6 @@ def add(self, item: Union[Task, CapTask]) -> str:
101113
102114 def _add_task (self , task : Task , span : trace .Span ) -> str :
103115 """Handle Task instance writing."""
104- # Extract task hash (required for filename)
105116 task_hash = getattr (task , 'hash' , None )
106117 if not task_hash :
107118 error_msg = "Task object missing 'hash' attribute"
@@ -111,31 +122,30 @@ def _add_task(self, task: Task, span: trace.Span) -> str:
111122 self .internal_task_error_counter .add (1 )
112123 raise ValueError (error_msg )
113124
114- # Enrich span with task metadata
115125 task_id = getattr (task , 'id' , 'unknown' )
116126 task_type = getattr (task , 'type' , 'unknown' )
117127 span .set_attribute ("task.id" , str (task_id ))
118128 span .set_attribute ("task.type" , task_type )
119129 span .set_attribute ("task.hash" , str (task_hash ))
120130 span .set_attribute ("item.type" , "Task" )
121131
122- self .logger .debug (f"Internally generated Task received: id={ task_id } , hash={ task_hash } , type={ task_type } " )
132+ self .logger .debug ("Internally generated Task received: id=%s, hash=%s, type=%s" ,
133+ task_id , task_hash , task_type )
123134
124135 try :
125136 file_path = self ._write_item_file (task , task_hash , "Task" )
126137 span .set_attribute ("file.path" , file_path )
127138 self .internal_task_counter .add (1 )
128- self .logger .info (f "Internal Task { task_hash } written to file: { file_path } " )
139+ self .logger .info ("Internal Task %s written to file: %s" , task_hash , file_path )
129140 return task_hash
130141 except Exception as e :
131- self .logger .error (f "Failed to write internal Task { task_hash } to file: { e } " , exc_info = True )
142+ self .logger .error ("Failed to write internal Task %s to file: %s" , task_hash , e , exc_info = True )
132143 span .record_exception (e )
133144 self .internal_task_error_counter .add (1 )
134145 raise
135146
136147 def _add_captask (self , captask : CapTask , span : trace .Span ) -> str :
137148 """Handle CapTask instance writing."""
138- # Extract CapTask hash (required for filename)
139149 captask_hash = getattr (captask , 'hash' , None )
140150 if not captask_hash :
141151 error_msg = "CapTask object missing 'hash' attribute"
@@ -150,39 +160,62 @@ def _add_captask(self, captask: CapTask, span: trace.Span) -> str:
150160 span .set_attribute ("captask.hash" , str (captask_hash ))
151161 span .set_attribute ("item.type" , "CapTask" )
152162
153- self .logger .debug (f"Internally generated CapTask received: cap_name={ cap_name } , hash={ captask_hash } " )
163+ self .logger .debug ("Internally generated CapTask received: cap_name=%s, hash=%s" ,
164+ cap_name , captask_hash )
154165
155166 try :
156167 file_path = self ._write_item_file (captask , captask_hash , "CapTask" )
157168 span .set_attribute ("file.path" , file_path )
158169 self .internal_captask_counter .add (1 )
159- self .logger .info (f "Internal CapTask { captask_hash } written to file: { file_path } " )
170+ self .logger .info ("Internal CapTask %s written to file: %s" , captask_hash , file_path )
160171 return captask_hash
161172 except Exception as e :
162- self .logger .error (f "Failed to write internal CapTask { captask_hash } to file: { e } " , exc_info = True )
173+ self .logger .error ("Failed to write internal CapTask %s to file: %s" , captask_hash , e , exc_info = True )
163174 span .record_exception (e )
164175 self .internal_captask_error_counter .add (1 )
165176 raise
166177
167178 def _write_item_file (self , item : Union [Task , CapTask ], item_hash : str , item_type : str ) -> str :
168179 """
169- Write a Task or CapTask instance to a file in watch_path. Format defaults to JSON.
180+ Write a Task or CapTask instance to a file in watch_path.
181+
182+ Format is determined by self.output_format (json or yaml).
170183
171184 :param item: Task or CapTask object to serialize.
172185 :param item_hash: Hash string used as filename stem.
173186 :param item_type: Descriptive type for logging (not used in filename).
174187 :return: Full path to the written file.
175188 """
176- ext = ".json"
189+ ext = ".json" if self . output_format == "json" else ".yaml"
177190 file_path = os .path .join (self .watch_path , f"{ item_hash } { ext } " )
178191
179- # Serialize item to dict (assuming to_dict() exists, fallback to __dict__)
192+ # Extract serializable dictionary
180193 if hasattr (item , 'to_dict' ):
181194 item_dict = item .to_dict ()
182195 else :
183196 item_dict = item .__dict__
184197
185198 with open (file_path , 'w' , encoding = 'utf-8' ) as f :
186- json .dump (item_dict , f , indent = 2 )
187-
188- return file_path
199+ if self .output_format == "json" :
200+ json .dump (item_dict , f , indent = 2 )
201+ else : # yaml
202+ yaml .dump (item_dict , f , default_flow_style = False , allow_unicode = True )
203+
204+ return file_path
205+
206+
207+ # ---------------------------------------------------------------------------
208+ # Example usage
209+ # ---------------------------------------------------------------------------
210+ #
211+ # # Using JSON (default)
212+ # watcher = InternalWatcher("/path/to/watch_dir")
213+ # task = Task(...) # Create a Task instance
214+ # task_hash = watcher.add(task) # Writes /path/to/watch_dir/<hash>.json
215+ #
216+ # captask = CapTask(cap_name="email", args=["to@example.com"])
217+ # captask_hash = watcher.add(captask) # Writes /path/to/watch_dir/<hash>.json
218+ #
219+ # # Using YAML
220+ # yaml_watcher = InternalWatcher("/path/to/watch_dir", output_format="yaml")
221+ # yaml_watcher.add(task) # Writes /path/to/watch_dir/<hash>.yaml
0 commit comments