refactor(core): Extract hooks out of workflow-execute-additional-data (no-changelog) (#12749)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-01-21 14:47:02 +01:00
committed by GitHub
parent 56c93caae0
commit ee08e9e1fe
19 changed files with 856 additions and 823 deletions

View File

@@ -0,0 +1,617 @@
import { stringify } from 'flatted';
import { mock } from 'jest-mock-extended';
import { BinaryDataService, ErrorReporter, InstanceSettings, Logger } from 'n8n-core';
import { ExpressionError, WorkflowHooks } from 'n8n-workflow';
import type {
IRunExecutionData,
ITaskData,
Workflow,
IDataObject,
IRun,
INode,
IWorkflowBase,
} from 'n8n-workflow';
import config from '@/config';
import type { Project } from '@/databases/entities/project';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { EventService } from '@/events/event.service';
import { ExternalHooks } from '@/external-hooks';
import { Push } from '@/push';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { mockInstance } from '@test/mocking';
import {
getWorkflowHooksMain,
getWorkflowHooksWorkerExecuter,
getWorkflowHooksWorkerMain,
} from '../execution-lifecycle-hooks';
describe('Execution Lifecycle Hooks', () => {
mockInstance(Logger);
mockInstance(InstanceSettings);
const errorReporter = mockInstance(ErrorReporter);
const eventService = mockInstance(EventService);
const executionRepository = mockInstance(ExecutionRepository);
const externalHooks = mockInstance(ExternalHooks);
const push = mockInstance(Push);
const workflowStaticDataService = mockInstance(WorkflowStaticDataService);
const workflowStatisticsService = mockInstance(WorkflowStatisticsService);
const binaryDataService = mockInstance(BinaryDataService);
const ownershipService = mockInstance(OwnershipService);
const workflowExecutionService = mockInstance(WorkflowExecutionService);
const nodeName = 'Test Node';
const node = mock<INode>();
const workflowId = 'test-workflow-id';
const executionId = 'test-execution-id';
const workflowData: IWorkflowBase = {
id: workflowId,
name: 'Test Workflow',
active: true,
connections: {},
nodes: [],
settings: {},
createdAt: new Date(),
updatedAt: new Date(),
};
const workflow = mock<Workflow>();
const staticData = mock<IDataObject>();
const taskData = mock<ITaskData>();
const runExecutionData = mock<IRunExecutionData>();
const successfulRun = mock<IRun>({
status: 'success',
finished: true,
waitTill: undefined,
});
const failedRun = mock<IRun>({
status: 'error',
finished: true,
waitTill: undefined,
});
const waitingRun = mock<IRun>({
finished: true,
status: 'waiting',
waitTill: new Date(),
});
const expressionError = new ExpressionError('Error');
const executionMode = 'manual';
const pushRef = 'test-push-ref';
const retryOf = 'test-retry-of';
const now = new Date('2025-01-13T18:25:50.267Z');
jest.useFakeTimers({ now });
beforeEach(() => {
jest.clearAllMocks();
workflowData.settings = {};
successfulRun.data = {
resultData: {
runData: {},
},
};
failedRun.data = {
resultData: {
runData: {},
error: expressionError,
},
};
});
describe('getWorkflowHooksMain', () => {
let hooks: WorkflowHooks;
beforeEach(() => {
hooks = getWorkflowHooksMain(
{
executionMode,
workflowData,
pushRef,
retryOf,
},
executionId,
);
});
it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toEqual('test-push-ref');
expect(hooks.retryOf).toEqual('test-retry-of');
const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(2);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(3);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(2);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(2);
expect(hookFunctions.nodeFetchedData).toHaveLength(1);
expect(hookFunctions.sendResponse).toBeUndefined();
});
describe('nodeExecuteBefore', () => {
it('should send nodeExecuteBefore push event', async () => {
await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]);
expect(push.send).toHaveBeenCalledWith(
{ type: 'nodeExecuteBefore', data: { executionId, nodeName } },
pushRef,
);
});
it('should emit node-pre-execute event', async () => {
await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]);
expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', {
executionId,
workflow: workflowData,
nodeName,
});
});
});
describe('nodeExecuteAfter', () => {
it('should send nodeExecuteAfter push event', async () => {
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
expect(push.send).toHaveBeenCalledWith(
{ type: 'nodeExecuteAfter', data: { executionId, nodeName, data: taskData } },
pushRef,
);
});
it('should emit node-post-execute event', async () => {
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', {
executionId,
workflow: workflowData,
nodeName,
});
});
it('should save execution progress when enabled', async () => {
workflowData.settings = { saveExecutionProgress: true };
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(executionId, {
includeData: true,
unflattenData: true,
});
});
it('should not save execution progress when disabled', async () => {
workflowData.settings = { saveExecutionProgress: false };
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
});
});
describe('workflowExecuteBefore', () => {
it('should send executionStarted push event', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
expect(push.send).toHaveBeenCalledWith(
{
type: 'executionStarted',
data: {
executionId,
mode: executionMode,
retryOf,
workflowId: 'test-workflow-id',
workflowName: 'Test Workflow',
startedAt: now,
flattedRunData: '[{}]',
},
},
pushRef,
);
});
it('should not call eventService', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
expect(eventService.emit).not.toHaveBeenCalled();
});
it('should run workflow.preExecute external hook', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [
workflow,
executionMode,
]);
});
});
describe('workflowExecuteAfter', () => {
it('should send executionFinished push event', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(eventService.emit).not.toHaveBeenCalled();
expect(push.send).toHaveBeenCalledWith(
{
type: 'executionFinished',
data: {
executionId,
rawData: stringify(successfulRun.data),
status: 'success',
workflowId: 'test-workflow-id',
},
},
pushRef,
);
});
it('should send executionWaiting push event', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]);
expect(push.send).toHaveBeenCalledWith(
{
type: 'executionWaiting',
data: { executionId },
},
pushRef,
);
});
describe('saving static data', () => {
it('should skip saving static data for manual executions', async () => {
hooks.mode = 'manual';
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]);
expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled();
});
it('should save static data for prod executions', async () => {
hooks.mode = 'trigger';
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]);
expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith(
workflowId,
staticData,
);
});
it('should handle static data saving errors', async () => {
hooks.mode = 'trigger';
const error = new Error('Static data save failed');
workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error);
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
});
describe('saving execution data', () => {
it('should update execution with proper data', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
executionId,
expect.objectContaining({
finished: true,
status: 'success',
}),
);
});
it('should handle errors when updating execution', async () => {
const error = new Error('Failed to update execution');
executionRepository.updateExistingExecution.mockRejectedValueOnce(error);
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should not delete unfinished executions', async () => {
const unfinishedRun = mock<IRun>({ finished: false, status: 'running' });
await hooks.executeHookFunctions('workflowExecuteAfter', [unfinishedRun, {}]);
expect(executionRepository.hardDelete).not.toHaveBeenCalled();
});
it('should not delete waiting executions', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]);
expect(executionRepository.hardDelete).not.toHaveBeenCalled();
});
it('should soft delete manual executions when manual saving is disabled', async () => {
hooks.workflowData.settings = { saveManualExecutions: false };
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(executionRepository.softDelete).toHaveBeenCalledWith(executionId);
});
it('should not soft delete manual executions with waitTill', async () => {
hooks.workflowData.settings = { saveManualExecutions: false };
await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]);
expect(executionRepository.softDelete).not.toHaveBeenCalled();
});
});
describe('error workflow', () => {
it('should not execute error workflow for manual executions', async () => {
hooks.mode = 'manual';
await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]);
expect(workflowExecutionService.executeErrorWorkflow).not.toHaveBeenCalled();
});
it('should execute error workflow for failed non-manual executions', async () => {
hooks.mode = 'trigger';
const errorWorkflow = 'error-workflow-id';
workflowData.settings = { errorWorkflow };
const project = mock<Project>();
ownershipService.getWorkflowProjectCached
.calledWith(workflowId)
.mockResolvedValue(project);
await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]);
expect(workflowExecutionService.executeErrorWorkflow).toHaveBeenCalledWith(
errorWorkflow,
{
workflow: {
id: workflowId,
name: workflowData.name,
},
execution: {
id: executionId,
error: expressionError,
mode: 'trigger',
retryOf,
lastNodeExecuted: undefined,
url: `http://localhost:5678/workflow/${workflowId}/executions/${executionId}`,
},
},
project,
);
});
});
it('should restore binary data IDs after workflow execution for webhooks', async () => {
config.set('binaryDataManager.mode', 'filesystem');
hooks.mode = 'webhook';
(successfulRun.data.resultData.runData = {
[nodeName]: [
{
executionTime: 1,
startTime: 1,
source: [],
data: {
main: [
[
{
json: {},
binary: {
data: {
id: `filesystem-v2:workflows/${workflowId}/executions/temp/binary_data/123`,
data: '',
mimeType: 'text/plain',
},
},
},
],
],
},
},
],
}),
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(binaryDataService.rename).toHaveBeenCalledWith(
'workflows/test-workflow-id/executions/temp/binary_data/123',
'workflows/test-workflow-id/executions/test-execution-id/binary_data/123',
);
});
});
describe('statistics events', () => {
it('workflowExecuteAfter should emit workflowExecutionCompleted statistics event', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
workflowData,
fullRunData: successfulRun,
});
});
it('nodeFetchedData should handle nodeFetchedData statistics event', async () => {
await hooks.executeHookFunctions('nodeFetchedData', [workflowId, node]);
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', {
workflowId,
node,
});
});
});
});
describe('getWorkflowHooksWorkerMain', () => {
let hooks: WorkflowHooks;
beforeEach(() => {
hooks = getWorkflowHooksWorkerMain(executionMode, executionId, workflowData, {
pushRef,
retryOf,
});
});
it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toEqual('test-push-ref');
expect(hooks.retryOf).toEqual('test-retry-of');
const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(0);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(0);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(1);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(1);
});
describe('workflowExecuteBefore', () => {
it('should run the workflow.preExecute external hook', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [
workflow,
executionMode,
]);
});
});
describe('workflowExecuteAfter', () => {
it('should delete successful executions when success saving is disabled', async () => {
workflowData.settings = {
saveDataSuccessExecution: 'none',
saveDataErrorExecution: 'all',
};
const hooks = getWorkflowHooksWorkerMain('webhook', executionId, workflowData, {
pushRef,
retryOf,
});
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(executionRepository.hardDelete).toHaveBeenCalledWith({
workflowId,
executionId,
});
});
it('should delete failed executions when error saving is disabled', async () => {
workflowData.settings = {
saveDataSuccessExecution: 'all',
saveDataErrorExecution: 'none',
};
const hooks = getWorkflowHooksWorkerMain('webhook', executionId, workflowData, {
pushRef,
retryOf,
});
await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]);
expect(executionRepository.hardDelete).toHaveBeenCalledWith({
workflowId,
executionId,
});
});
});
});
describe('getWorkflowHooksWorkerExecuter', () => {
let hooks: WorkflowHooks;
beforeEach(() => {
hooks = getWorkflowHooksWorkerExecuter(executionMode, executionId, workflowData, {
pushRef,
retryOf,
});
});
describe('saving static data', () => {
it('should skip saving static data for manual executions', async () => {
hooks.mode = 'manual';
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]);
expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled();
});
it('should save static data for prod executions', async () => {
hooks.mode = 'trigger';
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]);
expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith(
workflowId,
staticData,
);
});
it('should handle static data saving errors', async () => {
hooks.mode = 'trigger';
const error = new Error('Static data save failed');
workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error);
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
});
describe('error workflow', () => {
it('should not execute error workflow for manual executions', async () => {
hooks.mode = 'manual';
await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]);
expect(workflowExecutionService.executeErrorWorkflow).not.toHaveBeenCalled();
});
it('should execute error workflow for failed non-manual executions', async () => {
hooks.mode = 'trigger';
const errorWorkflow = 'error-workflow-id';
workflowData.settings = { errorWorkflow };
const project = mock<Project>();
ownershipService.getWorkflowProjectCached.calledWith(workflowId).mockResolvedValue(project);
await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]);
expect(workflowExecutionService.executeErrorWorkflow).toHaveBeenCalledWith(
errorWorkflow,
{
workflow: {
id: workflowId,
name: workflowData.name,
},
execution: {
id: executionId,
error: expressionError,
mode: 'trigger',
retryOf,
lastNodeExecuted: undefined,
url: `http://localhost:5678/workflow/${workflowId}/executions/${executionId}`,
},
},
project,
);
});
});
});
});

