-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Expand file tree
/
Copy pathroute.ts
More file actions
150 lines (136 loc) · 5.02 KB
/
route.ts
File metadata and controls
150 lines (136 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import { db } from '@sim/db'
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
const logger = createLogger('LogsExportAPI')
export const revalidate = 0
function escapeCsv(value: any): string {
if (value === null || value === undefined) return ''
const str = String(value)
if (/[",\n]/.test(str)) {
return `"${str.replace(/"/g, '""')}"`
}
return str
}
export async function GET(request: NextRequest) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = session.user.id
const { searchParams } = new URL(request.url)
const params = LogFilterParamsSchema.parse(Object.fromEntries(searchParams.entries()))
const selectColumns = {
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
level: workflowExecutionLogs.level,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
executionData: workflowExecutionLogs.executionData,
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
}
const workspaceCondition = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
const filterConditions = buildFilterConditions(params)
const conditions = filterConditions
? and(workspaceCondition, filterConditions)
: workspaceCondition
const header = [
'startedAt',
'level',
'workflow',
'trigger',
'durationMs',
'costTotal',
'workflowId',
'executionId',
'message',
'traceSpans',
].join(',')
const encoder = new TextEncoder()
const stream = new ReadableStream<Uint8Array>({
start: async (controller) => {
controller.enqueue(encoder.encode(`${header}\n`))
const pageSize = 1000
let offset = 0
try {
while (true) {
const rows = await db
.select(selectColumns)
.from(workflowExecutionLogs)
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
.where(conditions)
.orderBy(desc(workflowExecutionLogs.startedAt))
.limit(pageSize)
.offset(offset)
if (!rows.length) break
for (const r of rows as any[]) {
let message = ''
let traces: any = null
try {
const ed = (r as any).executionData
if (ed) {
if (ed.finalOutput)
message =
typeof ed.finalOutput === 'string'
? ed.finalOutput
: JSON.stringify(ed.finalOutput)
if (ed.message) message = ed.message
if (ed.traceSpans) traces = ed.traceSpans
}
} catch {}
const line = [
escapeCsv(r.startedAt?.toISOString?.() || r.startedAt),
escapeCsv(r.level),
escapeCsv(r.workflowName),
escapeCsv(r.trigger),
escapeCsv(r.totalDurationMs ?? ''),
escapeCsv(r.cost?.total ?? r.cost?.value?.total ?? ''),
escapeCsv(r.workflowId ?? ''),
escapeCsv(r.executionId ?? ''),
escapeCsv(message),
escapeCsv(traces ? JSON.stringify(traces) : ''),
].join(',')
controller.enqueue(encoder.encode(`${line}\n`))
}
offset += pageSize
}
controller.close()
} catch (e: any) {
logger.error('Export stream error', { error: e?.message })
try {
controller.error(e)
} catch {}
}
},
})
const ts = new Date().toISOString().replace(/[:.]/g, '-')
const filename = `logs-${ts}.csv`
return new NextResponse(stream as any, {
status: 200,
headers: {
'Content-Type': 'text/csv; charset=utf-8',
'Content-Disposition': `attachment; filename="${filename}"`,
'Cache-Control': 'no-cache',
},
})
} catch (error: any) {
logger.error('Export error', { error: error?.message })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}