Skip to content

Commit 5d45e64

Browse files
committed
feat: adjust functions to have chunk processing and initiation map separate
1 parent dbf1c93 commit 5d45e64

1 file changed

Lines changed: 176 additions & 85 deletions

File tree

Lines changed: 176 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,18 @@
11
import { parentPort } from 'worker_threads';
22

33
import { yearsPassed } from '@douglasneuroinformatics/libjs';
4-
import type { ScalarInstrument } from '@opendatacapture/runtime-core';
4+
import type { InstrumentMeasureValue } from '@opendatacapture/runtime-core';
55
import { DEFAULT_GROUP_NAME } from '@opendatacapture/schemas/core';
66
import { $RecordArrayFieldValue } from '@opendatacapture/schemas/instrument';
77
import type { InstrumentRecordsExport } from '@opendatacapture/schemas/instrument-records';
88
import { removeSubjectIdScope } from '@opendatacapture/subject-utils';
99

10-
type RecordType = {
11-
computedMeasures: null | { [key: string]: unknown };
12-
date: Date;
13-
id: string;
14-
instrumentId: string;
15-
session: {
16-
date: Date;
17-
id: string;
18-
type: 'IN_PERSON' | 'REMOTE' | 'RETROSPECTIVE';
19-
user: null | {
20-
username: string;
21-
};
22-
};
23-
subject: {
24-
dateOfBirth: Date | null;
25-
groupIds: string[];
26-
id: string;
27-
sex: string;
28-
};
29-
};
10+
import type { ChunkCompleteData, InitData, ParentMessage, RecordType } from './thread-types';
3011

