Skip to content

Commit 9c32b35

Browse files
poll_status now re-polls on Error to wait for ContentProcessor retries before marking as terminal
1 parent cf78a78 commit 9c32b35

2 files changed

Lines changed: 92 additions & 2 deletions

File tree

src/ContentProcessorWorkflow/src/services/content_process_service.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,22 +267,35 @@ async def poll_status(
267267
poll_interval_seconds: float = 5.0,
268268
timeout_seconds: float = 600.0,
269269
on_poll: Callable[[dict], Awaitable[None] | None] | None = None,
270+
error_confirmation_polls: int = 3,
270271
) -> dict:
271272
"""Poll Cosmos for status until a terminal state or timeout.
272273
274+
When an ``Error`` status is observed, the poller does not return
275+
immediately. Instead it re-polls up to *error_confirmation_polls*
276+
additional times (with the same interval) to confirm the error is
277+
persistent and not a transient state during a ContentProcessor
278+
retry cycle. If the status changes away from ``Error`` (e.g.
279+
back to a step name or ``Retrying``), the normal polling loop
280+
resumes.
281+
273282
Args:
274283
process_id: The content process ID to poll.
275284
poll_interval_seconds: Delay between poll attempts.
276285
timeout_seconds: Maximum elapsed time before giving up.
277286
on_poll: Optional callback invoked on each iteration with
278287
the current status dict. Accepts sync or async callables.
288+
error_confirmation_polls: Number of additional polls to
289+
perform after first observing ``Error`` before accepting
290+
it as terminal. Defaults to 3.
279291
280292
Returns:
281293
Final status dict with keys ``status``, ``process_id``,
282294
``file_name``, and ``terminal``.
283295
"""
284296
elapsed = 0.0
285297
result: dict | None = None
298+
consecutive_error_polls = 0
286299
while elapsed < timeout_seconds:
287300
result = await self.get_status(process_id)
288301
if result is None:
@@ -299,10 +312,27 @@ async def poll_status(
299312
await poll_handler
300313

301314
status = result.get("status", "processing")
302-
if status in ("Completed", "Error"):
315+
if status == "Completed":
303316
result["terminal"] = True
304317
return result
305318

319+
if status == "Error":
320+
consecutive_error_polls += 1
321+
if consecutive_error_polls > error_confirmation_polls:
322+
result["terminal"] = True
323+
return result
324+
logger.info(
325+
"Process %s reported Error (confirmation %d/%d), "
326+
"re-polling to confirm retries are exhausted.",
327+
process_id,
328+
consecutive_error_polls,
329+
error_confirmation_polls,
330+
)
331+
else:
332+
# Status changed away from Error (e.g. retry started),
333+
# reset the confirmation counter.
334+
consecutive_error_polls = 0
335+
306336
await asyncio.sleep(poll_interval_seconds)
307337
elapsed += poll_interval_seconds
308338

src/ContentProcessorWorkflow/tests/unit/services/test_content_process_service.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,69 @@ async def _run():
142142
record.processed_file_name = "test.pdf"
143143
svc._process_repo.get_async.return_value = record
144144

145-
result = await svc.poll_status("p1", poll_interval_seconds=0.01)
145+
result = await svc.poll_status(
146+
"p1",
147+
poll_interval_seconds=0.01,
148+
error_confirmation_polls=3,
149+
)
146150
assert result["status"] == "Error"
147151
assert result["terminal"] is True
152+
# Should have been called 1 (initial) + 3 (confirmation) = 4 times
153+
assert svc._process_repo.get_async.call_count == 4
154+
155+
asyncio.run(_run())
156+
157+
def test_error_recovers_to_retrying(self):
158+
"""When Error is seen but status changes to Retrying, polling continues."""
159+
160+
async def _run():
161+
svc = _make_service()
162+
# Error -> Retrying -> extract -> Completed
163+
statuses = iter(["Error", "Retrying", "extract", "Completed"])
164+
165+
async def _get_async(pid):
166+
s = next(statuses)
167+
rec = MagicMock()
168+
rec.status = s
169+
rec.processed_file_name = "test.pdf"
170+
return rec
171+
172+
svc._process_repo.get_async.side_effect = _get_async
173+
174+
result = await svc.poll_status(
175+
"p1",
176+
poll_interval_seconds=0.01,
177+
error_confirmation_polls=3,
178+
)
179+
assert result["status"] == "Completed"
180+
assert result["terminal"] is True
181+
182+
asyncio.run(_run())
183+
184+
def test_error_recovers_to_step_name(self):
185+
"""When Error is seen but status changes to a step name, polling continues."""
186+
187+
async def _run():
188+
svc = _make_service()
189+
# Error -> map (retry started) -> Completed
190+
statuses = iter(["Error", "map", "Completed"])
191+
192+
async def _get_async(pid):
193+
s = next(statuses)
194+
rec = MagicMock()
195+
rec.status = s
196+
rec.processed_file_name = "test.pdf"
197+
return rec
198+
199+
svc._process_repo.get_async.side_effect = _get_async
200+
201+
result = await svc.poll_status(
202+
"p1",
203+
poll_interval_seconds=0.01,
204+
error_confirmation_polls=3,
205+
)
206+
assert result["status"] == "Completed"
207+
assert result["terminal"] is True
148208

149209
asyncio.run(_run())
150210

0 commit comments

Comments
 (0)