Skip to content

Commit dbf1c93

Browse files
committed
feat: adjust threading code to have inital process and chunk processing
1 parent 320772f commit dbf1c93

1 file changed

Lines changed: 28 additions & 2 deletions

File tree

apps/api/src/instrument-records/instrument-records.service.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type { Model } from '@douglasneuroinformatics/libnest';
1111
import { linearRegression } from '@douglasneuroinformatics/libstats';
1212
import { BadRequestException, Injectable, NotFoundException, UnprocessableEntityException } from '@nestjs/common';
1313
import type { Json, ScalarInstrument } from '@opendatacapture/runtime-core';
14+
// import { DEFAULT_GROUP_NAME } from '@opendatacapture/schemas/core';
1415
import { $RecordArrayFieldValue } from '@opendatacapture/schemas/instrument';
1516
import type {
1617
CreateInstrumentRecordData,
@@ -20,6 +21,7 @@ import type {
2021
LinearRegressionResults,
2122
UploadInstrumentRecordsData
2223
} from '@opendatacapture/schemas/instrument-records';
24+
// import { removeSubjectIdScope } from '@opendatacapture/subject-utils';
2325
import { Prisma } from '@prisma/client';
2426
import type { Session } from '@prisma/client';
2527
import { isNumber, mergeWith, pickBy } from 'lodash-es';
@@ -34,6 +36,8 @@ import { SubjectsService } from '@/subjects/subjects.service';
3436

3537
import { InstrumentMeasuresService } from './instrument-measures.service';
3638

39+
import type { ChunkCompleteData, ChunkCompleteMessage, InitData, InitMessage, ParentMessage } from './thread-types';
40+
3741
type ExpandDataType =
3842
| {
3943
measure: string;
@@ -47,6 +51,14 @@ type ExpandDataType =
4751

4852
type WorkerMessage = { data: InstrumentRecordsExport; success: true } | { error: string; success: false };
4953

54+
// type MainThreadMessage = {
55+
// data: {
56+
57+
// }
58+
// } | {
59+
// error: string; success: false
60+
// }
61+
5062
@Injectable()
5163
export class InstrumentRecordsService {
5264
constructor(
@@ -230,17 +242,31 @@ export class InstrumentRecordsService {
230242

231243
// return results.flat();
232244

233-
const numWorkers = Math.min(cpus().length, Math.ceil(records.length / 100)); // Use up to CPU count, chunk size 100
245+
const numWorkers = 1;
246+
// Math.min(cpus().length, Math.ceil(records.length / 100)); // Use up to CPU count, chunk size 100
234247
const chunkSize = Math.ceil(records.length / numWorkers);
235248
const chunks = [];
236249
for (let i = 0; i < records.length; i += chunkSize) {
237250
chunks.push(records.slice(i, i + chunkSize));
238251
}
252+
const availableInstrumentArray: InitData = instruments
253+
.values()
254+
.toArray()
255+
.map((item) => {
256+
return {
257+
edition: item.internal.edition,
258+
id: item.id!,
259+
name: item.internal.name
260+
};
261+
});
262+
263+
const initWorker = new Worker(join(__dirname, 'export-worker.ts'));
264+
initWorker.postMessage({ data: availableInstrumentArray, type: 'INIT' });
239265

240266
const workerPromises = chunks.map((chunk) => {
241267
return new Promise<InstrumentRecordsExport>((resolve, reject) => {
242268
const worker = new Worker(join(__dirname, 'export-worker.ts'));
243-
worker.postMessage({ instruments: Array.from(instruments.entries()), records: chunk });
269+
worker.postMessage({ data: chunk, type: 'CHUNK_COMPLETE' });
244270
worker.on('message', (message: WorkerMessage) => {
245271
if (message.success) {
246272
resolve(message.data);

0 commit comments

Comments
 (0)