3112
type ExpandDataType =
3213
| {
3314
measure: string;
34-
measureValue: boolean | Date | number | string | undefined;
15+
measureValue: InstrumentMeasureValue;
3516
success: true;
3617
}
3718
| {
@@ -63,72 +44,182 @@ function expandData(listEntry: any[]): ExpandDataType[] {
6344
return validRecordArrayList;
6445
}
6546

66-
parentPort?.on(
67-
'message',
68-
({ instruments, records }: { instruments: [string, ScalarInstrument][]; records: RecordType[] }) => {
69-
const instrumentsMap = new Map(instruments);
70-
71-
const processRecord = (record: RecordType): InstrumentRecordsExport => {
72-
if (!record.computedMeasures) return [];
73-
74-
const instrument = instrumentsMap.get(record.instrumentId)!;
75-
const rows: InstrumentRecordsExport = [];
76-
77-
for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
78-
if (measureValue == null) continue;
79-
80-
if (!Array.isArray(measureValue)) {
81-
rows.push({
82-
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
83-
instrumentEdition: instrument.internal.edition,
84-
instrumentName: instrument.internal.name,
85-
measure: measureKey,
86-
sessionDate: record.session.date.toISOString(),
87-
sessionId: record.session.id,
88-
sessionType: record.session.type,
89-
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
90-
subjectId: removeSubjectIdScope(record.subject.id),
91-
subjectSex: record.subject.sex,
92-
timestamp: record.date.toISOString(),
93-
username: record.session.user?.username ?? 'N/A',
94-
value: measureValue as string
95-
});
96-
continue;
97-
}
47+
// parentPort?.on(
48+
// 'message',
49+
// ({ instruments, records }: { instruments: [string, ScalarInstrument][]; records: RecordType[] }) => {
50+
// const instrumentsMap = new Map(instruments);
9851

99-
if (measureValue.length < 1) continue;
100-
101-
const expanded = expandData(measureValue);
102-
for (const entry of expanded) {
103-
if (!entry.success) {
104-
throw new Error(`exportRecords: ${instrument.internal.name}.${measureKey}${entry.message}`);
105-
}
106-
rows.push({
107-
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
108-
instrumentEdition: instrument.internal.edition,
109-
instrumentName: instrument.internal.name,
110-
measure: `${measureKey} - ${entry.measure}`,
111-
sessionDate: record.session.date.toISOString(),
112-
sessionId: record.session.id,
113-
sessionType: record.session.type,
114-
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
115-
subjectId: removeSubjectIdScope(record.subject.id),
116-
subjectSex: record.subject.sex,
117-
timestamp: record.date.toISOString(),
118-
username: record.session.user?.username ?? 'N/A',
119-
value: entry.measureValue
120-
});
121-
}
52+
// const processRecord = (record: RecordType): InstrumentRecordsExport => {
53+
// if (!record.computedMeasures) return [];
54+
55+
// const instrument = instrumentsMap.get(record.instrumentId)!;
56+
// const rows: InstrumentRecordsExport = [];
57+
58+
// for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
59+
// if (measureValue == null) continue;
60+
61+
// if (!Array.isArray(measureValue)) {
62+
// rows.push({
63+
// groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
64+
// instrumentEdition: instrument.internal.edition,
65+
// instrumentName: instrument.internal.name,
66+
// measure: measureKey,
67+
// sessionDate: record.session.date.toISOString(),
68+
// sessionId: record.session.id,
69+
// sessionType: record.session.type,
70+
// subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
71+
// subjectId: removeSubjectIdScope(record.subject.id),
72+
// subjectSex: record.subject.sex,
73+
// timestamp: record.date.toISOString(),
74+
// username: record.session.user?.username ?? 'N/A',
75+
// value: measureValue as InstrumentMeasureValue
76+
// });
77+
// continue;
78+
// }
79+
80+
// if (measureValue.length < 1) continue;
81+
82+
// const expanded = expandData(measureValue);
83+
// for (const entry of expanded) {
84+
// if (!entry.success) {
85+
// throw new Error(`exportRecords: ${instrument.internal.name}.${measureKey} — ${entry.message}`);
86+
// }
87+
// rows.push({
88+
// groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
89+
// instrumentEdition: instrument.internal.edition,
90+
// instrumentName: instrument.internal.name,
91+
// measure: `${measureKey} - ${entry.measure}`,
92+
// sessionDate: record.session.date.toISOString(),
93+
// sessionId: record.session.id,
94+
// sessionType: record.session.type,
95+
// subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
96+
// subjectId: removeSubjectIdScope(record.subject.id),
97+
// subjectSex: record.subject.sex,
98+
// timestamp: record.date.toISOString(),
99+
// username: record.session.user?.username ?? 'N/A',
100+
// value: entry.measureValue
101+
// });
102+
// }
103+
// }
104+
105+
// return rows;
106+
// };
107+
108+
// try {
109+
// const results = records.map(processRecord);
110+
// parentPort?.postMessage({ data: results.flat(), success: true });
111+
// } catch (error) {
112+
// parentPort?.postMessage({ error: (error as Error).message, success: false });
113+
// }
114+
// }
115+
// );
116+
117+
// type InitData = string
118+
119+
// type InitMessage = {
120+
// data: InitData
121+
// type: 'INIT'
122+
// }
123+
124+
// type ChunkCompleteData = number
125+
126+
// type ChunkCompleteMessage = {
127+
// data: ChunkCompleteData
128+
// type: 'CHUNK_COMPLETE'
129+
// }
130+
131+
// type ParentMessage = ChunkCompleteMessage | InitMessage
132+
133+
let initData: Map<
134+
string | undefined,
135+
{
136+
edition: number;
137+
id: string;
138+
name: string;
139+
}
140+
>;
141+
142+
function handleInit(data: InitData) {
143+
initData = new Map(data.map((instrument) => [instrument.id, instrument]));
144+
}
145+
146+
function handleChunkComplete(_data: ChunkCompleteData) {
147+
if (!initData) {
148+
throw new Error('Expected init data to be defined');
149+
}
150+
const instrumentsMap = initData;
151+
152+
const processRecord = (record: RecordType): InstrumentRecordsExport => {
153+
if (!record.computedMeasures) return [];
154+
155+
const instrument = instrumentsMap.get(record.instrumentId)!;
156+
const rows: InstrumentRecordsExport = [];
157+
158+
for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
159+
if (measureValue == null) continue;
160+
161+
if (!Array.isArray(measureValue)) {
162+
rows.push({
163+
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
164+
instrumentEdition: instrument.edition,
165+
instrumentName: instrument.name,
166+
measure: measureKey,
167+
sessionDate: record.session.date.toISOString(),
168+
sessionId: record.session.id,
169+
sessionType: record.session.type,
170+
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
171+
subjectId: removeSubjectIdScope(record.subject.id),
172+
subjectSex: record.subject.sex,
173+
timestamp: record.date.toISOString(),
174+
username: record.session.user?.username ?? 'N/A',
175+
value: measureValue as InstrumentMeasureValue
176+
});
177+
continue;
122178
}
123179

124-
return rows;
125-
};
180+
if (measureValue.length < 1) continue;
126181

127-
try {
128-
const results = records.map(processRecord);
129-
parentPort?.postMessage({ data: results.flat(), success: true });
130-
} catch (error) {
131-
parentPort?.postMessage({ error: (error as Error).message, success: false });
182+
const expanded = expandData(measureValue);
183+
for (const entry of expanded) {
184+
if (!entry.success) {
185+
throw new Error(`exportRecords: ${instrument.name}.${measureKey}${entry.message}`);
186+
}
187+
rows.push({
188+
groupId: record.subject.groupIds[0] ?? DEFAULT_GROUP_NAME,
189+
instrumentEdition: instrument.edition,
190+
instrumentName: instrument.name,
191+
measure: `${measureKey} - ${entry.measure}`,
192+
sessionDate: record.session.date.toISOString(),
193+
sessionId: record.session.id,
194+
sessionType: record.session.type,
195+
subjectAge: record.subject.dateOfBirth ? yearsPassed(record.subject.dateOfBirth) : null,
196+
subjectId: removeSubjectIdScope(record.subject.id),
197+
subjectSex: record.subject.sex,
198+
timestamp: record.date.toISOString(),
199+
username: record.session.user?.username ?? 'N/A',
200+
value: entry.measureValue
201+
});
202+
}
132203
}
204+
205+
return rows;
206+
};
207+
208+
try {
209+
const results = _data.map(processRecord);
210+
parentPort?.postMessage({ data: results.flat(), success: true });
211+
} catch (error) {
212+
parentPort?.postMessage({ error: (error as Error).message, success: false });
213+
}
214+
}
215+
216+
parentPort!.on('message', (message: ParentMessage) => {
217+
switch (message.type) {
218+
case 'CHUNK_COMPLETE':
219+
return handleChunkComplete(message.data);
220+
case 'INIT':
221+
return handleInit(message.data);
222+
default:
223+
throw new Error(`Unexpected message type: ${(message satisfies never as { [key: string]: any }).type}`);
133224
}
134-
);
225+
});

0 commit comments

Comments
 (0)