refactor(core): Move event and telemetry handling into workers in queue mode (#7138)

# Motivation

In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.

This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).

This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.

# Changes

Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).

By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.

Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.

This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.

We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).

Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.

# Refactor

I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
This commit is contained in:
Michael Auerswald
2023-09-14 07:58:15 +02:00
committed by GitHub
parent 07a6417f0f
commit 0c6169ee22
10 changed files with 607 additions and 506 deletions

View File

@@ -36,7 +36,6 @@ import {
WorkflowHooks,
} from 'n8n-workflow';
import pick from 'lodash/pick';
import { Container } from 'typedi';
import type { FindOptionsWhere } from 'typeorm';
import { LessThanOrEqual, In } from 'typeorm';
@@ -66,7 +65,11 @@ import { ExecutionRepository } from '@db/repositories';
import { EventsService } from '@/services/events.service';
import { SecretsHelper } from './SecretsHelpers';
import { OwnershipService } from './services/ownership.service';
import { ExecutionMetadataService } from './services/executionMetadata.service';
import {
determineFinalExecutionStatus,
prepareExecutionDataForDbUpdate,
updateExistingExecution,
} from './executionLifecycleHooks/shared/sharedHookFunctions';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@@ -569,18 +572,11 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
saveDataSuccessExecution;
}
const workflowHasCrashed = fullRunData.status === 'crashed';
const workflowWasCanceled = fullRunData.status === 'canceled';
const workflowDidSucceed =
!fullRunData.data.resultData.error && !workflowHasCrashed && !workflowWasCanceled;
let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed';
if (workflowHasCrashed) workflowStatusFinal = 'crashed';
if (workflowWasCanceled) workflowStatusFinal = 'canceled';
if (fullRunData.waitTill) workflowStatusFinal = 'waiting';
const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
if (
(workflowDidSucceed && saveDataSuccessExecution === 'none') ||
(!workflowDidSucceed && saveDataErrorExecution === 'none')
(workflowStatusFinal === 'success' && saveDataSuccessExecution === 'none') ||
(workflowStatusFinal !== 'success' && saveDataErrorExecution === 'none')
) {
if (!fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
@@ -599,68 +595,18 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const pristineWorkflowData: IWorkflowBase = pick(this.workflowData, [
'id',
'name',
'active',
'createdAt',
'updatedAt',
'nodes',
'connections',
'settings',
'staticData',
'pinData',
]);
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData: pristineWorkflowData,
waitTill: fullRunData.waitTill,
status: workflowStatusFinal,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf?.toString();
}
const workflowId = this.workflowData.id;
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
Logger.debug(`Save execution data to database for execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId,
finished: fullExecutionData.finished,
stoppedAt: fullExecutionData.stoppedAt,
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal,
retryOf: this.retryOf,
});
await Container.get(ExecutionRepository).updateExistingExecution(
this.executionId,
fullExecutionData,
);
try {
if (fullRunData.data.resultData.metadata) {
await Container.get(ExecutionMetadataService).save(
this.executionId,
fullRunData.data.resultData.metadata,
);
}
} catch (e) {
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
}
if (fullRunData.finished === true && this.retryOf !== undefined) {
await Container.get(ExecutionRepository).updateExistingExecution(this.retryOf, {
retrySuccessId: this.executionId,
});
}
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id as string,
executionData: fullExecutionData,
});
if (!isManualMode) {
executeErrorWorkflow(
@@ -707,18 +653,40 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
*
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const internalHooks = Container.get(InternalHooks);
const eventsService = Container.get(EventsService);
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void internalHooks.onNodeBeforeExecute(this.executionId, this.workflowData, nodeName);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void internalHooks.onNodePostExecute(this.executionId, this.workflowData, nodeName);
},
],
workflowExecuteBefore: [
async function (workflow: Workflow, data: IRunExecutionData): Promise<void> {
void internalHooks.onWorkflowBeforeExecute(this.executionId, this.workflowData);
},
],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
Logger.debug('Executing hook (hookFunctionsSaveWorker)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
try {
// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}
if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
@@ -735,16 +703,9 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
}
}
const workflowHasCrashed = fullRunData.status === 'crashed';
const workflowWasCanceled = fullRunData.status === 'canceled';
const workflowDidSucceed =
!fullRunData.data.resultData.error && !workflowHasCrashed && !workflowWasCanceled;
let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed';
if (workflowHasCrashed) workflowStatusFinal = 'crashed';
if (workflowWasCanceled) workflowStatusFinal = 'canceled';
if (fullRunData.waitTill) workflowStatusFinal = 'waiting';
const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
if (!workflowDidSucceed) {
if (workflowStatusFinal !== 'success') {
executeErrorWorkflow(
this.workflowData,
fullRunData,
@@ -754,54 +715,20 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
);
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
waitTill: fullRunData.data.waitTill,
status: workflowStatusFinal,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
const workflowId = this.workflowData.id;
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}
await Container.get(ExecutionRepository).updateExistingExecution(
this.executionId,
fullExecutionData,
);
// For reasons(tm) the execution status is not updated correctly in the first update, so has to be written again (tbd)
await Container.get(ExecutionRepository).updateExistingExecution(this.executionId, {
status: fullExecutionData.status,
workflowStatusFinal,
retryOf: this.retryOf,
});
try {
if (fullRunData.data.resultData.metadata) {
await Container.get(ExecutionMetadataService).save(
this.executionId,
fullRunData.data.resultData.metadata,
);
}
} catch (e) {
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
}
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
await Container.get(ExecutionRepository).updateExistingExecution(this.retryOf, {
retrySuccessId: this.executionId,
});
}
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id as string,
executionData: fullExecutionData,
});
} catch (error) {
executeErrorWorkflow(
this.workflowData,
@@ -814,6 +741,14 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData);
}
},
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
// send tracking and event log events, but don't wait for them
void internalHooks.onWorkflowPostExecute(this.executionId, this.workflowData, fullRunData);
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
@@ -1216,14 +1151,19 @@ export function getWorkflowHooksWorkerMain(
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPush();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
const hookFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
// TODO: why are workers pushing to frontend?
// TODO: simplifying this for now to just leave the bare minimum hooks
// const hookFunctions = hookFunctionsPush();
// const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
// for (const key of Object.keys(preExecuteFunctions)) {
// if (hookFunctions[key] === undefined) {
// hookFunctions[key] = [];
// }
// hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
// }
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter