@@ -246,36 +246,68 @@ def _get_artifact_type(step_name: str) -> ArtifactType:
246246 container_name = self .application_context .configuration .app_cps_processes ,
247247 )
248248
249- ContentProcess (
250- process_id = self ._current_message_context .data_pipeline .process_id ,
251- processed_file_name = self ._current_message_context .data_pipeline .files [
252- 0
253- ].name ,
254- status = "Error" ,
255- processed_file_mime_type = self ._current_message_context .data_pipeline .files [
256- 0
257- ].mime_type ,
258- last_modified_time = datetime .datetime .now (datetime .UTC ),
259- last_modified_by = step_name ,
260- imported_time = datetime .datetime .strptime (
261- self ._current_message_context .data_pipeline .pipeline_status .creation_time ,
262- "%Y-%m-%dT%H:%M:%S.%fZ" ,
263- ),
264- process_output = [
265- Step_Outputs (
266- step_name = self .handler_name ,
267- step_result = exception_result .result ,
268- )
269- ],
270- ).update_status_to_cosmos (
271- connection_string = self .application_context .configuration .app_cosmos_connstr ,
272- database_name = self .application_context .configuration .app_cosmos_database ,
273- collection_name = self .application_context .configuration .app_cosmos_container_process ,
274- )
249+ # Only mark as terminal "Error" when retries are
250+ # exhausted. While retries remain, use "Retrying"
251+ # so the workflow poller keeps waiting instead of
252+ # treating the first transient failure as final.
253+ has_retries_remaining = queue_message .dequeue_count <= 5
254+
255+ if has_retries_remaining :
256+ # Lightweight status-only update — avoids
257+ # overwriting the document with null result /
258+ # scores that a previous successful step may
259+ # have written.
260+ ContentProcess (
261+ process_id = self ._current_message_context .data_pipeline .process_id ,
262+ processed_file_name = self ._current_message_context .data_pipeline .files [
263+ 0
264+ ].name ,
265+ status = "Retrying" ,
266+ processed_file_mime_type = self ._current_message_context .data_pipeline .files [
267+ 0
268+ ].mime_type ,
269+ last_modified_time = datetime .datetime .now (datetime .UTC ),
270+ last_modified_by = step_name ,
271+ imported_time = datetime .datetime .strptime (
272+ self ._current_message_context .data_pipeline .pipeline_status .creation_time ,
273+ "%Y-%m-%dT%H:%M:%S.%fZ" ,
274+ ),
275+ ).update_process_status_to_cosmos (
276+ connection_string = self .application_context .configuration .app_cosmos_connstr ,
277+ database_name = self .application_context .configuration .app_cosmos_database ,
278+ collection_name = self .application_context .configuration .app_cosmos_container_process ,
279+ )
280+ else :
281+ ContentProcess (
282+ process_id = self ._current_message_context .data_pipeline .process_id ,
283+ processed_file_name = self ._current_message_context .data_pipeline .files [
284+ 0
285+ ].name ,
286+ status = "Error" ,
287+ processed_file_mime_type = self ._current_message_context .data_pipeline .files [
288+ 0
289+ ].mime_type ,
290+ last_modified_time = datetime .datetime .now (datetime .UTC ),
291+ last_modified_by = step_name ,
292+ imported_time = datetime .datetime .strptime (
293+ self ._current_message_context .data_pipeline .pipeline_status .creation_time ,
294+ "%Y-%m-%dT%H:%M:%S.%fZ" ,
295+ ),
296+ process_output = [
297+ Step_Outputs (
298+ step_name = self .handler_name ,
299+ step_result = exception_result .result ,
300+ )
301+ ],
302+ ).update_status_to_cosmos (
303+ connection_string = self .application_context .configuration .app_cosmos_connstr ,
304+ database_name = self .application_context .configuration .app_cosmos_database ,
305+ collection_name = self .application_context .configuration .app_cosmos_container_process ,
306+ )
275307
276308 process_outputs : list [Step_Outputs ] = []
277309
278- if queue_message . dequeue_count > 5 :
310+ if not has_retries_remaining :
279311 logging .info (
280312 "Message will be moved to the Dead Letter Queue."
281313 )
0 commit comments