refactor(core): Reorganize n8n-core and enforce file-name casing (no-changelog) (#12667)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-01-17 15:17:25 +01:00
committed by GitHub
parent e7f00bcb7f
commit 05858c2153
132 changed files with 459 additions and 441 deletions

View File

@@ -0,0 +1,208 @@
import fs from 'node:fs';
import fsp from 'node:fs/promises';
import { tmpdir } from 'node:os';
import path from 'node:path';
import { FileSystemManager } from '@/binary-data/file-system.manager';
import { isStream } from '@/binary-data/object-store/utils';
import { toFileId, toStream } from '@test/utils';
jest.mock('fs');
jest.mock('fs/promises');
const storagePath = tmpdir();
const fsManager = new FileSystemManager(storagePath);
const toFullFilePath = (fileId: string) => path.join(storagePath, fileId);
const workflowId = 'ObogjVbqpNOQpiyV';
const executionId = '999';
const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32';
const fileId = toFileId(workflowId, executionId, fileUuid);
const otherWorkflowId = 'FHio8ftV6SrCAfPJ';
const otherExecutionId = '888';
const otherFileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb33';
const otherFileId = toFileId(otherWorkflowId, otherExecutionId, otherFileUuid);
const mockBuffer = Buffer.from('Test data');
const mockStream = toStream(mockBuffer);
afterAll(() => {
jest.restoreAllMocks();
});
describe('store()', () => {
it('should store a buffer', async () => {
const metadata = { mimeType: 'text/plain' };
const result = await fsManager.store(workflowId, executionId, mockBuffer, metadata);
expect(result.fileSize).toBe(mockBuffer.length);
});
});
describe('getPath()', () => {
it('should return a path', async () => {
const filePath = fsManager.getPath(fileId);
expect(filePath).toBe(toFullFilePath(fileId));
});
});
describe('getAsBuffer()', () => {
it('should return a buffer', async () => {
fsp.readFile = jest.fn().mockResolvedValue(mockBuffer);
fsp.access = jest.fn().mockImplementation(async () => {});
const result = await fsManager.getAsBuffer(fileId);
expect(Buffer.isBuffer(result)).toBe(true);
expect(fsp.readFile).toHaveBeenCalledWith(toFullFilePath(fileId));
});
});
describe('getAsStream()', () => {
it('should return a stream', async () => {
fs.createReadStream = jest.fn().mockReturnValue(mockStream);
fsp.access = jest.fn().mockImplementation(async () => {});
const stream = await fsManager.getAsStream(fileId);
expect(isStream(stream)).toBe(true);
expect(fs.createReadStream).toHaveBeenCalledWith(toFullFilePath(fileId), {
highWaterMark: undefined,
});
});
});
describe('getMetadata()', () => {
it('should return metadata', async () => {
const mimeType = 'text/plain';
const fileName = 'file.txt';
fsp.readFile = jest.fn().mockResolvedValue(
JSON.stringify({
fileSize: 1,
mimeType,
fileName,
}),
);
const metadata = await fsManager.getMetadata(fileId);
expect(metadata).toEqual(expect.objectContaining({ fileSize: 1, mimeType, fileName }));
});
});
describe('copyByFileId()', () => {
it('should copy by file ID and return the file ID', async () => {
fsp.copyFile = jest.fn().mockResolvedValue(undefined);
// @ts-expect-error - private method
jest.spyOn(fsManager, 'toFileId').mockReturnValue(otherFileId);
const targetFileId = await fsManager.copyByFileId(workflowId, executionId, fileId);
const sourcePath = toFullFilePath(fileId);
const targetPath = toFullFilePath(targetFileId);
expect(fsp.copyFile).toHaveBeenCalledWith(sourcePath, targetPath);
});
});
describe('copyByFilePath()', () => {
test('should copy by file path and return the file ID and size', async () => {
const sourceFilePath = tmpdir();
const metadata = { mimeType: 'text/plain' };
// @ts-expect-error - private method
jest.spyOn(fsManager, 'toFileId').mockReturnValue(otherFileId);
// @ts-expect-error - private method
jest.spyOn(fsManager, 'getSize').mockReturnValue(mockBuffer.length);
const targetPath = toFullFilePath(otherFileId);
fsp.cp = jest.fn().mockResolvedValue(undefined);
fsp.writeFile = jest.fn().mockResolvedValue(undefined);
const result = await fsManager.copyByFilePath(
workflowId,
executionId,
sourceFilePath,
metadata,
);
expect(fsp.cp).toHaveBeenCalledWith(sourceFilePath, targetPath);
expect(fsp.writeFile).toHaveBeenCalledWith(
`${toFullFilePath(otherFileId)}.metadata`,
JSON.stringify({ ...metadata, fileSize: mockBuffer.length }),
{ encoding: 'utf-8' },
);
expect(result.fileSize).toBe(mockBuffer.length);
});
});
describe('deleteMany()', () => {
const rmOptions = {
force: true,
recursive: true,
};
it('should delete many files by workflow ID and execution ID', async () => {
const ids = [
{ workflowId, executionId },
{ workflowId: otherWorkflowId, executionId: otherExecutionId },
];
fsp.rm = jest.fn().mockResolvedValue(undefined);
const promise = fsManager.deleteMany(ids);
await expect(promise).resolves.not.toThrow();
expect(fsp.rm).toHaveBeenCalledTimes(2);
expect(fsp.rm).toHaveBeenNthCalledWith(
1,
`${storagePath}/workflows/${workflowId}/executions/${executionId}`,
rmOptions,
);
expect(fsp.rm).toHaveBeenNthCalledWith(
2,
`${storagePath}/workflows/${otherWorkflowId}/executions/${otherExecutionId}`,
rmOptions,
);
});
it('should suppress error on non-existing filepath', async () => {
const ids = [{ workflowId: 'does-not-exist', executionId: 'does-not-exist' }];
fsp.rm = jest.fn().mockResolvedValue(undefined);
const promise = fsManager.deleteMany(ids);
await expect(promise).resolves.not.toThrow();
expect(fsp.rm).toHaveBeenCalledTimes(1);
});
});
describe('rename()', () => {
it('should rename a file', async () => {
fsp.rename = jest.fn().mockResolvedValue(undefined);
fsp.rm = jest.fn().mockResolvedValue(undefined);
const promise = fsManager.rename(fileId, otherFileId);
const oldPath = toFullFilePath(fileId);
const newPath = toFullFilePath(otherFileId);
await expect(promise).resolves.not.toThrow();
expect(fsp.rename).toHaveBeenCalledTimes(2);
expect(fsp.rename).toHaveBeenCalledWith(oldPath, newPath);
expect(fsp.rename).toHaveBeenCalledWith(`${oldPath}.metadata`, `${newPath}.metadata`);
});
});

View File

@@ -0,0 +1,134 @@
import { mock } from 'jest-mock-extended';
import fs from 'node:fs/promises';
import { ObjectStoreService } from '@/binary-data/object-store/object-store.service.ee';
import type { MetadataResponseHeaders } from '@/binary-data/object-store/types';
import { isStream } from '@/binary-data/object-store/utils';
import { ObjectStoreManager } from '@/binary-data/object-store.manager';
import { mockInstance, toFileId, toStream } from '@test/utils';
jest.mock('fs/promises');
const objectStoreService = mockInstance(ObjectStoreService);
const objectStoreManager = new ObjectStoreManager(objectStoreService);
const workflowId = 'ObogjVbqpNOQpiyV';
const executionId = '999';
const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32';
const fileId = toFileId(workflowId, executionId, fileUuid);
const prefix = `workflows/${workflowId}/executions/${executionId}/binary_data/`;
const otherWorkflowId = 'FHio8ftV6SrCAfPJ';
const otherExecutionId = '888';
const otherFileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb33';
const otherFileId = toFileId(otherWorkflowId, otherExecutionId, otherFileUuid);
const mockBuffer = Buffer.from('Test data');
const mockStream = toStream(mockBuffer);
beforeAll(() => {
jest.restoreAllMocks();
});
describe('store()', () => {
it('should store a buffer', async () => {
const metadata = { mimeType: 'text/plain' };
const result = await objectStoreManager.store(workflowId, executionId, mockBuffer, metadata);
expect(result.fileId.startsWith(prefix)).toBe(true);
expect(result.fileSize).toBe(mockBuffer.length);
});
});
describe('getPath()', () => {
it('should return a path', async () => {
const path = objectStoreManager.getPath(fileId);
expect(path).toBe(fileId);
});
});
describe('getAsBuffer()', () => {
it('should return a buffer', async () => {
// @ts-expect-error Overload signature seemingly causing the return type to be misinferred
objectStoreService.get.mockResolvedValue(mockBuffer);
const result = await objectStoreManager.getAsBuffer(fileId);
expect(Buffer.isBuffer(result)).toBe(true);
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' });
});
});
describe('getAsStream()', () => {
it('should return a stream', async () => {
objectStoreService.get.mockResolvedValue(mockStream);
const stream = await objectStoreManager.getAsStream(fileId);
expect(isStream(stream)).toBe(true);
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' });
});
});
describe('getMetadata()', () => {
it('should return metadata', async () => {
const mimeType = 'text/plain';
const fileName = 'file.txt';
objectStoreService.getMetadata.mockResolvedValue(
mock<MetadataResponseHeaders>({
'content-length': '1',
'content-type': mimeType,
'x-amz-meta-filename': fileName,
}),
);
const metadata = await objectStoreManager.getMetadata(fileId);
expect(metadata).toEqual(expect.objectContaining({ fileSize: 1, mimeType, fileName }));
expect(objectStoreService.getMetadata).toHaveBeenCalledWith(fileId);
});
});
describe('copyByFileId()', () => {
it('should copy by file ID and return the file ID', async () => {
const targetFileId = await objectStoreManager.copyByFileId(workflowId, executionId, fileId);
expect(targetFileId.startsWith(prefix)).toBe(true);
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' });
});
});
describe('copyByFilePath()', () => {
test('should copy by file path and return the file ID and size', async () => {
const sourceFilePath = 'path/to/file/in/filesystem';
const metadata = { mimeType: 'text/plain' };
fs.readFile = jest.fn().mockResolvedValue(mockBuffer);
const result = await objectStoreManager.copyByFilePath(
workflowId,
executionId,
sourceFilePath,
metadata,
);
expect(result.fileId.startsWith(prefix)).toBe(true);
expect(fs.readFile).toHaveBeenCalledWith(sourceFilePath);
expect(result.fileSize).toBe(mockBuffer.length);
});
});
describe('rename()', () => {
it('should rename a file', async () => {
const promise = objectStoreManager.rename(fileId, otherFileId);
await expect(promise).resolves.not.toThrow();
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' });
expect(objectStoreService.getMetadata).toHaveBeenCalledWith(fileId);
expect(objectStoreService.deleteOne).toHaveBeenCalledWith(fileId);
});
});

View File

@@ -0,0 +1,34 @@
import { Readable } from 'node:stream';
import { createGunzip } from 'node:zlib';
import { binaryToBuffer } from '@/binary-data/utils';
describe('BinaryData/utils', () => {
describe('binaryToBuffer', () => {
it('should handle buffer objects', async () => {
const body = Buffer.from('test');
expect((await binaryToBuffer(body)).toString()).toEqual('test');
});
it('should handle valid uncompressed Readable streams', async () => {
const body = Readable.from(Buffer.from('test'));
expect((await binaryToBuffer(body)).toString()).toEqual('test');
});
it('should handle valid compressed Readable streams', async () => {
const gunzip = createGunzip();
const body = Readable.from(
Buffer.from('1f8b08000000000000032b492d2e01000c7e7fd804000000', 'hex'),
).pipe(gunzip);
expect((await binaryToBuffer(body)).toString()).toEqual('test');
});
it('should throw on invalid compressed Readable streams', async () => {
const gunzip = createGunzip();
const body = Readable.from(Buffer.from('0001f8b080000000000000000', 'hex')).pipe(gunzip);
await expect(binaryToBuffer(body)).rejects.toThrow(
new Error('Failed to decompress response'),
);
});
});
});

View File

@@ -0,0 +1,247 @@
import { Container, Service } from '@n8n/di';
import { BINARY_ENCODING } from 'n8n-workflow';
import type { INodeExecutionData, IBinaryData } from 'n8n-workflow';
import { readFile, stat } from 'node:fs/promises';
import prettyBytes from 'pretty-bytes';
import type { Readable } from 'stream';
import type { BinaryData } from './types';
import { areConfigModes, binaryToBuffer } from './utils';
import { InvalidManagerError } from '../errors/invalid-manager.error';
import { InvalidModeError } from '../errors/invalid-mode.error';
@Service()
export class BinaryDataService {
private mode: BinaryData.ServiceMode = 'default';
private managers: Record<string, BinaryData.Manager> = {};
async init(config: BinaryData.Config) {
if (!areConfigModes(config.availableModes)) throw new InvalidModeError();
this.mode = config.mode === 'filesystem' ? 'filesystem-v2' : config.mode;
if (config.availableModes.includes('filesystem')) {
const { FileSystemManager } = await import('./file-system.manager');
this.managers.filesystem = new FileSystemManager(config.localStoragePath);
this.managers['filesystem-v2'] = this.managers.filesystem;
await this.managers.filesystem.init();
}
if (config.availableModes.includes('s3')) {
const { ObjectStoreManager } = await import('./object-store.manager');
const { ObjectStoreService } = await import('./object-store/object-store.service.ee');
this.managers.s3 = new ObjectStoreManager(Container.get(ObjectStoreService));
await this.managers.s3.init();
}
}
async copyBinaryFile(
workflowId: string,
executionId: string,
binaryData: IBinaryData,
filePath: string,
) {
const manager = this.managers[this.mode];
if (!manager) {
const { size } = await stat(filePath);
binaryData.fileSize = prettyBytes(size);
binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING });
return binaryData;
}
const metadata = {
fileName: binaryData.fileName,
mimeType: binaryData.mimeType,
};
const { fileId, fileSize } = await manager.copyByFilePath(
workflowId,
executionId,
filePath,
metadata,
);
binaryData.id = this.createBinaryDataId(fileId);
binaryData.fileSize = prettyBytes(fileSize);
binaryData.data = this.mode; // clear binary data from memory
return binaryData;
}
async store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
binaryData: IBinaryData,
) {
const manager = this.managers[this.mode];
if (!manager) {
const buffer = await binaryToBuffer(bufferOrStream);
binaryData.data = buffer.toString(BINARY_ENCODING);
binaryData.fileSize = prettyBytes(buffer.length);
return binaryData;
}
const metadata = {
fileName: binaryData.fileName,
mimeType: binaryData.mimeType,
};
const { fileId, fileSize } = await manager.store(
workflowId,
executionId,
bufferOrStream,
metadata,
);
binaryData.id = this.createBinaryDataId(fileId);
binaryData.fileSize = prettyBytes(fileSize);
binaryData.data = this.mode; // clear binary data from memory
return binaryData;
}
async getAsStream(binaryDataId: string, chunkSize?: number) {
const [mode, fileId] = binaryDataId.split(':');
return await this.getManager(mode).getAsStream(fileId, chunkSize);
}
async getAsBuffer(binaryData: IBinaryData) {
if (binaryData.id) {
const [mode, fileId] = binaryData.id.split(':');
return await this.getManager(mode).getAsBuffer(fileId);
}
return Buffer.from(binaryData.data, BINARY_ENCODING);
}
getPath(binaryDataId: string) {
const [mode, fileId] = binaryDataId.split(':');
return this.getManager(mode).getPath(fileId);
}
async getMetadata(binaryDataId: string) {
const [mode, fileId] = binaryDataId.split(':');
return await this.getManager(mode).getMetadata(fileId);
}
async deleteMany(ids: BinaryData.IdsForDeletion) {
const manager = this.managers[this.mode];
if (!manager) return;
if (manager.deleteMany) await manager.deleteMany(ids);
}
async duplicateBinaryData(
workflowId: string,
executionId: string,
inputData: Array<INodeExecutionData[] | null>,
) {
if (inputData && this.managers[this.mode]) {
const returnInputData = (inputData as INodeExecutionData[][]).map(
async (executionDataArray) => {
if (executionDataArray) {
return await Promise.all(
executionDataArray.map(async (executionData) => {
if (executionData.binary) {
return await this.duplicateBinaryDataInExecData(
workflowId,
executionId,
executionData,
);
}
return executionData;
}),
);
}
return executionDataArray;
},
);
return await Promise.all(returnInputData);
}
return inputData as INodeExecutionData[][];
}
async rename(oldFileId: string, newFileId: string) {
const manager = this.getManager(this.mode);
if (!manager) return;
await manager.rename(oldFileId, newFileId);
}
// ----------------------------------
// private methods
// ----------------------------------
private createBinaryDataId(fileId: string) {
return `${this.mode}:${fileId}`;
}
private async duplicateBinaryDataInExecData(
workflowId: string,
executionId: string,
executionData: INodeExecutionData,
) {
const manager = this.managers[this.mode];
if (executionData.binary) {
const binaryDataKeys = Object.keys(executionData.binary);
const bdPromises = binaryDataKeys.map(async (key: string) => {
if (!executionData.binary) {
return { key, newId: undefined };
}
const binaryDataId = executionData.binary[key].id;
if (!binaryDataId) {
return { key, newId: undefined };
}
const [_mode, fileId] = binaryDataId.split(':');
return await manager?.copyByFileId(workflowId, executionId, fileId).then((newFileId) => ({
newId: this.createBinaryDataId(newFileId),
key,
}));
});
return await Promise.all(bdPromises).then((b) => {
return b.reduce((acc, curr) => {
if (acc.binary && curr) {
acc.binary[curr.key].id = curr.newId;
}
return acc;
}, executionData);
});
}
return executionData;
}
private getManager(mode: string) {
const manager = this.managers[mode];
if (manager) return manager;
throw new InvalidManagerError(mode);
}
}

