mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-20 11:22:15 +00:00
refactor(core): Reduce memory usage in the Webhook node (#4640)
use file streaming to pass webhook binaries around
This commit is contained in:
committed by
GitHub
parent
602b1e56d6
commit
07e4743a3e
@@ -1,8 +1,9 @@
|
||||
import { promises as fs } from 'fs';
|
||||
import fs from 'fs/promises';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
import path from 'path';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
|
||||
import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
|
||||
|
||||
const PREFIX_METAFILE = 'binarymeta';
|
||||
const PREFIX_PERSISTED_METAFILE = 'persistedmeta';
|
||||
@@ -43,17 +44,47 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
.then(() => {});
|
||||
}
|
||||
|
||||
async getFileSize(identifier: string): Promise<number> {
|
||||
const stats = await fs.stat(this.getBinaryPath(identifier));
|
||||
return stats.size;
|
||||
}
|
||||
|
||||
async copyBinaryFile(filePath: string, executionId: string): Promise<string> {
|
||||
const binaryDataId = this.generateFileName(executionId);
|
||||
await this.addBinaryIdToPersistMeta(executionId, binaryDataId);
|
||||
await this.copyFileToLocalStorage(filePath, binaryDataId);
|
||||
return binaryDataId;
|
||||
}
|
||||
|
||||
async storeBinaryMetadata(identifier: string, metadata: BinaryMetadata) {
|
||||
await fs.writeFile(this.getMetadataPath(identifier), JSON.stringify(metadata), {
|
||||
encoding: 'utf-8',
|
||||
});
|
||||
}
|
||||
|
||||
async getBinaryMetadata(identifier: string): Promise<BinaryMetadata> {
|
||||
return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' }));
|
||||
}
|
||||
|
||||
async storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise<string> {
|
||||
const binaryDataId = this.generateFileName(executionId);
|
||||
return this.addBinaryIdToPersistMeta(executionId, binaryDataId).then(async () =>
|
||||
this.saveToLocalStorage(binaryBuffer, binaryDataId).then(() => binaryDataId),
|
||||
);
|
||||
await this.addBinaryIdToPersistMeta(executionId, binaryDataId);
|
||||
await this.saveToLocalStorage(binaryBuffer, binaryDataId);
|
||||
return binaryDataId;
|
||||
}
|
||||
|
||||
async retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer> {
|
||||
return this.retrieveFromLocalStorage(identifier);
|
||||
}
|
||||
|
||||
getBinaryPath(identifier: string): string {
|
||||
return path.join(this.storagePath, identifier);
|
||||
}
|
||||
|
||||
getMetadataPath(identifier: string): string {
|
||||
return path.join(this.storagePath, `${identifier}.metadata`);
|
||||
}
|
||||
|
||||
async markDataForDeletionByExecutionId(executionId: string): Promise<void> {
|
||||
const tt = new Date(new Date().getTime() + this.binaryDataTTL * 60000);
|
||||
return fs.writeFile(
|
||||
@@ -180,7 +211,7 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
}
|
||||
|
||||
private generateFileName(prefix: string): string {
|
||||
return `${prefix}_${uuid()}`;
|
||||
return [prefix, uuid()].join('');
|
||||
}
|
||||
|
||||
private getBinaryDataMetaPath() {
|
||||
@@ -196,15 +227,19 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
}
|
||||
|
||||
private async deleteFromLocalStorage(identifier: string) {
|
||||
return fs.rm(path.join(this.storagePath, identifier));
|
||||
return fs.rm(this.getBinaryPath(identifier));
|
||||
}
|
||||
|
||||
private async copyFileToLocalStorage(source: string, identifier: string): Promise<void> {
|
||||
await fs.cp(source, this.getBinaryPath(identifier));
|
||||
}
|
||||
|
||||
private async saveToLocalStorage(data: Buffer, identifier: string) {
|
||||
await fs.writeFile(path.join(this.storagePath, identifier), data);
|
||||
await fs.writeFile(this.getBinaryPath(identifier), data);
|
||||
}
|
||||
|
||||
private async retrieveFromLocalStorage(identifier: string): Promise<Buffer> {
|
||||
const filePath = path.join(this.storagePath, identifier);
|
||||
const filePath = this.getBinaryPath(identifier);
|
||||
try {
|
||||
return await fs.readFile(filePath);
|
||||
} catch (e) {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import { IBinaryData, INodeExecutionData } from 'n8n-workflow';
|
||||
import prettyBytes from 'pretty-bytes';
|
||||
import type { IBinaryData, INodeExecutionData } from 'n8n-workflow';
|
||||
import { BINARY_ENCODING } from '../Constants';
|
||||
import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
|
||||
import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
|
||||
import { BinaryDataFileSystem } from './FileSystem';
|
||||
import { readFile, stat } from 'fs/promises';
|
||||
|
||||
export class BinaryDataManager {
|
||||
static instance: BinaryDataManager | undefined;
|
||||
@@ -43,31 +45,59 @@ export class BinaryDataManager {
|
||||
return BinaryDataManager.instance;
|
||||
}
|
||||
|
||||
async copyBinaryFile(
|
||||
binaryData: IBinaryData,
|
||||
filePath: string,
|
||||
executionId: string,
|
||||
): Promise<IBinaryData> {
|
||||
// If a manager handles this binary, copy over the binary file and return its reference id.
|
||||
const manager = this.managers[this.binaryDataMode];
|
||||
if (manager) {
|
||||
const identifier = await manager.copyBinaryFile(filePath, executionId);
|
||||
// Add data manager reference id.
|
||||
binaryData.id = this.generateBinaryId(identifier);
|
||||
|
||||
// Prevent preserving data in memory if handled by a data manager.
|
||||
binaryData.data = this.binaryDataMode;
|
||||
|
||||
const fileSize = await manager.getFileSize(identifier);
|
||||
binaryData.fileSize = prettyBytes(fileSize);
|
||||
|
||||
await manager.storeBinaryMetadata(identifier, {
|
||||
fileName: binaryData.fileName,
|
||||
mimeType: binaryData.mimeType,
|
||||
fileSize,
|
||||
});
|
||||
} else {
|
||||
const { size } = await stat(filePath);
|
||||
binaryData.fileSize = prettyBytes(size);
|
||||
binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING });
|
||||
}
|
||||
|
||||
return binaryData;
|
||||
}
|
||||
|
||||
async storeBinaryData(
|
||||
binaryData: IBinaryData,
|
||||
binaryBuffer: Buffer,
|
||||
executionId: string,
|
||||
): Promise<IBinaryData> {
|
||||
const retBinaryData = binaryData;
|
||||
binaryData.fileSize = prettyBytes(binaryBuffer.length);
|
||||
|
||||
// If a manager handles this binary, return the binary data with it's reference id.
|
||||
if (this.managers[this.binaryDataMode]) {
|
||||
return this.managers[this.binaryDataMode]
|
||||
.storeBinaryData(binaryBuffer, executionId)
|
||||
.then((filename) => {
|
||||
// Add data manager reference id.
|
||||
retBinaryData.id = this.generateBinaryId(filename);
|
||||
// If a manager handles this binary, return the binary data with its reference id.
|
||||
const manager = this.managers[this.binaryDataMode];
|
||||
if (manager) {
|
||||
const identifier = await manager.storeBinaryData(binaryBuffer, executionId);
|
||||
// Add data manager reference id.
|
||||
binaryData.id = this.generateBinaryId(identifier);
|
||||
|
||||
// Prevent preserving data in memory if handled by a data manager.
|
||||
retBinaryData.data = this.binaryDataMode;
|
||||
|
||||
// Short-circuit return to prevent further actions.
|
||||
return retBinaryData;
|
||||
});
|
||||
// Prevent preserving data in memory if handled by a data manager.
|
||||
binaryData.data = this.binaryDataMode;
|
||||
} else {
|
||||
// Else fallback to storing this data in memory.
|
||||
binaryData.data = binaryBuffer.toString(BINARY_ENCODING);
|
||||
}
|
||||
|
||||
// Else fallback to storing this data in memory.
|
||||
retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING);
|
||||
return binaryData;
|
||||
}
|
||||
|
||||
@@ -88,6 +118,24 @@ export class BinaryDataManager {
|
||||
throw new Error('Storage mode used to store binary data not available');
|
||||
}
|
||||
|
||||
getBinaryPath(identifier: string): string {
|
||||
const { mode, id } = this.splitBinaryModeFileId(identifier);
|
||||
if (this.managers[mode]) {
|
||||
return this.managers[mode].getBinaryPath(id);
|
||||
}
|
||||
|
||||
throw new Error('Storage mode used to store binary data not available');
|
||||
}
|
||||
|
||||
async getBinaryMetadata(identifier: string): Promise<BinaryMetadata> {
|
||||
const { mode, id } = this.splitBinaryModeFileId(identifier);
|
||||
if (this.managers[mode]) {
|
||||
return this.managers[mode].getBinaryMetadata(id);
|
||||
}
|
||||
|
||||
throw new Error('Storage mode used to store binary data not available');
|
||||
}
|
||||
|
||||
async markDataForDeletionByExecutionId(executionId: string): Promise<void> {
|
||||
if (this.managers[this.binaryDataMode]) {
|
||||
return this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(executionId);
|
||||
|
||||
@@ -260,6 +260,7 @@ export interface IWebhookFunctions extends IWebhookFunctionsBase {
|
||||
filePath?: string,
|
||||
mimeType?: string,
|
||||
): Promise<IBinaryData>;
|
||||
copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>;
|
||||
request: (uriOrObject: string | IDataObject | any, options?: IDataObject) => Promise<any>;
|
||||
requestWithAuthentication(
|
||||
this: IAllExecuteFunctions,
|
||||
@@ -306,10 +307,21 @@ export interface IBinaryDataConfig {
|
||||
persistedBinaryDataTTL: number;
|
||||
}
|
||||
|
||||
export interface BinaryMetadata {
|
||||
fileName?: string;
|
||||
mimeType?: string;
|
||||
fileSize: number;
|
||||
}
|
||||
|
||||
export interface IBinaryDataManager {
|
||||
init(startPurger: boolean): Promise<void>;
|
||||
getFileSize(filePath: string): Promise<number>;
|
||||
copyBinaryFile(filePath: string, executionId: string): Promise<string>;
|
||||
storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise<void>;
|
||||
getBinaryMetadata(identifier: string): Promise<BinaryMetadata>;
|
||||
storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise<string>;
|
||||
retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer>;
|
||||
getBinaryPath(identifier: string): string;
|
||||
markDataForDeletionByExecutionId(executionId: string): Promise<void>;
|
||||
deleteMarkedFiles(): Promise<unknown>;
|
||||
deleteBinaryDataByIdentifier(identifier: string): Promise<void>;
|
||||
|
||||
@@ -64,6 +64,7 @@ import {
|
||||
NodeExecutionWithMetadata,
|
||||
IPairedItemData,
|
||||
deepCopy,
|
||||
BinaryFileType,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { Agent } from 'https';
|
||||
@@ -77,8 +78,8 @@ import FormData from 'form-data';
|
||||
import path from 'path';
|
||||
import { OptionsWithUri, OptionsWithUrl, RequestCallback, RequiredUriUrl } from 'request';
|
||||
import requestPromise, { RequestPromiseOptions } from 'request-promise-native';
|
||||
import { fromBuffer } from 'file-type';
|
||||
import { lookup } from 'mime-types';
|
||||
import FileType from 'file-type';
|
||||
import { lookup, extension } from 'mime-types';
|
||||
import { IncomingHttpHeaders } from 'http';
|
||||
import axios, {
|
||||
AxiosError,
|
||||
@@ -830,6 +831,13 @@ export async function getBinaryDataBuffer(
|
||||
return BinaryDataManager.getInstance().retrieveBinaryData(binaryData);
|
||||
}
|
||||
|
||||
function fileTypeFromMimeType(mimeType: string): BinaryFileType | undefined {
|
||||
if (mimeType.startsWith('image/')) return 'image';
|
||||
if (mimeType.startsWith('video/')) return 'video';
|
||||
if (mimeType.startsWith('text/') || mimeType.startsWith('application/json')) return 'text';
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store an incoming IBinaryData & related buffer using the configured binary data manager.
|
||||
*
|
||||
@@ -846,10 +854,60 @@ export async function setBinaryDataBuffer(
|
||||
return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId);
|
||||
}
|
||||
|
||||
export async function copyBinaryFile(
|
||||
executionId: string,
|
||||
filePath: string,
|
||||
fileName: string,
|
||||
mimeType?: string,
|
||||
): Promise<IBinaryData> {
|
||||
let fileExtension: string | undefined;
|
||||
if (!mimeType) {
|
||||
// If no mime type is given figure it out
|
||||
|
||||
if (filePath) {
|
||||
// Use file path to guess mime type
|
||||
const mimeTypeLookup = lookup(filePath);
|
||||
if (mimeTypeLookup) {
|
||||
mimeType = mimeTypeLookup;
|
||||
}
|
||||
}
|
||||
|
||||
if (!mimeType) {
|
||||
// read the first bytes of the file to guess mime type
|
||||
const fileTypeData = await FileType.fromFile(filePath);
|
||||
if (fileTypeData) {
|
||||
mimeType = fileTypeData.mime;
|
||||
fileExtension = fileTypeData.ext;
|
||||
}
|
||||
}
|
||||
|
||||
if (!mimeType) {
|
||||
// Fall back to text
|
||||
mimeType = 'text/plain';
|
||||
}
|
||||
} else if (!fileExtension) {
|
||||
fileExtension = extension(mimeType) || undefined;
|
||||
}
|
||||
|
||||
const returnData: IBinaryData = {
|
||||
mimeType,
|
||||
fileType: fileTypeFromMimeType(mimeType),
|
||||
fileExtension,
|
||||
data: '',
|
||||
};
|
||||
|
||||
if (fileName) {
|
||||
returnData.fileName = fileName;
|
||||
} else if (filePath) {
|
||||
returnData.fileName = path.parse(filePath).base;
|
||||
}
|
||||
|
||||
return BinaryDataManager.getInstance().copyBinaryFile(returnData, filePath, executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes a buffer and converts it into the format n8n uses. It encodes the binary data as
|
||||
* base64 and adds metadata.
|
||||
*
|
||||
*/
|
||||
export async function prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
@@ -871,7 +929,7 @@ export async function prepareBinaryData(
|
||||
|
||||
if (!mimeType) {
|
||||
// Use buffer to guess mime type
|
||||
const fileTypeData = await fromBuffer(binaryData);
|
||||
const fileTypeData = await FileType.fromBuffer(binaryData);
|
||||
if (fileTypeData) {
|
||||
mimeType = fileTypeData.mime;
|
||||
fileExtension = fileTypeData.ext;
|
||||
@@ -882,10 +940,13 @@ export async function prepareBinaryData(
|
||||
// Fall back to text
|
||||
mimeType = 'text/plain';
|
||||
}
|
||||
} else if (!fileExtension) {
|
||||
fileExtension = extension(mimeType) || undefined;
|
||||
}
|
||||
|
||||
const returnData: IBinaryData = {
|
||||
mimeType,
|
||||
fileType: fileTypeFromMimeType(mimeType),
|
||||
fileExtension,
|
||||
data: '',
|
||||
};
|
||||
@@ -3076,6 +3137,19 @@ export function getExecuteWebhookFunctions(
|
||||
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
|
||||
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
|
||||
},
|
||||
async copyBinaryFile(
|
||||
filePath: string,
|
||||
fileName: string,
|
||||
mimeType?: string,
|
||||
): Promise<IBinaryData> {
|
||||
return copyBinaryFile.call(
|
||||
this,
|
||||
additionalData.executionId!,
|
||||
filePath,
|
||||
fileName,
|
||||
mimeType,
|
||||
);
|
||||
},
|
||||
async prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
filePath?: string,
|
||||
|
||||
Reference in New Issue
Block a user