View File

@@ -0,0 +1,192 @@
import { BinaryDataService } from 'n8n-core';
import type { IRun } from 'n8n-workflow';
import config from '@/config';
import { restoreBinaryDataId } from '@/execution-lifecycle/restore-binary-data-id';
import { mockInstance } from '@test/mocking';
function toIRun(item?: object) {
return {
data: {
resultData: {
runData: {
myNode: [
{
data: {
main: [[item]],
},
},
],
},
},
},
} as unknown as IRun;
}
function getDataId(run: IRun, kind: 'binary' | 'json') {
// @ts-expect-error The type doesn't have the correct structure
return run.data.resultData.runData.myNode[0].data.main[0][0][kind].data.id;
}
const binaryDataService = mockInstance(BinaryDataService);
for (const mode of ['filesystem-v2', 's3'] as const) {
describe(`on ${mode} mode`, () => {
beforeAll(() => {
config.set('binaryDataManager.mode', mode);
});
afterEach(() => {
jest.clearAllMocks();
});
it('should restore if binary data ID is missing execution ID', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = 'temp';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`;
const run = toIRun({
binary: {
data: { id: `s3:${incorrectFileId}` },
},
});
await restoreBinaryDataId(run, executionId, 'webhook');
const correctFileId = incorrectFileId.replace('temp', executionId);
const correctBinaryDataId = `s3:${correctFileId}`;
expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId);
expect(getDataId(run, 'binary')).toBe(correctBinaryDataId);
});
it('should do nothing if binary data ID is not missing execution ID', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = '999';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`;
const binaryDataId = `s3:${fileId}`;
const run = toIRun({
binary: {
data: {
id: binaryDataId,
},
},
});
await restoreBinaryDataId(run, executionId, 'webhook');
expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'binary')).toBe(binaryDataId);
});
it('should do nothing if no binary data ID', async () => {
const executionId = '999';
const dataId = '123';
const run = toIRun({
json: {
data: { id: dataId },
},
});
await restoreBinaryDataId(run, executionId, 'webhook');
expect(binaryDataService.rename).not.toHaveBeenCalled();
expect(getDataId(run, 'json')).toBe(dataId);
});
it('should do nothing on itemless case', async () => {
const executionId = '999';
const promise = restoreBinaryDataId(toIRun(), executionId, 'webhook');
await expect(promise).resolves.not.toThrow();
expect(binaryDataService.rename).not.toHaveBeenCalled();
});
it('should do nothing if data is undefined', async () => {
const executionId = '999';
const run = toIRun({
json: {
data: undefined,
},
});
const promise = restoreBinaryDataId(run, executionId, 'webhook');
await expect(promise).resolves.not.toThrow();
expect(binaryDataService.rename).not.toHaveBeenCalled();
});
it('should do nothing if workflow execution mode is not `webhook`', async () => {
const executionId = '999';
const run = toIRun({
json: {
data: undefined,
},
});
const promise = restoreBinaryDataId(run, executionId, 'internal');
await expect(promise).resolves.not.toThrow();
expect(binaryDataService.rename).not.toHaveBeenCalled();
});
it('should ignore error thrown on renaming', async () => {
const workflowId = '6HYhhKmJch2cYxGj';
const executionId = 'temp';
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`;
const run = toIRun({
binary: {
data: { id: `s3:${incorrectFileId}` },
},
});
binaryDataService.rename.mockRejectedValueOnce(new Error('ENOENT'));
const promise = restoreBinaryDataId(run, executionId, 'webhook');
await expect(promise).resolves.not.toThrow();
expect(binaryDataService.rename).toHaveBeenCalled();
});
});
}
describe('on default mode', () => {
afterEach(() => {
config.load(config.default);
});
it('should do nothing', async () => {
config.set('binaryDataManager.mode', 'default');
const executionId = '999';
const run = toIRun({
json: {
data: undefined,
},
});
const promise = restoreBinaryDataId(run, executionId, 'internal');
await expect(promise).resolves.not.toThrow();
expect(binaryDataService.rename).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,100 @@
import { ErrorReporter } from 'n8n-core';
import { Logger } from 'n8n-core';
import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces';
import { mockInstance } from '@test/mocking';
import { saveExecutionProgress } from '../save-execution-progress';
import * as fnModule from '../to-save-settings';
mockInstance(Logger);
const errorReporter = mockInstance(ErrorReporter);
const executionRepository = mockInstance(ExecutionRepository);
afterEach(() => {
jest.clearAllMocks();
});
const commonArgs: [IWorkflowBase, string, string, ITaskData, IRunExecutionData, string] = [
{} as IWorkflowBase,
'some-execution-id',
'My Node',
{} as ITaskData,
{} as IRunExecutionData,
'some-session-id',
];
const commonSettings = { error: true, success: true, manual: true };
test('should ignore if save settings say so', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: false,
});
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
test('should ignore on leftover async call', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: true,
});
executionRepository.findSingleExecution.mockResolvedValue({
finished: true,
} as IExecutionResponse);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
test('should update execution when saving progress is enabled', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: true,
});
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}],
},
},
startData: {},
},
status: 'running',
});
expect(errorReporter.error).not.toHaveBeenCalled();
});
test('should report error on failure', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: true,
});
const error = new Error('Something went wrong');
executionRepository.findSingleExecution.mockImplementation(() => {
throw error;
});
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});

View File

@@ -0,0 +1,155 @@
import config from '@/config';
import { toSaveSettings } from '../to-save-settings';
afterEach(() => {
config.load(config.default);
});
describe('failed production executions', () => {
it('should favor workflow settings over defaults', () => {
config.set('executions.saveDataOnError', 'none');
const saveSettings = toSaveSettings({ saveDataErrorExecution: 'all' });
expect(saveSettings.error).toBe(true);
config.set('executions.saveDataOnError', 'all');
const _saveSettings = toSaveSettings({ saveDataErrorExecution: 'none' });
expect(_saveSettings.error).toBe(false);
});
it('should fall back to default if no workflow setting', () => {
config.set('executions.saveDataOnError', 'all');
const saveSettings = toSaveSettings();
expect(saveSettings.error).toBe(true);
config.set('executions.saveDataOnError', 'none');
const _saveSettings = toSaveSettings();
expect(_saveSettings.error).toBe(false);
});
});
describe('successful production executions', () => {
it('should favor workflow settings over defaults', () => {
config.set('executions.saveDataOnSuccess', 'none');
const saveSettings = toSaveSettings({ saveDataSuccessExecution: 'all' });
expect(saveSettings.success).toBe(true);
config.set('executions.saveDataOnSuccess', 'all');
const _saveSettings = toSaveSettings({ saveDataSuccessExecution: 'none' });
expect(_saveSettings.success).toBe(false);
});
it('should fall back to default if no workflow setting', () => {
config.set('executions.saveDataOnSuccess', 'all');
const saveSettings = toSaveSettings();
expect(saveSettings.success).toBe(true);
config.set('executions.saveDataOnSuccess', 'none');
const _saveSettings = toSaveSettings();
expect(_saveSettings.success).toBe(false);
});
});
describe('manual executions', () => {
it('should favor workflow setting over default', () => {
config.set('executions.saveDataManualExecutions', false);
const saveSettings = toSaveSettings({ saveManualExecutions: true });
expect(saveSettings.manual).toBe(true);
config.set('executions.saveDataManualExecutions', true);
const _saveSettings = toSaveSettings({ saveManualExecutions: false });
expect(_saveSettings.manual).toBe(false);
});
it('should favor fall back to default if workflow setting is explicit default', () => {
config.set('executions.saveDataManualExecutions', true);
const saveSettings = toSaveSettings({ saveManualExecutions: 'DEFAULT' });
expect(saveSettings.manual).toBe(true);
config.set('executions.saveDataManualExecutions', false);
const _saveSettings = toSaveSettings({ saveManualExecutions: 'DEFAULT' });
expect(_saveSettings.manual).toBe(false);
});
it('should fall back to default if no workflow setting', () => {
config.set('executions.saveDataManualExecutions', true);
const saveSettings = toSaveSettings();
expect(saveSettings.manual).toBe(true);
config.set('executions.saveDataManualExecutions', false);
const _saveSettings = toSaveSettings();
expect(_saveSettings.manual).toBe(false);
});
});
describe('execution progress', () => {
it('should favor workflow setting over default', () => {
config.set('executions.saveExecutionProgress', false);
const saveSettings = toSaveSettings({ saveExecutionProgress: true });
expect(saveSettings.progress).toBe(true);
config.set('executions.saveExecutionProgress', true);
const _saveSettings = toSaveSettings({ saveExecutionProgress: false });
expect(_saveSettings.progress).toBe(false);
});
it('should favor fall back to default if workflow setting is explicit default', () => {
config.set('executions.saveExecutionProgress', true);
const saveSettings = toSaveSettings({ saveExecutionProgress: 'DEFAULT' });
expect(saveSettings.progress).toBe(true);
config.set('executions.saveExecutionProgress', false);
const _saveSettings = toSaveSettings({ saveExecutionProgress: 'DEFAULT' });
expect(_saveSettings.progress).toBe(false);
});
it('should fall back to default if no workflow setting', () => {
config.set('executions.saveExecutionProgress', true);
const saveSettings = toSaveSettings();
expect(saveSettings.progress).toBe(true);
config.set('executions.saveExecutionProgress', false);
const _saveSettings = toSaveSettings();
expect(_saveSettings.progress).toBe(false);
});
});

View File

@@ -0,0 +1,130 @@
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { ErrorReporter, Logger } from 'n8n-core';
import type { IRun, IWorkflowBase, WorkflowExecuteMode } from 'n8n-workflow';
import type { IWorkflowErrorData } from '@/interfaces';
import { OwnershipService } from '@/services/ownership.service';
import { UrlService } from '@/services/url.service';
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
/**
* Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects
* all the data and executes it
*
* @param {IWorkflowBase} workflowData The workflow which got executed
* @param {IRun} fullRunData The run which produced the error
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
* @param {string} [executionId] The id the execution got saved as
*/
export function executeErrorWorkflow(
workflowData: IWorkflowBase,
fullRunData: IRun,
mode: WorkflowExecuteMode,
executionId?: string,
retryOf?: string,
): void {
const logger = Container.get(Logger);
// Check if there was an error and if so if an errorWorkflow or a trigger is set
let pastExecutionUrl: string | undefined;
if (executionId !== undefined) {
pastExecutionUrl = `${Container.get(UrlService).getWebhookBaseUrl()}workflow/${
workflowData.id
}/executions/${executionId}`;
}
if (fullRunData.data.resultData.error !== undefined) {
let workflowErrorData: IWorkflowErrorData;
const workflowId = workflowData.id;
if (executionId) {
// The error did happen in an execution
workflowErrorData = {
execution: {
id: executionId,
url: pastExecutionUrl,
error: fullRunData.data.resultData.error,
lastNodeExecuted: fullRunData.data.resultData.lastNodeExecuted!,
mode,
retryOf,
},
workflow: {
id: workflowId,
name: workflowData.name,
},
};
} else {
// The error did happen in a trigger
workflowErrorData = {
trigger: {
error: fullRunData.data.resultData.error,
mode,
},
workflow: {
id: workflowId,
name: workflowData.name,
},
};
}
const { errorTriggerType } = Container.get(GlobalConfig).nodes;
// Run the error workflow
// To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow.
const { errorWorkflow } = workflowData.settings ?? {};
if (errorWorkflow && !(mode === 'error' && workflowId && errorWorkflow === workflowId)) {
logger.debug('Start external error workflow', {
executionId,
errorWorkflowId: errorWorkflow,
workflowId,
});
// If a specific error workflow is set run only that one
// First, do permission checks.
if (!workflowId) {
// Manual executions do not trigger error workflows
// So this if should never happen. It was added to
// make sure there are no possible security gaps
return;
}
Container.get(OwnershipService)
.getWorkflowProjectCached(workflowId)
.then((project) => {
void Container.get(WorkflowExecutionService).executeErrorWorkflow(
errorWorkflow,
workflowErrorData,
project,
);
})
.catch((error: Error) => {
Container.get(ErrorReporter).error(error);
logger.error(
`Could not execute ErrorWorkflow for execution ID ${executionId} because of error querying the workflow owner`,
{
executionId,
errorWorkflowId: errorWorkflow,
workflowId,
error,
workflowErrorData,
},
);
});
} else if (
mode !== 'error' &&
workflowId !== undefined &&
workflowData.nodes.some((node) => node.type === errorTriggerType)
) {
logger.debug('Start internal error workflow', { executionId, workflowId });
void Container.get(OwnershipService)
.getWorkflowProjectCached(workflowId)
.then((project) => {
void Container.get(WorkflowExecutionService).executeErrorWorkflow(
workflowId,
workflowErrorData,
project,
);
});
}
}
}

View File

@@ -0,0 +1,628 @@
import { Container } from '@n8n/di';
import { stringify } from 'flatted';
import { ErrorReporter, Logger, InstanceSettings } from 'n8n-core';
import { WorkflowHooks } from 'n8n-workflow';
import type {
IDataObject,
INode,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowBase,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
Workflow,
} from 'n8n-workflow';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { EventService } from '@/events/event.service';
import { ExternalHooks } from '@/external-hooks';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { isWorkflowIdValid } from '@/utils';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { executeErrorWorkflow } from './execute-error-workflow';
import { restoreBinaryDataId } from './restore-binary-data-id';
import { saveExecutionProgress } from './save-execution-progress';
import {
determineFinalExecutionStatus,
prepareExecutionDataForDbUpdate,
updateExistingExecution,
} from './shared/shared-hook-functions';
import { toSaveSettings } from './to-save-settings';
/**
* Returns hook functions to push data to Editor-UI
*/
function hookFunctionsPush(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const pushInstance = Container.get(Push);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { pushRef, executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
workflowId: this.workflowData.id,
});
pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
const { pushRef, executionId } = this;
// Push data to session which started workflow after each rendered node
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
workflowId: this.workflowData.id,
});
pushInstance.send(
{ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } },
pushRef,
);
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks, _workflow, data): Promise<void> {
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
pushRef,
workflowId,
});
// Push data to session which started the workflow
if (pushRef === undefined) {
return;
}
pushInstance.send(
{
type: 'executionStarted',
data: {
executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId,
workflowName,
flattedRunData: data?.resultData.runData
? stringify(data.resultData.runData)
: stringify({}),
},
},
pushRef,
);
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;
const { id: workflowId } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
pushRef,
workflowId,
});
const { status } = fullRunData;
if (status === 'waiting') {
pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef);
} else {
const rawData = stringify(fullRunData.data);
pushInstance.send(
{ type: 'executionFinished', data: { executionId, workflowId, status, rawData } },
pushRef,
);
}
},
],
};
}
function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
const externalHooks = Container.get(ExternalHooks);
return {
workflowExecuteBefore: [
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
nodeExecuteAfter: [
async function (
this: WorkflowHooks,
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
): Promise<void> {
await saveExecutionProgress(
this.workflowData,
this.executionId,
nodeName,
data,
executionData,
this.pushRef,
);
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*/
function hookFunctionsSave(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
logger.debug('Executing hook (hookFunctionsSave)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
await restoreBinaryDataId(fullRunData, this.executionId, this.mode);
const isManualMode = this.mode === 'manual';
try {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id,
newStaticData,
);
} catch (e) {
Container.get(ErrorReporter).error(e);
logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`,
{ executionId: this.executionId, workflowId: this.workflowData.id },
);
}
}
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
/**
* When manual executions are not being saved, we only soft-delete
* the execution so that the user can access its binary data
* while building their workflow.
*
* The manual execution and its binary data will be hard-deleted
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
return;
}
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave && !fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
return;
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal: executionStatus,
retryOf: this.retryOf,
});
// When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef;
}
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} catch (error) {
Container.get(ErrorReporter).error(error);
logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
error,
});
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} finally {
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
});
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
* for running with queues. Manual executions should never run on queues as
* they are always executed in the main process.
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
const { executionId, workflowData } = this;
eventService.emit('workflow-pre-execute', { executionId, data: workflowData });
},
],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
logger.debug('Executing hook (hookFunctionsSaveWorker)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
const isManualMode = this.mode === 'manual';
try {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id,
newStaticData,
);
} catch (e) {
Container.get(ErrorReporter).error(e);
logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`,
{ pushRef: this.pushRef, workflowId: this.workflowData.id },
);
}
}
const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
fullRunData.status = workflowStatusFinal;
if (
!isManualMode &&
workflowStatusFinal !== 'success' &&
workflowStatusFinal !== 'waiting'
) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal,
retryOf: this.retryOf,
});
// When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef;
}
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
} catch (error) {
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} finally {
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
});
}
},
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('workflow-post-execute', {
workflow,
executionId,
runData,
});
},
async function (this: WorkflowHooks, fullRunData: IRun) {
const externalHooks = Container.get(ExternalHooks);
if (externalHooks.exists('workflow.postExecute')) {
try {
await externalHooks.run('workflow.postExecute', [
fullRunData,
this.workflowData,
this.executionId,
]);
} catch (error) {
Container.get(ErrorReporter).error(error);
Container.get(Logger).error(
'There was a problem running hook "workflow.postExecute"',
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
error,
);
}
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksIntegrated(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
}
/**
* Returns WorkflowHooks instance for worker in scaling mode.
*/
export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSaveWorker();
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
if (mode === 'manual' && Container.get(InstanceSettings).isWorker) {
const pushHooks = hookFunctionsPush();
for (const key of Object.keys(pushHooks)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
// eslint-disable-next-line prefer-spread
hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]);
}
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPreExecute();
// TODO: why are workers pushing to frontend?
// TODO: simplifying this for now to just leave the bare minimum hooks
// const hookFunctions = hookFunctionsPush();
// const preExecuteFunctions = hookFunctionsPreExecute();
// for (const key of Object.keys(preExecuteFunctions)) {
// if (hookFunctions[key] === undefined) {
// hookFunctions[key] = [];
// }
// hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
// }
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
hookFunctions.workflowExecuteAfter = [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
const isManualMode = this.mode === 'manual';
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
/**
* When manual executions are not being saved, we only soft-delete
* the execution so that the user can access its binary data
* while building their workflow.
*
* The manual execution and its binary data will be hard-deleted
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
return;
}
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (!isManualMode && shouldNotSave && !fullRunData.waitTill) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
}
},
];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running the main workflow
*/
export function getWorkflowHooksMain(
data: IWorkflowExecutionDataProcess,
executionId: string,
): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], pushFunctions[key]);
}
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = [];
if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
pushRef: data.pushRef,
retryOf: data.retryOf as string,
});
}

