@@ -169,6 +169,9 @@ const MAX_RECONNECT_ATTEMPTS = 10
169169const RECONNECT_BASE_DELAY_MS = 1000
170170const RECONNECT_MAX_DELAY_MS = 30_000
171171const RECONNECT_EMPTY_BATCH_DELAY_MS = 500
172+ const RECONNECT_BATCH_REQUEST_TIMEOUT_MS = 15_000
173+ const RECONNECT_LIVE_TAIL_TIMEOUT_MS = 60_000
174+ const RECONNECT_RECOVERY_TIMEOUT_MS = 2 * 60_000
172175
173176const logger = createLogger ( 'useChat' )
174177
@@ -194,6 +197,43 @@ function stringArrayParam(value: unknown): string[] {
194197 return value . filter ( ( item ) : item is string => typeof item === 'string' && item . trim ( ) . length > 0 )
195198}
196199
200+ function createTimedAbortSignal (
201+ parentSignal : AbortSignal | undefined ,
202+ timeoutMs : number ,
203+ timeoutMessage : string
204+ ) : {
205+ signal : AbortSignal
206+ didTimeout : ( ) => boolean
207+ cleanup : ( ) => void
208+ } {
209+ const controller = new AbortController ( )
210+ let timedOut = false
211+
212+ const onAbort = ( ) => {
213+ controller . abort ( parentSignal ?. reason )
214+ }
215+
216+ if ( parentSignal ?. aborted ) {
217+ controller . abort ( parentSignal . reason )
218+ } else if ( parentSignal ) {
219+ parentSignal . addEventListener ( 'abort' , onAbort , { once : true } )
220+ }
221+
222+ const timeoutId = setTimeout ( ( ) => {
223+ timedOut = true
224+ controller . abort ( new Error ( timeoutMessage ) )
225+ } , timeoutMs )
226+
227+ return {
228+ signal : controller . signal ,
229+ didTimeout : ( ) => timedOut ,
230+ cleanup : ( ) => {
231+ clearTimeout ( timeoutId )
232+ parentSignal ?. removeEventListener ( 'abort' , onAbort )
233+ } ,
234+ }
235+ }
236+
197237function resolveWorkflowNameForDisplay ( workflowId : unknown ) : string | undefined {
198238 const id = stringParam ( workflowId )
199239 if ( ! id ) return undefined
@@ -2296,21 +2336,36 @@ export function useChat(
22962336 afterCursor : string ,
22972337 signal ?: AbortSignal
22982338 ) : Promise < StreamBatchResponse > => {
2299- const response = await fetch (
2300- `/api/mothership/chat/stream?streamId=${ encodeURIComponent ( streamId ) } &after=${ encodeURIComponent ( afterCursor ) } &batch=true` ,
2301- { signal }
2339+ const timeoutMessage = `Timed out fetching stream batch for ${ streamId } `
2340+ const timedAbort = createTimedAbortSignal (
2341+ signal ,
2342+ RECONNECT_BATCH_REQUEST_TIMEOUT_MS ,
2343+ timeoutMessage
23022344 )
2303- if ( ! response . ok ) {
2304- throw await createResumeTransportError (
2305- response ,
2306- `Stream resume batch failed: ${ response . status } `
2345+ try {
2346+ const response = await fetch (
2347+ `/api/mothership/chat/stream?streamId= ${ encodeURIComponent ( streamId ) } &after= ${ encodeURIComponent ( afterCursor ) } &batch=true` ,
2348+ { signal : timedAbort . signal }
23072349 )
2350+ if ( ! response . ok ) {
2351+ throw await createResumeTransportError (
2352+ response ,
2353+ `Stream resume batch failed: ${ response . status } `
2354+ )
2355+ }
2356+ const batch = parseStreamBatchResponse ( await response . json ( ) )
2357+ if ( Array . isArray ( batch . previewSessions ) && batch . previewSessions . length > 0 ) {
2358+ seedPreviewSessions ( batch . previewSessions )
2359+ }
2360+ return batch
2361+ } catch ( error ) {
2362+ if ( timedAbort . didTimeout ( ) && ! signal ?. aborted ) {
2363+ throw new Error ( timeoutMessage )
2364+ }
2365+ throw error
2366+ } finally {
2367+ timedAbort . cleanup ( )
23082368 }
2309- const batch = parseStreamBatchResponse ( await response . json ( ) )
2310- if ( Array . isArray ( batch . previewSessions ) && batch . previewSessions . length > 0 ) {
2311- seedPreviewSessions ( batch . previewSessions )
2312- }
2313- return batch
23142369 } ,
23152370 [ seedPreviewSessions ]
23162371 )
@@ -2328,6 +2383,7 @@ export function useChat(
23282383 let seedEvents = opts . initialBatch ?. events ?? [ ]
23292384 let streamStatus = opts . initialBatch ?. status ?? 'unknown'
23302385 let streamError = opts . initialBatch ?. error
2386+ const reconnectStartedAt = Date . now ( )
23312387
23322388 const isStaleReconnect = ( ) =>
23332389 streamGenRef . current !== expectedGen || abortControllerRef . current ?. signal . aborted === true
@@ -2341,6 +2397,15 @@ export function useChat(
23412397
23422398 try {
23432399 while ( streamGenRef . current === expectedGen ) {
2400+ if ( Date . now ( ) - reconnectStartedAt >= RECONNECT_RECOVERY_TIMEOUT_MS ) {
2401+ logger . warn ( 'Reconnect tail timed out waiting for terminal state' , {
2402+ streamId,
2403+ afterCursor : latestCursor ,
2404+ elapsedMs : Date . now ( ) - reconnectStartedAt ,
2405+ } )
2406+ throw new Error ( RECONNECT_TAIL_ERROR )
2407+ }
2408+
23442409 if ( seedEvents . length > 0 ) {
23452410 const replayResult = await processSSEStreamRef . current (
23462411 buildReplayStream ( seedEvents ) . getReader ( ) ,
@@ -2371,26 +2436,47 @@ export function useChat(
23712436
23722437 logger . info ( 'Opening live stream tail' , { streamId, afterCursor : latestCursor } )
23732438
2374- const sseRes = await fetch (
2375- `/api/mothership/chat/stream?streamId=${ encodeURIComponent ( streamId ) } &after=${ encodeURIComponent ( latestCursor ) } ` ,
2376- { signal : activeAbort . signal }
2439+ const tailTimeoutMessage = `Timed out waiting for live reconnect tail for ${ streamId } `
2440+ const timedTailAbort = createTimedAbortSignal (
2441+ activeAbort . signal ,
2442+ RECONNECT_LIVE_TAIL_TIMEOUT_MS ,
2443+ tailTimeoutMessage
23772444 )
2378- if ( ! sseRes . ok || ! sseRes . body ) {
2379- throw await createResumeTransportError ( sseRes , RECONNECT_TAIL_ERROR )
2380- }
2445+ let liveResult : { sawStreamError : boolean ; sawComplete : boolean }
2446+ try {
2447+ const sseRes = await fetch (
2448+ `/api/mothership/chat/stream?streamId=${ encodeURIComponent ( streamId ) } &after=${ encodeURIComponent ( latestCursor ) } ` ,
2449+ { signal : timedTailAbort . signal }
2450+ )
2451+ if ( ! sseRes . ok || ! sseRes . body ) {
2452+ throw await createResumeTransportError ( sseRes , RECONNECT_TAIL_ERROR )
2453+ }
23812454
2382- if ( isStaleReconnect ( ) ) {
2383- return { error : false , aborted : true }
2384- }
2455+ if ( isStaleReconnect ( ) ) {
2456+ return { error : false , aborted : true }
2457+ }
23852458
2386- setTransportStreaming ( )
2459+ setTransportStreaming ( )
23872460
2388- const liveResult = await processSSEStreamRef . current (
2389- sseRes . body . getReader ( ) ,
2390- assistantId ,
2391- expectedGen ,
2392- { preserveExistingState : true }
2393- )
2461+ liveResult = await processSSEStreamRef . current (
2462+ sseRes . body . getReader ( ) ,
2463+ assistantId ,
2464+ expectedGen ,
2465+ { preserveExistingState : true }
2466+ )
2467+ } catch ( error ) {
2468+ if ( timedTailAbort . didTimeout ( ) && ! activeAbort . signal . aborted ) {
2469+ logger . warn ( 'Live reconnect tail request timed out' , {
2470+ streamId,
2471+ afterCursor : latestCursor ,
2472+ timeoutMs : RECONNECT_LIVE_TAIL_TIMEOUT_MS ,
2473+ } )
2474+ throw new Error ( RECONNECT_TAIL_ERROR )
2475+ }
2476+ throw error
2477+ } finally {
2478+ timedTailAbort . cleanup ( )
2479+ }
23942480
23952481 if ( liveResult . sawStreamError ) {
23962482 return { error : true , aborted : false }
0 commit comments