Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apps/sim/app/api/copilot/chat/abort/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { NextResponse } from 'next/server'
import { abortActiveStream } from '@/lib/copilot/chat-streaming'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
import { env } from '@/lib/core/config/env'

export async function POST(request: Request) {
const { userId: authenticatedUserId, isAuthenticated } =
Expand All @@ -17,6 +19,20 @@ export async function POST(request: Request) {
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
}

try {
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (env.COPILOT_API_KEY) {
headers['x-api-key'] = env.COPILOT_API_KEY
}
await fetch(`${SIM_AGENT_API_URL}/api/streams/explicit-abort`, {
method: 'POST',
headers,
body: JSON.stringify({ messageId: streamId }),
})
} catch {
// best effort: local abort should still proceed even if Go marker fails
}

const aborted = abortActiveStream(streamId)
return NextResponse.json({ aborted })
}
16 changes: 16 additions & 0 deletions apps/sim/app/api/copilot/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { getSession } from '@/lib/auth'
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import {
acquirePendingChatStream,
createSSEStream,
requestChatTitle,
SSE_RESPONSE_HEADERS,
Expand Down Expand Up @@ -320,6 +321,19 @@ export async function POST(req: NextRequest) {
}
}

if (actualChatId) {
const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
if (!acquired) {
return NextResponse.json(
{
error:
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
},
{ status: 409 }
)
}
}