View File

@@ -0,0 +1,75 @@
import { Container } from '@n8n/di';
import type { BinaryData } from 'n8n-core';
import { BinaryDataService, Logger } from 'n8n-core';
import type { IRun, WorkflowExecuteMode } from 'n8n-workflow';
import config from '@/config';
/**
* Whenever the execution ID is not available to the binary data service at the
* time of writing a binary data file, its name is missing the execution ID.
* This function restores the ID in the file name and run data reference.
*
* This edge case can happen only for a Webhook node that accepts binary data,
* when the binary data manager is set to persist this binary data.
*
* ```txt
* filesystem-v2:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b ->
* filesystem-v2:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b
*
* s3:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b ->
* s3:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b
* ```
*/
export async function restoreBinaryDataId(
run: IRun,
executionId: string,
workflowExecutionMode: WorkflowExecuteMode,
) {
if (
workflowExecutionMode !== 'webhook' ||
config.getEnv('binaryDataManager.mode') === 'default'
) {
return;
}
try {
const { runData } = run.data.resultData;
const promises = Object.keys(runData).map(async (nodeName) => {
const binaryDataId = runData[nodeName]?.[0]?.data?.main?.[0]?.[0]?.binary?.data?.id;
if (!binaryDataId) return;
const [mode, fileId] = binaryDataId.split(':') as [BinaryData.StoredMode, string];
const isMissingExecutionId = fileId.includes('/temp/');
if (!isMissingExecutionId) return;
const correctFileId = fileId.replace('temp', executionId);
await Container.get(BinaryDataService).rename(fileId, correctFileId);
const correctBinaryDataId = `${mode}:${correctFileId}`;
// @ts-expect-error Validated at the top
run.data.resultData.runData[nodeName][0].data.main[0][0].binary.data.id = correctBinaryDataId;
});
await Promise.all(promises);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
const logger = Container.get(Logger);
if (error.message.includes('ENOENT')) {
logger.warn('Failed to restore binary data ID - No such file or dir', {
executionId,
error,
});
return;
}
logger.error('Failed to restore binary data ID - Unknown error', { executionId, error });
}
}

