refactor(core): Move execution progress saving to standalone utility (no-changelog) (#7770)

This PR continues the effort of moving logic inside execution lifecycle
hooks into standalone testable functions, as a stepping stone to
refactoring the hooks themselves.
This commit is contained in:
Iván Ovejero
2023-11-27 13:10:43 +01:00
committed by GitHub
parent 2807ddcd0d
commit 7b8532d3a3
3 changed files with 220 additions and 83 deletions

View File

@@ -65,6 +65,7 @@ import {
import { restoreBinaryDataId } from './executionLifecycleHooks/restoreBinaryDataId'; import { restoreBinaryDataId } from './executionLifecycleHooks/restoreBinaryDataId';
import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings'; import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings';
import { Logger } from './Logger'; import { Logger } from './Logger';
import { saveExecutionProgress } from './executionLifecycleHooks/saveExecutionProgress';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@@ -358,89 +359,14 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
data: ITaskData, data: ITaskData,
executionData: IRunExecutionData, executionData: IRunExecutionData,
): Promise<void> { ): Promise<void> {
const saveSettings = toSaveSettings(this.workflowData.settings); await saveExecutionProgress(
this.workflowData,
if (!saveSettings.progress) return;
try {
logger.debug(
`Save execution progress to database for execution ID ${this.executionId} `,
{ executionId: this.executionId, nodeName },
);
const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution(
this.executionId, this.executionId,
{ nodeName,
includeData: true, data,
unflattenData: true, executionData,
}, this.sessionId,
); );
if (!fullExecutionData) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return;
}
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack: [],
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
fullExecutionData.status = 'running';
await Container.get(ExecutionRepository).updateExistingExecution(
this.executionId,
fullExecutionData,
);
} catch (err) {
ErrorReporter.error(err);
// TODO: Improve in the future!
// Errors here might happen because of database access
// For busy machines, we may get "Database is locked" errors.
// We do this to prevent crashes and executions ending in `unknown` state.
logger.error(
`Failed saving execution progress to database for execution ID ${this.executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`,
{
...err,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}
}, },
], ],
}; };

View File

@@ -0,0 +1,105 @@
import { Container } from 'typedi';
import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow';
import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import { Logger } from '@/Logger';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { toSaveSettings } from '@/executionLifecycleHooks/toSaveSettings';
export async function saveExecutionProgress(
workflowData: IWorkflowBase,
executionId: string,
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
sessionId?: string,
) {
const saveSettings = toSaveSettings(workflowData.settings);
if (!saveSettings.progress) return;
const logger = Container.get(Logger);
try {
logger.debug(`Save execution progress to database for execution ID ${executionId} `, {
executionId,
nodeName,
});
const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: true,
unflattenData: true,
},
);
if (!fullExecutionData) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return;
}
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack: [],
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
fullExecutionData.status = 'running';
await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
ErrorReporter.error(error);
// TODO: Improve in the future!
// Errors here might happen because of database access
// For busy machines, we may get "Database is locked" errors.
// We do this to prevent crashes and executions ending in `unknown` state.
logger.error(
`Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`,
{
...error,
executionId,
sessionId,
workflowId: workflowData.id,
},
);
}
}

View File

@@ -0,0 +1,106 @@
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { mockInstance } from '../../shared/mocking';
import { Logger } from '@/Logger';
import { saveExecutionProgress } from '@/executionLifecycleHooks/saveExecutionProgress';
import * as fnModule from '@/executionLifecycleHooks/toSaveSettings';
import {
ErrorReporterProxy,
type IRunExecutionData,
type ITaskData,
type IWorkflowBase,
} from 'n8n-workflow';
import type { IExecutionResponse } from '@/Interfaces';
mockInstance(Logger);
const executionRepository = mockInstance(ExecutionRepository);
afterEach(() => {
jest.clearAllMocks();
});
const commonArgs: [IWorkflowBase, string, string, ITaskData, IRunExecutionData, string] = [
{} as IWorkflowBase,
'some-execution-id',
'My Node',
{} as ITaskData,
{} as IRunExecutionData,
'some-session-id',
];
const commonSettings = { error: true, success: true, manual: true };
test('should ignore if save settings say so', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: false,
});
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
test('should ignore on leftover async call', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: true,
});
executionRepository.findSingleExecution.mockResolvedValue({
finished: true,
} as IExecutionResponse);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
test('should update execution', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: true,
});
const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error');
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}],
},
},
startData: {},
},
status: 'running',
});
expect(reporterSpy).not.toHaveBeenCalled();
});
test('should report error on failure', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: true,
});
const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error');
const error = new Error('Something went wrong');
executionRepository.findSingleExecution.mockImplementation(() => {
throw error;
});
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
expect(reporterSpy).toHaveBeenCalledWith(error);
});