diff --git a/src/messaging/messaging.ts b/src/messaging/messaging.ts index 05367128f4..139369a5bd 100644 --- a/src/messaging/messaging.ts +++ b/src/messaging/messaging.ts @@ -17,8 +17,8 @@ import { App } from '../app'; import { deepCopy } from '../utils/deep-copy'; -import { - ErrorInfo, MessagingClientErrorCode, FirebaseMessagingError, FirebaseMessagingSessionError +import { + ErrorInfo, MessagingClientErrorCode, FirebaseMessagingError } from '../utils/error'; import * as utils from '../utils'; import * as validator from '../utils/validator'; @@ -156,7 +156,7 @@ export class Messaging { } return this.getUrlPath() .then((urlPath) => { - const request: { message: Message; validate_only?: boolean } = { message: copy }; + const request: { message: Message; validate_only?: boolean; } = { message: copy }; if (dryRun) { request.validate_only = true; } @@ -212,57 +212,58 @@ export class Messaging { return this.getUrlPath() .then((urlPath) => { - if (http2SessionHandler) { - let sendResponsePromise: Promise[]>; - return new Promise((resolve: (result: PromiseSettledResult[]) => void, reject) => { - // Start session listeners - http2SessionHandler.invoke().catch((error) => { - const pendingBatchResponse = - sendResponsePromise ? sendResponsePromise.then(this.parseSendResponses) : undefined; - reject(new FirebaseMessagingSessionError(error, undefined, pendingBatchResponse)); - }); - - // Start making requests - const requests: Promise[] = copy.map(async (message) => { - validateMessage(message); - const request: { message: Message; validate_only?: boolean; } = { message }; - if (dryRun) { - request.validate_only = true; - } - return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse( - FCM_SEND_HOST, urlPath, request, http2SessionHandler); - }); - - // Resolve once all requests have completed - sendResponsePromise = Promise.allSettled(requests); - sendResponsePromise.then(resolve); - }); - } else { - const requests: Promise[] = copy.map(async (message) => { - validateMessage(message); - const request: { message: Message; validate_only?: boolean; } = { message }; - if (dryRun) { - request.validate_only = true; - } + const requests: Promise[] = copy.map(async (message) => { + validateMessage(message); + const request: { message: Message; validate_only?: boolean; } = { message }; + if (dryRun) { + request.validate_only = true; + } + if (http2SessionHandler) { + return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse( + FCM_SEND_HOST, urlPath, request, http2SessionHandler); + } else { return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse( FCM_SEND_HOST, urlPath, request); - }); - return Promise.allSettled(requests); - } + } + }); + return Promise.allSettled(requests); + }) + .then((results) => { + const sessionErrors = http2SessionHandler ? http2SessionHandler.getErrors() : []; + return this.parseSendResponses(results, sessionErrors); }) - .then(this.parseSendResponses) .finally(() => { http2SessionHandler?.close(); }); } - private parseSendResponses(results: PromiseSettledResult[]): BatchResponse { + private parseSendResponses( + results: PromiseSettledResult[], + sessionErrors: Error[] = [] + ): BatchResponse { const responses: SendResponse[] = []; results.forEach(result => { if (result.status === 'fulfilled') { responses.push(result.value); } else { // rejected - responses.push({ success: false, error: result.reason }); + let error = result.reason; + if (sessionErrors.length > 0) { + // Combine the original stream error and all session errors + const allErrors = [result.reason, ...sessionErrors]; + // TODO: AggregateError is supported in Node 18+ but only included in the ES2021+ + // We use (global as any).AggregateError as a workaround to access it in ES2020. + const cause = new (global as any).AggregateError(allErrors, 'Stream failure and session failures occurred'); + + const streamMessage = result.reason.message || 'Unknown stream error'; + const sessionMessage = `. Session failures: ${sessionErrors.map(e => e.message).join(', ')}`; + + error = new FirebaseMessagingError({ + code: MessagingClientErrorCode.UNKNOWN_ERROR.code, + message: `${streamMessage}${sessionMessage}`, + cause: cause + }); + } + responses.push({ success: false, error }); } }); const successCount: number = responses.filter((resp) => resp.success).length; diff --git a/src/utils/api-request.ts b/src/utils/api-request.ts index ffb8528209..f1603690c4 100644 --- a/src/utils/api-request.ts +++ b/src/utils/api-request.ts @@ -43,7 +43,7 @@ export interface BaseRequestConfig { method: HttpMethod; /** Target URL of the request. Should be a well-formed URL including protocol, hostname, port and path. */ url: string; - headers?: {[key: string]: string}; + headers?: { [key: string]: string; }; data?: string | object | Buffer | null; /** Connect and read timeout (in milliseconds) for the outgoing request. */ timeout?: number; @@ -63,7 +63,7 @@ export interface Http2RequestConfig extends BaseRequestConfig { http2SessionHandler: Http2SessionHandler; } -type RequestConfig = HttpRequestConfig | Http2RequestConfig +type RequestConfig = HttpRequestConfig | Http2RequestConfig; /** * Represents an HTTP or HTTP/2 response received from a remote server. @@ -96,15 +96,15 @@ interface LowLevelHttpResponse extends BaseLowLevelResponse { config: HttpRequestConfig; } -type IncomingHttp2Headers = http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader +type IncomingHttp2Headers = http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader; interface LowLevelHttp2Response extends BaseLowLevelResponse { - headers: IncomingHttp2Headers + headers: IncomingHttp2Headers; request: http2.ClientHttp2Stream | null; config: Http2RequestConfig; } -type LowLevelResponse = LowLevelHttpResponse | LowLevelHttp2Response +type LowLevelResponse = LowLevelHttpResponse | LowLevelHttp2Response; interface BaseLowLevelError extends Error { code?: string; @@ -122,7 +122,7 @@ interface LowLevelHttp2Error extends BaseLowLevelError { response?: LowLevelHttp2Response; } -type LowLevelError = LowLevelHttpError | LowLevelHttp2Error +type LowLevelError = LowLevelHttpError | LowLevelHttp2Error; class DefaultRequestResponse implements RequestResponse { @@ -294,7 +294,7 @@ export class RequestClient { constructor(retry: RetryConfig | null = defaultRetryConfig()) { if (retry) { - this.retry = retry + this.retry = retry; validateRetryConfig(this.retry); } } @@ -397,7 +397,7 @@ export class RequestClient { export class HttpClient extends RequestClient { constructor(retry?: RetryConfig | null) { - super(retry) + super(retry); } /** @@ -544,7 +544,7 @@ export function parseHttpResponse( const statusLine: string = headerLines[0]; const status: string = statusLine.trim().split(/\s/)[1]; - const headers: {[key: string]: string} = {}; + const headers: { [key: string]: string; } = {}; headerLines.slice(1).forEach((line) => { const colonPos = line.indexOf(':'); const name = line.substring(0, colonPos).trim().toLowerCase(); @@ -584,7 +584,7 @@ class AsyncRequestCall { protected entity: Buffer | undefined; protected promise: Promise; - constructor(private readonly configImpl: HttpRequestConfigImpl | Http2RequestConfigImpl) {} + constructor(private readonly configImpl: HttpRequestConfigImpl | Http2RequestConfigImpl) { } /** * Extracts multipart boundary from the HTTP header. The content-type header of a multipart @@ -600,13 +600,13 @@ class AsyncRequestCall { } const segments: string[] = contentType.split(';'); - const emptyObject: {[key: string]: string} = {}; + const emptyObject: { [key: string]: string; } = {}; const headerParams = segments.slice(1) .map((segment) => segment.trim().split('=')) .reduce((curr, params) => { // Parse key=value pairs in the content-type header into properties of an object. if (params.length === 2) { - const keyValuePair: {[key: string]: string} = {}; + const keyValuePair: { [key: string]: string; } = {}; keyValuePair[params[0]] = params[1]; return Object.assign(curr, keyValuePair); } @@ -740,7 +740,7 @@ class AsyncHttpCall extends AsyncRequestCall { private constructor(config: HttpRequestConfig) { const httpConfigImpl = new HttpRequestConfigImpl(config); - super(httpConfigImpl) + super(httpConfigImpl); try { this.httpConfigImpl = httpConfigImpl; this.options = this.httpConfigImpl.buildRequestOptions(); @@ -831,7 +831,7 @@ class AsyncHttpCall extends AsyncRequestCall { } class AsyncHttp2Call extends AsyncRequestCall { - private readonly http2ConfigImpl: Http2RequestConfigImpl + private readonly http2ConfigImpl: Http2RequestConfigImpl; /** * Sends an HTTP2 request based on the provided configuration. @@ -842,7 +842,7 @@ class AsyncHttp2Call extends AsyncRequestCall { private constructor(config: Http2RequestConfig) { const http2ConfigImpl = new Http2RequestConfigImpl(config); - super(http2ConfigImpl) + super(http2ConfigImpl); try { this.http2ConfigImpl = http2ConfigImpl; this.options = this.http2ConfigImpl.buildRequestOptions(); @@ -894,7 +894,7 @@ class AsyncHttp2Call extends AsyncRequestCall { req.end(this.entity); } - private handleHttp2Response(headers: IncomingHttp2Headers, stream: http2.ClientHttp2Stream): void{ + private handleHttp2Response(headers: IncomingHttp2Headers, stream: http2.ClientHttp2Stream): void { if (stream.aborted) { return; } @@ -945,7 +945,7 @@ class AsyncHttp2Call extends AsyncRequestCall { class BaseRequestConfigImpl implements BaseRequestConfig { constructor(protected readonly config: RequestConfig) { - this.config = config + this.config = config; } get method(): HttpMethod { @@ -956,7 +956,7 @@ class BaseRequestConfigImpl implements BaseRequestConfig { return this.config.url; } - get headers(): {[key: string]: string} | undefined { + get headers(): { [key: string]: string; } | undefined { return this.config.headers; } @@ -1000,7 +1000,7 @@ class BaseRequestConfigImpl implements BaseRequestConfig { } // Parse URL and append data to query string. const parsedUrl = new url.URL(fullUrl); - const dataObj = this.data as {[key: string]: string}; + const dataObj = this.data as { [key: string]: string; }; for (const key in dataObj) { if (Object.prototype.hasOwnProperty.call(dataObj, key)) { parsedUrl.searchParams.append(key, dataObj[key]); @@ -1033,7 +1033,7 @@ class BaseRequestConfigImpl implements BaseRequestConfig { class HttpRequestConfigImpl extends BaseRequestConfigImpl implements HttpRequestConfig { constructor(private readonly httpConfig: HttpRequestConfig) { - super(httpConfig) + super(httpConfig); } get httpAgent(): http.Agent | undefined { @@ -1067,7 +1067,7 @@ class HttpRequestConfigImpl extends BaseRequestConfigImpl implements HttpRequest class Http2RequestConfigImpl extends BaseRequestConfigImpl implements Http2RequestConfig { constructor(private readonly http2Config: Http2RequestConfig) { - super(http2Config) + super(http2Config); } get http2SessionHandler(): Http2SessionHandler { @@ -1115,7 +1115,7 @@ export class AuthorizedHttpClient extends HttpClient { } if (!requestCopy.headers['X-Goog-Api-Client']) { - requestCopy.headers['X-Goog-Api-Client'] = getMetricsHeader() + requestCopy.headers['X-Goog-Api-Client'] = getMetricsHeader(); } return super.send(requestCopy); @@ -1150,13 +1150,13 @@ export class AuthorizedHttp2Client extends Http2Client { requestCopy.headers['x-goog-user-project'] = quotaProjectId; } - if (!requestCopy.headers['X-Goog-Api-Client']) { - requestCopy.headers['X-Goog-Api-Client'] = getMetricsHeader() + if (!requestCopy.headers['X-Goog-Api-Client']) { + requestCopy.headers['X-Goog-Api-Client'] = getMetricsHeader(); } return super.send(requestCopy); }); - } + } protected getToken(): Promise { return this.app.INTERNAL.getToken() @@ -1257,9 +1257,9 @@ export class ExponentialBackoffPoller extends EventEmitter { private reject: (err: object) => void; constructor( - private readonly initialPollingDelayMillis: number = 1000, - private readonly maxPollingDelayMillis: number = 10000, - private readonly masterTimeoutMillis: number = 60000) { + private readonly initialPollingDelayMillis: number = 1000, + private readonly maxPollingDelayMillis: number = 10000, + private readonly masterTimeoutMillis: number = 60000) { super(); } @@ -1305,7 +1305,7 @@ export class ExponentialBackoffPoller extends EventEmitter { if (!result) { this.repollTimer = - setTimeout(() => this.emit('poll'), this.getPollingDelayMillis()); + setTimeout(() => this.emit('poll'), this.getPollingDelayMillis()); this.numTries++; return; } @@ -1341,72 +1341,63 @@ export class ExponentialBackoffPoller extends EventEmitter { export class Http2SessionHandler { - private http2Session: http2.ClientHttp2Session - protected promise: Promise - protected resolve: () => void; - protected reject: (_: any) => void; + private http2Session: http2.ClientHttp2Session; + private sessionErrors: Error[] = []; - constructor(url: string){ - this.promise = new Promise((resolve, reject) => { - this.resolve = resolve; - this.reject = reject; - this.http2Session = this.createSession(url) - }); + constructor(url: string) { + this.createSession(url); } public createSession(url: string): http2.ClientHttp2Session { - if (!this.http2Session || this.isClosed ) { + if (!this.http2Session || this.isCurrentSessionClosed) { + this.sessionErrors = []; const opts: http2.SecureClientSessionOptions = { // Set local max concurrent stream limit to respect backend limit peerMaxConcurrentStreams: 100, ALPNProtocols: ['h2'] } - const http2Session = http2.connect(url, opts) + this.http2Session = http2.connect(url, opts) - http2Session.on('goaway', (errorCode, _, opaqueData) => { - this.reject(new FirebaseAppError({ + this.http2Session.on('goaway', (errorCode, _, opaqueData) => { + const error = new FirebaseAppError({ code: AppErrorCodes.NETWORK_ERROR, message: `Error while making requests: GOAWAY - ${opaqueData?.toString()}, Error code: ${errorCode}` - })); - }) + }); + this.sessionErrors.push(error); + }); - http2Session.on('error', (error: any) => { + this.http2Session.on('error', (error: any) => { let errorMessage: any; - if (error.name == 'AggregateError' && error.errors) { - errorMessage = `Session error while making requests: ${error.code} - ${error.name}: ` + - `[${error.errors.map((e: any) => e.message).join(', ')}]` + if (error?.name === 'AggregateError' && Array.isArray(error.errors)) { + errorMessage = `Session error while making requests: ${error?.code} - ${error?.name}: ` + + `[${error.errors.map((e: any) => e.message).join(', ')}]`; } else { - errorMessage = `Session error while making requests: ${error.code} - ${error.message} ` + errorMessage = `Session error while making requests: ${error?.code} - ${error?.message} `; } - this.reject(new FirebaseAppError({ + const appError = new FirebaseAppError({ code: AppErrorCodes.NETWORK_ERROR, message: errorMessage, cause: error, - })); - }) - - http2Session.on('close', () => { - // Resolve current promise - this.resolve() + }); + this.sessionErrors.push(appError); }); - return http2Session } - return this.http2Session + return this.http2Session; } - public invoke(): Promise { - return this.promise + public getErrors(): Error[] { + return this.sessionErrors; } get session(): http2.ClientHttp2Session { - return this.http2Session + return this.http2Session; } - get isClosed(): boolean { - return this.http2Session.closed + get isCurrentSessionClosed(): boolean { + return !!this.http2Session?.closed; } public close(): void { - this.http2Session.close() + this.http2Session?.close(); } } \ No newline at end of file diff --git a/src/utils/error.ts b/src/utils/error.ts index f428e28780..a0f8641fbe 100644 --- a/src/utils/error.ts +++ b/src/utils/error.ts @@ -15,7 +15,6 @@ * limitations under the License. */ -import { BatchResponse } from '../messaging/messaging-api'; import { deepCopy } from '../utils/deep-copy'; import { RequestResponseError, RequestResponse } from './api-request'; @@ -426,30 +425,7 @@ export class FirebaseMessagingError extends PrefixedFirebaseError { } } -export class FirebaseMessagingSessionError extends FirebaseMessagingError { - public pendingBatchResponse?: Promise; - /** - * - * @param info - The error code info. - * @param message - The error message. This will override the default message if provided. - * @param pendingBatchResponse - BatchResponse for pending messages when session error occured. - */ - constructor(info: ErrorInfo, message?: string, pendingBatchResponse?: Promise) { - // Override default message if custom message provided. - super(info, message || info.message); - this.pendingBatchResponse = pendingBatchResponse; - - } - /** @returns The object representation of the error. */ - public toJSON(): object { - return { - code: this.code, - message: this.message, - pendingBatchResponse: this.pendingBatchResponse, - }; - } -} /** * Firebase project management error code structure. This extends PrefixedFirebaseError. diff --git a/test/unit/messaging/messaging.spec.ts b/test/unit/messaging/messaging.spec.ts index af4dad1d4d..35763e28aa 100644 --- a/test/unit/messaging/messaging.spec.ts +++ b/test/unit/messaging/messaging.spec.ts @@ -34,7 +34,6 @@ import { import { HttpClient } from '../../../src/utils/api-request'; import { getMetricsHeader, getSdkVersion } from '../../../src/utils/index'; import * as utils from '../utils'; -import { FirebaseMessagingSessionError } from '../../../src/utils/error'; chai.should(); chai.use(sinonChai); @@ -913,7 +912,7 @@ describe('Messaging', () => { }); }); - it('should throw error with BatchResponse promise on session error event using HTTP/2', () => { + it('should be fulfilled with a BatchResponse containing session errors when session fails using HTTP/2', () => { mockedHttp2Responses.push(mockHttp2SendRequestResponse('projects/projec_id/messages/1')) const sessionError = 'MOCK_SESSION_ERROR' mockedHttp2Responses.push(mockHttp2Error( @@ -924,18 +923,63 @@ describe('Messaging', () => { return messaging.sendEach( [validMessage, validMessage], true - ).catch(async (error: FirebaseMessagingSessionError) => { - expect(error.code).to.equal('messaging/app/network-error'); - expect(error.pendingBatchResponse).to.not.be.undefined; - await error.pendingBatchResponse?.then((response: BatchResponse) => { - expect(http2Mocker.requests.length).to.equal(2); - expect(response.failureCount).to.equal(1); - const responses = response.responses; - checkSendResponseSuccess(responses[0], 'projects/projec_id/messages/1'); - checkSendResponseFailure(responses[1], 'app/network-error'); - }) + ).then((response: BatchResponse) => { + expect(http2Mocker.requests.length).to.equal(2); + expect(response.successCount).to.equal(1); + expect(response.failureCount).to.equal(1); + const responses = response.responses; + checkSendResponseSuccess(responses[0], 'projects/projec_id/messages/1'); + checkSendResponseFailure( + responses[1], + 'messaging/unknown-error', + sessionError + ); + expect(responses[1].error!.message).to.contain(`MOCK_STREAM_ERROR caused by ${sessionError}`); + expect(responses[1].error!.cause!.constructor.name).to.equal('AggregateError'); + const cause = responses[1].error!.cause as any; + expect(cause.errors).to.be.an.instanceOf(Array); + expect(cause.errors.length).to.equal(2); + expect(cause.errors[0].message).to.contain('MOCK_STREAM_ERROR'); + expect(cause.errors[1].message).to.contain(sessionError); }); - }) + }); + + it('should be fulfilled with a BatchResponse containing AggregateError when multiple session errors occur' + + ' using HTTP/2', () => { + const sessionError1 = 'MOCK_SESSION_ERROR_1'; + const sessionError2 = 'MOCK_SESSION_ERROR_2'; + + mockedHttp2Responses.push(mockHttp2Error( + new Error('MOCK_STREAM_ERROR_1'), + new Error(sessionError1) + )); + mockedHttp2Responses.push(mockHttp2Error( + new Error('MOCK_STREAM_ERROR_2'), + new Error(sessionError2) + )); + + http2Mocker.http2Stub(mockedHttp2Responses); + + return messaging.sendEach( + [validMessage, validMessage], true + ).then((response: BatchResponse) => { + expect(http2Mocker.requests.length).to.equal(2); + expect(response.failureCount).to.equal(2); + + const failure = response.responses[0]; + expect(failure.success).to.be.false; + expect(failure.error!.code).to.equal('messaging/unknown-error'); + + const cause = failure.error!.cause; + expect(cause).to.not.be.undefined; + expect(cause!.constructor.name).to.equal('AggregateError'); + expect((cause as any).errors).to.be.an.instanceOf(Array); + expect((cause as any).errors.length).to.equal(3); + expect((cause as any).errors[0].message).to.contain('MOCK_STREAM_ERROR'); + expect((cause as any).errors[1].message).to.contain(sessionError1); + expect((cause as any).errors[2].message).to.contain(sessionError2); + }); + }); // This test was added to also verify https://github.com/firebase/firebase-admin-node/issues/1146 it('should be fulfilled when called with different message types using HTTP/2', () => { diff --git a/test/unit/utils/api-request.spec.ts b/test/unit/utils/api-request.spec.ts index 21d7e3ce58..887804545a 100644 --- a/test/unit/utils/api-request.spec.ts +++ b/test/unit/utils/api-request.spec.ts @@ -2520,7 +2520,6 @@ describe('Http2Client', () => { it('should fail on session and stream errors', async () => { const reqData = { request: 'data' }; const streamError = 'Error while making request: test stream error. Error code: AWFUL_STREAM_ERROR'; - const sessionError = 'Session error while making requests: AWFUL_SESSION_ERROR - test session error' mockedHttp2Responses.push(mockHttp2Error( { message: 'test stream error', code: 'AWFUL_STREAM_ERROR' }, { message: 'test session error', code: 'AWFUL_SESSION_ERROR' } @@ -2549,15 +2548,17 @@ describe('Http2Client', () => { expect(http2Mocker.requests[0].headers.authorization).to.equal('Bearer token'); expect(http2Mocker.requests[0].headers['content-type']).to.contain('application/json'); expect(http2Mocker.requests[0].headers['My-Custom-Header']).to.equal('CustomValue'); + + const sessionErrors = http2SessionHandler.getErrors(); + expect(sessionErrors.length).to.equal(1); + const expectedError1 = 'Session error while making requests: AWFUL_SESSION_ERROR - test session error '; + expect(sessionErrors[0].message).to.equal(expectedError1); }); - - await http2SessionHandler.invoke().should.eventually.be.rejectedWith(sessionError) - .and.have.property('code', 'app/network-error') }); it('should unwrap aggregate session errors', async () => { const reqData = { request: 'data' }; - const streamError = { message: 'test stream error', code: 'AWFUL_STREAM_ERROR' } + const streamError = { message: 'test stream error', code: 'AWFUL_STREAM_ERROR' }; const expectedStreamErrorMessage = 'Error while making request: test stream error. Error code: AWFUL_STREAM_ERROR'; const aggregateSessionError = { name: 'AggregateError', @@ -2566,15 +2567,12 @@ describe('Http2Client', () => { { message: 'Error message 1' }, { message: 'Error message 2' }, ] - } - const expectedAggregateErrorMessage = 'Session error while making requests: AWFUL_SESSION_ERROR - ' + - 'AggregateError: [Error message 1, Error message 2]' - + }; mockedHttp2Responses.push(mockHttp2Error(streamError, aggregateSessionError)); http2Mocker.http2Stub(mockedHttp2Responses); const client = new Http2Client(); - http2SessionHandler = new Http2SessionHandler(mockHostUrl) + http2SessionHandler = new Http2SessionHandler(mockHostUrl); await client.send({ method: 'POST', @@ -2595,10 +2593,13 @@ describe('Http2Client', () => { expect(http2Mocker.requests[0].headers.authorization).to.equal('Bearer token'); expect(http2Mocker.requests[0].headers['content-type']).to.contain('application/json'); expect(http2Mocker.requests[0].headers['My-Custom-Header']).to.equal('CustomValue'); - }); - await http2SessionHandler.invoke().should.eventually.be.rejectedWith(expectedAggregateErrorMessage) - .and.have.property('code', 'app/network-error') + const sessionErrors = http2SessionHandler.getErrors(); + expect(sessionErrors.length).to.equal(1); + const expectedError2 = 'Session error while making requests: AWFUL_SESSION_ERROR - AggregateError: ' + + '[Error message 1, Error message 2]'; + expect(sessionErrors[0].message).to.equal(expectedError2); + }); }); });