Skip to content

Commit 67f98fa

Browse files
committed
feat: testing cline worker thread implementation
1 parent f06098f commit 67f98fa

2 files changed

Lines changed: 228 additions & 58 deletions

File tree

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import { parentPort } from 'worker_threads';
2+
3+
import { yearsPassed } from '@douglasneuroinformatics/libjs';
4+
import type { ScalarInstrument } from '@opendatacapture/runtime-core';
5+
import { DEFAULT_GROUP_NAME } from '@opendatacapture/schemas/core';
6+
import { $RecordArrayFieldValue } from '@opendatacapture/schemas/instrument';
7+
import type { InstrumentRecordsExport } from '@opendatacapture/schemas/instrument-records';
8+
import { removeSubjectIdScope } from '@opendatacapture/subject-utils';
9+
10+
type RecordType = {
11+
computedMeasures: null | { [key: string]: unknown };
12+
date: Date;
13+
id: string;
14+
instrumentId: string;
15+
session: {
16+
date: Date;
17+
id: string;
18+
type: 'IN_PERSON' | 'REMOTE' | 'RETROSPECTIVE';
19+
user: null | {
20+
username: string;
21+
};
22+
};
23+
subject: {
24+
dateOfBirth: Date | null;
25+
groupIds: string[];
26+
id: string;
27+
sex: string;
28+
};
29+
};
30+
31+
type ExpandDataType =
32+
| {
33+
measure: string;
34+
measureValue: boolean | Date | number | string | undefined;
35+
success: true;
36+
}
37+
| {
38+
message: string;
39+
success: false;
40+
};
41+
42+
function expandData(listEntry: any[]): ExpandDataType[] {
43+
const validRecordArrayList: ExpandDataType[] = [];
44+
if (listEntry.length < 1) {
45+
throw new Error('Record Array is Empty');
46+
}
47+
for (const objectEntry of Object.values(listEntry)) {
48+
for (const [dataKey, dataValue] of Object.entries(objectEntry as { [key: string]: any })) {
49+
const parseResult = $RecordArrayFieldValue.safeParse(dataValue);
50+
if (!parseResult.success) {
51+
validRecordArrayList.push({
52+
message: `Error interpreting value ${dataValue} and record array key ${dataKey}`,
53+
success: false
54+
});
55+
}
56+
validRecordArrayList.push({
57+
measure: dataKey,
58+
measureValue: parseResult.data,
59+
success: true
60+
});
61+
}
62+
}
63+
return validRecordArrayList;
64+
}
65+
66+
parentPort?.on(
67+
'message',
68+
({ instruments, records }: { instruments: [string, ScalarInstrument][]; records: RecordType[] }) => {
69+
const instrumentsMap = new Map(instruments);
70+
71+
const processRecord = (record: RecordType): InstrumentRecordsExport => {
72+
if (!record.computedMeasures) return [];
73+
74+
const instrument = instrumentsMap.get(record.instrumentId)!;
75+
const rows: InstrumentRecordsExport = [];
76+
77+
for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
78+
if (measureValue == null) continue;
79+
80+
if (!Array.isArray(measureValue)) {
81+
rows.push({
82+
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
83+
instrumentEdition: instrument.internal.edition,
84+
instrumentName: instrument.internal.name,
85+
measure: measureKey,
86+
sessionDate: record.session.date.toISOString(),
87+
sessionId: record.session.id,
88+
sessionType: record.session.type,
89+
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
90+
subjectId: removeSubjectIdScope(record.subject.id),
91+
subjectSex: record.subject.sex,
92+
timestamp: record.date.toISOString(),
93+
username: record.session.user?.username ?? 'N/A',
94+
value: measureValue as string
95+
});
96+
continue;
97+
}
98+
99+
if (measureValue.length < 1) continue;
100+
101+
const expanded = expandData(measureValue);
102+
for (const entry of expanded) {
103+
if (!entry.success) {
104+
throw new Error(`exportRecords: ${instrument.internal.name}.${measureKey}${entry.message}`);
105+
}
106+
rows.push({
107+
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
108+
instrumentEdition: instrument.internal.edition,
109+
instrumentName: instrument.internal.name,
110+
measure: `${measureKey} - ${entry.measure}`,
111+
sessionDate: record.session.date.toISOString(),
112+
sessionId: record.session.id,
113+
sessionType: record.session.type,
114+
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
115+
subjectId: removeSubjectIdScope(record.subject.id),
116+
subjectSex: record.subject.sex,
117+
timestamp: record.date.toISOString(),
118+
username: record.session.user?.username ?? 'N/A',
119+
value: entry.measureValue
120+
});
121+
}
122+
}
123+
124+
return rows;
125+
};
126+
127+
try {
128+
const results = records.map(processRecord);
129+
parentPort?.postMessage({ data: results.flat(), success: true });
130+
} catch (error) {
131+
parentPort?.postMessage({ error: (error as Error).message, success: false });
132+
}
133+
}
134+
);

