Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { tasks } from '@trigger.dev/sdk'
import { type NextRequest, NextResponse } from 'next/server'
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { env, isTruthy } from '@/lib/core/config/env'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
Expand All @@ -17,6 +20,7 @@ import {
} from '@/lib/workflows/persistence/utils'
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
Expand Down Expand Up @@ -217,6 +221,64 @@ function resolveOutputIds(
})
}

type AsyncExecutionParams = {
requestId: string
workflowId: string
userId: string
input: any
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
}

/**
* Handles async workflow execution by queueing a background job.
* Returns immediately with a 202 Accepted response containing the job ID.
*/
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, input, triggerType } = params
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)

if (!useTrigger) {
logger.warn(`[${requestId}] Async mode requested but TRIGGER_DEV_ENABLED is false`)
return NextResponse.json(
{ error: 'Async execution is not enabled. Set TRIGGER_DEV_ENABLED=true to use async mode.' },
{ status: 400 }
)
}

const payload: WorkflowExecutionPayload = {
workflowId,
userId,
input,
triggerType,
}

try {
const handle = await tasks.trigger('workflow-execution', payload)

logger.info(`[${requestId}] Queued async workflow execution`, {
workflowId,
jobId: handle.id,
})

return NextResponse.json(
{
success: true,
async: true,
jobId: handle.id,
message: 'Workflow execution queued',
statusUrl: `${getBaseUrl()}/api/jobs/${handle.id}`,
},
{ status: 202 }
)
} catch (error: any) {
logger.error(`[${requestId}] Failed to queue async execution`, error)
return NextResponse.json(
{ error: `Failed to queue async execution: ${error.message}` },
{ status: 500 }
)
}
}

/**
* POST /api/workflows/[id]/execute
*
Expand Down Expand Up @@ -291,6 +353,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:

const streamHeader = req.headers.get('X-Stream-Response') === 'true'
const enableSSE = streamHeader || streamParam === true
const executionModeHeader = req.headers.get('X-Execution-Mode')
const isAsyncMode = executionModeHeader === 'async'

logger.info(`[${requestId}] Starting server-side execution`, {
workflowId,
Expand All @@ -301,6 +365,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
streamParam,
streamHeader,
enableSSE,
isAsyncMode,
})

const executionId = uuidv4()
Expand Down Expand Up @@ -349,6 +414,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workspaceId: workflow.workspaceId,
})

if (isAsyncMode) {
return handleAsyncExecution({
requestId,
workflowId,
userId: actorUserId,
input,
triggerType: loggingTriggerType,
})
}

let cachedWorkflowData: {
blocks: Record<string, any>
edges: any[]
Expand Down
7 changes: 6 additions & 1 deletion apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import {
Expand All @@ -22,6 +23,7 @@ import {
getSubBlockValue,
} from '@/lib/workflows/schedules/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionResult } from '@/executor/types'
import { mergeSubblockState } from '@/stores/workflows/server-utils'

const logger = createLogger('TriggerScheduleExecution')
Expand Down Expand Up @@ -293,14 +295,17 @@ async function runWorkflowExecution({
)

try {
const executionResult = (earlyError as any)?.executionResult as ExecutionResult | undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }

await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${
earlyError instanceof Error ? earlyError.message : String(earlyError)
}`,
stackTrace: earlyError instanceof Error ? earlyError.stack : undefined,
},
traceSpans: [],
traceSpans,
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete log entry for schedule failure`, loggingError)
Expand Down
10 changes: 4 additions & 6 deletions apps/sim/lib/logs/execution/logging-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,10 @@ export function calculateCostSummary(traceSpans: any[]): {
totalCost += span.cost.total || 0
totalInputCost += span.cost.input || 0
totalOutputCost += span.cost.output || 0
// Tokens are at span.tokens, not span.cost.tokens
totalTokens += span.tokens?.total || 0
totalPromptTokens += span.tokens?.prompt || 0
totalCompletionTokens += span.tokens?.completion || 0
totalPromptTokens += span.tokens?.input ?? span.tokens?.prompt ?? 0
totalCompletionTokens += span.tokens?.output ?? span.tokens?.completion ?? 0

// Aggregate model-specific costs - model is at span.model, not span.cost.model
if (span.model) {
const model = span.model
if (!models[model]) {
Expand All @@ -138,8 +136,8 @@ export function calculateCostSummary(traceSpans: any[]): {
models[model].input += span.cost.input || 0
models[model].output += span.cost.output || 0
models[model].total += span.cost.total || 0
models[model].tokens.prompt += span.tokens?.prompt || 0
models[model].tokens.completion += span.tokens?.completion || 0
models[model].tokens.prompt += span.tokens?.input ?? span.tokens?.prompt ?? 0
models[model].tokens.completion += span.tokens?.output ?? span.tokens?.completion ?? 0
models[model].tokens.total += span.tokens?.total || 0
}
}
Expand Down
28 changes: 15 additions & 13 deletions apps/sim/lib/logs/execution/logging-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,23 @@ export class LoggingSession {
const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0
const startTime = new Date(endTime.getTime() - Math.max(1, durationMs))

const costSummary = {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}
const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0

const message = error?.message || 'Execution failed before starting blocks'
const costSummary = hasProvidedSpans
? calculateCostSummary(traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}

const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0
const message = error?.message || 'Execution failed before starting blocks'

const errorSpan: TraceSpan = {
id: 'workflow-error-root',
Expand Down