Skip to content

Commit 215023d

Browse files
committed
fix: Cosmos record race condition and concurrent upsert lost-update
Root cause: submit() enqueued the processing message BEFORE inserting the Cosmos record. ContentProcessor picked up the message instantly, found no existing record, and created its own document. The Workflow then inserted a second document with the same process_id. Subsequent poll_status() always read the Workflow's stale 'processing' copy while ContentProcessor updated the other one to 'Completed'. Fixes: - Reorder submit(): blob upload -> Cosmos insert -> queue enqueue, so ContentProcessor always finds and updates the pre-existing record. - Add asyncio.Lock (_upsert_lock) around all Upsert_Content_Process calls to serialize read-modify-write on the shared Claim_Process document and prevent concurrent tasks from overwriting each other. - Increase document processing semaphore from 2 to 4. - Apply code-quality pass (structured docstrings, comment cleanup).
1 parent 2e228d6 commit 215023d

2 files changed

Lines changed: 103 additions & 72 deletions

File tree

src/ContentProcessorWorkflow/src/services/content_process_service.py

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def _get_queue_client(self) -> QueueClient:
102102
return self._queue_client
103103

104104
# ------------------------------------------------------------------ #
105-
# submit — replaces POST /contentprocessor/submit
105+
# submit
106106
# ------------------------------------------------------------------ #
107107
async def submit(
108108
self,
@@ -112,13 +112,26 @@ async def submit(
112112
schema_id: str,
113113
metadata_id: str,
114114
) -> str:
115-
"""Upload file to blob, enqueue processing message, create Cosmos record.
115+
"""Upload file to blob, insert Cosmos record, and enqueue processing.
116116
117-
Returns the generated process_id.
117+
Steps:
118+
1. Upload the file to blob storage.
119+
2. Insert a Cosmos DB record so ContentProcessor finds it
120+
on pickup (avoids duplicate-document race).
121+
3. Enqueue a processing message to the extract queue.
122+
123+
Args:
124+
file_bytes: Raw file content.
125+
filename: Sanitized file name.
126+
mime_type: Detected MIME type.
127+
schema_id: Schema to apply during extraction.
128+
metadata_id: Associated metadata identifier.
129+
130+
Returns:
131+
The generated process_id (UUID string).
118132
"""
119133
process_id = str(uuid.uuid4())
120134

121-
# 1. Upload file to blob: {cps-processes}/{process_id}/{filename}
122135
container_name = self._config.app_cps_processes
123136
blob_helper = self._get_blob_helper()
124137
await asyncio.to_thread(
@@ -128,7 +141,18 @@ async def submit(
128141
data=file_bytes,
129142
)
130143

131-
# 2. Enqueue processing message
144+
# Insert Cosmos record BEFORE enqueuing so ContentProcessor
145+
# finds this record (not creates a duplicate) when it starts.
146+
record = ContentProcessRecord(
147+
id=process_id,
148+
process_id=process_id,
149+
processed_file_name=filename,
150+
processed_file_mime_type=mime_type,
151+
status="processing",
152+
imported_time=datetime.now(timezone.utc),
153+
)
154+
await self._process_repo.add_async(record)
155+
132156
message = ContentProcessMessage(
133157
process_id=process_id,
134158
files=[
@@ -166,28 +190,21 @@ async def submit(
166190
self._get_queue_client().send_message, message.model_dump_json()
167191
)
168192

169-
# 3. Insert initial Cosmos record via sas-cosmosdb
170-
record = ContentProcessRecord(
171-
id=process_id,
172-
process_id=process_id,
173-
processed_file_name=filename,
174-
processed_file_mime_type=mime_type,
175-
status="processing",
176-
imported_time=datetime.now(timezone.utc),
177-
)
178-
await self._process_repo.add_async(record)
179-
180193
logger.info("Submitted process %s for file %s", process_id, filename)
181194
return process_id
182195

183196
# ------------------------------------------------------------------ #
184-
# get_status — replaces GET /contentprocessor/status/{id}
197+
# get_status
185198
# ------------------------------------------------------------------ #
186199
async def get_status(self, process_id: str) -> dict | None:
187200
"""Query Cosmos for process status.
188201
189-
Returns a dict with keys: status, process_id, file_name.
190-
Returns None if not found.
202+
Args:
203+
process_id: The content process identifier.
204+
205+
Returns:
206+
Dict with keys ``status``, ``process_id``, ``file_name``;
207+
``None`` if the record does not exist.
191208
"""
192209
record = await self._process_repo.get_async(process_id)
193210
if record is None:
@@ -199,25 +216,33 @@ async def get_status(self, process_id: str) -> dict | None:
199216
}
200217

201218
# ------------------------------------------------------------------ #
202-
# get_processed — replaces GET /contentprocessor/processed/{id}
219+
# get_processed
203220
# ------------------------------------------------------------------ #
204221
async def get_processed(self, process_id: str) -> dict | None:
205222
"""Query Cosmos for the full processed content result.
206223
207-
Returns the full document dict, or None if not found.
224+
Args:
225+
process_id: The content process identifier.
226+
227+
Returns:
228+
Full document dict, or ``None`` if not found.
208229
"""
209230
record = await self._process_repo.get_async(process_id)
210231
if record is None:
211232
return None
212233
return record.model_dump(mode="json")
213234

214235
# ------------------------------------------------------------------ #
215-
# get_steps — replaces GET /contentprocessor/processed/{id}/steps
236+
# get_steps
216237
# ------------------------------------------------------------------ #
217238
async def get_steps(self, process_id: str) -> list | None:
218239
"""Download step_outputs.json from blob storage.
219240
220-
Returns parsed list of step output dicts, or None if not found.
241+
Args:
242+
process_id: The content process identifier.
243+
244+
Returns:
245+
Parsed JSON list of step objects, or ``None`` if not found.
221246
"""
222247
container_name = self._config.app_cps_processes
223248
blob_name = f"{process_id}/step_outputs.json"
@@ -234,7 +259,7 @@ async def get_steps(self, process_id: str) -> list | None:
234259
return None
235260

236261
# ------------------------------------------------------------------ #
237-
# poll_status — replaces the HTTP polling loop
262+
# poll_status
238263
# ------------------------------------------------------------------ #
239264
async def poll_status(
240265
self,
@@ -243,16 +268,18 @@ async def poll_status(
243268
timeout_seconds: float = 600.0,
244269
on_poll: Callable[[dict], Awaitable[None] | None] | None = None,
245270
) -> dict:
246-
"""Poll Cosmos for status until terminal state or timeout.
271+
"""Poll Cosmos for status until a terminal state or timeout.
247272
248273
Args:
249274
process_id: The content process ID to poll.
250275
poll_interval_seconds: Delay between poll attempts.
251-
timeout_seconds: Maximum time to wait for a terminal status.
252-
on_poll: Optional callback invoked on each poll iteration with
276+
timeout_seconds: Maximum elapsed time before giving up.
277+
on_poll: Optional callback invoked on each iteration with
253278
the current status dict. Accepts sync or async callables.
254279
255-
Returns the final status dict with keys: status, process_id, file_name.
280+
Returns:
281+
Final status dict with keys ``status``, ``process_id``,
282+
``file_name``, and ``terminal``.
256283
"""
257284
elapsed = 0.0
258285
result: dict | None = None

src/ContentProcessorWorkflow/src/steps/document_process/executor/document_process_executor.py

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,11 @@ async def handle_execute(
157157
)
158158
)
159159

