Skip to content

Commit b152a3d

Browse files
authored
Merge pull request #1257 from david-roper/add-threading
Add threading
2 parents fc3e112 + 4bb1edf commit b152a3d

5 files changed

Lines changed: 557 additions & 122 deletions

File tree

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import type { Model } from '@douglasneuroinformatics/libnest';
2+
import { getModelToken } from '@douglasneuroinformatics/libnest';
3+
import { MockFactory } from '@douglasneuroinformatics/libnest/testing';
4+
import type { MockedInstance } from '@douglasneuroinformatics/libnest/testing';
5+
import { NotFoundException } from '@nestjs/common';
6+
import { Test } from '@nestjs/testing';
7+
import type { InstrumentRecord, Session, Subject, User } from '@prisma/client';
8+
import { beforeEach, describe, expect, it } from 'vitest';
9+
10+
import { GroupsService } from '../../groups/groups.service';
11+
import { InstrumentsService } from '../../instruments/instruments.service';
12+
import { SessionsService } from '../../sessions/sessions.service';
13+
import { SubjectsService } from '../../subjects/subjects.service';
14+
import { InstrumentMeasuresService } from '../instrument-measures.service';
15+
import { InstrumentRecordsService } from '../instrument-records.service';
16+
17+
describe('InstrumentRecordsService', () => {
18+
let instrumentRecordsService: InstrumentRecordsService;
19+
let instrumentRecordModel: MockedInstance<Model<'InstrumentRecord'>>;
20+
// let _groupsService: MockedInstance<GroupsService>;
21+
// let _instrumentMeasuresService: MockedInstance<InstrumentMeasuresService>;
22+
let instrumentsService: MockedInstance<InstrumentsService>;
23+
// let _sessionsService: MockedInstance<SessionsService>;
24+
// let _subjectsService: MockedInstance<SubjectsService>;
25+
26+
beforeEach(async () => {
27+
const moduleRef = await Test.createTestingModule({
28+
providers: [
29+
InstrumentRecordsService,
30+
MockFactory.createForModelToken(getModelToken('InstrumentRecord')),
31+
MockFactory.createForService(GroupsService),
32+
MockFactory.createForService(InstrumentMeasuresService),
33+
MockFactory.createForService(InstrumentsService),
34+
MockFactory.createForService(SessionsService),
35+
MockFactory.createForService(SubjectsService)
36+
]
37+
}).compile();
38+
39+
instrumentRecordModel = moduleRef.get(getModelToken('InstrumentRecord'));
40+
instrumentRecordsService = moduleRef.get(InstrumentRecordsService);
41+
// _groupsService = moduleRef.get(GroupsService);
42+
// _instrumentMeasuresService = moduleRef.get(InstrumentMeasuresService);
43+
instrumentsService = moduleRef.get(InstrumentsService);
44+
// _sessionsService = moduleRef.get(SessionsService);
45+
// _subjectsService = moduleRef.get(SubjectsService);
46+
});
47+
48+
describe('findById', () => {
49+
it('should throw a NotFoundException if no record is found', async () => {
50+
instrumentRecordModel.findFirst.mockResolvedValueOnce(null);
51+
await expect(instrumentRecordsService.findById('nonexistent-id')).rejects.toBeInstanceOf(NotFoundException);
52+
});
53+
54+
it('should return the instrument record with the correct shape', async () => {
55+
const mockRecord = {
56+
data: { test: 'data' },
57+
date: new Date(),
58+
id: 'test-record-id',
59+
instrumentId: 'test-instrument-id',
60+
sessionId: 'test-session-id',
61+
subjectId: 'test-subject-id'
62+
};
63+
64+
instrumentRecordModel.findFirst.mockResolvedValueOnce(mockRecord);
65+
66+
const result = await instrumentRecordsService.findById('test-record-id');
67+
68+
expect(result).toMatchObject({
69+
data: { test: 'data' },
70+
id: 'test-record-id',
71+
instrumentId: 'test-instrument-id',
72+
sessionId: 'test-session-id',
73+
subjectId: 'test-subject-id'
74+
});
75+
expect(result.date).toBeInstanceOf(Date);
76+
});
77+
});
78+
79+
describe('exportRecords', () => {
80+
it('should return an array of export records with correct shape', async () => {
81+
const mockRecords = [
82+
{
83+
assignmentId: '123',
84+
computedMeasures: { score: 85 },
85+
createdAt: new Date(),
86+
data: {
87+
score: 85
88+
},
89+
date: new Date('2023-01-01'),
90+
groupId: '123',
91+
id: 'record-1',
92+
instrumentId: 'instrument-1',
93+
session: {
94+
date: new Date('2023-01-01'),
95+
id: 'session-1',
96+
type: 'IN_PERSON' as const,
97+
user: { username: 'testuser' }
98+
},
99+
sessionId: '123',
100+
subject: {
101+
dateOfBirth: new Date('1990-01-01'),
102+
groupIds: ['group-1'],
103+
id: 'subject-1',
104+
sex: 'MALE' as const
105+
},
106+
subjectId: '123',
107+
updatedAt: new Date()
108+
}
109+
] satisfies (InstrumentRecord & {
110+
session: Partial<Session> & { user: Partial<User> };
111+
subject: Partial<Subject>;
112+
})[];
113+
114+
const mockInstruments = [
115+
{
116+
id: 'instrument-1',
117+
internal: { edition: 1, name: 'Test Instrument' }
118+
}
119+
];
120+
121+
instrumentRecordModel.aggregateRaw.mockResolvedValueOnce(mockRecords);
122+
instrumentsService.findById.mockResolvedValueOnce(mockInstruments[0]);
123+
124+
const ability = { can: () => true } as any;
125+
126+
const result = await instrumentRecordsService.exportRecords({}, { ability });
127+
128+
expect(Array.isArray(result)).toBe(true);
129+
expect(result.length).toBeGreaterThan(0);
130+
expect(result[0]).toMatchObject({
131+
groupId: '123',
132+
instrumentEdition: 1,
133+
instrumentName: 'Test Instrument',
134+
measure: 'score',
135+
sessionId: 'session-1',
136+
sessionType: 'IN_PERSON',
137+
subjectId: expect.any(String),
138+
timestamp: expect.any(String),
139+
username: 'testuser',
140+
value: 85
141+
});
142+
});
143+
});
144+
});
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import { parentPort } from 'worker_threads';
2+
3+
import type { FormTypes, InstrumentMeasureValue } from '@opendatacapture/runtime-core';
4+
import { DEFAULT_GROUP_NAME } from '@opendatacapture/schemas/core';
5+
import type { InstrumentRecordsExport } from '@opendatacapture/schemas/instrument-records';
6+
import { removeSubjectIdScope } from '@opendatacapture/subject-utils';
7+
8+
import type { ChunkCompleteData, InitData, ParentMessage, RecordType } from './thread-types';
9+
10+
type ExpandDataType =
11+
| {
12+
measure: string;
13+
measureValue: FormTypes.RecordArrayFieldValue | InstrumentMeasureValue;
14+
success: true;
15+
}
16+
| {
17+
message: string;
18+
success: false;
19+
};
20+
21+
function expandData(listEntry: any[]): ExpandDataType[] {
22+
const validRecordArrayList: ExpandDataType[] = [];
23+
if (listEntry.length < 1) {
24+
throw new Error('Record Array is Empty');
25+
}
26+
for (const objectEntry of Object.values(listEntry)) {
27+
for (const [dataKey, dataValue] of Object.entries(
28+
objectEntry as { [key: string]: FormTypes.RecordArrayFieldValue }
29+
)) {
30+
validRecordArrayList.push({
31+
measure: dataKey,
32+
measureValue: dataValue,
33+
success: true
34+
});
35+
}
36+
}
37+
return validRecordArrayList;
38+
}
39+
40+
let initData: Map<
41+
string | undefined,
42+
{
43+
edition: number;
44+
id: string;
45+
name: string;
46+
}
47+
>;
48+
49+
function handleInit(data: InitData) {
50+
initData = new Map(data.map((instrument) => [instrument.id, instrument]));
51+
52+
parentPort?.postMessage({ success: true });
53+
}
54+
55+
function handleChunkComplete(_data: ChunkCompleteData) {
56+
if (!initData) {
57+
throw new Error('Expected init data to be defined');
58+
}
59+
const instrumentsMap = initData;
60+
61+
const processRecord = (record: RecordType): InstrumentRecordsExport => {
62+
const instrument = instrumentsMap.get(record.instrumentId)!;
63+
64+
if (!record.computedMeasures) return [];
65+
66+
//const instrument = instrumentsMap.get(record.instrumentId)!;
67+
const rows: InstrumentRecordsExport = [];
68+
69+
for (const [measureKey, measureValue] of Object.entries(record.computedMeasures)) {
70+
if (measureValue == null) continue;
71+
72+
if (!Array.isArray(measureValue)) {
73+
rows.push({
74+
groupId: record.groupId ?? DEFAULT_GROUP_NAME,
75+
instrumentEdition: instrument.edition,
76+
instrumentName: instrument.name,
77+
measure: measureKey,
78+
sessionDate: record.session.date,
79+
sessionId: record.session.id,
80+
sessionType: record.session.type,
81+
subjectAge: record.subject.age,
82+
subjectId: removeSubjectIdScope(record.subject.id),
83+
subjectSex: record.subject.sex,
84+
timestamp: record.date,
85+
username: record.session.user?.username ?? 'N/A',
86+
value: measureValue as InstrumentMeasureValue
87+
});
88+
continue;
89+
}
90+
91+
if (measureValue.length < 1) continue;
92+
93+
const expanded = expandData(measureValue);
94+
for (const entry of expanded) {
95+
if (!entry.success) {
96+
throw new Error(`exportRecords: ${instrument.name}.${measureKey}${entry.message}`);
97+
}
98+
rows.push({
99+
groupId: record.groupId ?? DEFAULT_GROUP_NAME,
100+
instrumentEdition: instrument.edition,
101+
instrumentName: instrument.name,
102+
measure: `${measureKey} - ${entry.measure}`,
103+
sessionDate: record.session.date,
104+
sessionId: record.session.id,
105+
sessionType: record.session.type,
106+
subjectAge: record.subject.age,
107+
subjectId: removeSubjectIdScope(record.subject.id),
108+
subjectSex: record.subject.sex,
109+
timestamp: record.date,
110+
username: record.session.user?.username ?? 'N/A',
111+
value: entry.measureValue
112+
});
113+
}
114+
}
115+
return rows;
116+
};
117+
118+
try {
119+
const results = _data.map(processRecord);
120+
parentPort?.postMessage({ data: results.flat(), success: true });
121+
} catch (error) {
122+
parentPort?.postMessage({ error: (error as Error).message, success: false });
123+
}
124+
}
125+
126+
parentPort!.on('message', (message: ParentMessage) => {
127+
switch (message.type) {
128+
case 'CHUNK_COMPLETE':
129+
return handleChunkComplete(message.data);
130+
case 'INIT':
131+
return handleInit(message.data);
132+
default:
133+
throw new Error(`Unexpected message type: ${(message satisfies never as { [key: string]: any }).type}`);
134+
}
135+
});

0 commit comments

Comments
 (0)