-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathbulkUtils.ts
More file actions
334 lines (279 loc) · 10.5 KB
/
bulkUtils.ts
File metadata and controls
334 lines (279 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
/*
* Copyright 2026, Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Transform, Readable, TransformCallback } from 'node:stream';
import { createInterface } from 'node:readline';
import { pipeline } from 'node:stream/promises';
import * as fs from 'node:fs';
import { EOL } from 'node:os';
import { fetch } from 'undici';
import { HttpResponse } from '@jsforce/jsforce-node';
import {
IngestJobV2Results,
IngestJobV2SuccessfulResults,
IngestJobV2FailedResults,
IngestJobV2UnprocessedRecords,
QueryJobV2,
QueryJobInfoV2,
} from '@jsforce/jsforce-node/lib/api/bulk2.js';
import { Parser as csvParse } from 'csv-parse';
import type { Schema } from '@jsforce/jsforce-node';
import { Connection, Messages, SfError } from '@salesforce/core';
import type { BulkProcessedRecordV2, BulkRecordsV2 } from './types.js';
Messages.importMessagesDirectoryFromMetaUrl(import.meta.url);
const messages = Messages.loadMessages('@salesforce/plugin-data', 'messages');
export const transformResults = (results: IngestJobV2Results<Schema>): BulkRecordsV2 => ({
// ensureArray is used to handle the undefined or non-array case
successfulResults: results.successfulResults.map(anyRecordToBulkProcessedRecordV2),
failedResults: results.failedResults.map(anyRecordToBulkProcessedRecordV2),
// if the csv can't be read, it returns a string that is the csv body
...(typeof results.unprocessedRecords === 'string'
? { unprocessedRecords: [], unparsed: results.unprocessedRecords }
: { unprocessedRecords: results.unprocessedRecords.map(anyRecordToBulkProcessedRecordV2) }),
});
const anyRecordToBulkProcessedRecordV2 = (
record:
| IngestJobV2SuccessfulResults<Schema>[number]
| IngestJobV2UnprocessedRecords<Schema>[number]
| IngestJobV2FailedResults<Schema>[number]
): BulkProcessedRecordV2 => record as unknown as BulkProcessedRecordV2;
/** call the describe to verify the object exists in the org */
export const validateSobjectType = async (sobjectType: string, connection: Connection): Promise<string> => {
try {
await connection.sobject(sobjectType).describe();
return sobjectType;
} catch (e) {
throw new Error(messages.getMessage('invalidSobject', [sobjectType, (e as Error).message]));
}
};
export enum ColumnDelimiter {
BACKQUOTE = '`',
CARET = '^',
COMMA = ',',
PIPE = '|',
SEMICOLON = ';',
TAB = ' ',
}
export type ColumnDelimiterKeys = keyof typeof ColumnDelimiter;
/**
* Transform stream that skips the first line of CSV data (the header row).
* Used when processing subsequent bulk result pages to avoid duplicate headers.
*
* Optimized to work directly with Buffers without string conversion for better memory efficiency.
*/
export class SkipFirstLineTransform extends Transform {
private firstLineSkipped = false;
private buffer: Buffer = Buffer.alloc(0);
public constructor() {
super();
}
public _transform(chunk: Buffer, _encoding: BufferEncoding, callback: TransformCallback): void {
if (this.firstLineSkipped) {
// After first line is skipped, pass through all subsequent data
callback(null, chunk);
return;
}
// Buffer incoming data until we find the first newline
// Work directly with Buffers to avoid string conversion overhead
this.buffer = Buffer.concat([this.buffer, chunk]);
// Find newline byte (0x0A for \n)
const newlineIndex = this.buffer.indexOf(0x0a);
if (newlineIndex === -1) {
// No newline yet, keep buffering
callback();
return;
}
// Found the newline, skip everything up to and including it
const remainingData = this.buffer.subarray(newlineIndex + 1);
this.firstLineSkipped = true;
this.buffer = Buffer.alloc(0); // Clear buffer to free memory
callback(null, remainingData);
}
public _flush(callback: TransformCallback): void {
// If we reach the end without finding a newline, clear buffer and finish
this.buffer = Buffer.alloc(0);
callback();
}
}
async function bulkRequest(
conn: Connection,
url: string
): Promise<{ stream: Readable; headers: HttpResponse['headers'] }> {
// Bypass jsforce entirely and use undici fetch to avoid any buffering.
// jsforce's Transport.httpRequest() adds a 'complete' listener which triggers readAll() buffering.
// Using undici fetch directly gives us the raw response stream without any intermediate buffering.
const normalizedUrl = conn.normalizeUrl(url);
// Prepare request headers with authorization
const headers: { [name: string]: string } = {
'content-Type': 'text/csv',
};
// ensure undici gets a valid access token
await conn.refreshAuth();
if (conn.accessToken) {
headers.Authorization = `Bearer ${conn.accessToken}`;
}
const response = await fetch(normalizedUrl, {
method: 'GET',
headers,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
if (!response.body) {
throw new Error('No body was returned');
}
const stream = Readable.fromWeb(response.body);
// Extract headers in the format jsforce expects
const responseHeaders: HttpResponse['headers'] = {};
response.headers.forEach((value: string, key: string) => {
responseHeaders[key] = value;
});
return { stream, headers: responseHeaders };
}
export async function exportRecords(
conn: Connection,
queryJob: QueryJobV2<Schema>,
outputInfo: {
filePath: string;
format: 'csv' | 'json';
columnDelimiter: ColumnDelimiterKeys;
}
): Promise<QueryJobInfoV2> {
let jobInfo: QueryJobInfoV2 | undefined;
queryJob.on('jobComplete', (completedJob: QueryJobInfoV2) => {
jobInfo = completedJob;
});
await queryJob.poll();
if (jobInfo === undefined) {
throw new Error('could not get job info after polling');
}
let locator: string | undefined;
let recordsWritten = 0;
while (locator !== 'null') {
// we can't parallelize this because we:
// 1. need to get 1 batch to know the locator for the next one
// 2. merge all batches into one csv or json file
//
// eslint-disable-next-line no-await-in-loop
const res = await bulkRequest(
conn,
locator ? `/jobs/query/${jobInfo.id}/results?locator=${locator}` : `/jobs/query/${jobInfo.id}/results`
);
if (outputInfo.format === 'json') {
const jsonWritable = fs.createWriteStream(outputInfo.filePath, {
// Open file for appending. The file is created if it does not exist.
// https://nodejs.org/api/fs.html#file-system-flags
flags: 'a', // append mode
});
const totalRecords = jobInfo.numberRecordsProcessed;
if (!locator) {
// first write, start JSON array
jsonWritable.write(`[${EOL}`);
}
// eslint-disable-next-line no-await-in-loop
await pipeline(
res.stream,
new csvParse({ columns: true, delimiter: ColumnDelimiter[outputInfo.columnDelimiter] }),
new Transform({
objectMode: true,
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
transform(chunk, _encoding, callback) {
if (recordsWritten === totalRecords - 1) {
callback(null, ` ${JSON.stringify(chunk)}${EOL}]`);
} else {
recordsWritten++;
callback(null, ` ${JSON.stringify(chunk)},${EOL}`);
}
},
}),
jsonWritable
);
} else {
// csv
// eslint-disable-next-line no-await-in-loop
await pipeline(
locator
? [
res.stream,
new SkipFirstLineTransform(),
fs.createWriteStream(outputInfo.filePath, {
// Open file for appending. The file is created if it does not exist.
// https://nodejs.org/api/fs.html#file-system-flags
flags: 'a', // append mode
}),
]
: [res.stream, fs.createWriteStream(outputInfo.filePath)]
);
}
locator = res.headers['sforce-locator'];
}
return jobInfo;
}
async function readFirstFiveLines(filePath: string): Promise<string[]> {
const fileStream = fs.createReadStream(filePath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity, // Recognizes both CRLF and LF line endings
});
const lines = [];
for await (const line of rl) {
lines.push(line);
if (lines.length === 5) break;
}
return lines;
}
export async function detectDelimiter(filePath: string): Promise<ColumnDelimiterKeys> {
const delimiterMap = new Map<string, ColumnDelimiterKeys>();
delimiterMap.set('`', 'BACKQUOTE');
delimiterMap.set('^', 'CARET');
delimiterMap.set(',', 'COMMA');
delimiterMap.set('|', 'PIPE');
delimiterMap.set(';', 'SEMICOLON');
delimiterMap.set(' ', 'TAB');
const delimiters = ['`', '^', ',', '|', ';', ' '];
const delimiterCounts: { [key: string]: number } = {};
// Initialize counts
for (const delimiter of delimiters) {
delimiterCounts[delimiter] = 0;
}
// Read the first few lines of the file
const data = await readFirstFiveLines(filePath);
data.forEach((line) => {
// Ignore empty lines
if (line.trim() === '') return;
delimiters.forEach((delimiter) => {
// Use regex to avoid counting delimiters inside quotes
const regexDelimiter = delimiter === '^' || delimiter === '|' ? `\\${delimiter}` : delimiter;
const regex = new RegExp(`(?<=^|[^"']|")${regexDelimiter}(?=(?:(?:[^"]*"[^"]*")*[^"]*$))`, 'g');
const count = (line.match(regex) ?? []).length;
delimiterCounts[delimiter] += count;
});
});
// Find the delimiter with the highest count
let detectedDelimiter: string | undefined;
let maxCount = 0;
for (const [delimiter, count] of Object.entries(delimiterCounts)) {
if (count > maxCount) {
maxCount = count;
detectedDelimiter = delimiter;
}
}
// default to `COMMA` if no delimiter was found in the CSV file (1 column)
const columnDelimiter = delimiterMap.get(detectedDelimiter ?? ',');
if (columnDelimiter === undefined) {
throw new SfError(`Failed to detect column delimiter used in ${filePath}.`);
}
return columnDelimiter;
}