|
| 1 | +import { parentPort } from 'worker_threads'; |
| 2 | + |
| 3 | +/** @type {typeof import('@opendatacapture/schemas/core').DEFAULT_GROUP_NAME} */ |
| 4 | +const DEFAULT_GROUP_NAME = 'root'; |
| 5 | + |
| 6 | +/** @type {typeof import( '#runtime/v1/@opendatacapture/runtime-internal/index.js').removeSubjectIdScope} */ |
| 7 | +let removeSubjectIdScope; |
| 8 | + |
| 9 | +// we need to resolve import correctly in development |
| 10 | +try { |
| 11 | + const runtimeInternal = await import('#runtime/v1/@opendatacapture/runtime-internal/index.js'); |
| 12 | + removeSubjectIdScope = runtimeInternal.removeSubjectIdScope; |
| 13 | +} catch { |
| 14 | + const runtimeInternal = await import('@opendatacapture/runtime-v1/@opendatacapture/runtime-internal/index.js'); |
| 15 | + removeSubjectIdScope = runtimeInternal.removeSubjectIdScope; |
| 16 | +} |
| 17 | + |
| 18 | +/** |
| 19 | + * @typedef {Object} SuccessExpand |
| 20 | + * @property {string} measure |
| 21 | + * @property {any} measureValue |
| 22 | + * @property {true} success |
| 23 | + */ |
| 24 | + |
| 25 | +/** |
| 26 | + * @typedef {Object} FailureExpand |
| 27 | + * @property {string} message |
| 28 | + * @property {false} success |
| 29 | + */ |
| 30 | + |
| 31 | +/** @typedef {SuccessExpand | FailureExpand} ExpandDataType */ |
| 32 | + |
| 33 | +/** |
| 34 | + * Flattens nested record array data into a list of expandable data objects. |
| 35 | + * @param {any[]} listEntry - The array of records to expand. |
| 36 | + * @returns {ExpandDataType[]} An array of expanded measure objects. |
| 37 | + * @throws {Error} If the provided listEntry is empty. |
| 38 | + */ |
| 39 | +function expandData(listEntry) { |
| 40 | + /** @type {SuccessExpand[]} */ |
| 41 | + const validRecordArrayList = []; |
| 42 | + if (listEntry.length < 1) { |
| 43 | + throw new Error('Record Array is Empty'); |
| 44 | + } |
| 45 | + for (const objectEntry of Object.values(listEntry)) { |
| 46 | + for (const [dataKey, dataValue] of Object.entries(objectEntry)) { |
| 47 | + validRecordArrayList.push({ |
| 48 | + measure: dataKey, |
| 49 | + measureValue: dataValue, |
| 50 | + success: true |
| 51 | + }); |
| 52 | + } |
| 53 | + } |
| 54 | + return validRecordArrayList; |
| 55 | +} |
| 56 | + |
| 57 | +/** |
| 58 | + * Internal cache for instrument metadata. |
| 59 | + * @type {Map<string | undefined, { edition: number, id: string, name: string }>} |
| 60 | + */ |
| 61 | +let initData; |
| 62 | + |
| 63 | +/** |
| 64 | + * Initializes the worker with instrument metadata. |
| 65 | + * * @param {Array<{id: string, edition: number, name: string}>} data - The initialization payload. |
| 66 | + */ |
| 67 | +function handleInit(data) { |
| 68 | + initData = new Map(data.map((instrument) => [instrument.id, instrument])); |
| 69 | + parentPort?.postMessage({ success: true }); |
| 70 | +} |
| 71 | + |
| 72 | +/** |
| 73 | + * Processes a chunk of records and posts results back to the parent thread. |
| 74 | + * @param {import('./thread-types').BeginChunkProcessingData} _data - The collection of records to process. |
| 75 | + * @throws {Error} If initData is not defined. |
| 76 | + */ |
| 77 | +function handleChunkComplete(_data) { |
| 78 | + if (!initData) { |
| 79 | + throw new Error('Expected init data to be defined'); |
| 80 | + } |
| 81 | + const instrumentsMap = initData; |
| 82 | + |
| 83 | + /** |
| 84 | + * Transforms a single record into an array of exportable rows. |
| 85 | + * @param {import('./thread-types').RecordType} record - The raw instrument record. |
| 86 | + * @returns {import('@opendatacapture/schemas/instrument-records').InstrumentRecordsExport} The processed rows. |
| 87 | + */ |
| 88 | + const processRecord = (record) => { |
| 89 | + const instrument = instrumentsMap.get(record.instrumentId); |
| 90 | + if (!instrument) { |
| 91 | + throw new Error(`Instrument not found for ID: ${record.instrumentId}`); |
| 92 | + } |
| 93 | + |
| 94 | + if (!record.computedMeasures) { |
| 95 | + return []; |
| 96 | + } |
| 97 | + |
| 98 | + const rows = []; |
| 99 | + |
| 100 | + for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) { |
| 101 | + if (measureValue == null) continue; |
| 102 | + |
| 103 | + if (!Array.isArray(measureValue)) { |
| 104 | + rows.push({ |
| 105 | + groupId: record.groupId ?? DEFAULT_GROUP_NAME, |
| 106 | + instrumentEdition: instrument.edition, |
| 107 | + instrumentName: instrument.name, |
| 108 | + measure: measureKey, |
| 109 | + sessionDate: record.session.date, |
| 110 | + sessionId: record.session.id, |
| 111 | + sessionType: record.session.type, |
| 112 | + subjectAge: record.subject.age, |
| 113 | + subjectId: removeSubjectIdScope(record.subject.id), |
| 114 | + subjectSex: record.subject.sex, |
| 115 | + timestamp: record.date, |
| 116 | + username: record.session.user?.username ?? 'N/A', |
| 117 | + value: measureValue |
| 118 | + }); |
| 119 | + continue; |
| 120 | + } |
| 121 | + |
| 122 | + if (measureValue.length < 1) continue; |
| 123 | + |
| 124 | + const expanded = expandData(measureValue); |
| 125 | + for (const entry of expanded) { |
| 126 | + if (!entry.success) { |
| 127 | + throw new Error(`exportRecords: ${instrument.name}.${measureKey} — ${entry.message}`); |
| 128 | + } |
| 129 | + rows.push({ |
| 130 | + groupId: record.groupId ?? DEFAULT_GROUP_NAME, |
| 131 | + instrumentEdition: instrument.edition, |
| 132 | + instrumentName: instrument.name, |
| 133 | + measure: `${measureKey} - ${entry.measure}`, |
| 134 | + sessionDate: record.session.date, |
| 135 | + sessionId: record.session.id, |
| 136 | + sessionType: record.session.type, |
| 137 | + subjectAge: record.subject.age, |
| 138 | + subjectId: removeSubjectIdScope(record.subject.id), |
| 139 | + subjectSex: record.subject.sex, |
| 140 | + timestamp: record.date, |
| 141 | + username: record.session.user?.username ?? 'N/A', |
| 142 | + value: entry.measureValue |
| 143 | + }); |
| 144 | + } |
| 145 | + } |
| 146 | + return rows; |
| 147 | + }; |
| 148 | + |
| 149 | + try { |
| 150 | + const results = _data.map(processRecord); |
| 151 | + parentPort?.postMessage({ data: results.flat(), success: true }); |
| 152 | + } catch (error) { |
| 153 | + if (error instanceof Error) { |
| 154 | + parentPort?.postMessage({ error: error.message, success: false }); |
| 155 | + } else { |
| 156 | + parentPort?.postMessage({ error: 'Unknown Error', success: false }); |
| 157 | + } |
| 158 | + } |
| 159 | +} |
| 160 | + |
| 161 | +/** |
| 162 | + * Worker Message Router |
| 163 | + * @param {import('./thread-types').ParentMessage} message |
| 164 | + */ |
| 165 | +parentPort?.on('message', (message) => { |
| 166 | + switch (message.type) { |
| 167 | + case 'BEGIN_CHUNK_PROCESSING': |
| 168 | + return handleChunkComplete(message.data); |
| 169 | + case 'INIT': |
| 170 | + return handleInit(message.data); |
| 171 | + default: |
| 172 | + throw new Error(`Unexpected message type: ${message.type}`); |
| 173 | + } |
| 174 | +}); |
0 commit comments