feat(core): Implement lifecycle hooks to support streaming responses (no-changelog) (#16391)

This commit is contained in:
Benjamin Schroth
2025-06-24 15:38:03 +02:00
committed by GitHub
parent c4a50df824
commit 1086914080
19 changed files with 752 additions and 11 deletions

View File

@@ -1,10 +1,13 @@
import { Logger } from '@n8n/backend-common';
import type { ExecutionRepository } from '@n8n/db';
import type { Response } from 'express';
import { captor, mock } from 'jest-mock-extended';
import type {
IDeferredPromise,
IExecuteResponsePromiseData,
IRun,
IWorkflowExecutionDataProcess,
StructuredChunk,
} from 'n8n-workflow';
import { ExecutionCancelledError, randomInt, sleep } from 'n8n-workflow';
import PCancelable from 'p-cancelable';
@@ -179,6 +182,31 @@ describe('ActiveExecutions', () => {
await expect(postExecutePromise).resolves.toEqual(fullRunData);
});
test('Should close response if it exists', async () => {
executionData.httpResponse = mock<Response>();
const executionId = await activeExecutions.add(executionData);
activeExecutions.finalizeExecution(executionId, fullRunData);
expect(executionData.httpResponse.end).toHaveBeenCalled();
});
test('Should handle error when closing response', async () => {
const logger = mockInstance(Logger);
activeExecutions = new ActiveExecutions(logger, executionRepository, concurrencyControl);
executionData.httpResponse = mock<Response>();
jest.mocked(executionData.httpResponse.end).mockImplementation(() => {
throw new Error('Connection closed');
});
const executionId = await activeExecutions.add(executionData);
activeExecutions.finalizeExecution(executionId, fullRunData);
expect(logger.error).toHaveBeenCalledWith('Error closing streaming response', {
executionId,
error: 'Connection closed',
});
});
});
describe('getPostExecutePromise', () => {
@@ -187,6 +215,40 @@ describe('ActiveExecutions', () => {
});
});
describe('sendChunk', () => {
test('should send chunk to response', async () => {
executionData.httpResponse = mock<Response>();
const executionId = await activeExecutions.add(executionData);
const testChunk: StructuredChunk = {
content: 'test chunk',
type: 'item',
metadata: {
nodeName: 'testNode',
nodeId: uuid(),
timestamp: Date.now(),
},
};
activeExecutions.sendChunk(executionId, testChunk);
expect(executionData.httpResponse.write).toHaveBeenCalledWith(
JSON.stringify(testChunk) + '\n',
);
});
test('should skip sending chunk to response if response is not set', async () => {
const executionId = await activeExecutions.add(executionData);
const testChunk: StructuredChunk = {
content: 'test chunk',
type: 'item',
metadata: {
nodeName: 'testNode',
nodeId: uuid(),
timestamp: Date.now(),
},
};
expect(() => activeExecutions.sendChunk(executionId, testChunk)).not.toThrow();
});
});
describe('stopExecution', () => {
let executionId: string;

View File

@@ -1,6 +1,7 @@
import type { User } from '@n8n/db';
import type { ExecutionEntity } from '@n8n/db';
import { Container, Service } from '@n8n/di';
import type { Response } from 'express';
import { mock } from 'jest-mock-extended';
import { DirectedGraph, WorkflowExecute } from 'n8n-core';
import * as core from 'n8n-core';
@@ -22,6 +23,7 @@ import PCancelable from 'p-cancelable';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import * as ExecutionLifecycleHooks from '@/execution-lifecycle/execution-lifecycle-hooks';
import { CredentialsPermissionChecker } from '@/executions/pre-execution-checks';
import { ManualExecutionService } from '@/manual-execution.service';
import { Telemetry } from '@/telemetry';
@@ -300,3 +302,85 @@ describe('enqueueExecution', () => {
expect(setupQueue).toHaveBeenCalledTimes(1);
});
});
describe('streaming functionality', () => {
it('should setup sendChunk handler when streaming is enabled and execution mode is not manual', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
const permissionChecker = Container.get(CredentialsPermissionChecker);
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
const mockResponse = mock<Response>();
const data = mock<IWorkflowExecutionDataProcess>({
workflowData: { nodes: [] },
executionData: undefined,
executionMode: 'webhook',
streamingEnabled: true,
httpResponse: mockResponse,
});
const mockHooks = mock<core.ExecutionLifecycleHooks>();
jest
.spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain')
.mockReturnValue(mockHooks);
const mockAdditionalData = mock<IWorkflowExecuteAdditionalData>();
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData);
const manualExecutionService = Container.get(ManualExecutionService);
jest.spyOn(manualExecutionService, 'runManually').mockReturnValue(
new PCancelable(() => {
return mock<IRun>();
}),
);
// ACT
await runner.run(data);
// ASSERT
expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function));
});
it('should not setup sendChunk handler when streaming is enabled but execution mode is manual', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
const permissionChecker = Container.get(CredentialsPermissionChecker);
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
const mockResponse = mock<Response>();
const data = mock<IWorkflowExecutionDataProcess>({
workflowData: { nodes: [] },
executionData: undefined,
executionMode: 'manual',
streamingEnabled: true,
httpResponse: mockResponse,
});
const mockHooks = mock<core.ExecutionLifecycleHooks>();
jest
.spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain')
.mockReturnValue(mockHooks);
const mockAdditionalData = mock<IWorkflowExecuteAdditionalData>();
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData);
const manualExecutionService = Container.get(ManualExecutionService);
jest.spyOn(manualExecutionService, 'runManually').mockReturnValue(
new PCancelable(() => {
return mock<IRun>();
}),
);
// ACT
await runner.run(data);
// ASSERT
expect(mockHooks.addHandler).not.toHaveBeenCalledWith('sendChunk', expect.any(Function));
});
});

