diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index 98e7d1d099..cd30aeb015 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -85,12 +85,12 @@ describe('ActiveExecutions', () => { test('Should attach and resolve response promise to existing execution', async () => { const newExecution = mockExecutionData(); await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); - const deferredPromise = await mockDeferredPromise(); + const deferredPromise = mockDeferredPromise(); activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise); const fakeResponse = { data: { resultData: { runData: {} } } }; activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse); - await expect(deferredPromise.promise()).resolves.toEqual(fakeResponse); + await expect(deferredPromise.promise).resolves.toEqual(fakeResponse); }); test('Should remove an existing execution', async () => { @@ -163,5 +163,5 @@ function mockFullRunData(): IRun { // eslint-disable-next-line @typescript-eslint/promise-function-async const mockCancelablePromise = () => new PCancelable((resolve) => resolve()); -// eslint-disable-next-line @typescript-eslint/promise-function-async + const mockDeferredPromise = () => createDeferredPromise(); diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index c22d1843f4..2e8b42edc5 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -184,9 +184,9 @@ export class ActiveExecutions { */ async getPostExecutePromise(executionId: string): Promise { // Create the promise which will be resolved when the execution finished - const waitPromise = await createDeferredPromise(); + const waitPromise = createDeferredPromise(); this.getExecution(executionId).postExecutePromises.push(waitPromise); - return await waitPromise.promise(); + return await waitPromise.promise; } /** diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 03b39d4417..3b5ffa3466 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -437,9 +437,8 @@ export async function executeWebhook( let responsePromise: IDeferredPromise | undefined; if (responseMode === 'responseNode') { - responsePromise = await createDeferredPromise(); - responsePromise - .promise() + responsePromise = createDeferredPromise(); + responsePromise.promise .then(async (response: IN8nHttpFullResponse) => { if (didSendResponse) { return; @@ -550,7 +549,7 @@ export async function executeWebhook( // in `responseNode` mode `responseCallback` is called by `responsePromise` if (responseMode === 'responseNode' && responsePromise) { - await Promise.allSettled([responsePromise.promise()]); + await Promise.allSettled([responsePromise.promise]); return undefined; } diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index 0d2b6a735c..d14a4e3fd1 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -30,7 +30,7 @@ describe('WorkflowExecute', () => { }, }); - const waitPromise = await createDeferredPromise(); + const waitPromise = createDeferredPromise(); const nodeExecutionOrder: string[] = []; const additionalData = Helpers.WorkflowExecuteAdditionalData( waitPromise, @@ -41,7 +41,7 @@ describe('WorkflowExecute', () => { const executionData = await workflowExecute.run(workflowInstance); - const result = await waitPromise.promise(); + const result = await waitPromise.promise; // Check if the data from WorkflowExecute is identical to data received // by the webhooks @@ -93,7 +93,7 @@ describe('WorkflowExecute', () => { }, }); - const waitPromise = await createDeferredPromise(); + const waitPromise = createDeferredPromise(); const nodeExecutionOrder: string[] = []; const additionalData = Helpers.WorkflowExecuteAdditionalData( waitPromise, @@ -104,7 +104,7 @@ describe('WorkflowExecute', () => { const executionData = await workflowExecute.run(workflowInstance); - const result = await waitPromise.promise(); + const result = await waitPromise.promise; // Check if the data from WorkflowExecute is identical to data received // by the webhooks @@ -160,7 +160,7 @@ describe('WorkflowExecute', () => { settings: testData.input.workflowData.settings, }); - const waitPromise = await createDeferredPromise(); + const waitPromise = createDeferredPromise(); const nodeExecutionOrder: string[] = []; const additionalData = Helpers.WorkflowExecuteAdditionalData( waitPromise, @@ -171,7 +171,7 @@ describe('WorkflowExecute', () => { const executionData = await workflowExecute.run(workflowInstance); - const result = await waitPromise.promise(); + const result = await waitPromise.promise; // Check if the data from WorkflowExecute is identical to data received // by the webhooks diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 33037c3dd8..249391fd9d 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -208,11 +208,11 @@ export class AmqpTrigger implements INodeType { let responsePromise: IDeferredPromise | undefined = undefined; if (!parallelProcessing) { - responsePromise = await this.helpers.createDeferredPromise(); + responsePromise = this.helpers.createDeferredPromise(); } if (responsePromise) { this.emit([this.helpers.returnJsonArray([data as any])], undefined, responsePromise); - await responsePromise.promise(); + await responsePromise.promise; } else { this.emit([this.helpers.returnJsonArray([data as any])]); } diff --git a/packages/nodes-base/nodes/EmailReadImap/v1/EmailReadImapV1.node.ts b/packages/nodes-base/nodes/EmailReadImap/v1/EmailReadImapV1.node.ts index 9cd9d849af..419191e17e 100644 --- a/packages/nodes-base/nodes/EmailReadImap/v1/EmailReadImapV1.node.ts +++ b/packages/nodes-base/nodes/EmailReadImap/v1/EmailReadImapV1.node.ts @@ -507,7 +507,7 @@ export class EmailReadImapV1 implements INodeType { return newEmails; }; - const returnedPromise = await this.helpers.createDeferredPromise(); + const returnedPromise = this.helpers.createDeferredPromise(); const establishConnection = async (): Promise => { let searchCriteria = ['UNSEEN'] as Array; @@ -560,7 +560,7 @@ export class EmailReadImapV1 implements INodeType { }); // Wait with resolving till the returnedPromise got resolved, else n8n will be unhappy // if it receives an error before the workflow got activated - await returnedPromise.promise().then(() => { + await returnedPromise.promise.then(() => { this.emitError(error as Error); }); } diff --git a/packages/nodes-base/nodes/EmailReadImap/v2/EmailReadImapV2.node.ts b/packages/nodes-base/nodes/EmailReadImap/v2/EmailReadImapV2.node.ts index afe4104d65..796a269e88 100644 --- a/packages/nodes-base/nodes/EmailReadImap/v2/EmailReadImapV2.node.ts +++ b/packages/nodes-base/nodes/EmailReadImap/v2/EmailReadImapV2.node.ts @@ -535,7 +535,7 @@ export class EmailReadImapV2 implements INodeType { return newEmails; }; - const returnedPromise = await this.helpers.createDeferredPromise(); + const returnedPromise = this.helpers.createDeferredPromise(); const establishConnection = async (): Promise => { let searchCriteria = ['UNSEEN'] as Array; @@ -590,7 +590,7 @@ export class EmailReadImapV2 implements INodeType { }); // Wait with resolving till the returnedPromise got resolved, else n8n will be unhappy // if it receives an error before the workflow got activated - await returnedPromise.promise().then(() => { + await returnedPromise.promise.then(() => { this.emitError(error as Error); }); } diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index a088c27b19..631566ffbd 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -11,7 +11,7 @@ import type { ITriggerResponse, IRun, } from 'n8n-workflow'; -import { createDeferredPromise, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; export class KafkaTrigger implements INodeType { description: INodeTypeDescription = { @@ -281,13 +281,13 @@ export class KafkaTrigger implements INodeType { } let responsePromise = undefined; if (!parallelProcessing && (options.nodeVersion as number) > 1) { - responsePromise = await createDeferredPromise(); + responsePromise = this.helpers.createDeferredPromise(); this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise); } else { this.emit([this.helpers.returnJsonArray([data])]); } if (responsePromise) { - await responsePromise.promise(); + await responsePromise.promise; } }, }); diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts index 6a79e682aa..e9cea7412c 100644 --- a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -140,11 +140,11 @@ export class MqttTrigger implements INodeType { if (this.getMode() === 'trigger') { const donePromise = !options.parallelProcessing - ? await this.helpers.createDeferredPromise() + ? this.helpers.createDeferredPromise() : undefined; client.on('message', async (topic, payload) => { this.emit(parsePayload(topic, payload), undefined, donePromise); - await donePromise?.promise(); + await donePromise?.promise; }); await client.subscribeAsync(topicsQoS); } diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index a12bd54f2a..c57659b2c7 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -283,10 +283,9 @@ export class RabbitMQTrigger implements INodeType { let responsePromiseHook: IDeferredPromise | undefined = undefined; if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { - responsePromise = await this.helpers.createDeferredPromise(); + responsePromise = this.helpers.createDeferredPromise(); } else if (acknowledgeMode === 'laterMessageNode') { - responsePromiseHook = - await this.helpers.createDeferredPromise(); + responsePromiseHook = this.helpers.createDeferredPromise(); } if (responsePromiseHook) { this.emit([[item]], responsePromiseHook, undefined); @@ -295,7 +294,7 @@ export class RabbitMQTrigger implements INodeType { } if (responsePromise && acknowledgeMode !== 'laterMessageNode') { // Acknowledge message after the execution finished - await responsePromise.promise().then(async (data: IRun) => { + await responsePromise.promise.then(async (data: IRun) => { if (data.data.resultData.error) { // The execution did fail if (acknowledgeMode === 'executionFinishesSuccessfully') { @@ -308,7 +307,7 @@ export class RabbitMQTrigger implements INodeType { messageTracker.answered(message); }); } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { - await responsePromiseHook.promise().then(() => { + await responsePromiseHook.promise.then(() => { channel.ack(message); messageTracker.answered(message); }); diff --git a/packages/nodes-base/test/nodes/ExecuteWorkflow.ts b/packages/nodes-base/test/nodes/ExecuteWorkflow.ts index 0f36360fcd..ffb79a78ed 100644 --- a/packages/nodes-base/test/nodes/ExecuteWorkflow.ts +++ b/packages/nodes-base/test/nodes/ExecuteWorkflow.ts @@ -22,7 +22,7 @@ export async function executeWorkflow(testData: WorkflowTestData, nodeTypes: INo nodeTypes, settings: testData.input.workflowData.settings, }); - const waitPromise = await createDeferredPromise(); + const waitPromise = createDeferredPromise(); const nodeExecutionOrder: string[] = []; const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder); @@ -50,6 +50,6 @@ export async function executeWorkflow(testData: WorkflowTestData, nodeTypes: INo const workflowExecute = new WorkflowExecute(additionalData, executionMode, runExecutionData); executionData = await workflowExecute.processRunExecutionData(workflowInstance); - const result = await waitPromise.promise(); + const result = await waitPromise.promise; return { executionData, result, nodeExecutionOrder }; } diff --git a/packages/workflow/src/DeferredPromise.ts b/packages/workflow/src/DeferredPromise.ts index 1feededead..a91a5f1189 100644 --- a/packages/workflow/src/DeferredPromise.ts +++ b/packages/workflow/src/DeferredPromise.ts @@ -1,14 +1,17 @@ -// From: https://gist.github.com/compulim/8b49b0a744a3eeb2205e2b9506201e50 +type ResolveFn = (result: T | PromiseLike) => void; +type RejectFn = (error: Error) => void; + export interface IDeferredPromise { - promise: () => Promise; - reject: (error: Error) => void; - resolve: (result: T) => void; + promise: Promise; + resolve: ResolveFn; + reject: RejectFn; } -export async function createDeferredPromise(): Promise> { - return await new Promise>((resolveCreate) => { - const promise = new Promise((resolve, reject) => { - resolveCreate({ promise: async () => await promise, resolve, reject }); - }); +export function createDeferredPromise(): IDeferredPromise { + const deferred: Partial> = {}; + deferred.promise = new Promise((resolve, reject) => { + deferred.resolve = resolve; + deferred.reject = reject; }); + return deferred as IDeferredPromise; } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 082490dba4..1ca35089b7 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -728,7 +728,7 @@ export interface ICredentialTestFunctions { } interface BaseHelperFunctions { - createDeferredPromise: () => Promise>; + createDeferredPromise: () => IDeferredPromise; } interface JsonHelperFunctions { diff --git a/packages/workflow/test/DeferredPromise.test.ts b/packages/workflow/test/DeferredPromise.test.ts new file mode 100644 index 0000000000..5766dada56 --- /dev/null +++ b/packages/workflow/test/DeferredPromise.test.ts @@ -0,0 +1,22 @@ +import { createDeferredPromise } from '@/DeferredPromise'; + +describe('DeferredPromise', () => { + it('should resolve the promise with the correct value', async () => { + let done = false; + const deferred = createDeferredPromise(); + void deferred.promise.finally(() => { + done = true; + }); + expect(done).toBe(false); + deferred.resolve('test'); + await expect(deferred.promise).resolves.toBe('test'); + expect(done).toBe(true); + }); + + it('should reject the promise with the correct error', async () => { + const deferred = createDeferredPromise(); + const error = new Error('test error'); + deferred.reject(error); + await expect(deferred.promise).rejects.toThrow(error); + }); +});