View File

@@ -0,0 +1,104 @@
import { Container } from '@n8n/di';
import { ErrorReporter, Logger } from 'n8n-core';
import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { toSaveSettings } from './to-save-settings';
export async function saveExecutionProgress(
workflowData: IWorkflowBase,
executionId: string,
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
pushRef?: string,
) {
const saveSettings = toSaveSettings(workflowData.settings);
if (!saveSettings.progress) return;
const logger = Container.get(Logger);
try {
logger.debug(`Save execution progress to database for execution ID ${executionId} `, {
executionId,
nodeName,
});
const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: true,
unflattenData: true,
},
);
if (!fullExecutionData) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return;
}
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack: [],
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
fullExecutionData.status = 'running';
await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
Container.get(ErrorReporter).error(error);
// TODO: Improve in the future!
// Errors here might happen because of database access
// For busy machines, we may get "Database is locked" errors.
// We do this to prevent crashes and executions ending in `unknown` state.
logger.error(
`Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`,
{
...error,
executionId,
pushRef,
workflowId: workflowData.id,
},
);
}
}

View File

@@ -0,0 +1,37 @@
import { mock } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { determineFinalExecutionStatus } from '../shared-hook-functions';
describe('determineFinalExecutionStatus', () => {
describe('When waitTill is not set', () => {
test.each(['canceled', 'crashed', 'error', 'success'])('should return "%s"', (status) => {
const runData = { status, data: {} } as IRun;
expect(determineFinalExecutionStatus(runData)).toBe(status);
});
});
it('should return "error" when resultData.error exists', () => {
const runData = {
status: 'running',
data: {
resultData: {
error: new NodeOperationError(mock(), 'An error occurred'),
},
},
} as IRun;
expect(determineFinalExecutionStatus(runData)).toBe('error');
});
it('should return "waiting" when waitTill is defined', () => {
const runData = {
status: 'running',
data: {},
waitTill: new Date('2022-01-01T00:00:00'),
} as IRun;
expect(determineFinalExecutionStatus(runData)).toBe('waiting');
});
});