apps/api/src/instrument-records/instrument-records.service.ts

Lines changed: 94 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
import { replacer, reviver, yearsPassed } from '@douglasneuroinformatics/libjs';
1+
import { cpus } from 'os';
2+
import { dirname, join } from 'path';
3+
import { fileURLToPath } from 'url';
4+
import { Worker } from 'worker_threads';
5+
6+
const __dirname = dirname(fileURLToPath(import.meta.url));
7+
8+
import { replacer, reviver } from '@douglasneuroinformatics/libjs';
29
import { InjectModel } from '@douglasneuroinformatics/libnest';
310
import type { Model } from '@douglasneuroinformatics/libnest';
411
import { linearRegression } from '@douglasneuroinformatics/libstats';
512
import { BadRequestException, Injectable, NotFoundException, UnprocessableEntityException } from '@nestjs/common';
613
import type { Json, ScalarInstrument } from '@opendatacapture/runtime-core';
7-
import { DEFAULT_GROUP_NAME } from '@opendatacapture/schemas/core';
814
import { $RecordArrayFieldValue } from '@opendatacapture/schemas/instrument';
915
import type {
1016
CreateInstrumentRecordData,
@@ -14,7 +20,6 @@ import type {
1420
LinearRegressionResults,
1521
UploadInstrumentRecordsData
1622
} from '@opendatacapture/schemas/instrument-records';
17-
import { removeSubjectIdScope } from '@opendatacapture/subject-utils';
1823
import { Prisma } from '@prisma/client';
1924
import type { Session } from '@prisma/client';
2025
import { isNumber, mergeWith, pickBy } from 'lodash-es';
@@ -40,6 +45,8 @@ type ExpandDataType =
4045
success: false;
4146
};
4247

