|
| 1 | +import { parentPort } from 'worker_threads'; |
| 2 | + |
| 3 | +import type { FormTypes, InstrumentMeasureValue } from '@opendatacapture/runtime-core'; |
| 4 | +import { DEFAULT_GROUP_NAME } from '@opendatacapture/schemas/core'; |
| 5 | +import type { InstrumentRecordsExport } from '@opendatacapture/schemas/instrument-records'; |
| 6 | +import { removeSubjectIdScope } from '@opendatacapture/subject-utils'; |
| 7 | + |
| 8 | +import type { BeginChunkProcessingData, InitData, ParentMessage, RecordType, WorkerMessage } from './thread-types'; |
| 9 | + |
| 10 | +type ExpandDataType = |
| 11 | + | { |
| 12 | + measure: string; |
| 13 | + measureValue: FormTypes.RecordArrayFieldValue | InstrumentMeasureValue; |
| 14 | + success: true; |
| 15 | + } |
| 16 | + | { |
| 17 | + message: string; |
| 18 | + success: false; |
| 19 | + }; |
| 20 | + |
| 21 | +function expandData(listEntry: any[]): ExpandDataType[] { |
| 22 | + const validRecordArrayList: ExpandDataType[] = []; |
| 23 | + if (listEntry.length < 1) { |
| 24 | + throw new Error('Record Array is Empty'); |
| 25 | + } |
| 26 | + for (const objectEntry of Object.values(listEntry)) { |
| 27 | + for (const [dataKey, dataValue] of Object.entries( |
| 28 | + objectEntry as { [key: string]: FormTypes.RecordArrayFieldValue } |
| 29 | + )) { |
| 30 | + validRecordArrayList.push({ |
| 31 | + measure: dataKey, |
| 32 | + measureValue: dataValue, |
| 33 | + success: true |
| 34 | + }); |
| 35 | + } |
| 36 | + } |
| 37 | + return validRecordArrayList; |
| 38 | +} |
| 39 | + |
| 40 | +let initData: Map< |
| 41 | + string | undefined, |
| 42 | + { |
| 43 | + edition: number; |
| 44 | + id: string; |
| 45 | + name: string; |
| 46 | + } |
| 47 | +>; |
| 48 | + |
| 49 | +function handleInit(data: InitData) { |
| 50 | + initData = new Map(data.map((instrument) => [instrument.id, instrument])); |
| 51 | + |
| 52 | + parentPort?.postMessage({ success: true }); |
| 53 | +} |
| 54 | + |
| 55 | +function handleChunkComplete(_data: BeginChunkProcessingData) { |
| 56 | + if (!initData) { |
| 57 | + throw new Error('Expected init data to be defined'); |
| 58 | + } |
| 59 | + const instrumentsMap = initData; |
| 60 | + |
| 61 | + const processRecord = (record: RecordType): InstrumentRecordsExport => { |
| 62 | + const instrument = instrumentsMap.get(record.instrumentId)!; |
| 63 | + |
| 64 | + if (!record.computedMeasures) return []; |
| 65 | + |
| 66 | + //const instrument = instrumentsMap.get(record.instrumentId)!; |
| 67 | + const rows: InstrumentRecordsExport = []; |
| 68 | + |
| 69 | + for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) { |
| 70 | + if (measureValue == null) continue; |
| 71 | + |
| 72 | + if (!Array.isArray(measureValue)) { |
| 73 | + rows.push({ |
| 74 | + groupId: record.groupId ?? DEFAULT_GROUP_NAME, |
| 75 | + instrumentEdition: instrument.edition, |
| 76 | + instrumentName: instrument.name, |
| 77 | + measure: measureKey, |
| 78 | + sessionDate: record.session.date, |
| 79 | + sessionId: record.session.id, |
| 80 | + sessionType: record.session.type, |
| 81 | + subjectAge: record.subject.age, |
| 82 | + subjectId: removeSubjectIdScope(record.subject.id), |
| 83 | + subjectSex: record.subject.sex, |
| 84 | + timestamp: record.date, |
| 85 | + username: record.session.user?.username ?? 'N/A', |
| 86 | + value: measureValue as InstrumentMeasureValue |
| 87 | + }); |
| 88 | + continue; |
| 89 | + } |
| 90 | + |
| 91 | + if (measureValue.length < 1) continue; |
| 92 | + |
| 93 | + const expanded = expandData(measureValue); |
| 94 | + for (const entry of expanded) { |
| 95 | + if (!entry.success) { |
| 96 | + throw new Error(`exportRecords: ${instrument.name}.${measureKey} — ${entry.message}`); |
| 97 | + } |
| 98 | + rows.push({ |
| 99 | + groupId: record.groupId ?? DEFAULT_GROUP_NAME, |
| 100 | + instrumentEdition: instrument.edition, |
| 101 | + instrumentName: instrument.name, |
| 102 | + measure: `${measureKey} - ${entry.measure}`, |
| 103 | + sessionDate: record.session.date, |
| 104 | + sessionId: record.session.id, |
| 105 | + sessionType: record.session.type, |
| 106 | + subjectAge: record.subject.age, |
| 107 | + subjectId: removeSubjectIdScope(record.subject.id), |
| 108 | + subjectSex: record.subject.sex, |
| 109 | + timestamp: record.date, |
| 110 | + username: record.session.user?.username ?? 'N/A', |
| 111 | + value: entry.measureValue |
| 112 | + }); |
| 113 | + } |
| 114 | + } |
| 115 | + return rows; |
| 116 | + }; |
| 117 | + |
| 118 | + try { |
| 119 | + const results = _data.map(processRecord); |
| 120 | + parentPort?.postMessage({ data: results.flat(), success: true } satisfies WorkerMessage); |
| 121 | + } catch (error) { |
| 122 | + parentPort?.postMessage({ error: (error as Error).message, success: false } satisfies WorkerMessage); |
| 123 | + } |
| 124 | +} |
| 125 | + |
| 126 | +parentPort!.on('message', (message: ParentMessage) => { |
| 127 | + switch (message.type) { |
| 128 | + case 'BEGIN_CHUNK_PROCESSING': |
| 129 | + return handleChunkComplete(message.data); |
| 130 | + case 'INIT': |
| 131 | + return handleInit(message.data); |
| 132 | + default: |
| 133 | + throw new Error(`Unexpected message type: ${(message satisfies never as { [key: string]: any }).type}`); |
| 134 | + } |
| 135 | +}); |
0 commit comments