View File

@@ -0,0 +1,107 @@
import { Container } from '@n8n/di';
import pick from 'lodash/pick';
import { Logger } from 'n8n-core';
import { ensureError, type ExecutionStatus, type IRun, type IWorkflowBase } from 'n8n-workflow';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces';
import { ExecutionMetadataService } from '@/services/execution-metadata.service';
import { isWorkflowIdValid } from '@/utils';
export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus {
const workflowHasCrashed = runData.status === 'crashed';
const workflowWasCanceled = runData.status === 'canceled';
const workflowHasFailed = runData.status === 'error';
const workflowDidSucceed =
!runData.data.resultData?.error &&
!workflowHasCrashed &&
!workflowWasCanceled &&
!workflowHasFailed;
let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'error';
if (workflowHasCrashed) workflowStatusFinal = 'crashed';
if (workflowWasCanceled) workflowStatusFinal = 'canceled';
if (runData.waitTill) workflowStatusFinal = 'waiting';
return workflowStatusFinal;
}
export function prepareExecutionDataForDbUpdate(parameters: {
runData: IRun;
workflowData: IWorkflowBase;
workflowStatusFinal: ExecutionStatus;
retryOf?: string;
}) {
const { runData, workflowData, workflowStatusFinal, retryOf } = parameters;
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const pristineWorkflowData: IWorkflowBase = pick(workflowData, [
'id',
'name',
'active',
'createdAt',
'updatedAt',
'nodes',
'connections',
'settings',
'staticData',
'pinData',
]);
const fullExecutionData: UpdateExecutionPayload = {
data: runData.data,
mode: runData.mode,
finished: runData.finished ? runData.finished : false,
startedAt: runData.startedAt,
stoppedAt: runData.stoppedAt,
workflowData: pristineWorkflowData,
waitTill: runData.waitTill,
status: workflowStatusFinal,
workflowId: pristineWorkflowData.id,
};
if (retryOf !== undefined) {
fullExecutionData.retryOf = retryOf.toString();
}
const workflowId = workflowData.id;
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}
return fullExecutionData;
}
export async function updateExistingExecution(parameters: {
executionId: string;
workflowId: string;
executionData: Partial<IExecutionDb>;
}) {
const logger = Container.get(Logger);
const { executionId, workflowId, executionData } = parameters;
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
logger.debug(`Save execution data to database for execution ID ${executionId}`, {
executionId,
workflowId,
finished: executionData.finished,
stoppedAt: executionData.stoppedAt,
});
await Container.get(ExecutionRepository).updateExistingExecution(executionId, executionData);
try {
if (executionData.data?.resultData.metadata) {
await Container.get(ExecutionMetadataService).save(
executionId,
executionData.data.resultData.metadata,
);
}
} catch (e) {
const error = ensureError(e);
logger.error(`Failed to save metadata for execution ID ${executionId}`, { error });
}
if (executionData.finished === true && executionData.retryOf !== undefined) {
await Container.get(ExecutionRepository).updateExistingExecution(executionData.retryOf, {
retrySuccessId: executionId,
});
}
}