View File

@@ -0,0 +1,197 @@
import { jsonParse } from 'n8n-workflow';
import { createReadStream } from 'node:fs';
import fs from 'node:fs/promises';
import path from 'node:path';
import type { Readable } from 'stream';
import { v4 as uuid } from 'uuid';
import type { BinaryData } from './types';
import { assertDir, doesNotExist } from './utils';
import { DisallowedFilepathError } from '../errors/disallowed-filepath.error';
import { FileNotFoundError } from '../errors/file-not-found.error';
const EXECUTION_ID_EXTRACTOR =
/^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/;
export class FileSystemManager implements BinaryData.Manager {
constructor(private storagePath: string) {}
async init() {
await assertDir(this.storagePath);
}
async store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const fileId = this.toFileId(workflowId, executionId);
const filePath = this.resolvePath(fileId);
await assertDir(path.dirname(filePath));
await fs.writeFile(filePath, bufferOrStream);
const fileSize = await this.getSize(fileId);
await this.storeMetadata(fileId, { mimeType, fileName, fileSize });
return { fileId, fileSize };
}
getPath(fileId: string) {
return this.resolvePath(fileId);
}
async getAsStream(fileId: string, chunkSize?: number) {
const filePath = this.resolvePath(fileId);
if (await doesNotExist(filePath)) {
throw new FileNotFoundError(filePath);
}
return createReadStream(filePath, { highWaterMark: chunkSize });
}
async getAsBuffer(fileId: string) {
const filePath = this.resolvePath(fileId);
if (await doesNotExist(filePath)) {
throw new FileNotFoundError(filePath);
}
return await fs.readFile(filePath);
}
async getMetadata(fileId: string): Promise<BinaryData.Metadata> {
const filePath = this.resolvePath(`${fileId}.metadata`);
return await jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' }));
}
async deleteMany(ids: BinaryData.IdsForDeletion) {
if (ids.length === 0) return;
// binary files stored in single dir - `filesystem`
const executionIds = ids.map((o) => o.executionId);
const set = new Set(executionIds);
const fileNames = await fs.readdir(this.storagePath);
for (const fileName of fileNames) {
const executionId = fileName.match(EXECUTION_ID_EXTRACTOR)?.[1];
if (executionId && set.has(executionId)) {
const filePath = this.resolvePath(fileName);
await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]);
}
}
// binary files stored in nested dirs - `filesystem-v2`
const binaryDataDirs = ids.map(({ workflowId, executionId }) =>
this.resolvePath(`workflows/${workflowId}/executions/${executionId}`),
);
await Promise.all(
binaryDataDirs.map(async (dir) => {
await fs.rm(dir, { recursive: true, force: true });
}),
);
}
async copyByFilePath(
workflowId: string,
executionId: string,
sourcePath: string,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const targetFileId = this.toFileId(workflowId, executionId);
const targetPath = this.resolvePath(targetFileId);
await assertDir(path.dirname(targetPath));
await fs.cp(sourcePath, targetPath);
const fileSize = await this.getSize(targetFileId);
await this.storeMetadata(targetFileId, { mimeType, fileName, fileSize });
return { fileId: targetFileId, fileSize };
}
async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) {
const targetFileId = this.toFileId(workflowId, executionId);
const sourcePath = this.resolvePath(sourceFileId);
const targetPath = this.resolvePath(targetFileId);
await assertDir(path.dirname(targetPath));
await fs.copyFile(sourcePath, targetPath);
return targetFileId;
}
async rename(oldFileId: string, newFileId: string) {
const oldPath = this.resolvePath(oldFileId);
const newPath = this.resolvePath(newFileId);
await assertDir(path.dirname(newPath));
await Promise.all([
fs.rename(oldPath, newPath),
fs.rename(`${oldPath}.metadata`, `${newPath}.metadata`),
]);
const [tempDirParent] = oldPath.split('/temp/');
const tempDir = path.join(tempDirParent, 'temp');
await fs.rm(tempDir, { recursive: true });
}
// ----------------------------------
// private methods
// ----------------------------------
/**
* Generate an ID for a binary data file.
*
* The legacy ID format `{executionId}{uuid}` for `filesystem` mode is
* no longer used on write, only when reading old stored execution data.
*/
private toFileId(workflowId: string, executionId: string) {
if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244
return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`;
}
private resolvePath(...args: string[]) {
const returnPath = path.join(this.storagePath, ...args);
if (path.relative(this.storagePath, returnPath).startsWith('..')) {
throw new DisallowedFilepathError(returnPath);
}
return returnPath;
}
private async storeMetadata(fileId: string, metadata: BinaryData.Metadata) {
const filePath = this.resolvePath(`${fileId}.metadata`);
await fs.writeFile(filePath, JSON.stringify(metadata), { encoding: 'utf-8' });
}
private async getSize(fileId: string) {
const filePath = this.resolvePath(fileId);
try {
const stats = await fs.stat(filePath);
return stats.size;
} catch (error) {
throw new FileNotFoundError(filePath);
}
}
}

View File

@@ -0,0 +1,4 @@
export * from './binary-data.service';
export * from './types';
export { ObjectStoreService } from './object-store/object-store.service.ee';
export { isStoredMode as isValidNonDefaultMode } from './utils';

View File

@@ -0,0 +1,103 @@
import { Service } from '@n8n/di';
import fs from 'node:fs/promises';
import type { Readable } from 'node:stream';
import { v4 as uuid } from 'uuid';
import { ObjectStoreService } from './object-store/object-store.service.ee';
import type { BinaryData } from './types';
import { binaryToBuffer } from './utils';
@Service()
export class ObjectStoreManager implements BinaryData.Manager {
constructor(private readonly objectStoreService: ObjectStoreService) {}
async init() {
await this.objectStoreService.checkConnection();
}
async store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
metadata: BinaryData.PreWriteMetadata,
) {
const fileId = this.toFileId(workflowId, executionId);
const buffer = await binaryToBuffer(bufferOrStream);
await this.objectStoreService.put(fileId, buffer, metadata);
return { fileId, fileSize: buffer.length };
}
getPath(fileId: string) {
return fileId; // already full path, no transform needed
}
async getAsBuffer(fileId: string) {
return await this.objectStoreService.get(fileId, { mode: 'buffer' });
}
async getAsStream(fileId: string) {
return await this.objectStoreService.get(fileId, { mode: 'stream' });
}
async getMetadata(fileId: string): Promise<BinaryData.Metadata> {
const {
'content-length': contentLength,
'content-type': contentType,
'x-amz-meta-filename': fileName,
} = await this.objectStoreService.getMetadata(fileId);
const metadata: BinaryData.Metadata = { fileSize: Number(contentLength) };
if (contentType) metadata.mimeType = contentType;
if (fileName) metadata.fileName = fileName;
return metadata;
}
async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) {
const targetFileId = this.toFileId(workflowId, executionId);
const sourceFile = await this.objectStoreService.get(sourceFileId, { mode: 'buffer' });
await this.objectStoreService.put(targetFileId, sourceFile);
return targetFileId;
}
/**
* Copy to object store the temp file written by nodes like Webhook, FTP, and SSH.
*/
async copyByFilePath(
workflowId: string,
executionId: string,
sourcePath: string,
metadata: BinaryData.PreWriteMetadata,
) {
const targetFileId = this.toFileId(workflowId, executionId);
const sourceFile = await fs.readFile(sourcePath);
await this.objectStoreService.put(targetFileId, sourceFile, metadata);
return { fileId: targetFileId, fileSize: sourceFile.length };
}
async rename(oldFileId: string, newFileId: string) {
const oldFile = await this.objectStoreService.get(oldFileId, { mode: 'buffer' });
const oldFileMetadata = await this.objectStoreService.getMetadata(oldFileId);
await this.objectStoreService.put(newFileId, oldFile, oldFileMetadata);
await this.objectStoreService.deleteOne(oldFileId);
}
// ----------------------------------
// private methods
// ----------------------------------
private toFileId(workflowId: string, executionId: string) {
if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244
return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`;
}
}