if (stream) {
const executionId = crypto.randomUUID()
const runId = crypto.randomUUID()
Expand All @@ -337,6 +351,7 @@ export async function POST(req: NextRequest) {
titleProvider: provider,
requestId: tracker.requestId,
workspaceId: resolvedWorkspaceId,
pendingChatStreamAlreadyRegistered: Boolean(actualChatId),
orchestrateOptions: {
userId: authenticatedUserId,
workflowId,
Expand All @@ -348,6 +363,7 @@ export async function POST(req: NextRequest) {
interactive: true,
onComplete: async (result: OrchestratorResult) => {
if (!actualChatId) return
if (!result.success) return

const assistantMessage: Record<string, unknown> = {
id: crypto.randomUUID(),
Expand Down
15 changes: 13 additions & 2 deletions apps/sim/app/api/mothership/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import { getSession } from '@/lib/auth'
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import {
acquirePendingChatStream,
createSSEStream,
SSE_RESPONSE_HEADERS,
waitForPendingChatStream,
} from '@/lib/copilot/chat-streaming'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
Expand Down Expand Up @@ -253,7 +253,16 @@ export async function POST(req: NextRequest) {
)

if (actualChatId) {
await waitForPendingChatStream(actualChatId)
const acquired = await acquirePendingChatStream(actualChatId, userMessageId)
if (!acquired) {
return NextResponse.json(
{
error:
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
},
{ status: 409 }
)
}
}

const executionId = crypto.randomUUID()
Expand All @@ -271,6 +280,7 @@ export async function POST(req: NextRequest) {
titleModel: 'claude-opus-4-6',
requestId: tracker.requestId,
workspaceId,
pendingChatStreamAlreadyRegistered: Boolean(actualChatId),
orchestrateOptions: {
userId: authenticatedUserId,
workspaceId,
Expand All @@ -282,6 +292,7 @@ export async function POST(req: NextRequest) {
interactive: true,
onComplete: async (result: OrchestratorResult) => {
if (!actualChatId) return
if (!result.success) return

const assistantMessage: Record<string, unknown> = {
id: crypto.randomUUID(),
Expand Down
45 changes: 26 additions & 19 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = {
const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])
const RECONNECT_TAIL_ERROR =
'Live reconnect failed before the stream finished. The latest response may be incomplete.'
const UNEXPECTED_PROVIDER_ERROR_TAG =
'<mothership-error>{"message":"An unexpected provider error occurred"}</mothership-error>'

function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
const mapped: ContentBlock = {
Expand Down Expand Up @@ -415,6 +417,8 @@ export function useChat(
setIsReconnecting(false)
setResources([])
setActiveResourceId(null)
setStreamingFile(null)
streamingFileRef.current = null
setMessageQueue([])
}, [initialChatId, queryClient])

Expand All @@ -433,6 +437,8 @@ export function useChat(
setIsReconnecting(false)
setResources([])
setActiveResourceId(null)
setStreamingFile(null)
streamingFileRef.current = null
setMessageQueue([])
}, [isHomePage])

Expand Down Expand Up @@ -497,7 +503,6 @@ export function useChat(
}

if (activeStreamId && !sendingRef.current) {
abortControllerRef.current?.abort()
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
Expand All @@ -508,21 +513,16 @@ export function useChat(
const assistantId = crypto.randomUUID()

const reconnect = async () => {
let reconnectFailed = false
try {
const encoder = new TextEncoder()

const batchEvents = snapshot?.events ?? []
const streamStatus = snapshot?.status ?? ''

if (batchEvents.length === 0 && streamStatus === 'unknown') {
const cid = chatIdRef.current
if (cid) {
fetch(stopPathRef.current, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }),
}).catch(() => {})
}
reconnectFailed = true
setError(RECONNECT_TAIL_ERROR)
return
}

Expand Down Expand Up @@ -550,6 +550,7 @@ export function useChat(
{ signal: abortController.signal }
)
if (!sseRes.ok || !sseRes.body) {
reconnectFailed = true
logger.warn('SSE tail reconnect returned no readable body', {
status: sseRes.status,
streamId: activeStreamId,
Expand All @@ -565,6 +566,7 @@ export function useChat(
}
} catch (err) {
if (!(err instanceof Error && err.name === 'AbortError')) {
reconnectFailed = true
logger.warn('SSE tail failed during reconnect', err)
setError(RECONNECT_TAIL_ERROR)
}
Expand All @@ -578,10 +580,11 @@ export function useChat(
await processSSEStreamRef.current(combinedStream.getReader(), assistantId, gen)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
reconnectFailed = true
} finally {
setIsReconnecting(false)
if (streamGenRef.current === gen) {
finalizeRef.current()
finalizeRef.current(reconnectFailed ? { error: true } : undefined)
}
}
}
Expand Down Expand Up @@ -619,6 +622,17 @@ export function useChat(
return b
}

const appendInlineErrorTag = (tag: string) => {
if (runningText.includes(tag)) return
const tb = ensureTextBlock()
const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : ''
tb.content = `${tb.content ?? ''}${prefix}${tag}`
if (activeSubagent) tb.subagent = activeSubagent
runningText += `${prefix}${tag}`
streamingContentRef.current = runningText
flush()
}

const isStale = () => expectedGen !== undefined && streamGenRef.current !== expectedGen

const flush = () => {
Expand All @@ -644,12 +658,9 @@ export function useChat(

try {
while (true) {
if (isStale()) {
reader.cancel().catch(() => {})
break
}
const { done, value } = await reader.read()
if (done) break
if (isStale()) continue

buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
Expand Down Expand Up @@ -1114,14 +1125,11 @@ export function useChat(
}
case 'error': {
setError(parsed.error || 'An error occurred')
appendInlineErrorTag(UNEXPECTED_PROVIDER_ERROR_TAG)
break
Comment thread
Sg312 marked this conversation as resolved.
}
}
}
if (isStale()) {
reader.cancel().catch(() => {})
break
}
}
} finally {
if (streamReaderRef.current === reader) {
Expand Down Expand Up @@ -1500,7 +1508,6 @@ export function useChat(

useEffect(() => {
return () => {
streamReaderRef.current?.cancel().catch(() => {})
Comment thread
Sg312 marked this conversation as resolved.
streamReaderRef.current = null
abortControllerRef.current = null
streamGenRef.current++
Expand Down
18 changes: 16 additions & 2 deletions apps/sim/lib/copilot/async-runs/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ export async function upsertAsyncToolCall(input: {
status?: CopilotAsyncToolStatus
}) {
const existing = await getAsyncToolCall(input.toolCallId)
const incomingStatus = input.status ?? 'pending'
if (
existing &&
(isTerminalAsyncStatus(existing.status) || isDeliveredAsyncStatus(existing.status)) &&
!isTerminalAsyncStatus(incomingStatus) &&
!isDeliveredAsyncStatus(incomingStatus)
) {
logger.info('Ignoring async tool upsert that would downgrade terminal state', {
toolCallId: input.toolCallId,
existingStatus: existing.status,
incomingStatus,
})
return existing
}
const effectiveRunId = input.runId ?? existing?.runId ?? null
if (!effectiveRunId) {
logger.warn('upsertAsyncToolCall missing runId and no existing row', {
Expand All @@ -140,7 +154,7 @@ export async function upsertAsyncToolCall(input: {
toolCallId: input.toolCallId,
toolName: input.toolName,
args: input.args ?? {},
status: input.status ?? 'pending',
status: incomingStatus,
updatedAt: now,
})
.onConflictDoUpdate({
Expand All @@ -150,7 +164,7 @@ export async function upsertAsyncToolCall(input: {
checkpointId: input.checkpointId ?? null,
toolName: input.toolName,
args: input.args ?? {},
status: input.status ?? 'pending',
status: incomingStatus,
updatedAt: now,
},
})
Expand Down
Loading
Loading