View File

@@ -0,0 +1,37 @@
import type { IWorkflowSettings } from 'n8n-workflow';
import config from '@/config';
/**
* Return whether a workflow execution is configured to be saved or not:
*
* - `error`: Whether to save failed executions in production.
* - `success`: Whether to successful executions in production.
* - `manual`: Whether to save successful or failed manual executions.
* - `progress`: Whether to save execution progress, i.e. after each node's execution.
*/
export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) {
const DEFAULTS = {
ERROR: config.getEnv('executions.saveDataOnError'),
SUCCESS: config.getEnv('executions.saveDataOnSuccess'),
MANUAL: config.getEnv('executions.saveDataManualExecutions'),
PROGRESS: config.getEnv('executions.saveExecutionProgress'),
};
const {
saveDataErrorExecution = DEFAULTS.ERROR,
saveDataSuccessExecution = DEFAULTS.SUCCESS,
saveManualExecutions = DEFAULTS.MANUAL,
saveExecutionProgress = DEFAULTS.PROGRESS,
} = workflowSettings;
return {
error: saveDataErrorExecution === 'DEFAULT' ? DEFAULTS.ERROR : saveDataErrorExecution === 'all',
success:
saveDataSuccessExecution === 'DEFAULT'
? DEFAULTS.SUCCESS
: saveDataSuccessExecution === 'all',
manual: saveManualExecutions === 'DEFAULT' ? DEFAULTS.MANUAL : saveManualExecutions,
progress: saveExecutionProgress === 'DEFAULT' ? DEFAULTS.PROGRESS : saveExecutionProgress,
};
}