Skip to content
114 changes: 112 additions & 2 deletions destiny2/destiny2.routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,99 @@ import toTemporalInstant from '../helpers/to-temporal-instant.js';

import configuration from '../helpers/config.js';

const inventoryStreamBackpressureIdleTimeoutMs = 30 * 1000;
Comment thread
chrispaskvan marked this conversation as resolved.
const inventoryStreamWriteResult = Object.freeze({
closed: 'closed',
ok: 'ok',
timedOut: 'timed_out',
});

function getInventoryStreamBackpressureIdleTimeoutMs() {
const value = Number.parseInt(process.env.INVENTORY_STREAM_BACKPRESSURE_TIMEOUT_MS, 10);

return Number.isFinite(value) && value > 0 ? value : inventoryStreamBackpressureIdleTimeoutMs;
}

async function writeChunk(res, chunk) {
if (res.writableEnded || res.destroyed) {
return inventoryStreamWriteResult.closed;
}

if (res.write(chunk)) {
return inventoryStreamWriteResult.ok;
}

const drained = await new Promise(resolve => {
Comment thread
chrispaskvan marked this conversation as resolved.
const socket = res.socket;
const previousSocketTimeout = socket?.timeout;
let drainEmitter;
let timeout;
const handleDrain = () => {
resolveWith(
res.writableEnded || res.destroyed
? inventoryStreamWriteResult.closed
: inventoryStreamWriteResult.ok,
);
};
let timedOut = false;
const handleClose = () => {
resolveWith(
timedOut ? inventoryStreamWriteResult.timedOut : inventoryStreamWriteResult.closed,
);
};
const handleTimeout = () => {
timedOut = true;

if (typeof res.destroy === 'function' && !res.destroyed) {
res.destroy();
}

resolveWith(inventoryStreamWriteResult.timedOut);
Comment thread
chrispaskvan marked this conversation as resolved.
Comment thread
chrispaskvan marked this conversation as resolved.
Comment thread
chrispaskvan marked this conversation as resolved.
Comment thread
chrispaskvan marked this conversation as resolved.
};
const cleanup = () => {
drainEmitter.off('drain', handleDrain);
res.off('close', handleClose);

if (socket?.setTimeout) {
socket.off('timeout', handleTimeout);
socket.setTimeout(previousSocketTimeout ?? 0);
} else if (timeout) {
clearTimeout(timeout);
}
};
const resolveWith = result => {
cleanup();
resolve(result);
};

drainEmitter = res.on('drain', handleDrain);
res.on('close', handleClose);
Comment thread
chrispaskvan marked this conversation as resolved.

if (socket?.setTimeout) {
socket.setTimeout(getInventoryStreamBackpressureIdleTimeoutMs());
socket.on('timeout', handleTimeout);
Comment thread
chrispaskvan marked this conversation as resolved.
} else {
timeout = setTimeout(handleTimeout, getInventoryStreamBackpressureIdleTimeoutMs());
timeout.unref?.();
}
});
Comment thread
chrispaskvan marked this conversation as resolved.

return drained;
}

function handleWriteResult(req, res, writeResult, context) {
if (writeResult === inventoryStreamWriteResult.ok) return false;

if (writeResult === inventoryStreamWriteResult.timedOut) {
log.warn(`${req.method} ${req.url} inventory stream timed out ${context}`);
} else {
log.info(`${req.method} ${req.url} request closed ${context}`);
if (!res.writableEnded && !res.destroyed) res.end();
}

return true;
}

/**
* @openapi
* components:
Expand Down Expand Up @@ -124,13 +217,30 @@ const routes = ({ authenticationController, destiny2Controller }) => {
return res.end();
}

res.write(first ? `[${JSON.stringify(item)}` : `,${JSON.stringify(item)}`);
const writeResult = await writeChunk(
res,
first ? `[${JSON.stringify(item)}` : `,${JSON.stringify(item)}`,
);

if (
handleWriteResult(
req,
res,
writeResult,
`while streaming item ${index} of ${items.length}`,
)
)
return;
first = false;

// Introduce a delay to allow the event loop to process the 'close' event
await new Promise(resolve => setImmediate(resolve));
}
res.write(']');

const writeResult = await writeChunk(res, ']');

if (handleWriteResult(req, res, writeResult, 'before inventory completed')) return;

res.end();
} else {
if (Number.isNaN(page)) page = 1;
Expand Down
Loading