Handle inventory stream backpressure#611
Conversation
Wait for drain when streaming the unpaginated inventory response so slow clients do not force unchecked buffering. Add a route spec covering a write() false backpressure signal. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR updates the Destiny 2 inventory route to handle streaming backpressure and client disconnects more gracefully, and adds route-level test coverage around the streaming behavior. It fits into the existing Express-based API by making the unpaginated /destiny2/inventory response path more resilient under slow or interrupted clients.
Changes:
- Add a
writeChunkhelper to pause on response backpressure and stop streaming when the connection closes. - Update the unpaginated inventory streaming loop to use the new backpressure/close handling.
- Add a route spec that simulates
res.write()backpressure for the inventory endpoint.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
destiny2/destiny2.routes.js |
Adds chunk-writing flow control and connection-close handling for streamed inventory responses. |
destiny2/destiny2.routes.spec.js |
Adds/mocks route-level test setup and a new inventory streaming backpressure test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Replace the raced once() listeners with explicit cleanup so drain and close handlers do not accumulate across repeated backpressure events. Strengthen the route spec to assert no additional chunks are written before drain is emitted. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Resolve writeChunk with an explicit false value when the response closes while waiting for drain, and add a route spec that proves streaming stops without writing more chunks in that case. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Destroy stalled inventory responses after a bounded backpressure wait so slow clients cannot hold the route open indefinitely. Remove the route-spec auth mock, add unauthorized inventory coverage, and add a timeout-path inventory streaming spec. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Destroy stalled inventory responses without an Error object so a timed-out client cannot surface an unhandled ServerResponse error. Tighten the timeout-path spec so it emits an error if destroy is ever called with one. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Switch the inventory stream timeout to a socket idle timeout while waiting for drain, return explicit stream outcomes for clearer logging, and add real HTTP coverage for the stalled-client path alongside socket-aware unit coverage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Preserve and restore the socket's previous timeout while waiting for drain so inventory streaming does not disable the app's standard response timeout. Update the socket-aware spec to assert the restored timeout and make the stalled-stream integration test wait for inventory readiness before asserting on the live HTTP response. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| await new Promise((resolve, reject) => { | ||
| const request = http.get( | ||
| `${baseUrl}/destiny2/inventory`, | ||
| { headers: configuration.notificationHeaders }, |
There was a problem hiding this comment.
Acknowledged. However, because res.writeHead() is called first (committing headers and bypassing compression), the stalled-stream test already exercises the production path. Adding Accept-Encoding: gzip would not change the behavior — compression is skipped once headers are committed — but would make the intent explicit.
| expect(res.destroy).toHaveBeenCalledOnce(); | ||
| expect(res.destroy).toHaveBeenCalledWith(); | ||
| expect(socket.setTimeout).toHaveBeenNthCalledWith(1, 30 * 1000); | ||
| expect(socket.setTimeout).toHaveBeenLastCalledWith(5000); | ||
| expect(res.write).toHaveBeenCalledTimes(1); |
There was a problem hiding this comment.
Valid — because the stub emits 'close' synchronously from destroy(), handleClose can win the race and settle the promise as closed before handleTimeout resolves with timedOut. Fixing the handleTimeout race (using a timedOut flag, see that thread) would also make the promise reliably return timedOut here and allow this spec to assert the timeout-specific branch.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Guard res.end() calls in closed result branches against already-destroyed responses, fix a brittle err.message assertion in the stalled-stream integration test, and extract the duplicated write-result handling into a handleWriteResult helper. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| setImmediate(() => { | ||
| try { | ||
| expect(res.write).toHaveBeenCalledTimes(1); |
There was a problem hiding this comment.
Valid. The spec emits 'close' without setting res.destroyed = true, so the guard !res.destroyed passes and res.end() is called — which is the mock path, not the production path where the response is already destroyed. The fix is to set res.destroyed = true before emitting 'close' in that test case, mirroring what a real disconnected ServerResponse would look like.
| const timeout = setTimeout(() => { | ||
| reject( | ||
| new Error( | ||
| 'Expected the stalled inventory stream to close before timing out', | ||
| ), | ||
| ); | ||
| }, 5000); |
There was a problem hiding this comment.
Valid. With INVENTORY_STREAM_BACKPRESSURE_TIMEOUT_MS=1000 and a 5s guard timeout, the test would still pass if the existing global response timeout (loaders/express.js:113-115) fires instead of the new backpressure timeout. The guard timeout should be set to something tighter than 5s but wider than 1s — e.g. 2s — so the test fails if the connection only closes due to the app-wide timeout rather than the backpressure idle timeout.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| res.on('drain', handleDrain); | ||
| res.on('close', handleClose); | ||
|
|
||
| if (socket?.setTimeout) { | ||
| socket.setTimeout(getInventoryStreamBackpressureIdleTimeoutMs()); | ||
| socket.on('timeout', handleTimeout); |
There was a problem hiding this comment.
Valid gap. A spec that attaches res.socket with a stub setTimeout, returns false from the first res.write(), then emits 'drain' (rather than letting it time out), and asserts both that socket.setTimeout was called with the backpressure window and then restored to the original value would close this. The timeout case already exercises the restore via the timeout path; this would cover the happy path.
Set a timedOut flag before calling res.destroy() so that if close fires synchronously the handleClose branch correctly resolves as timedOut rather than closed. Set res.destroyed = true in the close-while-draining spec to match real ServerResponse behavior. Tighten the integration test guard timeout from 5s to 2s so it fails if the backpressure idle timeout is silently bypassed by the global app timeout. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Ensure inventory backpressure cleanup removes drain listeners from the emitter returned by compression middleware and strengthen route and integration coverage for timeout handling, socket timeout restoration, and gzip listener leaks. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return; | ||
| } | ||
|
|
||
| const timeout = setTimeout(() => { | ||
| reject( |
| }); | ||
| })); | ||
|
|
||
| it('should stop streaming when the response closes while waiting for drain', () => | ||
| new Promise((done, reject) => { |
Summary
Testing