1+ """
2+ Grep Function Call Module
3+
4+ Implements a grep search capability using `igrep` as the primary backend, with
5+ automatic fallback to the standard Unix `grep` when `igrep` is not available.
6+ The tool provides a structured interface for regex file search, integrating with
7+ the SCL function‑call system via the Capability base class.
8+
9+ Features:
10+ - Regex Search
11+ - Path Targeting (defaults to CWD, supports multiple paths)
12+ - Glob Filtering (comma/space separated, brace expansion; translated to
13+ --include for grep)
14+ - File Type Filter (--type) – **only available with igrep**; raises an error
15+ when used with grep
16+ - Case-Insensitive Search (-i)
17+ - Multiline Mode (-U --multiline-dotall) – **only available with igrep**;
18+ raises an error when used with grep
19+ - Output Modes: files_with_matches, content, count
20+ - Context Lines: -A, -B, -C (content mode only; supported by both igrep and
21+ GNU grep)
22+ - Line Numbers toggle (content mode, default on)
23+ - Pagination: head_limit & offset applied to final output
24+ - Ignored Content: VCS dirs, permission-based ignore patterns – handled through
25+ igrep’s default ignore rules; grep does not automatically respect ignore files
26+
27+ OpenTelemetry: uses tracer, meter and structured logging for full observability.
28+ """
29+ import logging
30+ import os
31+ import re
32+ import subprocess
33+ from itertools import product
34+ from typing import Optional , Dict , Any , List , Union
35+
36+ from opentelemetry import trace
37+ from scl .otel .otel import tracer , meter
38+ from scl .meta .capability import Capability
39+
40+ logger = logging .getLogger (__name__ )
41+
42+ # Meter for grep executions
43+ grep_execution_counter = meter .create_counter (
44+ "grep_function_call.executed" ,
45+ description = "Number of times a grep function call was executed"
46+ )
47+
48+
49+ class GrepFunctionCall (Capability ):
50+ """
51+ Concrete implementation of Capability for grep search invocations.
52+ Uses `igrep` by preference; falls back to standard `grep` when `igrep`
53+ is not installed. Some advanced features are only available with `igrep`.
54+ """
55+
56+ # Flags that are known to work with igrep (derived from `igrep --help`)
57+ _IGREP_SUPPORTED_OPTIONS = {
58+ "-i" , "--ignore-case" , "-S" , "--smart-case" ,
59+ "-." , "--hidden" , "-L" , "--follow" , "-w" , "--word-regexp" ,
60+ "-g" , "--glob" , "-t" , "--type" , "-T" , "--type-not" ,
61+ "--editor" , "--custom-command" , "--theme" , "--context-viewer" ,
62+ "--type-list" , "-h" , "--help" , "-V" , "--version" ,
63+ }
64+
65+ @tracer .start_as_current_span ("GrepFunctionCall.__init__" )
66+ def __init__ (self ,
67+ name : str ,
68+ description : str ,
69+ original_body : str ,
70+ llm_description : Optional [str ] = None ,
71+ search_params : Optional [Dict ] = None ):
72+ current_span = trace .get_current_span ()
73+ current_span .set_attribute ("grep.name" , name )
74+
75+ super ().__init__ (
76+ name = name ,
77+ type = "grep_function_call" ,
78+ description = description ,
79+ original_body = original_body ,
80+ llm_description = llm_description
81+ )
82+
83+ # Default search parameters (used when not overridden in execute)
84+ self .search_params = search_params or {}
85+ logger .debug (f"GrepFunctionCall '{ name } ' initialized with params: { self .search_params } " )
86+ logger .info (f"GrepFunctionCall '{ name } ' created" )
87+
88+ @tracer .start_as_current_span ("GrepFunctionCall.execute" )
89+ def execute (self , args_dict : Dict [str , Any ]) -> str :
90+ """
91+ Execute the grep search with the provided arguments.
92+
93+ Args:
94+ args_dict: Dictionary containing search parameters. Merged with default
95+ search_params. Supported keys: pattern, path (str or list of str),
96+ glob, type, ignore_case, multiline, output_mode
97+ (files_with_matches, content, count), context_before,
98+ context_after, context_around, line_numbers, head_limit, offset.
99+
100+ Returns:
101+ String containing the search output based on output_mode.
102+ """
103+ current_span = trace .get_current_span ()
104+ # Merge defaults with runtime args; runtime args take precedence
105+ merged_args = {** self .search_params , ** args_dict }
106+
107+ pattern = merged_args .get ("pattern" )
108+ if not pattern :
109+ error_msg = "No search pattern provided"
110+ logger .error (error_msg )
111+ current_span .set_status (trace .Status (trace .StatusCode .ERROR , error_msg ))
112+ raise ValueError (error_msg )
113+
114+ current_span .set_attribute ("grep.pattern" , pattern )
115+ current_span .set_attribute ("grep.path" , str (merged_args .get ("path" , os .getcwd ())))
116+
117+ try :
118+ cmd = self ._build_command (merged_args )
119+ logger .info (f"Executing grep command: { ' ' .join (cmd )} " )
120+ current_span .set_attribute ("grep.command" , ' ' .join (cmd ))
121+
122+ result = self ._run_command (cmd )
123+
124+ # Apply pagination
125+ head_limit = merged_args .get ("head_limit" )
126+ offset = merged_args .get ("offset" , 0 )
127+ if head_limit is not None or offset > 0 :
128+ lines = result .splitlines ()
129+ if offset > 0 :
130+ lines = lines [offset :]
131+ if head_limit is not None :
132+ lines = lines [:head_limit ]
133+ result = "\n " .join (lines )
134+
135+ grep_execution_counter .add (1 , {"grep.name" : self .name })
136+ current_span .set_attribute ("grep.result_length" , len (result ))
137+ logger .info (f"GrepFunctionCall '{ self .name } ' executed successfully" )
138+ return result
139+
140+ except Exception as e :
141+ logger .error (f"GrepFunctionCall '{ self .name } ' execution failed: { e } " , exc_info = True )
142+ current_span .record_exception (e )
143+ current_span .set_status (trace .Status (trace .StatusCode .ERROR , str (e )))
144+ raise
145+
146+ def _build_command (self , args_dict : Dict [str , Any ]) -> List [str ]:
147+ """
148+ Build the grep command using the appropriate binary (igrep or grep),
149+ translating options as needed for compatibility.
150+ """
151+ binary = self ._get_grep_binary ()
152+ cmd = [binary ]
153+
154+ # Features that are handled differently per binary
155+ output_mode = args_dict .get ("output_mode" , "content" )
156+ ignore_case = args_dict .get ("ignore_case" , False )
157+ multiline = args_dict .get ("multiline" , False )
158+ line_numbers = args_dict .get ("line_numbers" , True ) # content mode only
159+ context_before = args_dict .get ("context_before" )
160+ context_after = args_dict .get ("context_after" )
161+ context_around = args_dict .get ("context_around" )
162+ glob_pattern = args_dict .get ("glob" )
163+ file_type = args_dict .get ("type" )
164+
165+ # Supported flags common to both igrep and grep
166+ if ignore_case :
167+ cmd .append ("-i" )
168+
169+ # Multiline mode – only igrep supports this
170+ if multiline :
171+ if binary == "grep" :
172+ raise RuntimeError (
173+ "Multiline mode (-U --multiline-dotall) is not supported by standard grep. "
174+ "Install igrep to use this feature."
175+ )
176+ cmd .extend (["-U" , "--multiline-dotall" ])
177+
178+ # Output modes
179+ if output_mode == "files_with_matches" :
180+ cmd .append ("-l" )
181+ elif output_mode == "count" :
182+ cmd .append ("-c" )
183+ elif output_mode == "content" :
184+ if line_numbers :
185+ cmd .append ("-n" )
186+ if context_before is not None :
187+ cmd .extend (["-B" , str (context_before )])
188+ if context_after is not None :
189+ cmd .extend (["-A" , str (context_after )])
190+ if context_around is not None :
191+ cmd .extend (["-C" , str (context_around )])
192+
193+ # Glob filtering – igrep uses -g, grep uses --include
194+ if glob_pattern :
195+ globs = self ._parse_glob (glob_pattern )
196+ if binary == "igrep" :
197+ for g in globs :
198+ cmd .extend (["-g" , g ])
199+ else : # grep
200+ for g in globs :
201+ cmd .extend (["--include" , g ])
202+
203+ # File type filter – only igrep supports --type
204+ if file_type :
205+ if binary == "grep" :
206+ raise RuntimeError (
207+ "File type filtering (--type) is not supported by standard grep. "
208+ "Install igrep to use this feature."
209+ )
210+ cmd .extend (["--type" , file_type ])
211+
212+ # Pattern must come before path(s)
213+ cmd .append (args_dict ["pattern" ])
214+
215+ # Path targeting – supports multiple paths
216+ paths = args_dict .get ("path" , os .getcwd ())
217+ if isinstance (paths , str ):
218+ cmd .append (paths )
219+ elif isinstance (paths , list ):
220+ cmd .extend (paths )
221+ else :
222+ cmd .append (str (paths ))
223+
224+ return cmd
225+
226+ def _get_grep_binary (self ) -> str :
227+ """
228+ Choose the most capable binary available.
229+ Prefer `igrep`; fall back to standard `grep` if `igrep` is not found.
230+ """
231+ if self ._is_binary_available ("igrep" ):
232+ logger .debug ("Using 'igrep' for full feature support." )
233+ return "igrep"
234+
235+ if self ._is_binary_available ("grep" ):
236+ logger .debug ("Using standard 'grep' (igrep not found)." )
237+ return "grep"
238+
239+ raise FileNotFoundError (
240+ "Neither `igrep` nor standard `grep` found. "
241+ "Please install igrep or ensure grep is in your PATH."
242+ )
243+
244+ def _parse_glob (self , glob_pattern : str ) -> List [str ]:
245+ """
246+ Parse glob patterns. Supports comma/space separation and brace expansion.
247+
248+ For example:
249+ "*.log, *.txt" -> ["*.log", "*.txt"]
250+ "*.{js,ts}" -> ["*.js", "*.ts"]
251+ "*.log,*.{md,rst} src/.*" -> ["*.log", "*.md", "*.rst", "src/.*"]
252+ """
253+ # Step 1: split by commas that are outside braces (replace them with spaces)
254+ depth = 0
255+ simplified = []
256+ for ch in glob_pattern :
257+ if ch == '{' :
258+ depth += 1
259+ simplified .append (ch )
260+ elif ch == '}' :
261+ depth -= 1
262+ simplified .append (ch )
263+ elif ch == ',' and depth == 0 :
264+ simplified .append (' ' ) # treat as whitespace separator
265+ else :
266+ simplified .append (ch )
267+ # Step 2: split by whitespace to obtain raw tokens
268+ raw_tokens = re .split (r'\s+' , '' .join (simplified ).strip ())
269+ # Step 3: expand braces in each token
270+ expanded = []
271+ for token in raw_tokens :
272+ if not token :
273+ continue
274+ expanded .extend (self ._expand_braces (token ))
275+ return expanded if expanded else [glob_pattern ]
276+
277+ @staticmethod
278+ def _expand_braces (text : str ) -> List [str ]:
279+ """
280+ Expand brace groups like "{a,b}" into a list of strings.
281+
282+ Supports multiple brace groups, e.g. "a{b,c}d{e,f}" -> ["abde", "abdf", "acde", "acdf"].
283+ No nesting of braces is supported.
284+ """
285+ # Find all brace groups
286+ brace_re = re .compile (r'\{([^{}]*)\}' )
287+ matches = list (brace_re .finditer (text ))
288+ if not matches :
289+ return [text ]
290+ # Extract the comma-separated options for each group
291+ option_lists = [m .group (1 ).split (',' ) for m in matches ]
292+ results = []
293+ for combo in product (* option_lists ):
294+ # Reconstruct the string by replacing each brace group with the chosen option
295+ last_idx = 0
296+ parts = []
297+ for match , opt in zip (matches , combo ):
298+ start , end = match .span ()
299+ parts .append (text [last_idx :start ])
300+ parts .append (opt )
301+ last_idx = end
302+ parts .append (text [last_idx :])
303+ results .append ('' .join (parts ))
304+ return results
305+
306+ @staticmethod
307+ def _is_binary_available (name : str ) -> bool :
308+ try :
309+ subprocess .run ([name , "--version" ], capture_output = True , check = False )
310+ return True
311+ except FileNotFoundError :
312+ return False
313+
314+ def _run_command (self , cmd : List [str ]) -> str :
315+ """
316+ Execute the grep command and return the output.
317+ """
318+ try :
319+ result = subprocess .run (
320+ cmd ,
321+ capture_output = True ,
322+ text = True ,
323+ check = False # grep returns 1 if no matches found
324+ )
325+ if result .returncode == 0 :
326+ return result .stdout
327+ elif result .returncode == 1 :
328+ # No matches found
329+ return ""
330+ else :
331+ error_msg = f"grep failed with code { result .returncode } : { result .stderr } "
332+ logger .error (error_msg )
333+ raise RuntimeError (error_msg )
334+ except FileNotFoundError :
335+ logger .error ("grep binary not found. Please install igrep or ensure grep is in PATH." )
336+ raise
337+
338+ def __repr__ (self ) -> str :
339+ return f"GrepFunctionCall(name='{ self .name } ', pattern='{ self .search_params .get ('pattern' )} ')"
340+
341+
342+ """
343+ Example usage:
344+ --------------
345+ from scl.capabilities.grep_function_call import GrepFunctionCall
346+
347+ # Create a grep capability
348+ grep_cap = GrepFunctionCall(
349+ name="error_search",
350+ description="Search for ERROR patterns in log files",
351+ original_body="Searches for ERROR in log files",
352+ search_params={
353+ "glob": "*.log",
354+ "output_mode": "content",
355+ "ignore_case": False
356+ }
357+ )
358+
359+ # Execute with a specific pattern
360+ result = grep_cap.execute({"pattern": "ERROR", "path": "/var/log/"})
361+ print(result)
362+
363+ # Search with context lines and pagination
364+ result = grep_cap.execute({
365+ "pattern": "timeout",
366+ "path": ".",
367+ "output_mode": "content",
368+ "context_after": 2,
369+ "head_limit": 10,
370+ "offset": 5
371+ })
372+ print(result)
373+
374+ # Find files containing a specific type (only works with igrep)
375+ result = grep_cap.execute({
376+ "pattern": "def",
377+ "path": ".",
378+ "output_mode": "files_with_matches",
379+ "type": "python"
380+ })
381+ print(result)
382+
383+ # Use brace expansion in glob: "*.{md,rst}" will be expanded to "*.md" and "*.rst"
384+ grep_cap.search_params["glob"] = "*.{md,rst}"
385+ result = grep_cap.execute({"pattern": "TODO"})
386+ print(result)
387+
388+ # Search multiple directories by passing a list of paths
389+ result = grep_cap.execute({
390+ "pattern": "FIXME",
391+ "path": ["/var/log", "/home/user/project"],
392+ "output_mode": "content"
393+ })
394+ print(result)
395+ """
0 commit comments