feat(core): Integrate object store as binary data manager (#7253)

Depends on: #7225 | Story:
[PAY-848](https://linear.app/n8n/issue/PAY-848)

This PR integrates the object store service as a new binary data manager
for Enterprise.
This commit is contained in:
Iván Ovejero
2023-10-05 15:25:17 +02:00
committed by GitHub
parent e5ad1e7e4d
commit 1a661e6d00
28 changed files with 1130 additions and 500 deletions

View File

@@ -1,13 +1,12 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { readFile, stat } from 'fs/promises';
import { readFile, stat } from 'node:fs/promises';
import prettyBytes from 'pretty-bytes';
import { Service } from 'typedi';
import Container, { Service } from 'typedi';
import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow';
import { UnknownBinaryDataManager, InvalidBinaryDataMode } from './errors';
import { LogCatch } from '../decorators/LogCatch.decorator';
import { UnknownBinaryDataManagerError, InvalidBinaryDataModeError } from './errors';
import { areValidModes, toBuffer } from './utils';
import { LogCatch } from '../decorators/LogCatch.decorator';
import type { Readable } from 'stream';
import type { BinaryData } from './types';
@@ -20,16 +19,28 @@ export class BinaryDataService {
private managers: Record<string, BinaryData.Manager> = {};
async init(config: BinaryData.Config) {
if (!areValidModes(config.availableModes)) throw new InvalidBinaryDataMode();
if (!areValidModes(config.availableModes)) {
throw new InvalidBinaryDataModeError();
}
this.mode = config.mode;
if (config.availableModes.includes('filesystem')) {
const { FileSystemManager } = await import('./FileSystem.manager');
this.managers.filesystem = new FileSystemManager(config.localStoragePath);
await this.managers.filesystem.init();
}
if (config.availableModes.includes('s3')) {
const { ObjectStoreManager } = await import('./ObjectStore.manager');
const { ObjectStoreService } = await import('../ObjectStore/ObjectStore.service.ee');
this.managers.s3 = new ObjectStoreManager(Container.get(ObjectStoreService));
await this.managers.s3.init();
}
}
@LogCatch((error) => Logger.error('Failed to copy binary data file', { error }))
@@ -242,6 +253,6 @@ export class BinaryDataService {
if (manager) return manager;
throw new UnknownBinaryDataManager(mode);
throw new UnknownBinaryDataManagerError(mode);
}
}

View File

@@ -1,18 +1,10 @@
/**
* @tech_debt The `workflowId` arguments on write are for compatibility with the
* `BinaryData.Manager` interface. Unused in filesystem mode until we refactor
* how we store binary data files in the `/binaryData` dir.
*/
import { createReadStream } from 'fs';
import fs from 'fs/promises';
import path from 'path';
import { createReadStream } from 'node:fs';
import fs from 'node:fs/promises';
import path from 'node:path';
import { v4 as uuid } from 'uuid';
import { jsonParse } from 'n8n-workflow';
import { rename } from 'node:fs/promises';
import { FileNotFoundError } from '../errors';
import { ensureDirExists } from './utils';
import { FileNotFoundError } from '../errors';
import type { Readable } from 'stream';
import type { BinaryData } from './types';
@@ -27,18 +19,36 @@ export class FileSystemManager implements BinaryData.Manager {
await ensureDirExists(this.storagePath);
}
async store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const fileId = this.toFileId(workflowId, executionId);
const filePath = this.resolvePath(fileId);
await fs.writeFile(filePath, bufferOrStream);
const fileSize = await this.getSize(fileId);
await this.storeMetadata(fileId, { mimeType, fileName, fileSize });
return { fileId, fileSize };
}
getPath(fileId: string) {
return this.resolvePath(fileId);
}
async getAsStream(fileId: string, chunkSize?: number) {
const filePath = this.getPath(fileId);
const filePath = this.resolvePath(fileId);
return createReadStream(filePath, { highWaterMark: chunkSize });
}
async getAsBuffer(fileId: string) {
const filePath = this.getPath(fileId);
const filePath = this.resolvePath(fileId);
try {
return await fs.readFile(filePath);
@@ -53,30 +63,6 @@ export class FileSystemManager implements BinaryData.Manager {
return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' }));
}
async store(
_workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const fileId = this.toFileId(executionId);
const filePath = this.getPath(fileId);
await fs.writeFile(filePath, bufferOrStream);
const fileSize = await this.getSize(fileId);
await this.storeMetadata(fileId, { mimeType, fileName, fileSize });
return { fileId, fileSize };
}
async deleteOne(fileId: string) {
const filePath = this.getPath(fileId);
return fs.rm(filePath);
}
async deleteMany(ids: BinaryData.IdsForDeletion) {
const executionIds = ids.map((o) => o.executionId);
@@ -95,24 +81,25 @@ export class FileSystemManager implements BinaryData.Manager {
}
async copyByFilePath(
_workflowId: string,
workflowId: string,
executionId: string,
filePath: string,
sourcePath: string,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const newFileId = this.toFileId(executionId);
const targetFileId = this.toFileId(workflowId, executionId);
const targetPath = this.resolvePath(targetFileId);
await fs.cp(filePath, this.getPath(newFileId));
await fs.cp(sourcePath, targetPath);
const fileSize = await this.getSize(newFileId);
const fileSize = await this.getSize(targetFileId);
await this.storeMetadata(newFileId, { mimeType, fileName, fileSize });
await this.storeMetadata(targetFileId, { mimeType, fileName, fileSize });
return { fileId: newFileId, fileSize };
return { fileId: targetFileId, fileSize };
}
async copyByFileId(_workflowId: string, executionId: string, sourceFileId: string) {
const targetFileId = this.toFileId(executionId);
async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) {
const targetFileId = this.toFileId(workflowId, executionId);
const sourcePath = this.resolvePath(sourceFileId);
const targetPath = this.resolvePath(targetFileId);
@@ -122,12 +109,12 @@ export class FileSystemManager implements BinaryData.Manager {
}
async rename(oldFileId: string, newFileId: string) {
const oldPath = this.getPath(oldFileId);
const newPath = this.getPath(newFileId);
const oldPath = this.resolvePath(oldFileId);
const newPath = this.resolvePath(newFileId);
await Promise.all([
rename(oldPath, newPath),
rename(`${oldPath}.metadata`, `${newPath}.metadata`),
fs.rename(oldPath, newPath),
fs.rename(`${oldPath}.metadata`, `${newPath}.metadata`),
]);
}
@@ -135,7 +122,12 @@ export class FileSystemManager implements BinaryData.Manager {
// private methods
// ----------------------------------
private toFileId(executionId: string) {
/**
* @tech_debt The `workflowId` argument is for compatibility with the
* `BinaryData.Manager` interface. Unused here until we refactor
* how we store binary data files in the `/binaryData` dir.
*/
private toFileId(_workflowId: string, executionId: string) {
return [executionId, uuid()].join('');
}
@@ -156,7 +148,7 @@ export class FileSystemManager implements BinaryData.Manager {
}
private async getSize(fileId: string) {
const filePath = this.getPath(fileId);
const filePath = this.resolvePath(fileId);
try {
const stats = await fs.stat(filePath);

View File

@@ -0,0 +1,120 @@
import fs from 'node:fs/promises';
import { Service } from 'typedi';
import { v4 as uuid } from 'uuid';
import { toBuffer } from './utils';
import { ObjectStoreService } from '../ObjectStore/ObjectStore.service.ee';
import type { Readable } from 'node:stream';
import type { BinaryData } from './types';
@Service()
export class ObjectStoreManager implements BinaryData.Manager {
constructor(private readonly objectStoreService: ObjectStoreService) {}
async init() {
await this.objectStoreService.checkConnection();
}
async store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
metadata: BinaryData.PreWriteMetadata,
) {
const fileId = this.toFileId(workflowId, executionId);
const buffer = await this.toBuffer(bufferOrStream);
await this.objectStoreService.put(fileId, buffer, metadata);
return { fileId, fileSize: buffer.length };
}
getPath(fileId: string) {
return fileId; // already full path, no transform needed
}
async getAsBuffer(fileId: string) {
return this.objectStoreService.get(fileId, { mode: 'buffer' });
}
async getAsStream(fileId: string) {
return this.objectStoreService.get(fileId, { mode: 'stream' });
}
async getMetadata(fileId: string): Promise<BinaryData.Metadata> {
const {
'content-length': contentLength,
'content-type': contentType,
'x-amz-meta-filename': fileName,
} = await this.objectStoreService.getMetadata(fileId);
const metadata: BinaryData.Metadata = { fileSize: Number(contentLength) };
if (contentType) metadata.mimeType = contentType;
if (fileName) metadata.fileName = fileName;
return metadata;
}
async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) {
const targetFileId = this.toFileId(workflowId, executionId);
const sourceFile = await this.objectStoreService.get(sourceFileId, { mode: 'buffer' });
await this.objectStoreService.put(targetFileId, sourceFile);
return targetFileId;
}
/**
* Copy to object store the temp file written by nodes like Webhook, FTP, and SSH.
*/
async copyByFilePath(
workflowId: string,
executionId: string,
sourcePath: string,
metadata: BinaryData.PreWriteMetadata,
) {
const targetFileId = this.toFileId(workflowId, executionId);
const sourceFile = await fs.readFile(sourcePath);
await this.objectStoreService.put(targetFileId, sourceFile, metadata);
return { fileId: targetFileId, fileSize: sourceFile.length };
}
async deleteMany(ids: BinaryData.IdsForDeletion) {
const prefixes = ids.map(
({ workflowId, executionId }) =>
`workflows/${workflowId}/executions/${executionId}/binary_data/`,
);
await Promise.all(
prefixes.map(async (prefix) => {
await this.objectStoreService.deleteMany(prefix);
}),
);
}
async rename(oldFileId: string, newFileId: string) {
const oldFile = await this.objectStoreService.get(oldFileId, { mode: 'buffer' });
const oldFileMetadata = await this.objectStoreService.getMetadata(oldFileId);
await this.objectStoreService.put(newFileId, oldFile, oldFileMetadata);
await this.objectStoreService.deleteOne(oldFileId);
}
// ----------------------------------
// private methods
// ----------------------------------
private toFileId(workflowId: string, executionId: string) {
if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244
return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`;
}
private async toBuffer(bufferOrStream: Buffer | Readable) {
return toBuffer(bufferOrStream);
}
}

View File

@@ -1,12 +1,10 @@
import { BINARY_DATA_MODES } from './utils';
export class InvalidBinaryDataMode extends Error {
constructor() {
super(`Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`);
}
export class InvalidBinaryDataModeError extends Error {
message = `Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`;
}
export class UnknownBinaryDataManager extends Error {
export class UnknownBinaryDataManagerError extends Error {
constructor(mode: string) {
super(`No binary data manager found for: ${mode}`);
}

View File

@@ -4,8 +4,10 @@ import type { BINARY_DATA_MODES } from './utils';
export namespace BinaryData {
export type Mode = (typeof BINARY_DATA_MODES)[number];
export type NonDefaultMode = Exclude<Mode, 'default'>;
export type Config = {
mode: 'default' | 'filesystem';
mode: Mode;
availableModes: string[];
localStoragePath: string;
};
@@ -37,17 +39,16 @@ export namespace BinaryData {
getAsStream(fileId: string, chunkSize?: number): Promise<Readable>;
getMetadata(fileId: string): Promise<Metadata>;
deleteMany(ids: IdsForDeletion): Promise<void>;
copyByFileId(workflowId: string, executionId: string, sourceFileId: string): Promise<string>;
copyByFilePath(
workflowId: string,
executionId: string,
filePath: string,
sourcePath: string,
metadata: PreWriteMetadata,
): Promise<WriteResult>;
deleteOne(fileId: string): Promise<void>;
deleteMany(ids: IdsForDeletion): Promise<void>;
rename(oldFileId: string, newFileId: string): Promise<void>;
}
}

View File

@@ -4,31 +4,56 @@ import { createHash } from 'node:crypto';
import axios from 'axios';
import { Service } from 'typedi';
import { sign } from 'aws4';
import { isStream, parseXml } from './utils';
import { ExternalStorageRequestFailed } from './errors';
import { isStream, parseXml, writeBlockedMessage } from './utils';
import { LoggerProxy as Logger } from 'n8n-workflow';
import type { AxiosRequestConfig, Method } from 'axios';
import type { AxiosRequestConfig, AxiosResponse, Method } from 'axios';
import type { Request as Aws4Options, Credentials as Aws4Credentials } from 'aws4';
import type { ListPage, ObjectStore, RawListPage } from './types';
import type {
Bucket,
ConfigSchemaCredentials,
ListPage,
RawListPage,
RequestOptions,
} from './types';
import type { Readable } from 'stream';
import type { BinaryData } from '..';
@Service()
export class ObjectStoreService {
private credentials: Aws4Credentials;
private host = '';
private bucket: Bucket = { region: '', name: '' };
private credentials: Aws4Credentials = { accessKeyId: '', secretAccessKey: '' };
private isReady = false;
private isReadOnly = false;
private logger = Logger;
async init(host: string, bucket: Bucket, credentials: ConfigSchemaCredentials) {
this.host = host;
this.bucket.name = bucket.name;
this.bucket.region = bucket.region;
constructor(
private bucket: { region: string; name: string },
credentials: { accountId: string; secretKey: string },
) {
this.credentials = {
accessKeyId: credentials.accountId,
secretAccessKey: credentials.secretKey,
accessKeyId: credentials.accessKey,
secretAccessKey: credentials.accessSecret,
};
await this.checkConnection();
this.setReady(true);
}
get host() {
return `${this.bucket.name}.s3.${this.bucket.region}.amazonaws.com`;
setReadonly(newState: boolean) {
this.isReadOnly = newState;
}
setReady(newState: boolean) {
this.isReady = newState;
}
/**
@@ -37,7 +62,9 @@ export class ObjectStoreService {
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
*/
async checkConnection() {
return this.request('HEAD', this.host);
if (this.isReady) return;
return this.request('HEAD', this.host, this.bucket.name);
}
/**
@@ -46,6 +73,8 @@ export class ObjectStoreService {
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
*/
async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) {
if (this.isReadOnly) return this.blockWrite(filename);
const headers: Record<string, string | number> = {
'Content-Length': buffer.length,
'Content-MD5': createHash('md5').update(buffer).digest('base64'),
@@ -54,7 +83,9 @@ export class ObjectStoreService {
if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName;
if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType;
return this.request('PUT', this.host, `/${filename}`, { headers, body: buffer });
const path = `/${this.bucket.name}/${filename}`;
return this.request('PUT', this.host, path, { headers, body: buffer });
}
/**
@@ -62,9 +93,11 @@ export class ObjectStoreService {
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
*/
async get(path: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
async get(path: string, { mode }: { mode: 'stream' }): Promise<Readable>;
async get(path: string, { mode }: { mode: 'stream' | 'buffer' }) {
async get(fileId: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
async get(fileId: string, { mode }: { mode: 'stream' }): Promise<Readable>;
async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }) {
const path = `${this.bucket.name}/${fileId}`;
const { data } = await this.request('GET', this.host, path, {
responseType: mode === 'buffer' ? 'arraybuffer' : 'stream',
});
@@ -81,27 +114,31 @@ export class ObjectStoreService {
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
*/
async getMetadata(path: string) {
async getMetadata(fileId: string) {
type Response = {
headers: {
'content-length': string;
'content-type'?: string;
'x-amz-meta-filename'?: string;
} & Record<string, string | number>;
} & BinaryData.PreWriteMetadata;
};
const path = `${this.bucket.name}/${fileId}`;
const response: Response = await this.request('HEAD', this.host, path);
return response.headers;
}
/**
* Delete an object in the configured bucket.
* Delete a single object in the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
*/
async deleteOne(path: string) {
return this.request('DELETE', this.host, `/${encodeURIComponent(path)}`);
async deleteOne(fileId: string) {
const path = `${this.bucket.name}/${fileId}`;
return this.request('DELETE', this.host, path);
}
/**
@@ -122,13 +159,13 @@ export class ObjectStoreService {
'Content-MD5': createHash('md5').update(body).digest('base64'),
};
return this.request('POST', this.host, '/?delete', { headers, body });
const path = `${this.bucket.name}/?delete`;
return this.request('POST', this.host, path, { headers, body });
}
/**
* List objects with a common prefix in the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
*/
async list(prefix: string) {
const items = [];
@@ -149,16 +186,18 @@ export class ObjectStoreService {
}
/**
* Fetch a page of objects with a common prefix in the configured bucket. Max 1000 per page.
* Fetch a page of objects with a common prefix in the configured bucket.
*
* Max 1000 objects per page - set by AWS.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
*/
async getListPage(prefix: string, nextPageToken?: string) {
const bucketlessHost = this.host.split('.').slice(1).join('.');
const qs: Record<string, string | number> = { 'list-type': 2, prefix };
if (nextPageToken) qs['continuation-token'] = nextPageToken;
const { data } = await this.request('GET', bucketlessHost, `/${this.bucket.name}`, { qs });
const { data } = await this.request('GET', this.host, this.bucket.name, { qs });
if (typeof data !== 'string') {
throw new TypeError(`Expected XML string but received ${typeof data}`);
@@ -193,11 +232,19 @@ export class ObjectStoreService {
return path.concat(`?${qsParams}`);
}
private async request<T = unknown>(
private async blockWrite(filename: string): Promise<AxiosResponse> {
const logMessage = writeBlockedMessage(filename);
this.logger.warn(logMessage);
return { status: 403, statusText: 'Forbidden', data: logMessage, headers: {}, config: {} };
}
private async request(
method: Method,
host: string,
rawPath = '',
{ qs, headers, body, responseType }: ObjectStore.RequestOptions = {},
{ qs, headers, body, responseType }: RequestOptions = {},
) {
const path = this.toPath(rawPath, qs);
@@ -224,9 +271,17 @@ export class ObjectStoreService {
if (responseType) config.responseType = responseType;
try {
return await axios.request<T>(config);
} catch (error) {
throw new ExternalStorageRequestFailed(error, config);
this.logger.debug('Sending request to S3', { config });
return await axios.request<unknown>(config);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
const message = `Request to S3 failed: ${error.message}`;
this.logger.error(message, { config });
throw new Error(message, { cause: { error, details: config } });
}
}
}

View File

@@ -1,8 +0,0 @@
import { AxiosRequestConfig } from 'axios';
export class ExternalStorageRequestFailed extends Error {
constructor(error: unknown, details: AxiosRequestConfig) {
super('Request to external object storage failed');
this.cause = { error, details };
}
}

View File

@@ -22,11 +22,13 @@ type Item = {
export type ListPage = Omit<RawListPage['listBucketResult'], 'contents'> & { contents: Item[] };
export namespace ObjectStore {
export type RequestOptions = {
qs?: Record<string, string | number>;
headers?: Record<string, string | number>;
body?: string | Buffer;
responseType?: ResponseType;
};
}
export type Bucket = { region: string; name: string };
export type RequestOptions = {
qs?: Record<string, string | number>;
headers?: Record<string, string | number>;
body?: string | Buffer;
responseType?: ResponseType;
};
export type ConfigSchemaCredentials = { accessKey: string; accessSecret: string };

View File

@@ -14,3 +14,7 @@ export async function parseXml<T>(xml: string): Promise<T> {
valueProcessors: [parseNumbers, parseBooleans],
}) as Promise<T>;
}
export function writeBlockedMessage(filename: string) {
return `Request to write file "${filename}" to object storage was blocked because S3 storage is not available with your current license. Please upgrade to a license that supports this feature, or set N8N_DEFAULT_BINARY_DATA_MODE to an option other than "s3".`;
}

View File

@@ -17,3 +17,4 @@ export * from './WorkflowExecute';
export { NodeExecuteFunctions, UserSettings };
export * from './errors';
export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee';
export { BinaryData } from './BinaryData/types';