View File

@@ -0,0 +1,318 @@
import axios from 'axios';
import { mock } from 'jest-mock-extended';
import { Readable } from 'stream';
import { ObjectStoreService } from '@/binary-data/object-store/object-store.service.ee';
import { writeBlockedMessage } from '@/binary-data/object-store/utils';
jest.mock('axios');
const mockAxios = axios as jest.Mocked<typeof axios>;
const mockBucket = { region: 'us-east-1', name: 'test-bucket' };
const mockHost = `s3.${mockBucket.region}.amazonaws.com`;
const mockCredentials = { accessKey: 'mock-access-key', accessSecret: 'mock-secret-key' };
const mockUrl = `https://${mockHost}/${mockBucket.name}`;
const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed';
const mockError = new Error('Something went wrong!');
const fileId =
'workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32';
const mockBuffer = Buffer.from('Test data');
const toDeletionXml = (filename: string) => `<Delete>
<Object><Key>${filename}</Key></Object>
</Delete>`;
let objectStoreService: ObjectStoreService;
beforeEach(async () => {
objectStoreService = new ObjectStoreService(mock());
mockAxios.request.mockResolvedValueOnce({ status: 200 }); // for checkConnection
await objectStoreService.init(mockHost, mockBucket, mockCredentials);
jest.restoreAllMocks();
});
describe('checkConnection()', () => {
it('should send a HEAD request to the correct host', async () => {
mockAxios.request.mockResolvedValue({ status: 200 });
objectStoreService.setReady(false);
await objectStoreService.checkConnection();
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'HEAD',
url: `https://${mockHost}/${mockBucket.name}`,
headers: expect.objectContaining({
'X-Amz-Content-Sha256': expect.any(String),
'X-Amz-Date': expect.any(String),
Authorization: expect.any(String),
}),
}),
);
});
it('should throw an error on request failure', async () => {
objectStoreService.setReady(false);
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.checkConnection();
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});
describe('getMetadata()', () => {
it('should send a HEAD request to the correct host and path', async () => {
mockAxios.request.mockResolvedValue({ status: 200 });
await objectStoreService.getMetadata(fileId);
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'HEAD',
url: `${mockUrl}/${fileId}`,
headers: expect.objectContaining({
Host: mockHost,
'X-Amz-Content-Sha256': expect.any(String),
'X-Amz-Date': expect.any(String),
Authorization: expect.any(String),
}),
}),
);
});
it('should throw an error on request failure', async () => {
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.getMetadata(fileId);
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});
describe('put()', () => {
it('should send a PUT request to upload an object', async () => {
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
mockAxios.request.mockResolvedValue({ status: 200 });
await objectStoreService.put(fileId, mockBuffer, metadata);
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'PUT',
url: `${mockUrl}/${fileId}`,
headers: expect.objectContaining({
'Content-Length': mockBuffer.length,
'Content-MD5': expect.any(String),
'x-amz-meta-filename': metadata.fileName,
'Content-Type': metadata.mimeType,
}),
data: mockBuffer,
}),
);
});
it('should block if read-only', async () => {
objectStoreService.setReadonly(true);
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
const promise = objectStoreService.put(fileId, mockBuffer, metadata);
await expect(promise).resolves.not.toThrow();
const result = await promise;
expect(result.status).toBe(403);
expect(result.statusText).toBe('Forbidden');
expect(result.data).toBe(writeBlockedMessage(fileId));
});
it('should throw an error on request failure', async () => {
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.put(fileId, mockBuffer, metadata);
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});
describe('get()', () => {
it('should send a GET request to download an object as a buffer', async () => {
const fileId = 'file.txt';
mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') });
const result = await objectStoreService.get(fileId, { mode: 'buffer' });
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'GET',
url: `${mockUrl}/${fileId}`,
responseType: 'arraybuffer',
}),
);
expect(Buffer.isBuffer(result)).toBe(true);
});
it('should send a GET request to download an object as a stream', async () => {
mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() });
const result = await objectStoreService.get(fileId, { mode: 'stream' });
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'GET',
url: `${mockUrl}/${fileId}`,
responseType: 'stream',
}),
);
expect(result instanceof Readable).toBe(true);
});
it('should throw an error on request failure', async () => {
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.get(fileId, { mode: 'buffer' });
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});
describe('deleteOne()', () => {
it('should send a DELETE request to delete a single object', async () => {
mockAxios.request.mockResolvedValue({ status: 204 });
await objectStoreService.deleteOne(fileId);
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'DELETE',
url: `${mockUrl}/${fileId}`,
}),
);
});
it('should throw an error on request failure', async () => {
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.deleteOne(fileId);
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});
describe('deleteMany()', () => {
it('should send a POST request to delete multiple objects', async () => {
const prefix = 'test-dir/';
const fileName = 'file.txt';
const mockList = [
{
key: fileName,
lastModified: '2023-09-24T12:34:56Z',
eTag: 'abc123def456',
size: 456789,
storageClass: 'STANDARD',
},
];
objectStoreService.list = jest.fn().mockResolvedValue(mockList);
mockAxios.request.mockResolvedValue({ status: 204 });
await objectStoreService.deleteMany(prefix);
expect(objectStoreService.list).toHaveBeenCalledWith(prefix);
expect(mockAxios.request).toHaveBeenCalledWith(
expect.objectContaining({
method: 'POST',
url: `${mockUrl}/?delete`,
headers: expect.objectContaining({
'Content-Type': 'application/xml',
'Content-Length': expect.any(Number),
'Content-MD5': expect.any(String),
}),
data: toDeletionXml(fileName),
}),
);
});
it('should not send a deletion request if no prefix match', async () => {
objectStoreService.list = jest.fn().mockResolvedValue([]);
const result = await objectStoreService.deleteMany('non-matching-prefix');
expect(result).toBeUndefined();
});
it('should throw an error on request failure', async () => {
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.deleteMany('test-dir/');
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});
describe('list()', () => {
it('should list objects with a common prefix', async () => {
const prefix = 'test-dir/';
const mockListPage = {
contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }],
isTruncated: false,
};
objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage);
mockAxios.request.mockResolvedValue({ status: 200 });
const result = await objectStoreService.list(prefix);
expect(result).toEqual(mockListPage.contents);
});
it('should consolidate pages', async () => {
const prefix = 'test-dir/';
const mockFirstListPage = {
contents: [{ key: `${prefix}file1.txt` }],
isTruncated: true,
nextContinuationToken: 'token1',
};
const mockSecondListPage = {
contents: [{ key: `${prefix}file2.txt` }],
isTruncated: false,
};
objectStoreService.getListPage = jest
.fn()
.mockResolvedValueOnce(mockFirstListPage)
.mockResolvedValueOnce(mockSecondListPage);
mockAxios.request.mockResolvedValue({ status: 200 });
const result = await objectStoreService.list(prefix);
expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]);
});
it('should throw an error on request failure', async () => {
mockAxios.request.mockRejectedValue(mockError);
const promise = objectStoreService.list('test-dir/');
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
});