View File

@@ -8,6 +8,7 @@ import type {
IRun,
ExecutionStatus,
IWorkflowExecutionDataProcess,
StructuredChunk,
} from 'n8n-workflow';
import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow';
import { strict as assert } from 'node:assert';
@@ -97,6 +98,7 @@ export class ActiveExecutions {
postExecutePromise,
status: executionStatus,
responsePromise: resumingExecution?.responsePromise,
httpResponse: executionData.httpResponse ?? undefined,
};
this.activeExecutions[executionId] = execution;
@@ -142,6 +144,15 @@ export class ActiveExecutions {
execution?.responsePromise?.resolve(response);
}
/** Used for sending a chunk to a streaming response */
sendChunk(executionId: string, chunkText: StructuredChunk): void {
const execution = this.activeExecutions[executionId];
if (execution?.httpResponse) {
execution?.httpResponse.write(JSON.stringify(chunkText) + '\n');
execution?.httpResponse.flush();
}
}
/** Cancel the execution promise and reject its post-execution promise. */
stopExecution(executionId: string): void {
const execution = this.activeExecutions[executionId];
@@ -166,6 +177,20 @@ export class ActiveExecutions {
finalizeExecution(executionId: string, fullRunData?: IRun) {
if (!this.has(executionId)) return;
const execution = this.getExecutionOrFail(executionId);
// Close response if it exists (for streaming responses)
if (execution.executionData.httpResponse) {
try {
this.logger.debug('Closing response for execution', { executionId });
execution.executionData.httpResponse.end();
} catch (error) {
this.logger.error('Error closing streaming response', {
executionId,
error: (error as Error).message,
});
}
}
execution.postExecutePromise.resolve(fullRunData);
this.logger.debug('Execution finalized', { executionId });
}

View File

@@ -294,6 +294,7 @@ describe('Execution Lifecycle Hooks', () => {
expect(handlers.workflowExecuteAfter).toHaveLength(5);
expect(handlers.nodeFetchedData).toHaveLength(1);
expect(handlers.sendResponse).toHaveLength(0);
expect(handlers.sendChunk).toHaveLength(0);
});
describe('nodeExecuteBefore', () => {
@@ -610,6 +611,7 @@ describe('Execution Lifecycle Hooks', () => {
expect(handlers.workflowExecuteAfter).toHaveLength(4);
expect(handlers.nodeFetchedData).toHaveLength(0);
expect(handlers.sendResponse).toHaveLength(0);
expect(handlers.sendChunk).toHaveLength(0);
});
describe('workflowExecuteBefore', () => {
@@ -697,6 +699,7 @@ describe('Execution Lifecycle Hooks', () => {
expect(handlers.workflowExecuteAfter).toHaveLength(4);
expect(handlers.nodeFetchedData).toHaveLength(1);
expect(handlers.sendResponse).toHaveLength(0);
expect(handlers.sendChunk).toHaveLength(0);
});
describe('saving static data', () => {
@@ -794,6 +797,7 @@ describe('Execution Lifecycle Hooks', () => {
expect(handlers.workflowExecuteAfter).toHaveLength(4);
expect(handlers.nodeFetchedData).toHaveLength(1);
expect(handlers.sendResponse).toHaveLength(0);
expect(handlers.sendChunk).toHaveLength(0);
});
});
});

View File

@@ -1,6 +1,6 @@
import type { ICredentialsBase, IExecutionBase, IExecutionDb, ITagBase } from '@n8n/db';
import type { AssignableGlobalRole } from '@n8n/permissions';
import type { Application } from 'express';
import type { Application, Response } from 'express';
import type {
ExecutionError,
ICredentialDataDecryptedObject,
@@ -114,6 +114,8 @@ export interface IExecutingWorkflowData {
startedAt: Date;
/** This promise rejects when the execution is stopped. When the execution finishes (successfully or not), the promise resolves. */
postExecutePromise: IDeferredPromise<IRun | undefined>;
/** HTTPResponse needed for streaming responses */
httpResponse?: Response;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
workflowExecution?: PCancelable<IRun>;
status: ExecutionStatus;

View File

@@ -6,12 +6,13 @@ import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import { ApplicationError, ExecutionCancelledError } from 'n8n-workflow';
import type { ActiveExecutions } from '@/active-executions';
import { mockInstance } from '@test/mocking';
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
import type { JobProcessor } from '../job-processor';
import { ScalingService } from '../scaling.service';
import type { Job, JobData, JobQueue } from '../scaling.types';
import type { Job, JobData, JobId, JobQueue } from '../scaling.types';
const queue = mock<JobQueue>({
client: { ping: jest.fn() },
@@ -315,4 +316,42 @@ describe('ScalingService', () => {
expect(result).toBe(false);
});
});
describe('message handling', () => {
it('should handle send-chunk messages', async () => {
const activeExecutions = mock<ActiveExecutions>();
scalingService = new ScalingService(
mockLogger(),
mock(),
activeExecutions,
jobProcessor,
globalConfig,
mock(),
instanceSettings,
mock(),
);
await scalingService.setupQueue();
// Simulate receiving a send-chunk message
const messageHandler = queue.on.mock.calls.find(
([event]) => (event as string) === 'global:progress',
)?.[1] as (jobId: JobId, msg: unknown) => void;
expect(messageHandler).toBeDefined();
const sendChunkMessage = {
kind: 'send-chunk',
executionId: 'exec-123',
chunkText: { type: 'item', content: 'test' },
workerId: 'worker-456',
};
messageHandler('job-789', sendChunkMessage);
expect(activeExecutions.sendChunk).toHaveBeenCalledWith('exec-123', {
type: 'item',
content: 'test',
});
});
});
});

View File

@@ -8,6 +8,7 @@ import type {
IExecuteResponsePromiseData,
IRun,
IWorkflowExecutionDataProcess,
StructuredChunk,
} from 'n8n-workflow';
import { BINARY_ENCODING, Workflow, UnexpectedError } from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
@@ -25,6 +26,7 @@ import type {
JobResult,
RespondToWebhookMessage,
RunningJob,
SendChunkMessage,
} from './scaling.types';
/**
@@ -149,6 +151,17 @@ export class JobProcessor {
await job.progress(msg);
});
lifecycleHooks.addHandler('sendChunk', async (chunk: StructuredChunk): Promise<void> => {
const msg: SendChunkMessage = {
kind: 'send-chunk',
executionId,
chunkText: chunk,
workerId: this.instanceSettings.hostId,
};
await job.progress(msg);
});
additionalData.executionId = executionId;
additionalData.setExecutionStatus = (status: ExecutionStatus) => {

View File

@@ -317,6 +317,9 @@ export class ScalingService {
// than natively provided by Bull in `global:completed` and `global:failed` events
switch (msg.kind) {
case 'send-chunk':
this.activeExecutions.sendChunk(msg.executionId, msg.chunkText);
break;
case 'respond-to-webhook':
const decodedResponse = this.decodeWebhookResponse(msg.response);
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);

View File

@@ -1,6 +1,11 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type Bull from 'bull';
import type { ExecutionError, IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type {
ExecutionError,
IExecuteResponsePromiseData,
IRun,
StructuredChunk,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
export type JobQueue = Bull.Queue<JobData>;
@@ -35,7 +40,8 @@ export type JobMessage =
| RespondToWebhookMessage
| JobFinishedMessage
| JobFailedMessage
| AbortJobMessage;
| AbortJobMessage
| SendChunkMessage;
/** Message sent by worker to main to respond to a webhook. */
export type RespondToWebhookMessage = {
@@ -52,6 +58,13 @@ export type JobFinishedMessage = {
workerId: string;
};
export type SendChunkMessage = {
kind: 'send-chunk';
executionId: string;
chunkText: StructuredChunk;
workerId: string;
};
/** Message sent by worker to main to report a job has failed. */
export type JobFailedMessage = {
kind: 'job-failed';

View File

@@ -404,7 +404,7 @@ export async function executeWebhook(
'firstEntryJson',
) as WebhookResponseData | string | undefined;
if (!['onReceived', 'lastNode', 'responseNode', 'formPage'].includes(responseMode)) {
if (!['onReceived', 'lastNode', 'responseNode', 'formPage', 'streaming'].includes(responseMode)) {
// If the mode is not known we error. Is probably best like that instead of using
// the default that people know as early as possible (probably already testing phase)
// that something does not resolve properly.
@@ -563,9 +563,12 @@ export async function executeWebhook(
| undefined;
};
if (responseHeaders !== undefined && responseHeaders.entries !== undefined) {
for (const item of responseHeaders.entries) {
res.setHeader(item.name, item.value);
if (!res.headersSent) {
// Only set given headers if they haven't been sent yet, e.g. for streaming
if (responseHeaders !== undefined && responseHeaders.entries !== undefined) {
for (const item of responseHeaders.entries) {
res.setHeader(item.name, item.value);
}
}
}
}
@@ -662,6 +665,17 @@ export async function executeWebhook(
responsePromise,
);
if (responseMode === 'streaming') {
Container.get(Logger).debug(
`Execution of workflow "${workflow.name}" from with ID ${executionId} is set to streaming`,
{ executionId },
);
// TODO: Add check for streaming nodes here
runData.httpResponse = res;
runData.streamingEnabled = true;
didSendResponse = true;
}
if (responseMode === 'formPage' && !didSendResponse) {
res.send({ formWaitingUrl: `${additionalData.formWaitingBaseUrl}/${executionId}` });
process.nextTick(() => res.end());

View File

@@ -234,6 +234,9 @@ async function startExecution(
// This one already contains changes to talk to parent process
// and get executionID from `activeExecutions` running on main process
additionalDataIntegrated.executeWorkflow = additionalData.executeWorkflow;
if (additionalData.httpResponse) {
additionalDataIntegrated.httpResponse = additionalData.httpResponse;
}
let subworkflowTimeout = additionalData.executionTimeoutTimestamp;
const workflowSettings = workflowData.settings;

View File

@@ -266,6 +266,15 @@ export class WorkflowRunner {
this.activeExecutions.resolveResponsePromise(executionId, response);
});
if (data.streamingEnabled) {
if (data.executionMode !== 'manual') {
lifecycleHooks.addHandler('sendChunk', (chunk) => {
data.httpResponse?.write(JSON.stringify(chunk) + '\n');
data.httpResponse?.flush?.();
});
}
}
additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({
executionId,
});

View File

@@ -39,6 +39,7 @@ describe('ExecutionLifecycleHooks', () => {
sendResponse: [],
workflowExecuteAfter: [],
workflowExecuteBefore: [],
sendChunk: [],
});
});
});

View File

@@ -38,8 +38,10 @@ import type {
import {
ApplicationError,
createDeferredPromise,
NodeApiError,
NodeConnectionTypes,
NodeHelpers,
NodeOperationError,
Workflow,
} from 'n8n-workflow';
@@ -2110,4 +2112,312 @@ describe('WorkflowExecute', () => {
});
});
});
describe('error chunk handling', () => {
const nodeTypes = mock<INodeTypes>();
let workflowExecute: WorkflowExecute;
let additionalData: IWorkflowExecuteAdditionalData;
let runExecutionData: IRunExecutionData;
let mockHooks: ExecutionLifecycleHooks;
beforeEach(() => {
runExecutionData = {
startData: {},
resultData: { runData: {} },
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: null,
},
};
mockHooks = mock<ExecutionLifecycleHooks>();
additionalData = mock<IWorkflowExecuteAdditionalData>();
additionalData.hooks = mockHooks;
additionalData.currentNodeExecutionIndex = 0;
workflowExecute = new WorkflowExecute(additionalData, 'manual', runExecutionData);
jest.spyOn(mockHooks, 'runHook').mockResolvedValue(undefined);
});
test('should send error chunk when workflow execution fails', async () => {
// ARRANGE
const errorNode: INode = {
id: '1',
name: 'ErrorNode',
type: 'test.error',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const nodeOperationError = new NodeOperationError(errorNode, 'Node execution failed');
nodeOperationError.description = 'A detailed error description';
const errorNodeType = mock<INodeType>({
description: {
name: 'test.error',
displayName: 'Test Error Node',
defaultVersion: 1,
properties: [],
inputs: [{ type: NodeConnectionTypes.Main }],
outputs: [{ type: NodeConnectionTypes.Main }],
},
async execute() {
throw nodeOperationError;
},
});
nodeTypes.getByNameAndVersion.mockReturnValue(errorNodeType);
const workflow = new Workflow({
id: 'test',
nodes: [errorNode],
connections: {},
active: false,
nodeTypes,
});
const waitPromise = createDeferredPromise<IRun>();
const testAdditionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
testAdditionalData.hooks = mockHooks;
// ACT
try {
await workflowExecute.run(workflow, errorNode);
} catch {
// Expected to throw
}
// ASSERT
expect(mockHooks.runHook).toHaveBeenCalledWith('sendChunk', [
{
type: 'error',
content: 'A detailed error description',
},
]);
});
test('should send error chunk when workflow execution fails with NodeApiError', async () => {
// ARRANGE
const errorNode: INode = {
id: 'error-node-id',
name: 'ErrorNode',
type: 'test.error',
typeVersion: 1,
position: [100, 200],
parameters: {},
};
const nodeApiError = new NodeApiError(errorNode, { message: 'API request failed' });
nodeApiError.description = 'The API returned an error';
const errorNodeType = mock<INodeType>({
description: {
name: 'test.error',
displayName: 'Test Error Node',
defaultVersion: 1,
properties: [],
inputs: [{ type: NodeConnectionTypes.Main }],
outputs: [{ type: NodeConnectionTypes.Main }],
},
async execute() {
throw nodeApiError;
},
});
nodeTypes.getByNameAndVersion.mockReturnValue(errorNodeType);
const workflow = new Workflow({
id: 'test',
nodes: [errorNode],
connections: {},
active: false,
nodeTypes,
});
const waitPromise = createDeferredPromise<IRun>();
const testAdditionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
testAdditionalData.hooks = mockHooks;
// ACT
try {
await workflowExecute.run(workflow, errorNode);
} catch {
// Expected to throw
}
// ASSERT
expect(mockHooks.runHook).toHaveBeenCalledWith('sendChunk', [
{
type: 'error',
content: 'The API returned an error',
},
]);
});
test('should not send error chunk when workflow execution succeeds', async () => {
// ARRANGE
const successNode: INode = {
id: '1',
name: 'SuccessNode',
type: 'test.success',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const successNodeType = mock<INodeType>({
description: {
name: 'test.success',
displayName: 'Test Success Node',
defaultVersion: 1,
properties: [],
inputs: [{ type: NodeConnectionTypes.Main }],
outputs: [{ type: NodeConnectionTypes.Main }],
},
async execute() {
return [[{ json: { success: true } }]];
},
});
nodeTypes.getByNameAndVersion.mockReturnValue(successNodeType);
const workflow = new Workflow({
id: 'test',
nodes: [successNode],
connections: {},
active: false,
nodeTypes,
});
const waitPromise = createDeferredPromise<IRun>();
const testAdditionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
testAdditionalData.hooks = mockHooks;
// ACT
await workflowExecute.run(workflow, successNode);
// ASSERT
expect(mockHooks.runHook).not.toHaveBeenCalledWith('sendChunk', expect.anything());
});
test('should send error chunk when workflow execution fails with NodeOperationError', async () => {
// ARRANGE
const errorNode: INode = {
id: '1',
name: 'ErrorNode',
type: 'test.error',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const nodeOperationError = new NodeOperationError(errorNode, 'Operation failed');
nodeOperationError.description = 'Custom error description';
const errorNodeType = mock<INodeType>({
description: {
name: 'test.error',
displayName: 'Test Error Node',
defaultVersion: 1,
properties: [],
inputs: [{ type: NodeConnectionTypes.Main }],
outputs: [{ type: NodeConnectionTypes.Main }],
},
async execute() {
throw nodeOperationError;
},
});
nodeTypes.getByNameAndVersion.mockReturnValue(errorNodeType);
const workflow = new Workflow({
id: 'test',
nodes: [errorNode],
connections: {},
active: false,
nodeTypes,
});
const waitPromise = createDeferredPromise<IRun>();
const testAdditionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
testAdditionalData.hooks = mockHooks;
// ACT
try {
await workflowExecute.run(workflow, errorNode);
} catch {
// Expected to throw
}
// ASSERT
expect(mockHooks.runHook).toHaveBeenCalledWith('sendChunk', [
{
type: 'error',
content: 'Custom error description',
},
]);
});
test('should send error chunk with undefined content when error has no description', async () => {
// ARRANGE
const errorNode: INode = {
id: '1',
name: 'ErrorNode',
type: 'test.error',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const simpleError = new Error('Simple error message');
const errorNodeType = mock<INodeType>({
description: {
name: 'test.error',
displayName: 'Test Error Node',
defaultVersion: 1,
properties: [],
inputs: [{ type: NodeConnectionTypes.Main }],
outputs: [{ type: NodeConnectionTypes.Main }],
},
async execute() {
throw simpleError;
},
});
nodeTypes.getByNameAndVersion.mockReturnValue(errorNodeType);
const workflow = new Workflow({
id: 'test',
nodes: [errorNode],
connections: {},
active: false,
nodeTypes,
});
const waitPromise = createDeferredPromise<IRun>();
const testAdditionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
testAdditionalData.hooks = mockHooks;
// ACT
try {
await workflowExecute.run(workflow, errorNode);
} catch {
// Expected to throw
}
// ASSERT
expect(mockHooks.runHook).toHaveBeenCalledWith('sendChunk', [
{
type: 'error',
content: undefined, // When no description is available, content should be undefined
},
]);
});
});
});

View File

@@ -7,6 +7,7 @@ import type {
ITaskData,
ITaskStartedData,
IWorkflowBase,
StructuredChunk,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
@@ -46,6 +47,9 @@ export type ExecutionLifecycleHookHandlers = {
(this: ExecutionLifecycleHooks, response: IExecuteResponsePromiseData) => Promise<void> | void
>;
/** Used by nodes to send chunks to streaming responses */
sendChunk: Array<(this: ExecutionLifecycleHooks, chunk: StructuredChunk) => Promise<void> | void>;
/**
* Executed after a node fetches data
* - For a webhook node, after the node had been run.
@@ -84,6 +88,7 @@ export class ExecutionLifecycleHooks {
sendResponse: [],
workflowExecuteAfter: [],
workflowExecuteBefore: [],
sendChunk: [],
};
constructor(

View File

@@ -16,6 +16,8 @@ import type {
} from 'n8n-workflow';
import { ApplicationError, ExpressionError, NodeConnectionTypes } from 'n8n-workflow';
import type { ExecutionLifecycleHooks } from '@/execution-engine/execution-lifecycle-hooks';
import { describeCommonTests } from './shared-tests';
import { ExecuteContext } from '../execute-context';
import * as validateUtil from '../utils/validate-value-against-schema';
@@ -41,14 +43,20 @@ describe('ExecuteContext', () => {
const nodeTypes = mock<INodeTypes>();
const expression = mock<Expression>();
const workflow = mock<Workflow>({ expression, nodeTypes });
const node = mock<INode>({
const node: INode = {
id: 'test-node-id',
name: 'Test Node',
type: 'testNodeType',
typeVersion: 1,
position: [0, 0],
credentials: {
[testCredentialType]: {
id: 'testCredentialId',
name: 'testCredential',
},
},
});
parameters: {},
};
node.parameters = {
testParameter: 'testValue',
nullParameter: null,
@@ -259,4 +267,106 @@ describe('ExecuteContext', () => {
sendMessageSpy.mockRestore();
});
});
describe('sendChunk', () => {
test('should send call hook with structured chunk', async () => {
const hooksMock: ExecutionLifecycleHooks = mock<ExecutionLifecycleHooks>({
runHook: jest.fn(),
});
const additionalDataWithHooks: IWorkflowExecuteAdditionalData = {
...additionalData,
hooks: hooksMock,
};
const testExecuteContext = new ExecuteContext(
workflow,
node,
additionalDataWithHooks,
'manual',
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
[closeFn],
abortSignal,
);
await testExecuteContext.sendChunk('item', 'test');
expect(hooksMock.runHook).toHaveBeenCalledWith('sendChunk', [
expect.objectContaining({
type: 'item',
content: '"test"',
metadata: expect.objectContaining({
nodeName: 'Test Node',
nodeId: 'test-node-id',
timestamp: expect.any(Number),
}),
}),
]);
});
test('should send chunk without content when content is undefined', async () => {
const hooksMock: ExecutionLifecycleHooks = mock<ExecutionLifecycleHooks>({
runHook: jest.fn(),
});
const additionalDataWithHooks: IWorkflowExecuteAdditionalData = {
...additionalData,
hooks: hooksMock,
};
const testExecuteContext = new ExecuteContext(
workflow,
node,
additionalDataWithHooks,
'manual',
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
[closeFn],
abortSignal,
);
await testExecuteContext.sendChunk('begin');
expect(hooksMock.runHook).toHaveBeenCalledWith('sendChunk', [
expect.objectContaining({
type: 'begin',
content: undefined,
metadata: expect.objectContaining({
nodeName: 'Test Node',
nodeId: 'test-node-id',
timestamp: expect.any(Number),
}),
}),
]);
});
test('should handle when hooks is undefined', async () => {
const additionalDataWithoutHooks = {
...additionalData,
hooks: undefined,
};
const testExecuteContext = new ExecuteContext(
workflow,
node,
additionalDataWithoutHooks,
'manual',
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
[closeFn],
abortSignal,
);
// Should not throw error
await expect(testExecuteContext.sendChunk('item', 'test')).resolves.toBeUndefined();
});
});
});

View File

@@ -1,7 +1,9 @@
import type {
AINodeConnectionType,
CallbackManager,
ChunkType,
CloseFunction,
IDataObject,
IExecuteData,
IExecuteFunctions,
IExecuteResponsePromiseData,
@@ -13,6 +15,7 @@ import type {
IWorkflowExecuteAdditionalData,
NodeExecutionHint,
Result,
StructuredChunk,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
@@ -128,6 +131,23 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
)) as IExecuteFunctions['getNodeParameter'];
}
async sendChunk(type: ChunkType, content?: IDataObject | string): Promise<void> {
const node = this.getNode();
const metadata = {
nodeId: node.id,
nodeName: node.name,
timestamp: Date.now(),
};
const message: StructuredChunk = {
type,
content: content ? JSON.stringify(content) : undefined,
metadata,
};
await this.additionalData.hooks?.runHook('sendChunk', [message]);
}
async startJob<T = unknown, E = unknown>(
jobType: string,
settings: unknown,

View File

@@ -1679,6 +1679,11 @@ export class WorkflowExecute {
taskData.error = executionError;
taskData.executionStatus = 'error';
// Send error to the response if necessary
await hooks?.runHook('sendChunk', [
{ type: 'error', content: executionError.description },
]);
if (
executionData.node.continueOnFail === true ||
['continueRegularOutput', 'continueErrorOutput'].includes(

View File

@@ -919,6 +919,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
putExecutionToWait(waitTill: Date): Promise<void>;
sendMessageToUI(message: any): void;
sendResponse(response: IExecuteResponsePromiseData): void;
sendChunk(type: ChunkType, content?: IDataObject | string): void;
// TODO: Make this one then only available in the new config one
addInputData(
@@ -2094,7 +2095,12 @@ export interface IWebhookResponseData {
}
export type WebhookResponseData = 'allEntries' | 'firstEntryJson' | 'firstEntryBinary' | 'noData';
export type WebhookResponseMode = 'onReceived' | 'lastNode' | 'responseNode' | 'formPage';
export type WebhookResponseMode =
| 'onReceived'
| 'lastNode'
| 'responseNode'
| 'formPage'
| 'streaming';
export interface INodeTypes {
getByName(nodeType: string): INodeType | IVersionedNodeType;
@@ -2325,6 +2331,8 @@ export interface IWorkflowExecutionDataProcess {
data?: ITaskData;
};
agentRequest?: AiAgentRequest;
httpResponse?: express.Response; // Used for streaming responses
streamingEnabled?: boolean;
}
export interface ExecuteWorkflowOptions {
@@ -2916,3 +2924,14 @@ export type IPersonalizationSurveyAnswersV4 = {
reportedSource?: string | null;
reportedSourceOther?: string | null;
};
export type ChunkType = 'begin' | 'item' | 'end' | 'error';
export interface StructuredChunk {
type: ChunkType;
content?: string;
metadata: {
nodeId: string;
nodeName: string;
timestamp: number;
};
}