From a971ffad63ba4a267135119baf54b560ac3738ae Mon Sep 17 00:00:00 2001 From: Jonathan MASSUCHETTI Date: Tue, 25 Jul 2023 14:15:13 +0200 Subject: [PATCH 1/4] feat: abort all request when a node interceptor is disposed --- .../ClientRequest/NodeClientRequest.test.ts | 47 ++++- .../ClientRequest/NodeClientRequest.ts | 1 + src/interceptors/ClientRequest/http.get.ts | 10 + .../ClientRequest/http.request.ts | 9 + src/interceptors/ClientRequest/index.test.ts | 181 ++++++++++++++---- src/interceptors/ClientRequest/index.ts | 8 +- src/interceptors/fetch/index.test.ts | 158 +++++++++++++++ src/interceptors/fetch/index.ts | 17 +- src/utils/AbortControllerManager.test.ts | 128 +++++++++++++ src/utils/AbortControllerManager.ts | 158 +++++++++++++++ tsconfig.json | 7 +- 11 files changed, 682 insertions(+), 42 deletions(-) create mode 100644 src/interceptors/fetch/index.test.ts create mode 100644 src/utils/AbortControllerManager.test.ts create mode 100644 src/utils/AbortControllerManager.ts diff --git a/src/interceptors/ClientRequest/NodeClientRequest.test.ts b/src/interceptors/ClientRequest/NodeClientRequest.test.ts index 174f41b08..82b992b3f 100644 --- a/src/interceptors/ClientRequest/NodeClientRequest.test.ts +++ b/src/interceptors/ClientRequest/NodeClientRequest.test.ts @@ -27,6 +27,7 @@ const httpServer = new HttpServer((app) => { }) const logger = new Logger('test') +const registerSignal = () => {} beforeAll(async () => { await httpServer.listen() @@ -45,6 +46,7 @@ it('gracefully finishes the request when it has a mocked response', async () => { emitter, logger, + registerSignal } ) @@ -94,6 +96,7 @@ it('responds with a mocked response when requesting an existing hostname', async { emitter, logger, + registerSignal } ) @@ -124,6 +127,7 @@ it('performs the request as-is given resolver returned no mocked response', asyn { emitter, logger, + registerSignal } ) @@ -150,7 +154,7 @@ it('emits the ENOTFOUND error connecting to a non-existing hostname given no moc const emitter = new AsyncEventEmitter() const request = new NodeClientRequest( normalizeClientRequestArgs('http:', 'http://non-existing-url.com'), - { emitter, logger } + { emitter, logger, registerSignal } ) request.end() @@ -171,6 +175,7 @@ it('emits the ECONNREFUSED error connecting to an inactive server given no mocke { emitter, logger, + registerSignal } ) @@ -195,7 +200,7 @@ it('does not emit ENOTFOUND error connecting to an inactive server given mocked const handleError = vi.fn() const request = new NodeClientRequest( normalizeClientRequestArgs('http:', 'http://non-existing-url.com'), - { emitter, logger } + { emitter, logger, registerSignal } ) emitter.on('request', async ({ request }) => { @@ -228,6 +233,7 @@ it('does not emit ECONNREFUSED error connecting to an inactive server given mock { emitter, logger, + registerSignal } ) @@ -264,6 +270,7 @@ it('sends the request body to the server given no mocked response', async () => { emitter, logger, + registerSignal } ) @@ -292,6 +299,42 @@ it('does not send request body to the original server given mocked response', as { emitter, logger, + registerSignal + } + ) + + emitter.on('request', async ({ request }) => { + await sleep(200) + request.respondWith(new Response('mock created!', { status: 301 })) + }) + + request.write('one') + request.write('two') + request.end() + + const responseReceived = new DeferredPromise() + request.on('response', (response) => { + responseReceived.resolve(response) + }) + const response = await responseReceived + + expect(response.statusCode).toBe(301) + + const text = await getIncomingMessageBody(response) + expect(text).toBe('mock created!') +}) + + +it('abort the request when the interceptor is disposed', async () => { + const emitter = new AsyncEventEmitter() + const request = new NodeClientRequest( + normalizeClientRequestArgs('http:', httpServer.http.url('/write'), { + method: 'POST', + }), + { + emitter, + logger, + registerSignal } ) diff --git a/src/interceptors/ClientRequest/NodeClientRequest.ts b/src/interceptors/ClientRequest/NodeClientRequest.ts index ec33b7aeb..3e66f429e 100644 --- a/src/interceptors/ClientRequest/NodeClientRequest.ts +++ b/src/interceptors/ClientRequest/NodeClientRequest.ts @@ -24,6 +24,7 @@ export type Protocol = 'http' | 'https' export interface NodeClientOptions { emitter: ClientRequestEmitter logger: Logger + registerSignal: (signal: AbortSignal) => void } export class NodeClientRequest extends ClientRequest { diff --git a/src/interceptors/ClientRequest/http.get.ts b/src/interceptors/ClientRequest/http.get.ts index 61635371e..d2d85f1db 100644 --- a/src/interceptors/ClientRequest/http.get.ts +++ b/src/interceptors/ClientRequest/http.get.ts @@ -15,6 +15,16 @@ export function get(protocol: Protocol, options: NodeClientOptions) { `${protocol}:`, ...args ) + + const [, requestOptions] = clientRequestArgs; + + if (!requestOptions.signal) { + const abortController = new AbortController(); + requestOptions.signal = abortController.signal; + } + + options.registerSignal(requestOptions.signal); + const request = new NodeClientRequest(clientRequestArgs, options) /** diff --git a/src/interceptors/ClientRequest/http.request.ts b/src/interceptors/ClientRequest/http.request.ts index 1e4c3b535..b1d4e7168 100644 --- a/src/interceptors/ClientRequest/http.request.ts +++ b/src/interceptors/ClientRequest/http.request.ts @@ -20,6 +20,15 @@ export function request(protocol: Protocol, options: NodeClientOptions) { `${protocol}:`, ...args ) + + const [, requestOptions] = clientRequestArgs; + + if (!requestOptions.signal) { + const abortController = new AbortController(); + requestOptions.signal = abortController.signal; + } + + options.registerSignal(requestOptions.signal); return new NodeClientRequest(clientRequestArgs, options) } } diff --git a/src/interceptors/ClientRequest/index.test.ts b/src/interceptors/ClientRequest/index.test.ts index 1df8c5599..5feff0e87 100644 --- a/src/interceptors/ClientRequest/index.test.ts +++ b/src/interceptors/ClientRequest/index.test.ts @@ -1,57 +1,164 @@ -import { it, expect, beforeAll, afterAll } from 'vitest' +import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest' import http from 'http' import { HttpServer } from '@open-draft/test-server/http' import { DeferredPromise } from '@open-draft/deferred-promise' import { ClientRequestInterceptor } from '.' -const httpServer = new HttpServer((app) => { - app.get('/', (_req, res) => { - res.status(200).send('/') +describe('ClientRequestInterceptor', () => { + const httpServer = new HttpServer((app) => { + app.get('/', (_req, res) => { + res.status(200).send('/') + }) + app.get('/get', (_req, res) => { + res.status(200).send('/get') + }) }) - app.get('/get', (_req, res) => { - res.status(200).send('/get') + + const interceptor = new ClientRequestInterceptor() + + beforeAll(async () => { + await httpServer.listen() + }) + + afterAll(async () => { + await httpServer.close() + }) + + beforeEach(() => { + interceptor.apply() + }) + + afterEach(() => { + interceptor.dispose() }) -}) -const interceptor = new ClientRequestInterceptor() + it('forbids calling "respondWith" multiple times for the same request', async () => { + const requestUrl = httpServer.http.url('/') -beforeAll(async () => { - interceptor.apply() - await httpServer.listen() -}) + interceptor.on('request', function firstRequestListener({ request }) { + request.respondWith(new Response()) + }) -afterAll(async () => { - interceptor.dispose() - await httpServer.close() -}) + const secondRequestEmitted = new DeferredPromise() + interceptor.on('request', function secondRequestListener({ request }) { + expect(() => + request.respondWith(new Response(null, { status: 301 })) + ).toThrow( + `Failed to respond to "GET ${requestUrl}" request: the "request" event has already been responded to.` + ) -it('forbids calling "respondWith" multiple times for the same request', async () => { - const requestUrl = httpServer.http.url('/') + secondRequestEmitted.resolve() + }) - interceptor.on('request', function firstRequestListener({ request }) { - request.respondWith(new Response()) + const request = http.get(requestUrl) + await secondRequestEmitted + + const responseReceived = new DeferredPromise() + request.on('response', (response) => { + responseReceived.resolve(response) + }) + + const response = await responseReceived + expect(response.statusCode).toBe(200) + expect(response.statusMessage).toBe('') }) - const secondRequestEmitted = new DeferredPromise() - interceptor.on('request', function secondRequestListener({ request }) { - expect(() => - request.respondWith(new Response(null, { status: 301 })) - ).toThrow( - `Failed to respond to "GET ${requestUrl}" request: the "request" event has already been responded to.` - ) + it('add an AbortSignal to the request if missing', async () => { + const requestUrl = httpServer.http.url('/') - secondRequestEmitted.resolve() + const requestEmitted = new DeferredPromise() + interceptor.on('request', function requestListener({ request }) { + expect(request.signal).toBeInstanceOf(AbortSignal) + requestEmitted.resolve() + }) + + http.get(requestUrl) + await requestEmitted }) - const request = http.get(requestUrl) - await secondRequestEmitted + it('keeps the existing AbortSignal if the request had one', async () => { + const requestUrl = httpServer.http.url('/') + const controller = new AbortController() + + /** + * For some reason, controller.signal !== request.signal, some kind of un/wrapping must be happening. + * Because of that, we test that aborting from the user controller aborts the request + */ - const responseReceived = new DeferredPromise() - request.on('response', (response) => { - responseReceived.resolve(response) + const requestEmitted = new DeferredPromise() + interceptor.on('request', function requestListener({ request }) { + expect(request.signal).toBeInstanceOf(AbortSignal) + requestEmitted.resolve() + }) + + const request = http.get(requestUrl, { signal: controller.signal }) + await requestEmitted + + const requestAborted = new DeferredPromise() + request.on('error', (err) => { + expect(err.name).toEqual('AbortError') + requestAborted.resolve() + }) + + controller.abort() + await requestAborted }) - const response = await responseReceived - expect(response.statusCode).toBe(200) - expect(response.statusMessage).toBe('') -}) + it('abort ongoing requests when disposed', async () => { + const requestUrl = httpServer.http.url('/') + + const requestEmitted = new DeferredPromise() + interceptor.on('request', function requestListener() { + requestEmitted.resolve() + }) + + const controller = new AbortController() + const requestWithoutUserController = http.get(requestUrl) + const requestWithUserController = http.get(requestUrl, { signal: controller.signal }) + + const requests = [requestWithoutUserController, requestWithUserController] + + const requestsAborted = requests.map(request => { + const requestAborted = new DeferredPromise() + request.on('error', (err) => { + expect(err.name).toEqual('AbortError') + requestAborted.resolve() + }) + + return requestAborted + }) + + await requestEmitted + interceptor.dispose() + await Promise.all(requestsAborted) + }) + + it('abort upcoming requests when disposed', async () => { + const requestUrl = httpServer.http.url('/') + + interceptor.on('request', function requestListener() { + expect.fail('the request should never be sent, yet intercepted') + }) + + const controller = new AbortController() + const requestWithoutUserController = http.request(requestUrl) + const requestWithUserController = http.request(requestUrl, { signal: controller.signal }) + + const requests = [requestWithoutUserController, requestWithUserController] + + const requestsAborted = requests.map(request => { + const requestAborted = new DeferredPromise() + request.on('error', (err) => { + expect(err.name).toEqual('AbortError') + requestAborted.resolve() + }) + + return requestAborted + }) + + interceptor.dispose() + requests.forEach(request => request.end()) + + await Promise.all(requestsAborted) + }) +}) \ No newline at end of file diff --git a/src/interceptors/ClientRequest/index.ts b/src/interceptors/ClientRequest/index.ts index ac75d4ace..e9ad03832 100644 --- a/src/interceptors/ClientRequest/index.ts +++ b/src/interceptors/ClientRequest/index.ts @@ -2,6 +2,7 @@ import http from 'http' import https from 'https' import { HttpRequestEventMap } from '../../glossary' import { Interceptor } from '../../Interceptor' +import { AbortControllerManager } from '../../utils/AbortControllerManager' import { AsyncEventEmitter } from '../../utils/AsyncEventEmitter' import { get } from './http.get' import { request } from './http.request' @@ -21,7 +22,6 @@ export class ClientRequestInterceptor extends Interceptor { constructor() { super(ClientRequestInterceptor.interceptorSymbol) - this.modules = new Map() this.modules.set('http', http) this.modules.set('https', https) @@ -30,6 +30,11 @@ export class ClientRequestInterceptor extends Interceptor { protected setup(): void { const logger = this.logger.extend('setup') + const controllerManager = new AbortControllerManager() + this.subscriptions.push(() => controllerManager.dispose()) + + controllerManager.decorate() + for (const [protocol, requestModule] of this.modules) { const { request: pureRequest, get: pureGet } = requestModule @@ -43,6 +48,7 @@ export class ClientRequestInterceptor extends Interceptor { const options: NodeClientOptions = { emitter: this.emitter, logger: this.logger, + registerSignal: (signal) => controllerManager.registerSignal(signal), } // @ts-ignore diff --git a/src/interceptors/fetch/index.test.ts b/src/interceptors/fetch/index.test.ts new file mode 100644 index 000000000..e193660aa --- /dev/null +++ b/src/interceptors/fetch/index.test.ts @@ -0,0 +1,158 @@ +import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest' +import { HttpServer } from '@open-draft/test-server/http' +import { DeferredPromise } from '@open-draft/deferred-promise' +import { FetchInterceptor } from './index' + +describe('FetchInterceptor', () => { + const noop = () => {} + const httpServer = new HttpServer((app) => { + app.get('/', (_req, res) => { + res.status(200).send('/') + }) + app.get('/get', (_req, res) => { + res.status(200).send('/get') + }) + }) + + const interceptor = new FetchInterceptor() + + beforeAll(async () => { + await httpServer.listen() + }) + + afterAll(async () => { + await httpServer.close() + }) + + beforeEach(() => { + interceptor.apply() + }) + + afterEach(() => { + interceptor.dispose() + }) + + it('add an AbortSignal to the request if missing', async () => { + const requestUrl = httpServer.http.url('/') + + const requestEmitted = new DeferredPromise() + interceptor.on('request', function requestListener({ request }) { + expect(request.signal).toBeInstanceOf(AbortSignal) + requestEmitted.resolve() + }) + + fetch(requestUrl).catch(noop) + await requestEmitted + }) + + it('keeps the existing AbortSignal if the request had one', async () => { + const requestUrl = httpServer.http.url('/') + const controller = new AbortController() + + /** + * For some reason, controller.signal !== request.signal, some kind of un/wrapping must be happening. + * Because of that, we test that aborting from the user controller aborts the request + */ + + const requestEmitted = new DeferredPromise() + interceptor.on('request', function requestListener({ request }) { + expect(request.signal).toBeInstanceOf(AbortSignal) + requestEmitted.resolve() + }) + + const requestAborted = new DeferredPromise() + fetch(requestUrl, { signal: controller.signal }).catch((err) => { + expect(err.name).toEqual('AbortError') + requestAborted.resolve() + }) + + await requestEmitted + controller.abort() + await requestAborted + }) + + it('abort ongoing requests when disposed', async () => { + const requestUrl = httpServer.http.url('/') + + const requestEmitted = new DeferredPromise() + interceptor.on('request', function requestListener() { + requestEmitted.resolve() + }) + + const controller = new AbortController() + const requestWithoutUserController = fetch(requestUrl) + const requestWithUserController = fetch(requestUrl, { signal: controller.signal }) + + const requests = [requestWithoutUserController, requestWithUserController] + + const requestsAborted = requests.map((request) => { + const requestAborted = new DeferredPromise() + + request.catch((err) => { + expect(err.name).toEqual('AbortError') + requestAborted.resolve() + }) + + return requestAborted + }) + + await requestEmitted + interceptor.dispose() + await Promise.all(requestsAborted) + }) + + it('abort upcoming requests when disposed', async () => { + const requestUrl = httpServer.http.url('/') + + const stream = { + open: true + } + + function createBodyStream() { + return new ReadableStream({ + pull: function(controller) { + if (!stream.open) { + controller.close() + return + } + console.log('pull called!') + controller.enqueue('Some data...') + } + }) + } + + const abortController = new AbortController() + const requestWithoutUserController = fetch(requestUrl, { + method: 'POST', + // @ts-ignore + duplex: 'half', + body: createBodyStream(), + }) + const requestWithUserController = fetch(requestUrl, { + method: 'POST', + // @ts-ignore + duplex: 'half', + body: createBodyStream(), + signal: abortController.signal + }) + + const requests = [requestWithoutUserController, requestWithUserController] + + const requestsAborted = requests.map((request) => { + const requestAborted = new DeferredPromise() + + request.catch((err) => { + expect(err.name).toEqual('AbortError') + requestAborted.resolve() + }) + + return requestAborted + }) + + interceptor.dispose() + stream.open = false + // requests.forEach(request => request.end()) + + await Promise.all(requestsAborted) + }) +}) \ No newline at end of file diff --git a/src/interceptors/fetch/index.ts b/src/interceptors/fetch/index.ts index d8166463d..78162283c 100644 --- a/src/interceptors/fetch/index.ts +++ b/src/interceptors/fetch/index.ts @@ -2,6 +2,7 @@ import { invariant } from 'outvariant' import { until } from '@open-draft/until' import { HttpRequestEventMap, IS_PATCHED_MODULE } from '../../glossary' import { Interceptor } from '../../Interceptor' +import { AbortControllerManager } from '../../utils/AbortControllerManager' import { uuidv4 } from '../../utils/uuid' import { toInteractiveRequest } from '../../utils/toInteractiveRequest' @@ -27,9 +28,23 @@ export class FetchInterceptor extends Interceptor { 'Failed to patch the "fetch" module: already patched.' ) + const controllerManager = new AbortControllerManager() + this.subscriptions.push(() => controllerManager.dispose()) + + controllerManager.decorate() + globalThis.fetch = async (input, init) => { + const augmentedInit = { ...init } + + if (!augmentedInit.signal) { + const abortController = new AbortController(); + augmentedInit.signal = abortController.signal; + } + + controllerManager.registerSignal(augmentedInit.signal); + const requestId = uuidv4() - const request = new Request(input, init) + const request = new Request(input, augmentedInit) this.logger.info('[%s] %s', request.method, request.url) diff --git a/src/utils/AbortControllerManager.test.ts b/src/utils/AbortControllerManager.test.ts new file mode 100644 index 000000000..502fa40b1 --- /dev/null +++ b/src/utils/AbortControllerManager.test.ts @@ -0,0 +1,128 @@ +import { afterEach, describe, expect, test } from 'vitest' +import { AbortControllerManager } from './AbortControllerManager' + +describe('AbortControllerManager', () => { + const manager = new AbortControllerManager() + + afterEach(() => { + if (manager) { + manager.dispose() + } + }) + + test('global AbortController is not decorated if patch is not applied yet', () => { + const pureAbortController = AbortController + + expect(AbortController).toBe(pureAbortController) + expect(manager.isDecorated()).toBeFalsy() + }) + + test('global AbortController is decorated if patch is applied', () => { + const pureAbortController = AbortController + manager.decorate() + + expect(AbortController).not.toBe(pureAbortController) + expect(manager.isDecorated()).toBeTruthy() + }) + + test('new AbortControllers are referenced if patch is applied', () => { + manager.decorate() + + const controller = new AbortController() + + expect(manager.isReferenced(controller)).toBeTruthy() + expect(manager.isRegistered(controller)).toBeFalsy() + }) + + test('registering a AbortSignal add the controller into the request map', () => { + manager.decorate() + + const controller = new AbortController() + manager.registerSignal(controller.signal) + + expect(manager.isRegistered(controller)).toBeTruthy() + }) + + test('global AbortController is restored if restoreAbortController is called', () => { + manager.decorate() + + const controller = new AbortController() + + manager.restore() + + const controller2 = new AbortController() + + expect(manager.isReferenced(controller)).toBeTruthy() + expect(manager.isReferenced(controller2)).toBeFalsy() + }) + + test('restoring AbortController do not clear the maps', () => { + manager.decorate() + + const controller = new AbortController() + + manager.registerSignal(controller.signal) + manager.restore() + + expect(manager.isReferenced(controller)).toBeTruthy() + expect(manager.isRegistered(controller)).toBeTruthy() + }) + + test('calling dispose restore the AbortController and clear the maps', () => { + manager.decorate() + + const controller = new AbortController() + manager.registerSignal(controller.signal) + + expect(manager.isReferenced(controller)).toBeTruthy() + expect(manager.isRegistered(controller)).toBeTruthy() + + manager.dispose() + + expect(manager.isReferenced(controller)).toBeFalsy() + expect(manager.isRegistered(controller)).toBeFalsy() + + const controller2 = new AbortController() + + expect(manager.isReferenced(controller2)).toBeFalsy() + }) + + test('creating a new instance of the manager returns the existing one', () => { + const manager2 = new AbortControllerManager() + expect(manager).toBe(manager2) + }) + + test('calling abortAll abort all registered controllers', () => { + manager.decorate() + + const controller = new AbortController() + const controller2 = new AbortController() + const controller3 = new AbortController() + + manager.registerSignal(controller.signal) + manager.registerSignal(controller2.signal) + + manager.abortAll() + + expect(controller.signal.aborted).toBeTruthy() + expect(controller2.signal.aborted).toBeTruthy() + expect(controller3.signal.aborted).toBeFalsy() + }) + + test('calling dispose abort all registered controllers', () => { + manager.decorate() + + const controller = new AbortController() + const controller2 = new AbortController() + const controller3 = new AbortController() + + manager.registerSignal(controller.signal) + manager.registerSignal(controller2.signal) + + manager.dispose() + + expect(controller.signal.aborted).toBeTruthy() + expect(controller2.signal.aborted).toBeTruthy() + expect(controller3.signal.aborted).toBeFalsy() + }) +}) \ No newline at end of file diff --git a/src/utils/AbortControllerManager.ts b/src/utils/AbortControllerManager.ts new file mode 100644 index 000000000..8c25b2e3f --- /dev/null +++ b/src/utils/AbortControllerManager.ts @@ -0,0 +1,158 @@ +import { Logger } from '@open-draft/logger' + +export const instanceSymbolAbortControllerManager: unique symbol = Symbol('AbortControllerManager'); +export const decoratorSymbol: unique symbol = Symbol('DecoratedAbortController'); + +export class AbortControllerManager { + private logger: Logger + private pureAbortController: typeof AbortController | undefined + private referencedAbortControllers= new WeakMap>(); + private registeredAbortControllers = new Map(); + + constructor() { + this.logger = new Logger('AbortControllerManager'); + + const runningInstance = this.getRunningInstance() + + if (runningInstance) { + this.logger.debug("returning the existing instance"); + return runningInstance + } + + this.pureAbortController = AbortController + + Object.defineProperty(globalThis, instanceSymbolAbortControllerManager, { + enumerable: true, + configurable: true, + value: this, + }) + } + + private getRunningInstance(): AbortControllerManager | undefined { + // @ts-ignore + return globalThis[instanceSymbolAbortControllerManager] + } + + private getReferencedAbortControllers() { + return this.referencedAbortControllers + } + + private getPureAbortController(): typeof AbortController { + return this.pureAbortController ?? globalThis.AbortController + } + + decorate(): boolean { + if (this.isDecorated()) { + this.logger.debug('already decorated') + return false + } + + const pureAbortController = this.getPureAbortController() + const logger = this.logger + const getGlobalAbortControllers = () => this.getReferencedAbortControllers() + + const decorator = class CustomAbortController { + constructor() { + const abortController = new pureAbortController() + getGlobalAbortControllers().set(abortController.signal, new WeakRef(abortController)) + logger.debug('AbortController registered') + return abortController + } + } + + Object.defineProperty(decorator, decoratorSymbol, { + enumerable: true, + configurable: true, + value: true, + }) + + Object.defineProperty(globalThis, 'AbortController', { + enumerable: true, + configurable: true, + value: decorator, + }) + + this.logger.info('native "AbortController" patched!') + return true + } + + restore(): boolean { + if (!this.isDecorated()) { + this.logger.info('nothing to restore.') + return false + } + + Object.defineProperty(globalThis, 'AbortController', { + enumerable: true, + configurable: true, + value: this.getPureAbortController(), + }) + + this.logger.info('native "AbortController" restored!') + return true + } + + registerSignal(signal: AbortSignal): boolean { + const controllerWeakRef = this.referencedAbortControllers.get(signal) + + /** + * If the controller is not found, it means one of two things : + * 1) it has been created before the interceptor setup + * 2) it has been garbage collected + * + * Case (1) means this controller is outside of the scope of MSW, + * therefore it is not the responsibility of the interceptor to handle its behavior during shutdown. + * + * Case (2) means the last ref to AbortController.signal has been dropped before the request was sent. + * This indicates that the test might be flaky and the user may benefit from a warning by + * the test runner about open handles. + * For this reason it is correct to not handle its behavior during shutdown. + */ + if (controllerWeakRef === undefined) { + this.logger.debug('AbortController not found in the global map') + return false + } + + const controller = controllerWeakRef.deref() + + if (controller === undefined) { + this.logger.debug('AbortController has been garbage collected before it could be registered') + return false + } + + this.registeredAbortControllers.set(signal, controller) + return true + } + + isDecorated() { + // @ts-ignore + return globalThis.AbortController[decoratorSymbol] === true + } + + isReferenced(controller: AbortController) { + return this.referencedAbortControllers.has(controller.signal); + } + + isRegistered(controller: AbortController) { + return this.registeredAbortControllers.has(controller.signal); + } + + abortAll(): number { + let i = 0 + this.registeredAbortControllers.forEach(c => { + if (c.signal.aborted) return + i++ + c.abort() + }) + + return i; + } + + dispose() { + this.logger.debug('dispose') + this.restore() + this.abortAll() + this.referencedAbortControllers = new WeakMap() + this.registeredAbortControllers.clear() + } +} diff --git a/tsconfig.json b/tsconfig.json index c2f7edd1a..d3dcae77b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,7 +10,12 @@ "removeComments": false, "esModuleInterop": true, "downlevelIteration": true, - "lib": ["dom", "dom.iterable", "ES2018.AsyncGenerator"] + "lib": [ + "dom", + "dom.iterable", + "ES2018.AsyncGenerator", + "es2021.weakref" + ] }, "include": ["src/**/*.ts"], "exclude": ["node_modules", "**/*.test.*"] From 6b8cc38d007c05bc6879eb028acc1c67e2db79ec Mon Sep 17 00:00:00 2001 From: Jonathan MASSUCHETTI Date: Tue, 25 Jul 2023 15:17:17 +0200 Subject: [PATCH 2/4] feat: free controller once the request ends --- .../ClientRequest/NodeClientRequest.test.ts | 13 ++--------- .../ClientRequest/NodeClientRequest.ts | 22 ++++++++++++++++-- src/interceptors/ClientRequest/http.get.ts | 9 -------- .../ClientRequest/http.request.ts | 8 ------- src/interceptors/ClientRequest/index.test.ts | 23 +++++++++++++++++++ src/interceptors/ClientRequest/index.ts | 1 - src/interceptors/fetch/index.test.ts | 18 +++++++++++++++ src/interceptors/fetch/index.ts | 7 +++++- src/utils/AbortControllerManager.test.ts | 23 ++++++++++++++++--- src/utils/AbortControllerManager.ts | 7 +++++- 10 files changed, 95 insertions(+), 36 deletions(-) diff --git a/src/interceptors/ClientRequest/NodeClientRequest.test.ts b/src/interceptors/ClientRequest/NodeClientRequest.test.ts index 82b992b3f..3f289a948 100644 --- a/src/interceptors/ClientRequest/NodeClientRequest.test.ts +++ b/src/interceptors/ClientRequest/NodeClientRequest.test.ts @@ -27,7 +27,6 @@ const httpServer = new HttpServer((app) => { }) const logger = new Logger('test') -const registerSignal = () => {} beforeAll(async () => { await httpServer.listen() @@ -46,7 +45,6 @@ it('gracefully finishes the request when it has a mocked response', async () => { emitter, logger, - registerSignal } ) @@ -96,7 +94,6 @@ it('responds with a mocked response when requesting an existing hostname', async { emitter, logger, - registerSignal } ) @@ -127,7 +124,6 @@ it('performs the request as-is given resolver returned no mocked response', asyn { emitter, logger, - registerSignal } ) @@ -154,7 +150,7 @@ it('emits the ENOTFOUND error connecting to a non-existing hostname given no moc const emitter = new AsyncEventEmitter() const request = new NodeClientRequest( normalizeClientRequestArgs('http:', 'http://non-existing-url.com'), - { emitter, logger, registerSignal } + { emitter, logger } ) request.end() @@ -175,7 +171,6 @@ it('emits the ECONNREFUSED error connecting to an inactive server given no mocke { emitter, logger, - registerSignal } ) @@ -200,7 +195,7 @@ it('does not emit ENOTFOUND error connecting to an inactive server given mocked const handleError = vi.fn() const request = new NodeClientRequest( normalizeClientRequestArgs('http:', 'http://non-existing-url.com'), - { emitter, logger, registerSignal } + { emitter, logger } ) emitter.on('request', async ({ request }) => { @@ -233,7 +228,6 @@ it('does not emit ECONNREFUSED error connecting to an inactive server given mock { emitter, logger, - registerSignal } ) @@ -270,7 +264,6 @@ it('sends the request body to the server given no mocked response', async () => { emitter, logger, - registerSignal } ) @@ -299,7 +292,6 @@ it('does not send request body to the original server given mocked response', as { emitter, logger, - registerSignal } ) @@ -334,7 +326,6 @@ it('abort the request when the interceptor is disposed', async () => { { emitter, logger, - registerSignal } ) diff --git a/src/interceptors/ClientRequest/NodeClientRequest.ts b/src/interceptors/ClientRequest/NodeClientRequest.ts index 3e66f429e..5c85d8e21 100644 --- a/src/interceptors/ClientRequest/NodeClientRequest.ts +++ b/src/interceptors/ClientRequest/NodeClientRequest.ts @@ -1,7 +1,9 @@ import { ClientRequest, IncomingMessage } from 'http' import type { Logger } from '@open-draft/logger' import { until } from '@open-draft/until' +import { invariant } from 'outvariant' import type { ClientRequestEmitter } from '.' +import { AbortControllerManager } from '../../utils/AbortControllerManager' import { ClientRequestEndCallback, ClientRequestEndChunk, @@ -24,7 +26,6 @@ export type Protocol = 'http' | 'https' export interface NodeClientOptions { emitter: ClientRequestEmitter logger: Logger - registerSignal: (signal: AbortSignal) => void } export class NodeClientRequest extends ClientRequest { @@ -39,6 +40,7 @@ export class NodeClientRequest extends ClientRequest { 'EAI_AGAIN', ] + private forgetSignal: () => void private response: IncomingMessage private emitter: ClientRequestEmitter private logger: Logger @@ -56,7 +58,22 @@ export class NodeClientRequest extends ClientRequest { [url, requestOptions, callback]: NormalizedClientRequestArgs, options: NodeClientOptions ) { - super(requestOptions, callback) + const augmentedRequestOptions = { ...requestOptions } + + if (!augmentedRequestOptions.signal) { + const abortController = new AbortController() + augmentedRequestOptions.signal = abortController.signal + } + + super(augmentedRequestOptions, callback) + + const controllerManager = new AbortControllerManager() + + const { signal } = augmentedRequestOptions + invariant(signal, "Missing AbortSignal") + + controllerManager.registerSignal(signal); + this.forgetSignal = () => queueMicrotask(() => controllerManager.forgetSignal(signal)) this.logger = options.logger.extend( `request ${requestOptions.method} ${url.href}` @@ -236,6 +253,7 @@ export class NodeClientRequest extends ClientRequest { }) this.logger.info('request (mock) is completed') + this.forgetSignal() return this } diff --git a/src/interceptors/ClientRequest/http.get.ts b/src/interceptors/ClientRequest/http.get.ts index d2d85f1db..77b25f433 100644 --- a/src/interceptors/ClientRequest/http.get.ts +++ b/src/interceptors/ClientRequest/http.get.ts @@ -16,15 +16,6 @@ export function get(protocol: Protocol, options: NodeClientOptions) { ...args ) - const [, requestOptions] = clientRequestArgs; - - if (!requestOptions.signal) { - const abortController = new AbortController(); - requestOptions.signal = abortController.signal; - } - - options.registerSignal(requestOptions.signal); - const request = new NodeClientRequest(clientRequestArgs, options) /** diff --git a/src/interceptors/ClientRequest/http.request.ts b/src/interceptors/ClientRequest/http.request.ts index b1d4e7168..e1228f29c 100644 --- a/src/interceptors/ClientRequest/http.request.ts +++ b/src/interceptors/ClientRequest/http.request.ts @@ -21,14 +21,6 @@ export function request(protocol: Protocol, options: NodeClientOptions) { ...args ) - const [, requestOptions] = clientRequestArgs; - - if (!requestOptions.signal) { - const abortController = new AbortController(); - requestOptions.signal = abortController.signal; - } - - options.registerSignal(requestOptions.signal); return new NodeClientRequest(clientRequestArgs, options) } } diff --git a/src/interceptors/ClientRequest/index.test.ts b/src/interceptors/ClientRequest/index.test.ts index 5feff0e87..11aebe3b5 100644 --- a/src/interceptors/ClientRequest/index.test.ts +++ b/src/interceptors/ClientRequest/index.test.ts @@ -3,6 +3,7 @@ import http from 'http' import { HttpServer } from '@open-draft/test-server/http' import { DeferredPromise } from '@open-draft/deferred-promise' import { ClientRequestInterceptor } from '.' +import { AbortControllerManager } from '../../utils/AbortControllerManager' describe('ClientRequestInterceptor', () => { const httpServer = new HttpServer((app) => { @@ -161,4 +162,26 @@ describe('ClientRequestInterceptor', () => { await Promise.all(requestsAborted) }) + + it('signal is forgotten when the request ends', async () => { + const requestUrl = httpServer.http.url('/') + + interceptor.on('request', function requestListener({ request }) { + request.respondWith(new Response()) + }) + + const controller = new AbortController() + const request = http.get(requestUrl, { signal: controller.signal }) + + const responseReceived = new DeferredPromise() + request.on('response', (response) => { + responseReceived.resolve(response) + }) + + await responseReceived + + const manager = new AbortControllerManager() + expect(manager.isRegistered(controller)).toBeFalsy() + expect(manager.isReferenced(controller)).toBeFalsy() + }) }) \ No newline at end of file diff --git a/src/interceptors/ClientRequest/index.ts b/src/interceptors/ClientRequest/index.ts index e9ad03832..a577f7418 100644 --- a/src/interceptors/ClientRequest/index.ts +++ b/src/interceptors/ClientRequest/index.ts @@ -48,7 +48,6 @@ export class ClientRequestInterceptor extends Interceptor { const options: NodeClientOptions = { emitter: this.emitter, logger: this.logger, - registerSignal: (signal) => controllerManager.registerSignal(signal), } // @ts-ignore diff --git a/src/interceptors/fetch/index.test.ts b/src/interceptors/fetch/index.test.ts index e193660aa..7560eb22e 100644 --- a/src/interceptors/fetch/index.test.ts +++ b/src/interceptors/fetch/index.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest' import { HttpServer } from '@open-draft/test-server/http' import { DeferredPromise } from '@open-draft/deferred-promise' +import { AbortControllerManager } from '../../utils/AbortControllerManager' import { FetchInterceptor } from './index' describe('FetchInterceptor', () => { @@ -155,4 +156,21 @@ describe('FetchInterceptor', () => { await Promise.all(requestsAborted) }) + + it('signal is forgotten when the request ends', async () => { + const requestUrl = httpServer.http.url('/') + + interceptor.on('request', function requestListener({ request }) { + request.respondWith(new Response()) + }) + + const abortController = new AbortController() + const response = fetch(requestUrl, { signal: abortController.signal }) + + await response + + const manager = new AbortControllerManager() + expect(manager.isRegistered(abortController)).toBeFalsy() + expect(manager.isReferenced(abortController)).toBeFalsy() + }) }) \ No newline at end of file diff --git a/src/interceptors/fetch/index.ts b/src/interceptors/fetch/index.ts index 78162283c..398cbaecd 100644 --- a/src/interceptors/fetch/index.ts +++ b/src/interceptors/fetch/index.ts @@ -41,7 +41,10 @@ export class FetchInterceptor extends Interceptor { augmentedInit.signal = abortController.signal; } - controllerManager.registerSignal(augmentedInit.signal); + const { signal } = augmentedInit + invariant(signal, "Missing AbortSignal") + + controllerManager.registerSignal(signal); const requestId = uuidv4() const request = new Request(input, augmentedInit) @@ -76,6 +79,8 @@ export class FetchInterceptor extends Interceptor { return mockedResponse }) + queueMicrotask(() => controllerManager.forgetSignal(signal)) + if (resolverResult.error) { const error = Object.assign(new TypeError('Failed to fetch'), { cause: resolverResult.error, diff --git a/src/utils/AbortControllerManager.test.ts b/src/utils/AbortControllerManager.test.ts index 502fa40b1..889a30693 100644 --- a/src/utils/AbortControllerManager.test.ts +++ b/src/utils/AbortControllerManager.test.ts @@ -68,7 +68,7 @@ describe('AbortControllerManager', () => { expect(manager.isRegistered(controller)).toBeTruthy() }) - test('calling dispose restore the AbortController and clear the maps', () => { + test('calling dispose() restore the AbortController and clear the maps', () => { manager.decorate() const controller = new AbortController() @@ -92,7 +92,7 @@ describe('AbortControllerManager', () => { expect(manager).toBe(manager2) }) - test('calling abortAll abort all registered controllers', () => { + test('calling abortAll() abort all registered controllers', () => { manager.decorate() const controller = new AbortController() @@ -109,7 +109,7 @@ describe('AbortControllerManager', () => { expect(controller3.signal.aborted).toBeFalsy() }) - test('calling dispose abort all registered controllers', () => { + test('calling dispose() abort all registered controllers', () => { manager.decorate() const controller = new AbortController() @@ -125,4 +125,21 @@ describe('AbortControllerManager', () => { expect(controller2.signal.aborted).toBeTruthy() expect(controller3.signal.aborted).toBeFalsy() }) + + + test('calling forget() removes the controller from the references map and registration map', () => { + manager.decorate() + + const controller = new AbortController() + + manager.registerSignal(controller.signal) + + expect(manager.isReferenced(controller)).toBeTruthy() + expect(manager.isRegistered(controller)).toBeTruthy() + + manager.forgetSignal(controller.signal) + + expect(manager.isReferenced(controller)).toBeFalsy() + expect(manager.isRegistered(controller)).toBeFalsy() + }) }) \ No newline at end of file diff --git a/src/utils/AbortControllerManager.ts b/src/utils/AbortControllerManager.ts index 8c25b2e3f..68fdad44c 100644 --- a/src/utils/AbortControllerManager.ts +++ b/src/utils/AbortControllerManager.ts @@ -103,7 +103,7 @@ export class AbortControllerManager { * Case (1) means this controller is outside of the scope of MSW, * therefore it is not the responsibility of the interceptor to handle its behavior during shutdown. * - * Case (2) means the last ref to AbortController.signal has been dropped before the request was sent. + * Case (2) means the last ref to AbortController.signal has been dropped before the request was intercepted. * This indicates that the test might be flaky and the user may benefit from a warning by * the test runner about open handles. * For this reason it is correct to not handle its behavior during shutdown. @@ -124,6 +124,11 @@ export class AbortControllerManager { return true } + forgetSignal(signal: AbortSignal) { + this.referencedAbortControllers.delete(signal); + this.registeredAbortControllers.delete(signal); + } + isDecorated() { // @ts-ignore return globalThis.AbortController[decoratorSymbol] === true From 8bb4fc47f66da0a2497960683d579e05518df33d Mon Sep 17 00:00:00 2001 From: Jonathan MASSUCHETTI Date: Tue, 25 Jul 2023 15:27:17 +0200 Subject: [PATCH 3/4] fix: remove usage of micro task --- src/interceptors/ClientRequest/NodeClientRequest.ts | 2 +- src/interceptors/fetch/index.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/interceptors/ClientRequest/NodeClientRequest.ts b/src/interceptors/ClientRequest/NodeClientRequest.ts index 5c85d8e21..8248474d1 100644 --- a/src/interceptors/ClientRequest/NodeClientRequest.ts +++ b/src/interceptors/ClientRequest/NodeClientRequest.ts @@ -73,7 +73,7 @@ export class NodeClientRequest extends ClientRequest { invariant(signal, "Missing AbortSignal") controllerManager.registerSignal(signal); - this.forgetSignal = () => queueMicrotask(() => controllerManager.forgetSignal(signal)) + this.forgetSignal = () => controllerManager.forgetSignal(signal) this.logger = options.logger.extend( `request ${requestOptions.method} ${url.href}` diff --git a/src/interceptors/fetch/index.ts b/src/interceptors/fetch/index.ts index 398cbaecd..e111cb710 100644 --- a/src/interceptors/fetch/index.ts +++ b/src/interceptors/fetch/index.ts @@ -79,7 +79,7 @@ export class FetchInterceptor extends Interceptor { return mockedResponse }) - queueMicrotask(() => controllerManager.forgetSignal(signal)) + controllerManager.forgetSignal(signal) if (resolverResult.error) { const error = Object.assign(new TypeError('Failed to fetch'), { From 19c1f9aa761390292e04889675dfd2e6ab202c16 Mon Sep 17 00:00:00 2001 From: Jonathan MASSUCHETTI Date: Tue, 25 Jul 2023 15:32:14 +0200 Subject: [PATCH 4/4] fix: forget signal once we have the resolver result --- src/interceptors/ClientRequest/NodeClientRequest.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/interceptors/ClientRequest/NodeClientRequest.ts b/src/interceptors/ClientRequest/NodeClientRequest.ts index 8248474d1..f94ee4bf7 100644 --- a/src/interceptors/ClientRequest/NodeClientRequest.ts +++ b/src/interceptors/ClientRequest/NodeClientRequest.ts @@ -200,6 +200,7 @@ export class NodeClientRequest extends ClientRequest { return mockedResponse }).then((resolverResult) => { this.logger.info('the listeners promise awaited!') + this.forgetSignal() /** * @fixme We are in the "end()" method that still executes in parallel @@ -253,8 +254,6 @@ export class NodeClientRequest extends ClientRequest { }) this.logger.info('request (mock) is completed') - this.forgetSignal() - return this }