View File

@@ -0,0 +1,288 @@
import { Service } from '@n8n/di';
import { sign } from 'aws4';
import type { Request as Aws4Options, Credentials as Aws4Credentials } from 'aws4';
import axios from 'axios';
import type { AxiosRequestConfig, AxiosResponse, InternalAxiosRequestConfig, Method } from 'axios';
import { ApplicationError } from 'n8n-workflow';
import { createHash } from 'node:crypto';
import type { Readable } from 'stream';
import { Logger } from '@/logging/logger';
import type {
Bucket,
ConfigSchemaCredentials,
ListPage,
MetadataResponseHeaders,
RawListPage,
RequestOptions,
} from './types';
import { isStream, parseXml, writeBlockedMessage } from './utils';
import type { BinaryData } from '../types';
@Service()
export class ObjectStoreService {
private host = '';
private bucket: Bucket = { region: '', name: '' };
private credentials: Aws4Credentials = { accessKeyId: '', secretAccessKey: '' };
private isReady = false;
private isReadOnly = false;
constructor(private readonly logger: Logger) {}
async init(host: string, bucket: Bucket, credentials: ConfigSchemaCredentials) {
this.host = host;
this.bucket.name = bucket.name;
this.bucket.region = bucket.region;
this.credentials = {
accessKeyId: credentials.accessKey,
secretAccessKey: credentials.accessSecret,
};
await this.checkConnection();
this.setReady(true);
}
setReadonly(newState: boolean) {
this.isReadOnly = newState;
}
setReady(newState: boolean) {
this.isReady = newState;
}
/**
* Confirm that the configured bucket exists and the caller has permission to access it.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
*/
async checkConnection() {
if (this.isReady) return;
return await this.request('HEAD', this.host, this.bucket.name);
}
/**
* Upload an object to the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
*/
async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) {
if (this.isReadOnly) return await this.blockWrite(filename);
const headers: Record<string, string | number> = {
'Content-Length': buffer.length,
'Content-MD5': createHash('md5').update(buffer).digest('base64'),
};
if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName;
if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType;
const path = `/${this.bucket.name}/${filename}`;
return await this.request('PUT', this.host, path, { headers, body: buffer });
}
/**
* Download an object as a stream or buffer from the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
*/
async get(fileId: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
async get(fileId: string, { mode }: { mode: 'stream' }): Promise<Readable>;
async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }) {
const path = `${this.bucket.name}/${fileId}`;
const { data } = await this.request('GET', this.host, path, {
responseType: mode === 'buffer' ? 'arraybuffer' : 'stream',
});
if (mode === 'stream' && isStream(data)) return data;
if (mode === 'buffer' && Buffer.isBuffer(data)) return data;
throw new TypeError(`Expected ${mode} but received ${typeof data}.`);
}
/**
* Retrieve metadata for an object in the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
*/
async getMetadata(fileId: string) {
const path = `${this.bucket.name}/${fileId}`;
const response = await this.request('HEAD', this.host, path);
return response.headers as MetadataResponseHeaders;
}
/**
* Delete a single object in the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
*/
async deleteOne(fileId: string) {
const path = `${this.bucket.name}/${fileId}`;
return await this.request('DELETE', this.host, path);
}
/**
* Delete objects with a common prefix in the configured bucket.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
*/
async deleteMany(prefix: string) {
const objects = await this.list(prefix);
if (objects.length === 0) return;
const innerXml = objects.map(({ key }) => `<Object><Key>${key}</Key></Object>`).join('\n');
const body = ['<Delete>', innerXml, '</Delete>'].join('\n');
const headers = {
'Content-Type': 'application/xml',
'Content-Length': body.length,
'Content-MD5': createHash('md5').update(body).digest('base64'),
};
const path = `${this.bucket.name}/?delete`;
return await this.request('POST', this.host, path, { headers, body });
}
/**
* List objects with a common prefix in the configured bucket.
*/
async list(prefix: string) {
const items = [];
let isTruncated;
let nextPageToken;
do {
const listPage = await this.getListPage(prefix, nextPageToken);
if (listPage.contents?.length > 0) items.push(...listPage.contents);
isTruncated = listPage.isTruncated;
nextPageToken = listPage.nextContinuationToken;
} while (isTruncated && nextPageToken);
return items;
}
/**
* Fetch a page of objects with a common prefix in the configured bucket.
*
* Max 1000 objects per page - set by AWS.
*
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
*/
async getListPage(prefix: string, nextPageToken?: string) {
const qs: Record<string, string | number> = { 'list-type': 2, prefix };
if (nextPageToken) qs['continuation-token'] = nextPageToken;
const { data } = await this.request('GET', this.host, this.bucket.name, { qs });
if (typeof data !== 'string') {
throw new TypeError(`Expected XML string but received ${typeof data}`);
}
const { listBucketResult: page } = await parseXml<RawListPage>(data);
if (!page.contents) return { ...page, contents: [] };
// `explicitArray: false` removes array wrapper on single item array, so restore it
if (!Array.isArray(page.contents)) page.contents = [page.contents];
// remove null prototype - https://github.com/Leonidas-from-XIV/node-xml2js/issues/670
page.contents.forEach((item) => {
Object.setPrototypeOf(item, Object.prototype);
});
return page as ListPage;
}
private toPath(rawPath: string, qs?: Record<string, string | number>) {
const path = rawPath.startsWith('/') ? rawPath : `/${rawPath}`;
if (!qs) return path;
const qsParams = Object.entries(qs)
.map(([key, value]) => `${key}=${value}`)
.join('&');
return path.concat(`?${qsParams}`);
}
private async blockWrite(filename: string): Promise<AxiosResponse> {
const logMessage = writeBlockedMessage(filename);
this.logger.warn(logMessage);
return {
status: 403,
statusText: 'Forbidden',
data: logMessage,
headers: {},
config: {} as InternalAxiosRequestConfig,
};
}
private async request<T>(
method: Method,
host: string,
rawPath = '',
{ qs, headers, body, responseType }: RequestOptions = {},
) {
const path = this.toPath(rawPath, qs);
const optionsToSign: Aws4Options = {
method,
service: 's3',
region: this.bucket.region,
host,
path,
};
if (headers) optionsToSign.headers = headers;
if (body) optionsToSign.body = body;
const signedOptions = sign(optionsToSign, this.credentials);
const config: AxiosRequestConfig = {
method,
url: `https://${host}${path}`,
headers: signedOptions.headers,
};
if (body) config.data = body;
if (responseType) config.responseType = responseType;
try {
this.logger.debug('Sending request to S3', { config });
return await axios.request<T>(config);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
const message = `Request to S3 failed: ${error.message}`;
this.logger.error(message, { config });
throw new ApplicationError(message, { cause: error, extra: { config } });
}
}
}

