Skip to content

Commit abad5e4

Browse files
authored
W 21683654 improve streaming error handling (#345)
* fix error handling in lambda adapter * update tests to check 502 prevention behaviour when streaming * add changeset for fix
1 parent e81379d commit abad5e4

3 files changed

Lines changed: 135 additions & 67 deletions

File tree

.changeset/hungry-otters-unite.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@salesforce/mrt-utilities': patch
3+
---
4+
5+
Improved error handling in create-lambda-adapter.ts used for streaming support when deployed to Managed Runtime.

packages/mrt-utilities/src/streaming/create-lambda-adapter.ts

Lines changed: 42 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ export function createStreamingLambdaAdapter(
115115

116116
if (isStreamOpen && typeof responseStream.write === 'function') {
117117
const errorMessage = error instanceof Error ? error.message : String(error);
118-
responseStream.write(`HTTP/1.1 500 Internal Server Error\r\n\r\nInternal Server Error: ${errorMessage}`);
118+
responseStream.write(`Internal Server Error: ${errorMessage}`);
119119
} else {
120120
console.error('[error handler] Cannot write error - stream is closed');
121121
}
@@ -163,34 +163,20 @@ async function streamResponse(
163163
}
164164
};
165165

166+
// See events defined for hhtp.serverResponse interface here:
167+
// https://nodejs.org/docs/latest-v24.x/api/http.html#class-httpserverresponse
168+
166169
// Handle response finish
167-
expressResponse.once('finish', () => {
168-
resolveOnce();
169-
});
170+
expressResponse.once('finish', resolveOnce);
171+
// handle response close, either after a 'finish' or if underlying
172+
// connection is closed early
173+
expressResponse.once('close', resolveOnce);
170174

171175
// Handle response errors
172-
expressResponse.once('error', (err: Error) => {
173-
rejectOnce(err);
174-
});
176+
expressResponse.once('error', rejectOnce);
175177

176178
try {
177-
app(expressRequest, expressResponse, (err) => {
178-
if (err) {
179-
console.error('Express app error:', err);
180-
rejectOnce(err);
181-
} else {
182-
// If response has finished, resolveOnce will be called by the finish event
183-
// Otherwise, resolve after a short delay to allow async operations
184-
if (expressResponse.finished) {
185-
resolveOnce();
186-
} else {
187-
// Wait a bit for the response to finish
188-
setTimeout(() => {
189-
resolveOnce();
190-
}, 10);
191-
}
192-
}
193-
});
179+
app(expressRequest, expressResponse);
194180
} catch (error) {
195181
console.error('Error in streamResponse:', error);
196182
rejectOnce(error as Error);
@@ -530,6 +516,7 @@ export function createExpressResponse(
530516

531517
if (!isStreamOpen()) {
532518
console.error('Cannot initialize response - stream is closed');
519+
emitCloseOnce(response);
533520
return;
534521
}
535522

@@ -557,7 +544,7 @@ export function createExpressResponse(
557544
// Create HttpResponseStream with metadata
558545
// This writes the HTTP status and headers to the stream
559546
const metadata: StreamMetadata = {
560-
statusCode,
547+
statusCode: response.statusCode || statusCode,
561548
headers,
562549
};
563550

@@ -570,6 +557,15 @@ export function createExpressResponse(
570557

571558
httpResponseStream = awslambda.HttpResponseStream.from(responseStream, metadata);
572559

560+
// Bind 'finish' and 'close' handlers to emit the same events on the response object
561+
httpResponseStream.on('finish', () => {
562+
response.finished = true;
563+
response.emit('finish');
564+
});
565+
httpResponseStream.on('close', () => {
566+
response.emit('close');
567+
});
568+
573569
// Set up compression stream if compression is enabled
574570
// The compression stream pipes to httpResponseStream, which pipes to responseStream
575571
// 'identity' means no encoding, so we should not initialize compression for it
@@ -670,7 +666,7 @@ export function createExpressResponse(
670666
reasonPhrase = undefined;
671667
}
672668

673-
statusCode = code || statusCode;
669+
statusCode = code || this.statusCode || statusCode;
674670
this.statusCode = statusCode;
675671

676672
// Set statusMessage if provided
@@ -699,6 +695,7 @@ export function createExpressResponse(
699695
res.write = function (chunk: string | Buffer | Uint8Array): boolean {
700696
if (!isStreamOpen()) {
701697
console.error(`Cannot write - stream is closed`);
698+
emitCloseOnce(this);
702699
return false;
703700
}
704701

@@ -729,6 +726,15 @@ export function createExpressResponse(
729726
}
730727
};
731728

729+
let emittedClose = false;
730+
const emitCloseOnce = (response: ExpressResponse) => {
731+
if (emittedClose) {
732+
return;
733+
}
734+
response.emit('close');
735+
emittedClose = true;
736+
};
737+
732738
/**
733739
* Ends the appropriate stream(s) and emits the finish event
734740
* If compression is enabled, ends the compression stream which will automatically
@@ -737,47 +743,29 @@ export function createExpressResponse(
737743
* @param response - The Express response object to emit finish event on
738744
*/
739745
const endStream = (response: ExpressResponse): void => {
740-
if (shouldCompress && compressionStream) {
741-
try {
742-
// Flush compression stream to ensure all buffered data is written
743-
_flush();
744-
// End compression stream - this will automatically end httpResponseStream
745-
// due to the pipe with { end: true } option
746-
compressionStream.end(() => {
747-
response.finished = true;
748-
response.emit('finish');
749-
});
750-
} catch (error) {
751-
console.error(`Error ending compression stream:`, error);
752-
// Still emit finish even if there was an error
753-
response.finished = true;
754-
response.emit('finish');
755-
}
756-
} else if (httpResponseStream && httpResponseStream.writable) {
757-
// No compression, end httpResponseStream directly
746+
const stream = (shouldCompress && compressionStream) || (httpResponseStream?.writable && httpResponseStream);
747+
if (stream) {
758748
try {
759749
_flush();
760-
httpResponseStream.end(() => {
761-
response.finished = true;
762-
response.emit('finish');
763-
});
750+
stream.end();
764751
} catch (error) {
765-
console.error(`Error ending httpResponseStream:`, error);
752+
console.error(`Error ending stream:`, error);
766753
response.finished = true;
767-
response.emit('finish');
754+
emitCloseOnce(response);
768755
}
769756
} else {
770757
console.error(`Cannot call end() - stream is closed`);
771758
// Still emit finish to prevent hanging
772759
response.finished = true;
773-
response.emit('finish');
760+
emitCloseOnce(response);
774761
}
775762
};
776763

777764
// @ts-expect-error - Type signature doesn't match ServerResponse.end exactly, but our implementation is compatible
778765
res.end = function (chunk?: string | Buffer | Uint8Array) {
779766
if (!isStreamOpen()) {
780767
console.error(`Cannot end - stream is already closed`);
768+
emitCloseOnce(this);
781769
return this;
782770
}
783771

@@ -808,9 +796,8 @@ export function createExpressResponse(
808796
// Add Express-specific methods that aren't in ServerResponse
809797
res.status = function (code: number, message?: string) {
810798
this.statusCode = code;
811-
statusCode = code;
812799
if (message !== undefined) {
813-
statusMessage = message;
800+
this.statusMessage = message;
814801
}
815802
return this;
816803
};
@@ -855,6 +842,7 @@ export function createExpressResponse(
855842
if (!responseStarted) {
856843
if (!isStreamOpen()) {
857844
console.error('[res.flushHeaders] Cannot flush headers - stream is closed');
845+
emitCloseOnce(this);
858846
return this;
859847
}
860848

@@ -901,6 +889,7 @@ export function createExpressResponse(
901889
res.flush = function () {
902890
if (!isStreamOpen()) {
903891
console.error(`Cannot flush - stream is closed`);
892+
emitCloseOnce(this);
904893
return this;
905894
}
906895

packages/mrt-utilities/test/streaming/create-lambda-adapter.test.ts

Lines changed: 88 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,22 @@ import {
1717
createExpressResponse,
1818
} from '@salesforce/mrt-utilities/streaming';
1919

20+
type WritableWithChunksAndMetadata = Writable &
21+
EventEmitter & {
22+
chunks: Buffer[];
23+
metadata: {statusCode: number; headers: Record<string, any>; cookies?: string[]};
24+
getWrittenData: (encoding?: BufferEncoding) => Buffer | string;
25+
};
26+
2027
// Mock awslambda global
2128
const mockHttpResponseStream = {
2229
from: sinon
2330
.stub()
2431
.callsFake(
25-
(stream: Writable, _metadata: {statusCode: number; headers: Record<string, any>; cookies?: string[]}) => {
32+
(
33+
stream: WritableWithChunksAndMetadata,
34+
_metadata: {statusCode: number; headers: Record<string, any>; cookies?: string[]}) => {
35+
stream.metadata = _metadata;
2636
return stream;
2737
},
2838
),
@@ -35,7 +45,7 @@ const mockHttpResponseStream = {
3545
};
3646

3747
// Mock stream type with Sinon stubs so assertions (e.g. .called, .calledWith) type-check
38-
type MockWritable = (Writable & EventEmitter) & {
48+
type MockWritable = WritableWithChunksAndMetadata & {
3949
write: sinon.SinonStub;
4050
end: sinon.SinonStub;
4151
destroy: sinon.SinonStub;
@@ -53,6 +63,8 @@ function createMockWritable(): MockWritable {
5363
stream.writableEnded = false;
5464
stream.writableFinished = false;
5565
stream.destroyed = false;
66+
stream.chunks = chunks;
67+
stream.metadata = undefined;
5668

5769
stream.write = sinon.stub().callsFake((chunk: any) => {
5870
if (destroyed || ended) return false;
@@ -84,6 +96,11 @@ function createMockWritable(): MockWritable {
8496
// Mock flush method
8597
});
8698

99+
stream.getWrittenData = function (encoding?: BufferEncoding) {
100+
const data = Buffer.concat(this.chunks);
101+
return encoding ? data.toString(encoding) : data;
102+
};
103+
87104
return stream as MockWritable;
88105
}
89106

@@ -241,7 +258,6 @@ describe('create-lambda-adapter', () => {
241258
mockResponseStream = createMockWritable();
242259
mockApp = express();
243260
mockHttpResponseStream.from.resetHistory();
244-
mockHttpResponseStream.from.callsFake((stream) => stream);
245261
});
246262

247263
afterEach(() => {
@@ -283,7 +299,7 @@ describe('create-lambda-adapter', () => {
283299

284300
await handler(event, context);
285301

286-
expect(mockResponseStream.write.firstCall.args[0]).to.include('500 Internal Server Error');
302+
expect(mockResponseStream.write.firstCall.args[0]).to.include('Error');
287303
expect(mockResponseStream.end.called).to.be.true;
288304
});
289305

@@ -298,7 +314,7 @@ describe('create-lambda-adapter', () => {
298314

299315
await handler(event, context);
300316

301-
expect(mockResponseStream.write.firstCall.args[0]).to.include('500 Internal Server Error');
317+
expect(mockResponseStream.write.firstCall.args[0]).to.include('Error');
302318
expect(mockResponseStream.end.called).to.be.true;
303319
});
304320

@@ -321,22 +337,80 @@ describe('create-lambda-adapter', () => {
321337
expect(closedStream.write.called).to.be.false;
322338
});
323339

324-
it('should handle stream without write method', async () => {
325-
mockApp.get('/test', () => {
326-
throw new Error('Test error');
340+
it('should return status code 400 response for requests to streaming route with invalid path', async () => {
341+
// We need a catch-all route to force the router to try to decode the path
342+
const dummyCatchAllRoute = (_: any, res: any) => {
343+
res.status(200).send('dummy catch-all route response');
344+
};
345+
try {
346+
//express 4 style catch-all route, throws an error when installed
347+
// in express 5, see https://github.com/pillarjs/path-to-regexp#errors
348+
mockApp.get('/*', dummyCatchAllRoute);
349+
} catch (error) {
350+
//express 5 style catch-all route
351+
mockApp.get('/{*splat}', dummyCatchAllRoute);
352+
}
353+
354+
355+
const handler = createStreamingLambdaAdapter(mockApp, mockResponseStream);
356+
const event = createMockEvent({path: '/%80'});
357+
const context = createMockContext();
358+
359+
await handler(event, context);
360+
361+
expect(mockResponseStream.metadata.statusCode).to.equal(400);
362+
expect(mockResponseStream.write.called).to.be.true;
363+
expect(mockResponseStream.end.called).to.be.true;
364+
365+
const responseData = mockResponseStream.getWrittenData('utf-8');
366+
367+
// The response body returned by finalhandler depends on the NODE_ENV environment variable, when it is set to "production", a brief error page with the message "Bad Request" is returned, otherwise it returns a more detailed error page with a stack trace for a URIError exception
368+
expect(responseData).to.contain('<title>Error</title>');
369+
expect(responseData).to.match(/URIError|Bad Request/);
370+
});
371+
372+
it('should return status code 404 response for requests to streaming route that explicitly raises to return a 404', async () => {
373+
// A 404 implemented by throwing
374+
mockApp.get('/intentional404', () => {
375+
type ErrorWithStatus = Error & {status: number};
376+
const err = new Error('Not Found') as ErrorWithStatus;
377+
err.message = 'Not Found';
378+
err.status = 404;
379+
throw err;
327380
});
328381

329-
const streamWithoutWrite = createMockWritable();
330-
delete (streamWithoutWrite as any).write;
382+
const handler = createStreamingLambdaAdapter(mockApp, mockResponseStream);
383+
const event = createMockEvent({path: '/intentional404'});
384+
const context = createMockContext();
385+
386+
await handler(event, context);
331387

332-
const handler = createStreamingLambdaAdapter(mockApp, streamWithoutWrite);
333-
const event = createMockEvent({path: '/test'});
388+
//expect(mockResponseStream.write).toHaveBeenCalled();
389+
expect(mockResponseStream.end.called).to.be.true;
390+
expect(mockResponseStream.metadata.statusCode).to.equal(404);
391+
392+
expect(mockResponseStream.getWrittenData('utf-8')).to.contain('Not Found');
393+
});
394+
395+
it('should stream status code 200 response for requests to an finalhandler style route', async () => {
396+
mockApp.get('/simpleroute', (req, res) => {
397+
// A hanlder that uses the response object in the style of https://github.com/pillarjs/finalhandler/blob/v2.1.0/index.js#L245-L280 which is used by express to handle the final response in case of an error
398+
res.statusCode = 200;
399+
res.statusMessage = 'OK';
400+
const body = 'hello world';
401+
res.setHeader('Content-Type', 'text/plain; charset=utf-8');
402+
res.setHeader('Content-Length', Buffer.byteLength(body, 'utf8'));
403+
res.end(body);
404+
});
405+
406+
const handler = createStreamingLambdaAdapter(mockApp, mockResponseStream);
407+
const event = createMockEvent({path: '/simpleroute'});
334408
const context = createMockContext();
335409

336410
await handler(event, context);
337411

338-
// Should not throw
339-
expect(streamWithoutWrite.end.called).to.be.true;
412+
expect(mockResponseStream.metadata.statusCode).to.equal(200);
413+
expect(mockResponseStream.getWrittenData('utf-8')).to.equal('hello world');
340414
});
341415

342416
it('should handle stream without end method in finally', async () => {

0 commit comments

Comments
 (0)