160-
# Limit concurrency to avoid overwhelming the service
161-
max_concurrency = 2
160+
# Limit concurrency; serialize Cosmos writes via _upsert_lock to
161+
# prevent lost-update races on the shared Claim_Process document.
162+
max_concurrency = 4
162163
semaphore = asyncio.Semaphore(max_concurrency)
164+
_upsert_lock = asyncio.Lock()
163165

164166
async def _process_one(item) -> dict:
165167
async with semaphore:
@@ -184,7 +186,7 @@ async def _process_one(item) -> dict:
184186
schema_id,
185187
)
186188

187-
# Direct submit: blob upload + queue enqueue + cosmos insert
189+
# Direct submit: blob upload + cosmos insert + queue enqueue
188190
process_id = await content_process_service.submit(
189191
file_bytes=file_bytes,
190192
filename=filename,
@@ -193,21 +195,21 @@ async def _process_one(item) -> dict:
193195
metadata_id=metadata_id,
194196
)
195197

196-
# Upsert initial status to claim process
197-
await claim_process_repository.Upsert_Content_Process(
198-
process_id=claim_id,
199-
content_process=Content_Process(
200-
process_id=process_id,
201-
file_name=str(item.file_name),
202-
mime_type=content_type or "application/octet-stream",
203-
status="processing",
204-
),
205-
)
198+
# Upsert initial "processing" status to claim process
199+
async with _upsert_lock:
200+
await claim_process_repository.Upsert_Content_Process(
201+
process_id=claim_id,
202+
content_process=Content_Process(
203+
process_id=process_id,
204+
file_name=str(item.file_name),
205+
mime_type=content_type or "application/octet-stream",
206+
status="processing",
207+
),
208+
)
206209