View File

@@ -0,0 +1,42 @@
import type { AxiosResponseHeaders, ResponseType } from 'axios';
import type { BinaryData } from '../types';
export type RawListPage = {
listBucketResult: {
name: string;
prefix: string;
keyCount: number;
maxKeys: number;
isTruncated: boolean;
nextContinuationToken?: string; // only if isTruncated is true
contents?: Item | Item[];
};
};
type Item = {
key: string;
lastModified: string;
eTag: string;
size: number; // bytes
storageClass: string;
};
export type ListPage = Omit<RawListPage['listBucketResult'], 'contents'> & { contents: Item[] };
export type Bucket = { region: string; name: string };
export type RequestOptions = {
qs?: Record<string, string | number>;
headers?: Record<string, string | number>;
body?: string | Buffer;
responseType?: ResponseType;
};
export type MetadataResponseHeaders = AxiosResponseHeaders & {
'content-length': string;
'content-type'?: string;
'x-amz-meta-filename'?: string;
} & BinaryData.PreWriteMetadata;
export type ConfigSchemaCredentials = { accessKey: string; accessSecret: string };

View File

@@ -0,0 +1,20 @@
import { Stream } from 'node:stream';
import { parseStringPromise } from 'xml2js';
import { firstCharLowerCase, parseBooleans, parseNumbers } from 'xml2js/lib/processors';
export function isStream(maybeStream: unknown): maybeStream is Stream {
return maybeStream instanceof Stream;
}
export async function parseXml<T>(xml: string): Promise<T> {
return await (parseStringPromise(xml, {
explicitArray: false,
ignoreAttrs: true,
tagNameProcessors: [firstCharLowerCase],
valueProcessors: [parseNumbers, parseBooleans],
}) as Promise<T>);
}
export function writeBlockedMessage(filename: string) {
return `Request to write file "${filename}" to object storage was blocked because S3 storage is not available with your current license. Please upgrade to a license that supports this feature, or set N8N_DEFAULT_BINARY_DATA_MODE to an option other than "s3".`;
}

