@@ -169,9 +169,6 @@ const MAX_RECONNECT_ATTEMPTS = 10
169169const RECONNECT_BASE_DELAY_MS = 1000
170170const RECONNECT_MAX_DELAY_MS = 30_000
171171const RECONNECT_EMPTY_BATCH_DELAY_MS = 500
172- const RECONNECT_BATCH_REQUEST_TIMEOUT_MS = 15_000
173- const RECONNECT_LIVE_TAIL_TIMEOUT_MS = 60_000
174- const RECONNECT_RECOVERY_TIMEOUT_MS = 2 * 60_000
175172
176173const logger = createLogger ( 'useChat' )
177174
@@ -197,43 +194,6 @@ function stringArrayParam(value: unknown): string[] {
197194 return value . filter ( ( item ) : item is string => typeof item === 'string' && item . trim ( ) . length > 0 )
198195}
199196
200- function createTimedAbortSignal (
201- parentSignal : AbortSignal | undefined ,
202- timeoutMs : number ,
203- timeoutMessage : string
204- ) : {
205- signal : AbortSignal
206- didTimeout : ( ) => boolean
207- cleanup : ( ) => void
208- } {
209- const controller = new AbortController ( )
210- let timedOut = false
211-
212- const onAbort = ( ) => {
213- controller . abort ( parentSignal ?. reason )
214- }
215-
216- if ( parentSignal ?. aborted ) {
217- controller . abort ( parentSignal . reason )
218- } else if ( parentSignal ) {
219- parentSignal . addEventListener ( 'abort' , onAbort , { once : true } )
220- }
221-
222- const timeoutId = setTimeout ( ( ) => {
223- timedOut = true
224- controller . abort ( new Error ( timeoutMessage ) )
225- } , timeoutMs )
226-
227- return {
228- signal : controller . signal ,
229- didTimeout : ( ) => timedOut ,
230- cleanup : ( ) => {
231- clearTimeout ( timeoutId )
232- parentSignal ?. removeEventListener ( 'abort' , onAbort )
233- } ,
234- }
235- }
236-
237197function resolveWorkflowNameForDisplay ( workflowId : unknown ) : string | undefined {
238198 const id = stringParam ( workflowId )
239199 if ( ! id ) return undefined
@@ -2340,36 +2300,21 @@ export function useChat(
23402300 afterCursor : string ,
23412301 signal ?: AbortSignal
23422302 ) : Promise < StreamBatchResponse > => {
2343- const timeoutMessage = `Timed out fetching stream batch for ${ streamId } `
2344- const timedAbort = createTimedAbortSignal (
2345- signal ,
2346- RECONNECT_BATCH_REQUEST_TIMEOUT_MS ,
2347- timeoutMessage
2303+ const response = await fetch (
2304+ `/api/mothership/chat/stream?streamId=${ encodeURIComponent ( streamId ) } &after=${ encodeURIComponent ( afterCursor ) } &batch=true` ,
2305+ { signal }
23482306 )
2349- try {
2350- const response = await fetch (
2351- `/api/mothership/chat/stream?streamId= ${ encodeURIComponent ( streamId ) } &after= ${ encodeURIComponent ( afterCursor ) } &batch=true` ,
2352- { signal : timedAbort . signal }
2307+ if ( ! response . ok ) {
2308+ throw await createResumeTransportError (
2309+ response ,
2310+ `Stream resume batch failed: ${ response . status } `
23532311 )
2354- if ( ! response . ok ) {
2355- throw await createResumeTransportError (
2356- response ,
2357- `Stream resume batch failed: ${ response . status } `
2358- )
2359- }
2360- const batch = parseStreamBatchResponse ( await response . json ( ) )
2361- if ( Array . isArray ( batch . previewSessions ) && batch . previewSessions . length > 0 ) {
2362- seedPreviewSessions ( batch . previewSessions )
2363- }
2364- return batch
2365- } catch ( error ) {
2366- if ( timedAbort . didTimeout ( ) && ! signal ?. aborted ) {
2367- throw new Error ( timeoutMessage )
2368- }
2369- throw error
2370- } finally {
2371- timedAbort . cleanup ( )
23722312 }
2313+ const batch = parseStreamBatchResponse ( await response . json ( ) )
2314+ if ( Array . isArray ( batch . previewSessions ) && batch . previewSessions . length > 0 ) {
2315+ seedPreviewSessions ( batch . previewSessions )
2316+ }
2317+ return batch
23732318 } ,
23742319 [ seedPreviewSessions ]
23752320 )
@@ -2387,7 +2332,6 @@ export function useChat(
23872332 let seedEvents = opts . initialBatch ?. events ?? [ ]
23882333 let streamStatus = opts . initialBatch ?. status ?? 'unknown'
23892334 let streamError = opts . initialBatch ?. error
2390- const reconnectStartedAt = Date . now ( )
23912335
23922336 const isStaleReconnect = ( ) =>
23932337 streamGenRef . current !== expectedGen || abortControllerRef . current ?. signal . aborted === true
@@ -2401,15 +2345,6 @@ export function useChat(
24012345
24022346 try {
24032347 while ( streamGenRef . current === expectedGen ) {
2404- if ( Date . now ( ) - reconnectStartedAt >= RECONNECT_RECOVERY_TIMEOUT_MS ) {
2405- logger . warn ( 'Reconnect tail timed out waiting for terminal state' , {
2406- streamId,
2407- afterCursor : latestCursor ,
2408- elapsedMs : Date . now ( ) - reconnectStartedAt ,
2409- } )
2410- throw new Error ( RECONNECT_TAIL_ERROR )
2411- }
2412-
24132348 if ( seedEvents . length > 0 ) {
24142349 const replayResult = await processSSEStreamRef . current (
24152350 buildReplayStream ( seedEvents ) . getReader ( ) ,
@@ -2440,47 +2375,26 @@ export function useChat(
24402375
24412376 logger . info ( 'Opening live stream tail' , { streamId, afterCursor : latestCursor } )
24422377
2443- const tailTimeoutMessage = `Timed out waiting for live reconnect tail for ${ streamId } `
2444- const timedTailAbort = createTimedAbortSignal (
2445- activeAbort . signal ,
2446- RECONNECT_LIVE_TAIL_TIMEOUT_MS ,
2447- tailTimeoutMessage
2378+ const sseRes = await fetch (
2379+ `/api/mothership/chat/stream?streamId=${ encodeURIComponent ( streamId ) } &after=${ encodeURIComponent ( latestCursor ) } ` ,
2380+ { signal : activeAbort . signal }
24482381 )
2449- let liveResult : { sawStreamError : boolean ; sawComplete : boolean }
2450- try {
2451- const sseRes = await fetch (
2452- `/api/mothership/chat/stream?streamId=${ encodeURIComponent ( streamId ) } &after=${ encodeURIComponent ( latestCursor ) } ` ,
2453- { signal : timedTailAbort . signal }
2454- )
2455- if ( ! sseRes . ok || ! sseRes . body ) {
2456- throw await createResumeTransportError ( sseRes , RECONNECT_TAIL_ERROR )
2457- }
2382+ if ( ! sseRes . ok || ! sseRes . body ) {
2383+ throw await createResumeTransportError ( sseRes , RECONNECT_TAIL_ERROR )
2384+ }
24582385
2459- if ( isStaleReconnect ( ) ) {
2460- return { error : false , aborted : true }
2461- }
2386+ if ( isStaleReconnect ( ) ) {
2387+ return { error : false , aborted : true }
2388+ }
24622389
2463- setTransportStreaming ( )
2390+ setTransportStreaming ( )
24642391
2465- liveResult = await processSSEStreamRef . current (
2466- sseRes . body . getReader ( ) ,
2467- assistantId ,
2468- expectedGen ,
2469- { preserveExistingState : true }
2470- )
2471- } catch ( error ) {
2472- if ( timedTailAbort . didTimeout ( ) && ! activeAbort . signal . aborted ) {
2473- logger . warn ( 'Live reconnect tail request timed out' , {
2474- streamId,
2475- afterCursor : latestCursor ,
2476- timeoutMs : RECONNECT_LIVE_TAIL_TIMEOUT_MS ,
2477- } )
2478- throw new Error ( RECONNECT_TAIL_ERROR )
2479- }
2480- throw error
2481- } finally {
2482- timedTailAbort . cleanup ( )
2483- }
2392+ const liveResult = await processSSEStreamRef . current (
2393+ sseRes . body . getReader ( ) ,
2394+ assistantId ,
2395+ expectedGen ,
2396+ { preserveExistingState : true }
2397+ )
24842398
24852399 if ( liveResult . sawStreamError ) {
24862400 return { error : true , aborted : false }
0 commit comments