48+
type WorkerMessage = { data: InstrumentRecordsExport; success: true } | { error: string; success: false };
49+
4350
@Injectable()
4451
export class InstrumentRecordsService {
4552
constructor(
@@ -133,7 +140,6 @@ export class InstrumentRecordsService {
133140
}
134141

135142
async exportRecords({ groupId }: { groupId?: string } = {}, { ability }: EntityOperationOptions = {}) {
136-
const data: InstrumentRecordsExport = [];
137143
const records = await this.instrumentRecordModel.findMany({
138144
include: {
139145
session: {
@@ -164,66 +170,96 @@ export class InstrumentRecordsService {
164170

165171
const instruments = new Map(instrumentsArray.map((instrument) => [instrument.id, instrument]));
166172

167-
const processRecord = (record: (typeof records)[number]) => {
168-
if (!record.computedMeasures) return [];
169-
170-
const instrument = instruments.get(record.instrumentId)!;
171-
const rows: InstrumentRecordsExport = [];
172-
173-
for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
174-
if (measureValue == null) continue;
175-
176-
if (!Array.isArray(measureValue)) {
177-
rows.push({
178-
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
179-
instrumentEdition: instrument.internal.edition,
180-
instrumentName: instrument.internal.name,
181-
measure: measureKey,
182-
sessionDate: record.session.date.toISOString(),
183-
sessionId: record.session.id,
184-
sessionType: record.session.type,
185-
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
186-
subjectId: removeSubjectIdScope(record.subject.id),
187-
subjectSex: record.subject.sex,
188-
timestamp: record.date.toISOString(),
189-
username: record.session.user?.username ?? 'N/A',
190-
value: measureValue
191-
});
192-
continue;
193-
}
173+
// const processRecord = (record: (typeof records)[number]) => {
174+
// if (!record.computedMeasures) return [];
194175

195-
if (measureValue.length < 1) continue;
176+
// const instrument = instruments.get(record.instrumentId)!;
177+
// const rows: InstrumentRecordsExport = [];
196178

197-
const expanded = this.expandData(measureValue);
198-
for (const entry of expanded) {
199-
if (!entry.success) {
200-
throw new Error(`exportRecords: ${instrument.internal.name}.${measureKey}${entry.message}`);
201-
}
202-
rows.push({
203-
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
204-
instrumentEdition: instrument.internal.edition,
205-
instrumentName: instrument.internal.name,
206-
measure: `${measureKey} - ${entry.measure}`,
207-
sessionDate: record.session.date.toISOString(),
208-
sessionId: record.session.id,
209-
sessionType: record.session.type,
210-
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
211-
subjectId: removeSubjectIdScope(record.subject.id),
212-
subjectSex: record.subject.sex,
213-
timestamp: record.date.toISOString(),
214-
username: record.session.user?.username ?? 'N/A',
215-
value: entry.measureValue
216-
});
217-
}
218-
}
179+
// for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
180+
// if (measureValue == null) continue;
219181

220-
return rows;
221-
};
182+
// if (!Array.isArray(measureValue)) {
183+
// rows.push({
184+
// groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
185+
// instrumentEdition: instrument.internal.edition,
186+
// instrumentName: instrument.internal.name,
187+
// measure: measureKey,
188+
// sessionDate: record.session.date.toISOString(),
189+
// sessionId: record.session.id,
190+
// sessionType: record.session.type,
191+
// subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
192+
// subjectId: removeSubjectIdScope(record.subject.id),
193+
// subjectSex: record.subject.sex,
194+
// timestamp: record.date.toISOString(),
195+
// username: record.session.user?.username ?? 'N/A',
196+
// value: measureValue
197+
// });
198+
// continue;
199+
// }
200+
201+
// if (measureValue.length < 1) continue;
222202

223-
const results = await Promise.all(records.map((record) => processRecord(record)));
203+
// const expanded = this.expandData(measureValue);
204+
// for (const entry of expanded) {
205+
// if (!entry.success) {
206+
// throw new Error(`exportRecords: ${instrument.internal.name}.${measureKey} — ${entry.message}`);
207+
// }
208+
// rows.push({
209+
// groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
210+
// instrumentEdition: instrument.internal.edition,
211+
// instrumentName: instrument.internal.name,
212+
// measure: `${measureKey} - ${entry.measure}`,
213+
// sessionDate: record.session.date.toISOString(),
214+
// sessionId: record.session.id,
215+
// sessionType: record.session.type,
216+
// subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
217+
// subjectId: removeSubjectIdScope(record.subject.id),
218+
// subjectSex: record.subject.sex,
219+
// timestamp: record.date.toISOString(),
220+
// username: record.session.user?.username ?? 'N/A',
221+
// value: entry.measureValue
222+
// });
223+
// }
224+
// }
225+
226+
// return rows;
227+
// };
228+
229+
// const results = await Promise.all(records.map((record) => processRecord(record)));
230+
231+
// return results.flat();
232+
233+
const numWorkers = Math.min(cpus().length, Math.ceil(records.length / 100)); // Use up to CPU count, chunk size 100
234+
const chunkSize = Math.ceil(records.length / numWorkers);
235+
const chunks = [];
236+
for (let i = 0; i < records.length; i += chunkSize) {
237+
chunks.push(records.slice(i, i + chunkSize));
238+
}
239+
240+
const workerPromises = chunks.map((chunk) => {
241+
return new Promise<InstrumentRecordsExport>((resolve, reject) => {
242+
const worker = new Worker(join(__dirname, 'export-worker.ts'));
243+
worker.postMessage({ instruments: Array.from(instruments.entries()), records: chunk });
244+
worker.on('message', (message: WorkerMessage) => {
245+
if (message.success) {
246+
resolve(message.data);
247+
} else {
248+
reject(new Error(message.error));
249+
}
250+
void worker.terminate();
251+
});
252+
worker.on('error', (error) => {
253+
reject(error);
254+
void worker.terminate();
255+
});
256+
});
257+
});
224258

259+
const results = await Promise.all(workerPromises);
225260
return results.flat();
226261

262+
// const data: InstrumentRecordsExport = [];
227263
// for (const record of records) {
228264
// if (!record.computedMeasures) {
229265
// continue;
@@ -281,7 +317,7 @@ export class InstrumentRecordsService {
281317
// }
282318
// }
283319

284-
return data;
320+
// return data;
285321
}
286322

287323
async find(

0 commit comments

Comments
 (0)