View File

@@ -0,0 +1,73 @@
import type { Readable } from 'stream';
export namespace BinaryData {
type LegacyMode = 'filesystem';
type UpgradedMode = 'filesystem-v2';
/**
* Binary data mode selectable by user via env var config.
*/
export type ConfigMode = 'default' | 'filesystem' | 's3';
/**
* Binary data mode used internally by binary data service. User-selected
* legacy modes are replaced with upgraded modes.
*/
export type ServiceMode = Exclude<ConfigMode, LegacyMode> | UpgradedMode;
/**
* Binary data mode in binary data ID in stored execution data. Both legacy
* and upgraded modes may be present, except default in-memory mode.
*/
export type StoredMode = Exclude<ConfigMode | UpgradedMode, 'default'>;
export type Config = {
mode: ConfigMode;
availableModes: ConfigMode[];
localStoragePath: string;
};
export type Metadata = {
fileName?: string;
mimeType?: string;
fileSize: number;
};
export type WriteResult = { fileId: string; fileSize: number };
export type PreWriteMetadata = Omit<Metadata, 'fileSize'>;
export type IdsForDeletion = Array<{ workflowId: string; executionId: string }>;
export interface Manager {
init(): Promise<void>;
store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
metadata: PreWriteMetadata,
): Promise<WriteResult>;
getPath(fileId: string): string;
getAsBuffer(fileId: string): Promise<Buffer>;
getAsStream(fileId: string, chunkSize?: number): Promise<Readable>;
getMetadata(fileId: string): Promise<Metadata>;
/**
* Present for `FileSystem`, absent for `ObjectStore` (delegated to S3 lifecycle config)
*/
deleteMany?(ids: IdsForDeletion): Promise<void>;
copyByFileId(workflowId: string, executionId: string, sourceFileId: string): Promise<string>;
copyByFilePath(
workflowId: string,
executionId: string,
sourcePath: string,
metadata: PreWriteMetadata,
): Promise<WriteResult>;
rename(oldFileId: string, newFileId: string): Promise<void>;
}
}