207-
# Poll Cosmos directly until terminal status,
208-
# upserting intermediate status changes to the claim process.
209-
# Track the last status to avoid redundant writes that
210-
# create race conditions with concurrent tasks.
210+
# Poll until terminal status, upserting intermediate
211+
# changes. Skip duplicate and terminal statuses to
212+
# avoid clobbering the caller's richer final upsert.
211213
_last_polled_status: str | None = None
212214

213215
async def _on_poll(poll_data: dict) -> None:
@@ -216,19 +218,20 @@ async def _on_poll(poll_data: dict) -> None:
216218
if polled_status == _last_polled_status:
217219
return
218220
_last_polled_status = polled_status
219-
# Skip the final "Completed"/"Error" upsert here;
220-
# the caller does a richer upsert with scores.
221+
# Terminal statuses are handled by the caller with scores.
221222
if polled_status in ("Completed", "Error"):
222223
return
223-
await claim_process_repository.Upsert_Content_Process(
224-
process_id=claim_id,
225-
content_process=Content_Process(
226-
process_id=process_id,
227-
file_name=str(item.file_name),
228-
mime_type=content_type or "application/octet-stream",
229-
status=polled_status,
230-
),
231-
)
224+
async with _upsert_lock:
225+
await claim_process_repository.Upsert_Content_Process(
226+
process_id=claim_id,
227+
content_process=Content_Process(
228+
process_id=process_id,
229+
file_name=str(item.file_name),
230+
mime_type=content_type
231+
or "application/octet-stream",
232+
status=polled_status,
233+
),
234+
)
232235

233236
poll_result = await content_process_service.poll_status(
234237
process_id=process_id,
@@ -239,7 +242,6 @@ async def _on_poll(poll_data: dict) -> None:
239242

240243
status_text = poll_result.get("status", "Failed")
241244

242-
# Fetch final processed result for scores
243245
schema_score_f = 0.0
244246
entity_score_f = 0.0
245247
processed_time = ""
@@ -271,21 +273,23 @@ async def _on_poll(poll_data: dict) -> None:
271273
processed_time = ""
272274
result_payload = final_payload
273275

274-
# Final cosmos upsert with scores
275-
await claim_process_repository.Upsert_Content_Process(
276-
process_id=claim_id,
277-
content_process=Content_Process(
278-
process_id=process_id,
279-
file_name=str(item.file_name),
280-
mime_type=content_type or "application/octet-stream",
281-
status=status_text,
282-
schema_score=schema_score_f,
283-
entity_score=entity_score_f,
284-
processed_time=processed_time,
285-
),
286-
)
287-
288-
# Map status to HTTP-like code for downstream compatibility
276+
# Final upsert with scores
277+
async with _upsert_lock:
278+
await claim_process_repository.Upsert_Content_Process(
279+
process_id=claim_id,
280+
content_process=Content_Process(
281+
process_id=process_id,
282+
file_name=str(item.file_name),
283+
mime_type=content_type
284+
or "application/octet-stream",
285+
status=status_text,
286+
schema_score=schema_score_f,
287+
entity_score=entity_score_f,
288+
processed_time=processed_time,
289+
),
290+
)
291+
292+
# Map to HTTP-like code for downstream compatibility
289293
if status_text == "Completed":
290294
status_code = 302
291295
elif status_text in ("Error", "Failed"):

0 commit comments

Comments
 (0)