Introduce binary data management (#2059)

* introduce binary data management

* merge fixes

* fixes

* init binary data manager for other modes

* improve binary manager

* improve binary manager

* delete binary data on executions delete

* lazy delete non-saved executions binary data

* merge fixes + error handing

* improve structure

* leftovers and cleanups

* formatting

* fix config description

* fixes

* fix races

* duplicate binary data for execute workflow node

* clean up and cr

* update mode name, add binary mode to diagnostics

* update mode name, add prefix to filename

* update filename

* allow multiple modes, backward compatibility

* improve file and id naming

* use execution id for binary data storage

* delete binary data by execution id

* add meta for persisted binary data

* delete marked persisted files

* mark deletion by executionid

* add env var for persisted binary data ttl

* improvements

* lint fix

* fix env var description

* cleanup

* cleanup

*  Minor improvements

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Ahsan Virani
2021-12-23 22:29:04 +01:00
committed by GitHub
parent 416e15cdb6
commit 1e42effc3a
22 changed files with 743 additions and 40 deletions

View File

@@ -6,7 +6,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import * as localtunnel from 'localtunnel';
import { TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core';
import { BinaryDataManager, IBinaryDataConfig, TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core';
import { Command, flags } from '@oclif/command';
// eslint-disable-next-line import/no-extraneous-dependencies
import * as Redis from 'ioredis';
@@ -305,6 +305,9 @@ export class Start extends Command {
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli);
const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig;
await BinaryDataManager.init(binaryDataConfig, true);
await Server.start();
// Start to get active workflows and run their triggers

View File

@@ -3,7 +3,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/unbound-method */
import { UserSettings } from 'n8n-core';
import { BinaryDataManager, IBinaryDataConfig, UserSettings } from 'n8n-core';
import { Command, flags } from '@oclif/command';
// eslint-disable-next-line import/no-extraneous-dependencies
import * as Redis from 'ioredis';
@@ -152,6 +152,9 @@ export class Webhook extends Command {
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli);
const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig;
await BinaryDataManager.init(binaryDataConfig);
if (config.get('executions.mode') === 'queue') {
const redisHost = config.get('queue.bull.redis.host');
const redisPassword = config.get('queue.bull.redis.password');

View File

@@ -10,7 +10,7 @@
import * as PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command';
import { UserSettings, WorkflowExecute } from 'n8n-core';
import { BinaryDataManager, IBinaryDataConfig, UserSettings, WorkflowExecute } from 'n8n-core';
import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow';
@@ -274,6 +274,9 @@ export class Worker extends Command {
const versions = await GenericHelpers.getVersions();
const instanceId = await UserSettings.getInstanceId();
const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig;
await BinaryDataManager.init(binaryDataConfig);
InternalHooksManager.init(instanceId, versions.cli);
console.info('\nn8n worker is now ready');

View File

@@ -650,6 +650,39 @@ const config = convict({
},
},
binaryDataManager: {
availableModes: {
format: String,
default: 'filesystem',
env: 'N8N_AVAILABLE_BINARY_DATA_MODES',
doc: 'Available modes of binary data storage, as comma separated strings',
},
mode: {
format: String,
default: 'default',
env: 'N8N_DEFAULT_BINARY_DATA_MODE',
doc: 'Storage mode for binary data, default | filesystem',
},
localStoragePath: {
format: String,
default: path.join(core.UserSettings.getUserN8nFolderPath(), 'binaryData'),
env: 'N8N_BINARY_DATA_STORAGE_PATH',
doc: 'Path for binary data storage in "filesystem" mode',
},
binaryDataTTL: {
format: Number,
default: 60,
env: 'N8N_BINARY_DATA_TTL',
doc: 'TTL for binary data of unsaved executions in minutes',
},
persistedBinaryDataTTL: {
format: Number,
default: 1440,
env: 'N8N_PERSISTED_BINARY_DATA_TTL',
doc: 'TTL for persisted binary data in minutes (binary data gets deleted if not persisted before TTL expires)',
},
},
deployment: {
type: {
format: String,

View File

@@ -310,6 +310,7 @@ export interface IDiagnosticInfo {
[key: string]: string | number | undefined;
};
deploymentType: string;
binaryDataMode: string;
}
export interface IInternalHooksClass {
@@ -322,7 +323,11 @@ export interface IInternalHooksClass {
onWorkflowCreated(workflow: IWorkflowBase): Promise<void>;
onWorkflowDeleted(workflowId: string): Promise<void>;
onWorkflowSaved(workflow: IWorkflowBase): Promise<void>;
onWorkflowPostExecute(workflow: IWorkflowBase, runData?: IRun): Promise<void>;
onWorkflowPostExecute(
executionId: string,
workflow: IWorkflowBase,
runData?: IRun,
): Promise<void>;
}
export interface IN8nConfig {

View File

@@ -1,4 +1,5 @@
/* eslint-disable import/no-cycle */
import { BinaryDataManager } from 'n8n-core';
import { IDataObject, IRun, TelemetryHelpers } from 'n8n-workflow';
import {
IDiagnosticInfo,
@@ -28,6 +29,7 @@ export class InternalHooksClass implements IInternalHooksClass {
system_info: diagnosticInfo.systemInfo,
execution_variables: diagnosticInfo.executionVariables,
n8n_deployment_type: diagnosticInfo.deploymentType,
n8n_binary_data_mode: diagnosticInfo.binaryDataMode,
};
return Promise.all([
@@ -76,7 +78,11 @@ export class InternalHooksClass implements IInternalHooksClass {
});
}
async onWorkflowPostExecute(workflow: IWorkflowBase, runData?: IRun): Promise<void> {
async onWorkflowPostExecute(
executionId: string,
workflow: IWorkflowBase,
runData?: IRun,
): Promise<void> {
const properties: IDataObject = {
workflow_id: workflow.id,
is_manual: false,
@@ -120,7 +126,10 @@ export class InternalHooksClass implements IInternalHooksClass {
}
}
return this.telemetry.trackWorkflowExecution(properties);
return Promise.all([
BinaryDataManager.getInstance().persistBinaryDataForExecutionId(executionId),
this.telemetry.trackWorkflowExecution(properties),
]).then(() => {});
}
async onN8nStop(): Promise<void> {

View File

@@ -49,7 +49,9 @@ import { compare } from 'bcryptjs';
import * as promClient from 'prom-client';
import {
BinaryDataManager,
Credentials,
IBinaryDataConfig,
ICredentialTestFunctions,
LoadNodeParameterOptions,
NodeExecuteFunctions,
@@ -2449,12 +2451,27 @@ class App {
const filters = {
startedAt: LessThanOrEqual(deleteData.deleteBefore),
};
if (deleteData.filters !== undefined) {
Object.assign(filters, deleteData.filters);
}
const execs = await Db.collections.Execution!.find({ ...filters, select: ['id'] });
await Promise.all(
execs.map(async (item) =>
BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(item.id.toString()),
),
);
await Db.collections.Execution!.delete(filters);
} else if (deleteData.ids !== undefined) {
await Promise.all(
deleteData.ids.map(async (id) =>
BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(id),
),
);
// Deletes all executions with the given ids
await Db.collections.Execution!.delete(deleteData.ids);
} else {
@@ -2650,6 +2667,23 @@ class App {
}),
);
// ----------------------------------------
// Binary data
// ----------------------------------------
// Returns binary buffer
this.app.get(
`/${this.restEndpoint}/data/:path`,
ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string> => {
const dataPath = req.params.path;
return BinaryDataManager.getInstance()
.retrieveBinaryDataByIdentifier(dataPath)
.then((buffer: Buffer) => {
return buffer.toString('base64');
});
}),
);
// ----------------------------------------
// Settings
// ----------------------------------------
@@ -2917,6 +2951,7 @@ export async function start(): Promise<void> {
await app.externalHooks.run('n8n.ready', [app]);
const cpus = os.cpus();
const binarDataConfig = config.get('binaryDataManager') as IBinaryDataConfig;
const diagnosticInfo: IDiagnosticInfo = {
basicAuthActive: config.get('security.basicAuth.active') as boolean,
databaseType: (await GenericHelpers.getConfigValue('database.type')) as DatabaseType,
@@ -2950,6 +2985,7 @@ export async function start(): Promise<void> {
executions_data_prune_timeout: config.get('executions.pruneDataTimeout'),
},
deploymentType: config.get('deployment.type'),
binaryDataMode: binarDataConfig.mode,
};
void Db.collections

View File

@@ -16,7 +16,7 @@ import * as express from 'express';
// eslint-disable-next-line import/no-extraneous-dependencies
import { get } from 'lodash';
import { BINARY_ENCODING, NodeExecuteFunctions } from 'n8n-core';
import { BINARY_ENCODING, BinaryDataManager, NodeExecuteFunctions } from 'n8n-core';
import {
createDeferredPromise,
@@ -37,6 +37,7 @@ import {
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import {
GenericHelpers,
@@ -447,7 +448,7 @@ export async function executeWebhook(
IExecutionDb | undefined
>;
executePromise
.then((data) => {
.then(async (data) => {
if (data === undefined) {
if (!didSendResponse) {
responseCallback(null, {
@@ -611,7 +612,10 @@ export async function executeWebhook(
if (!didSendResponse) {
// Send the webhook response manually
res.setHeader('Content-Type', binaryData.mimeType);
res.end(Buffer.from(binaryData.data, BINARY_ENCODING));
const binaryDataBuffer = await BinaryDataManager.getInstance().retrieveBinaryData(
binaryData,
);
res.end(binaryDataBuffer);
responseCallback(null, {
noWebhookResponse: true,

View File

@@ -14,7 +14,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable func-names */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { UserSettings, WorkflowExecute } from 'n8n-core';
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import {
IDataObject,
@@ -481,8 +481,11 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await Db.collections.Execution!.delete(this.executionId);
await BinaryDataManager.getInstance().markDataForDeletionByExecutionId(
this.executionId,
);
return;
}
@@ -515,6 +518,10 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}
// Data is always saved, so we remove from database
await Db.collections.Execution!.delete(this.executionId);
await BinaryDataManager.getInstance().markDataForDeletionByExecutionId(
this.executionId,
);
return;
}
}
@@ -836,6 +843,8 @@ export async function executeWorkflow(
workflowData,
{ parentProcessMode: additionalData.hooks!.mode },
);
additionalDataIntegrated.executionId = executionId;
// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
// and get executionID from `activeExecutions` running on main process
@@ -910,7 +919,7 @@ export async function executeWorkflow(
}
await externalHooks.run('workflow.postExecute', [data, workflowData]);
void InternalHooksManager.getInstance().onWorkflowPostExecute(workflowData, data);
void InternalHooksManager.getInstance().onWorkflowPostExecute(executionId, workflowData, data);
if (data.finished === true) {
// Workflow did finish successfully

View File

@@ -11,7 +11,7 @@
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
/* eslint-disable import/no-cycle */
/* eslint-disable @typescript-eslint/no-unused-vars */
import { IProcessMessage, WorkflowExecute } from 'n8n-core';
import { BinaryDataManager, IProcessMessage, WorkflowExecute } from 'n8n-core';
import {
ExecutionError,
@@ -174,6 +174,7 @@ export class WorkflowRunner {
postExecutePromise
.then(async (executionData) => {
void InternalHooksManager.getInstance().onWorkflowPostExecute(
executionId!,
data.workflowData,
executionData,
);
@@ -539,6 +540,7 @@ export class WorkflowRunner {
(!workflowDidSucceed && saveDataErrorExecution === 'none')
) {
await Db.collections.Execution!.delete(executionId);
await BinaryDataManager.getInstance().markDataForDeletionByExecutionId(executionId);
}
// eslint-disable-next-line id-denylist
} catch (err) {

View File

@@ -5,7 +5,13 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-use-before-define */
/* eslint-disable @typescript-eslint/unbound-method */
import { IProcessMessage, UserSettings, WorkflowExecute } from 'n8n-core';
import {
BinaryDataManager,
IBinaryDataConfig,
IProcessMessage,
UserSettings,
WorkflowExecute,
} from 'n8n-core';
import {
ExecutionError,
@@ -141,6 +147,9 @@ export class WorkflowRunnerProcess {
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli);
const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig;
await BinaryDataManager.init(binaryDataConfig);
// Credentials should now be loaded from database.
// We check if any node uses credentials. If it does, then
// init database.
@@ -260,7 +269,11 @@ export class WorkflowRunnerProcess {
const { workflow } = executeWorkflowFunctionOutput;
result = await workflowExecute.processRunExecutionData(workflow);
await externalHooks.run('workflow.postExecute', [result, workflowData]);
void InternalHooksManager.getInstance().onWorkflowPostExecute(workflowData, result);
void InternalHooksManager.getInstance().onWorkflowPostExecute(
executionId,
workflowData,
result,
);
await sendToParentProcess('finishExecution', { executionId, result });
delete this.childExecutions[executionId];
} catch (e) {