View File

@@ -0,0 +1,48 @@
import concatStream from 'concat-stream';
import fs from 'node:fs/promises';
import type { Readable } from 'node:stream';
import type { BinaryData } from './types';
export const CONFIG_MODES = ['default', 'filesystem', 's3'] as const;
const STORED_MODES = ['filesystem', 'filesystem-v2', 's3'] as const;
export function areConfigModes(modes: string[]): modes is BinaryData.ConfigMode[] {
return modes.every((m) => CONFIG_MODES.includes(m as BinaryData.ConfigMode));
}
export function isStoredMode(mode: string): mode is BinaryData.StoredMode {
return STORED_MODES.includes(mode as BinaryData.StoredMode);
}
export async function assertDir(dir: string) {
try {
await fs.access(dir);
} catch {
await fs.mkdir(dir, { recursive: true });
}
}
export async function doesNotExist(dir: string) {
try {
await fs.access(dir);
return false;
} catch {
return true;
}
}
/** Converts a buffer or a readable stream to a buffer */
export async function binaryToBuffer(body: Buffer | Readable) {
if (Buffer.isBuffer(body)) return body;
return await new Promise<Buffer>((resolve, reject) => {
body
.once('error', (cause) => {
if ('code' in cause && cause.code === 'Z_DATA_ERROR')
reject(new Error('Failed to decompress response', { cause }));
else reject(cause);
})
.pipe(concatStream(resolve));
});
}