From 271024ded0d55aa97daaf52cb8051abee36ad474 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 23 Apr 2025 16:56:13 +0200 Subject: [PATCH] feat(core): Add InstanceRole auth support for binary-data object- storage backend (#14800) --- packages/@n8n/config/src/index.ts | 5 - packages/@n8n/config/test/config.test.ts | 14 - packages/core/package.json | 5 +- .../__tests__/file-system.manager.test.ts | 4 +- .../__tests__/object-store.manager.test.ts | 4 +- .../src/binary-data/__tests__/utils.test.ts | 3 +- .../__tests__/object-store.service.test.ts | 700 ++++++++++-------- .../object-store/object-store.config.ts} | 32 +- .../object-store/object-store.service.ee.ts | 348 +++++---- .../src/binary-data/object-store/types.ts | 37 +- .../src/binary-data/object-store/utils.ts | 16 - packages/core/src/binary-data/utils.ts | 26 +- pnpm-lock.yaml | 22 +- 13 files changed, 644 insertions(+), 572 deletions(-) rename packages/{@n8n/config/src/configs/external-storage.config.ts => core/src/binary-data/object-store/object-store.config.ts} (52%) delete mode 100644 packages/core/src/binary-data/object-store/utils.ts diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index 04aa2ed276..3fbb858faa 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -10,7 +10,6 @@ import { EndpointsConfig } from './configs/endpoints.config'; import { EventBusConfig } from './configs/event-bus.config'; import { ExecutionsConfig } from './configs/executions.config'; import { ExternalHooksConfig } from './configs/external-hooks.config'; -import { ExternalStorageConfig } from './configs/external-storage.config'; import { GenericConfig } from './configs/generic.config'; import { LicenseConfig } from './configs/license.config'; import { LoggingConfig } from './configs/logging.config'; @@ -34,7 +33,6 @@ export { Config, Env, Nested } from './decorators'; export { TaskRunnersConfig } from './configs/runners.config'; export { SecurityConfig } from './configs/security.config'; export { ExecutionsConfig } from './configs/executions.config'; -export { S3Config } from './configs/external-storage.config'; export { LOG_SCOPES } from './configs/logging.config'; export type { LogScope } from './configs/logging.config'; export { WorkflowsConfig } from './configs/workflows.config'; @@ -76,9 +74,6 @@ export class GlobalConfig { @Nested nodes: NodesConfig; - @Nested - externalStorage: ExternalStorageConfig; - @Nested workflows: WorkflowsConfig; diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 940f17df43..384e970bc9 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -139,20 +139,6 @@ describe('GlobalConfig', () => { endpoint: 'https://api.n8n.io/api/versions/', infoUrl: 'https://docs.n8n.io/hosting/installation/updating/', }, - externalStorage: { - s3: { - host: '', - protocol: 'https', - bucket: { - name: '', - region: '', - }, - credentials: { - accessKey: '', - accessSecret: '', - }, - }, - }, workflows: { defaultName: 'My workflow', callerPolicyDefaultOption: 'workflowsFromSameOwner', diff --git a/packages/core/package.json b/packages/core/package.json index 9423eb728a..4f956f250f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -28,8 +28,6 @@ ], "devDependencies": { "@n8n/typescript-config": "workspace:*", - "@types/aws4": "^1.5.1", - "@types/concat-stream": "^2.0.0", "@types/express": "catalog:", "@types/jsonwebtoken": "catalog:", "@types/lodash": "catalog:", @@ -38,16 +36,15 @@ "@types/xml2js": "catalog:" }, "dependencies": { + "@aws-sdk/client-s3": "3.666.0", "@langchain/core": "catalog:", "@n8n/client-oauth2": "workspace:*", "@n8n/config": "workspace:*", "@n8n/di": "workspace:*", "@sentry/node": "catalog:", - "aws4": "1.11.0", "axios": "catalog:", "callsites": "catalog:", "chardet": "2.0.0", - "concat-stream": "2.0.0", "cron": "3.1.7", "fast-glob": "catalog:", "file-type": "16.5.4", diff --git a/packages/core/src/binary-data/__tests__/file-system.manager.test.ts b/packages/core/src/binary-data/__tests__/file-system.manager.test.ts index fae24801f5..19fa867b41 100644 --- a/packages/core/src/binary-data/__tests__/file-system.manager.test.ts +++ b/packages/core/src/binary-data/__tests__/file-system.manager.test.ts @@ -2,9 +2,9 @@ import fs from 'node:fs'; import fsp from 'node:fs/promises'; import { tmpdir } from 'node:os'; import path from 'node:path'; +import { Readable } from 'node:stream'; import { FileSystemManager } from '@/binary-data/file-system.manager'; -import { isStream } from '@/binary-data/object-store/utils'; import { toFileId, toStream } from '@test/utils'; jest.mock('fs'); @@ -70,7 +70,7 @@ describe('getAsStream()', () => { const stream = await fsManager.getAsStream(fileId); - expect(isStream(stream)).toBe(true); + expect(stream).toBeInstanceOf(Readable); expect(fs.createReadStream).toHaveBeenCalledWith(toFullFilePath(fileId), { highWaterMark: undefined, }); diff --git a/packages/core/src/binary-data/__tests__/object-store.manager.test.ts b/packages/core/src/binary-data/__tests__/object-store.manager.test.ts index 9ca99e8d7b..09696f7d37 100644 --- a/packages/core/src/binary-data/__tests__/object-store.manager.test.ts +++ b/packages/core/src/binary-data/__tests__/object-store.manager.test.ts @@ -1,9 +1,9 @@ import { mock } from 'jest-mock-extended'; import fs from 'node:fs/promises'; +import { Readable } from 'node:stream'; import { ObjectStoreService } from '@/binary-data/object-store/object-store.service.ee'; import type { MetadataResponseHeaders } from '@/binary-data/object-store/types'; -import { isStream } from '@/binary-data/object-store/utils'; import { ObjectStoreManager } from '@/binary-data/object-store.manager'; import { mockInstance, toFileId, toStream } from '@test/utils'; @@ -67,7 +67,7 @@ describe('getAsStream()', () => { const stream = await objectStoreManager.getAsStream(fileId); - expect(isStream(stream)).toBe(true); + expect(stream).toBeInstanceOf(Readable); expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' }); }); }); diff --git a/packages/core/src/binary-data/__tests__/utils.test.ts b/packages/core/src/binary-data/__tests__/utils.test.ts index 329345262f..2c3cc7d57f 100644 --- a/packages/core/src/binary-data/__tests__/utils.test.ts +++ b/packages/core/src/binary-data/__tests__/utils.test.ts @@ -1,3 +1,4 @@ +import { UnexpectedError } from 'n8n-workflow'; import { Readable } from 'node:stream'; import { createGunzip } from 'node:zlib'; @@ -27,7 +28,7 @@ describe('BinaryData/utils', () => { const gunzip = createGunzip(); const body = Readable.from(Buffer.from('0001f8b080000000000000000', 'hex')).pipe(gunzip); await expect(binaryToBuffer(body)).rejects.toThrow( - new Error('Failed to decompress response'), + new UnexpectedError('Failed to decompress response'), ); }); }); diff --git a/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts b/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts index 03cfc5b979..f2259177dd 100644 --- a/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts +++ b/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts @@ -1,329 +1,429 @@ -import type { S3Config } from '@n8n/config'; -import axios from 'axios'; -import { mock } from 'jest-mock-extended'; +import { + DeleteObjectCommand, + DeleteObjectsCommand, + GetObjectCommand, + HeadBucketCommand, + HeadObjectCommand, + ListObjectsV2Command, + PutObjectCommand, + type S3Client, +} from '@aws-sdk/client-s3'; +import { captor, mock } from 'jest-mock-extended'; import { Readable } from 'stream'; -import { ObjectStoreService } from '@/binary-data/object-store/object-store.service.ee'; +import type { ObjectStoreConfig } from '../object-store.config'; +import { ObjectStoreService } from '../object-store.service.ee'; -jest.mock('axios'); +const mockS3Send = jest.fn(); +const s3Client = mock({ send: mockS3Send }); +jest.mock('@aws-sdk/client-s3', () => ({ + ...jest.requireActual('@aws-sdk/client-s3'), + S3Client: class { + constructor() { + return s3Client; + } + }, +})); -const mockAxios = axios as jest.Mocked; - -const mockBucket = { region: 'us-east-1', name: 'test-bucket' }; -const mockHost = `s3.${mockBucket.region}.amazonaws.com`; -const mockCredentials = { accessKey: 'mock-access-key', accessSecret: 'mock-secret-key' }; -const mockUrl = `https://${mockHost}/${mockBucket.name}`; -const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed'; -const mockError = new Error('Something went wrong!'); -const fileId = - 'workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32'; -const mockBuffer = Buffer.from('Test data'); -const s3Config = mock({ - host: mockHost, - bucket: mockBucket, - credentials: mockCredentials, - protocol: 'https', -}); - -const toDeletionXml = (filename: string) => ` -${filename} -`; - -let objectStoreService: ObjectStoreService; - -const now = new Date('2024-02-01T01:23:45.678Z'); -jest.useFakeTimers({ now }); - -beforeEach(async () => { - objectStoreService = new ObjectStoreService(mock(), s3Config); - mockAxios.request.mockResolvedValueOnce({ status: 200 }); // for checkConnection - await objectStoreService.init(); - jest.restoreAllMocks(); -}); - -describe('checkConnection()', () => { - it('should send a HEAD request to the correct host', async () => { - mockAxios.request.mockResolvedValue({ status: 200 }); - - objectStoreService.setReady(false); - - await objectStoreService.checkConnection(); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'HEAD', - url: 'https://s3.us-east-1.amazonaws.com/test-bucket', - headers: { - Host: 's3.us-east-1.amazonaws.com', - 'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'X-Amz-Date': '20240201T012345Z', - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a5240c11a706e9e6c60e7033a848fc934911b12330e5a4609b0b943f97d9781b', - }, - }); +describe('ObjectStoreService', () => { + const mockBucket = { region: 'us-east-1', name: 'test-bucket' }; + const mockHost = `s3.${mockBucket.region}.amazonaws.com`; + const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed'; + const mockError = new Error('Something went wrong!'); + const workflowId = 'workflow-id'; + const executionId = 999; + const binaryDataId = '71f6209b-5d48-41a2-a224-80d529d8bb32'; + const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataId}`; + const mockBuffer = Buffer.from('Test data'); + const s3Config = mock({ + host: mockHost, + bucket: mockBucket, + credentials: { + accessKey: 'mock-access-key', + accessSecret: 'mock-secret-key', + authAutoDetect: false, + }, + protocol: 'https', }); - it('should throw an error on request failure', async () => { - objectStoreService.setReady(false); + let objectStoreService: ObjectStoreService; - mockAxios.request.mockRejectedValue(mockError); + const now = new Date('2024-02-01T01:23:45.678Z'); + jest.useFakeTimers({ now }); - const promise = objectStoreService.checkConnection(); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); -}); - -describe('getMetadata()', () => { - it('should send a HEAD request to the correct host and path', async () => { - mockAxios.request.mockResolvedValue({ status: 200 }); - - await objectStoreService.getMetadata(fileId); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'HEAD', - url: `${mockUrl}/${fileId}`, - headers: { - Host: mockHost, - 'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'X-Amz-Date': '20240201T012345Z', - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=60e11c39580ad7dd3a3d549523e7115cdff018540f24c6412ed40053e52a21d0', - }, - }); + beforeEach(async () => { + objectStoreService = new ObjectStoreService(mock(), s3Config); + await objectStoreService.init(); + jest.restoreAllMocks(); }); - it('should throw an error on request failure', async () => { - mockAxios.request.mockRejectedValue(mockError); - - const promise = objectStoreService.getMetadata(fileId); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); -}); - -describe('put()', () => { - it('should send a PUT request to upload an object', async () => { - const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; - - mockAxios.request.mockResolvedValue({ status: 200 }); - - await objectStoreService.put(fileId, mockBuffer, metadata); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'PUT', - url: 'https://s3.us-east-1.amazonaws.com/test-bucket/workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32', - headers: { - 'Content-Length': 9, - 'Content-MD5': 'yh6gLBC3w39CW5t92G1eEQ==', - 'x-amz-meta-filename': 'file.txt', - 'Content-Type': 'text/plain', - Host: 's3.us-east-1.amazonaws.com', - 'X-Amz-Content-Sha256': 'e27c8214be8b7cf5bccc7c08247e3cb0c1514a48ee1f63197fe4ef3ef51d7e6f', - 'X-Amz-Date': '20240201T012345Z', - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=content-length;content-md5;content-type;host;x-amz-content-sha256;x-amz-date;x-amz-meta-filename, Signature=6b0fbb51a35dbfa73ac79a964ffc7203b40517a062efc5b01f5f9b7ad553fa7a', - }, - data: mockBuffer, - }); - }); - - it('should throw an error on request failure', async () => { - const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; - - mockAxios.request.mockRejectedValue(mockError); - - const promise = objectStoreService.put(fileId, mockBuffer, metadata); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); -}); - -describe('get()', () => { - it('should send a GET request to download an object as a buffer', async () => { - const fileId = 'file.txt'; - - mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') }); - - const result = await objectStoreService.get(fileId, { mode: 'buffer' }); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'GET', - url: `${mockUrl}/${fileId}`, - responseType: 'arraybuffer', - headers: { - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=5f69680786e0ad9f0a0324eb5e4b8fe8c78562afc924489ea423632a2ad2187d', - Host: 's3.us-east-1.amazonaws.com', - 'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'X-Amz-Date': '20240201T012345Z', - }, - }); - - expect(Buffer.isBuffer(result)).toBe(true); - }); - - it('should send a GET request to download an object as a stream', async () => { - mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() }); - - const result = await objectStoreService.get(fileId, { mode: 'stream' }); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'GET', - url: `${mockUrl}/${fileId}`, - responseType: 'stream', - headers: { - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=3ef579ebe2ae89303a89c0faf3ce8ef8e907295dc538d59e95bcf35481c0d03e', - Host: 's3.us-east-1.amazonaws.com', - 'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'X-Amz-Date': '20240201T012345Z', - }, - }); - - expect(result instanceof Readable).toBe(true); - }); - - it('should throw an error on request failure', async () => { - mockAxios.request.mockRejectedValue(mockError); - - const promise = objectStoreService.get(fileId, { mode: 'buffer' }); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); -}); - -describe('deleteOne()', () => { - it('should send a DELETE request to delete a single object', async () => { - mockAxios.request.mockResolvedValue({ status: 204 }); - - await objectStoreService.deleteOne(fileId); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'DELETE', - url: `${mockUrl}/${fileId}`, - headers: { - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=4ad61b1b4da335c6c49772d28e54a301f787d199c9403055b217f890f7aec7fc', - Host: 's3.us-east-1.amazonaws.com', - 'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'X-Amz-Date': '20240201T012345Z', - }, - }); - }); - - it('should throw an error on request failure', async () => { - mockAxios.request.mockRejectedValue(mockError); - - const promise = objectStoreService.deleteOne(fileId); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); -}); - -describe('deleteMany()', () => { - it('should send a POST request to delete multiple objects', async () => { - const prefix = 'test-dir/'; - const fileName = 'file.txt'; - - const mockList = [ - { - key: fileName, - lastModified: '2023-09-24T12:34:56Z', - eTag: 'abc123def456', - size: 456789, - storageClass: 'STANDARD', - }, - ]; - - objectStoreService.list = jest.fn().mockResolvedValue(mockList); - - mockAxios.request.mockResolvedValue({ status: 204 }); - - await objectStoreService.deleteMany(prefix); - - expect(mockAxios.request).toHaveBeenCalledWith({ - method: 'POST', - url: `${mockUrl}?delete=`, - headers: { - 'Content-Type': 'application/xml', - 'Content-Length': 55, - 'Content-MD5': 'ybYDrpQxwYvNIGBQs7PJNA==', - Host: 's3.us-east-1.amazonaws.com', - 'X-Amz-Content-Sha256': '5708e5c935cb75eb528e41ef1548e08b26c5b3b7504b67dc911abc1ff1881f76', - 'X-Amz-Date': '20240201T012345Z', - Authorization: - 'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=content-length;content-md5;content-type;host;x-amz-content-sha256;x-amz-date, Signature=039168f10927b31624f3a5edae8eb4c89405f7c594eb2d6e00257c1462363f99', - }, - data: toDeletionXml(fileName), - }); - }); - - it('should not send a deletion request if no prefix match', async () => { - objectStoreService.list = jest.fn().mockResolvedValue([]); - - const result = await objectStoreService.deleteMany('non-matching-prefix'); - - expect(result).toBeUndefined(); - }); - - it('should throw an error on request failure', async () => { - mockAxios.request.mockRejectedValue(mockError); - - const promise = objectStoreService.deleteMany('test-dir/'); - - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); - }); -}); - -describe('list()', () => { - it('should list objects with a common prefix', async () => { - const prefix = 'test-dir/'; - - const mockListPage = { - contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }], - isTruncated: false, + describe('getClientConfig()', () => { + const credentials = { + accessKeyId: s3Config.credentials.accessKey, + secretAccessKey: s3Config.credentials.accessSecret, }; - objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage); + it('should return client config with endpoint and forcePathStyle when custom host is provided', () => { + s3Config.host = 'example.com'; - mockAxios.request.mockResolvedValue({ status: 200 }); + const clientConfig = objectStoreService.getClientConfig(); - const result = await objectStoreService.list(prefix); + expect(clientConfig).toEqual({ + endpoint: 'https://example.com', + forcePathStyle: true, + region: mockBucket.region, + credentials, + }); + }); - expect(result).toEqual(mockListPage.contents); + it('should return client config without endpoint when host is not provided', () => { + s3Config.host = ''; + + const clientConfig = objectStoreService.getClientConfig(); + + expect(clientConfig).toEqual({ + region: mockBucket.region, + credentials, + }); + }); + + it('should return client config without credentials when authAutoDetect is true', () => { + s3Config.credentials.authAutoDetect = true; + + const clientConfig = objectStoreService.getClientConfig(); + + expect(clientConfig).toEqual({ + region: mockBucket.region, + }); + }); }); - it('should consolidate pages', async () => { - const prefix = 'test-dir/'; + describe('checkConnection()', () => { + it('should send a HEAD request to the correct bucket', async () => { + mockS3Send.mockResolvedValueOnce({}); - const mockFirstListPage = { - contents: [{ key: `${prefix}file1.txt` }], - isTruncated: true, - nextContinuationToken: 'token1', - }; + objectStoreService.setReady(false); - const mockSecondListPage = { - contents: [{ key: `${prefix}file2.txt` }], - isTruncated: false, - }; + await objectStoreService.checkConnection(); - objectStoreService.getListPage = jest - .fn() - .mockResolvedValueOnce(mockFirstListPage) - .mockResolvedValueOnce(mockSecondListPage); + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(HeadBucketCommand); + expect(command.input).toEqual({ Bucket: 'test-bucket' }); + }); - mockAxios.request.mockResolvedValue({ status: 200 }); + it('should throw an error on request failure', async () => { + objectStoreService.setReady(false); - const result = await objectStoreService.list(prefix); + mockS3Send.mockRejectedValueOnce(mockError); - expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]); + const promise = objectStoreService.checkConnection(); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); }); - it('should throw an error on request failure', async () => { - mockAxios.request.mockRejectedValue(mockError); + describe('getMetadata()', () => { + it('should send a HEAD request to the correct bucket and key', async () => { + mockS3Send.mockResolvedValueOnce({ + ContentType: 'text/plain', + ContentLength: 1024, + ETag: '"abc123"', + LastModified: new Date(), + Metadata: { filename: 'test.txt' }, + }); - const promise = objectStoreService.list('test-dir/'); + await objectStoreService.getMetadata(fileId); - await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(HeadObjectCommand); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Key: fileId, + }); + }); + + it('should throw an error on request failure', async () => { + mockS3Send.mockRejectedValueOnce(mockError); + + const promise = objectStoreService.getMetadata(fileId); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); + }); + + describe('put()', () => { + it('should send a PUT request to upload an object', async () => { + const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; + + mockS3Send.mockResolvedValueOnce({}); + + await objectStoreService.put(fileId, mockBuffer, metadata); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(PutObjectCommand); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Key: fileId, + Body: mockBuffer, + ContentLength: mockBuffer.length, + ContentMD5: 'yh6gLBC3w39CW5t92G1eEQ==', + ContentType: 'text/plain', + Metadata: { filename: 'file.txt' }, + }); + }); + + it('should throw an error on request failure', async () => { + const metadata = { fileName: 'file.txt', mimeType: 'text/plain' }; + + mockS3Send.mockRejectedValueOnce(mockError); + + const promise = objectStoreService.put(fileId, mockBuffer, metadata); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); + }); + + describe('get()', () => { + it('should send a GET request to download an object as a buffer', async () => { + const fileId = 'file.txt'; + const body = Readable.from(mockBuffer); + + mockS3Send.mockResolvedValueOnce({ Body: body }); + + const result = await objectStoreService.get(fileId, { mode: 'buffer' }); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(GetObjectCommand); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Key: fileId, + }); + + expect(Buffer.isBuffer(result)).toBe(true); + }); + + it('should send a GET request to download an object as a stream', async () => { + const body = new Readable(); + + mockS3Send.mockResolvedValueOnce({ Body: body }); + + const result = await objectStoreService.get(fileId, { mode: 'stream' }); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(GetObjectCommand); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Key: fileId, + }); + + expect(result instanceof Readable).toBe(true); + expect(result).toBe(body); + }); + + it('should throw an error on request failure', async () => { + mockS3Send.mockRejectedValueOnce(mockError); + + const promise = objectStoreService.get(fileId, { mode: 'buffer' }); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); + }); + + describe('deleteOne()', () => { + it('should send a DELETE request to delete a single object', async () => { + mockS3Send.mockResolvedValueOnce({}); + + await objectStoreService.deleteOne(fileId); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(DeleteObjectCommand); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Key: fileId, + }); + }); + + it('should throw an error on request failure', async () => { + mockS3Send.mockRejectedValueOnce(mockError); + + const promise = objectStoreService.deleteOne(fileId); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); + }); + + describe('deleteMany()', () => { + it('should send a DELETE request to delete multiple objects', async () => { + const prefix = 'test-dir/'; + const fileName = 'file.txt'; + + const mockList = [ + { + key: fileName, + lastModified: '2023-09-24T12:34:56Z', + eTag: 'abc123def456', + size: 456789, + storageClass: 'STANDARD', + }, + ]; + + objectStoreService.list = jest.fn().mockResolvedValue(mockList); + mockS3Send.mockResolvedValueOnce({}); + + await objectStoreService.deleteMany(prefix); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(DeleteObjectsCommand); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Delete: { + Objects: [{ Key: fileName }], + }, + }); + }); + + it('should not send a deletion request if no prefix match', async () => { + objectStoreService.list = jest.fn().mockResolvedValue([]); + + const result = await objectStoreService.deleteMany('non-matching-prefix'); + + expect(result).toBeUndefined(); + expect(mockS3Send).not.toHaveBeenCalled(); + }); + + it('should throw an error on request failure', async () => { + objectStoreService.list = jest.fn().mockResolvedValue([{ key: 'file.txt' }]); + mockS3Send.mockRejectedValueOnce(mockError); + + const promise = objectStoreService.deleteMany('test-dir/'); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); + }); + + describe('list()', () => { + it('should list objects with a common prefix', async () => { + const prefix = 'test-dir/'; + + const mockListPage = { + contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }], + isTruncated: false, + }; + + objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage); + + const result = await objectStoreService.list(prefix); + + expect(result).toEqual(mockListPage.contents); + }); + + it('should consolidate pages', async () => { + const prefix = 'test-dir/'; + + const mockFirstListPage = { + contents: [{ key: `${prefix}file1.txt` }], + isTruncated: true, + nextContinuationToken: 'token1', + }; + + const mockSecondListPage = { + contents: [{ key: `${prefix}file2.txt` }], + isTruncated: false, + }; + + objectStoreService.getListPage = jest + .fn() + .mockResolvedValueOnce(mockFirstListPage) + .mockResolvedValueOnce(mockSecondListPage); + + const result = await objectStoreService.list(prefix); + + expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]); + }); + + it('should throw an error on request failure', async () => { + objectStoreService.getListPage = jest.fn().mockRejectedValueOnce(mockError); + + const promise = objectStoreService.list('test-dir/'); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); + }); + + describe('getListPage()', () => { + it('should fetch a page of objects with a common prefix', async () => { + const prefix = 'test-dir/'; + const mockContents = [ + { + Key: `${prefix}file1.txt`, + LastModified: new Date(), + ETag: '"abc123"', + Size: 123, + StorageClass: 'STANDARD', + }, + ]; + + mockS3Send.mockResolvedValueOnce({ + Contents: mockContents, + IsTruncated: false, + }); + + const result = await objectStoreService.getListPage(prefix); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command).toBeInstanceOf(ListObjectsV2Command); + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Prefix: prefix, + }); + + expect(result.contents).toHaveLength(1); + expect(result.isTruncated).toBe(false); + }); + + it('should use continuation token when provided', async () => { + const prefix = 'test-dir/'; + const token = 'next-page-token'; + + mockS3Send.mockResolvedValueOnce({ + Contents: [], + IsTruncated: false, + }); + + await objectStoreService.getListPage(prefix, token); + + const commandCaptor = captor(); + expect(mockS3Send).toHaveBeenCalledWith(commandCaptor); + const command = commandCaptor.value; + expect(command.input).toEqual({ + Bucket: 'test-bucket', + Prefix: prefix, + ContinuationToken: token, + }); + }); + + it('should throw an error on request failure', async () => { + mockS3Send.mockRejectedValueOnce(mockError); + + const promise = objectStoreService.getListPage('test-dir/'); + + await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); + }); }); }); diff --git a/packages/@n8n/config/src/configs/external-storage.config.ts b/packages/core/src/binary-data/object-store/object-store.config.ts similarity index 52% rename from packages/@n8n/config/src/configs/external-storage.config.ts rename to packages/core/src/binary-data/object-store/object-store.config.ts index 6a51c263e2..f02901aebc 100644 --- a/packages/@n8n/config/src/configs/external-storage.config.ts +++ b/packages/core/src/binary-data/object-store/object-store.config.ts @@ -1,13 +1,12 @@ +import { Config, Env, Nested } from '@n8n/config'; import { z } from 'zod'; -import { Config, Env, Nested } from '../decorators'; - const protocolSchema = z.enum(['http', 'https']); export type Protocol = z.infer; @Config -class S3BucketConfig { +class ObjectStoreBucketConfig { /** Name of the n8n bucket in S3-compatible external storage */ @Env('N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME') name: string = ''; @@ -18,7 +17,7 @@ class S3BucketConfig { } @Config -class S3CredentialsConfig { +class ObjectStoreCredentialsConfig { /** Access key in S3-compatible external storage */ @Env('N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY') accessKey: string = ''; @@ -26,11 +25,22 @@ class S3CredentialsConfig { /** Access secret in S3-compatible external storage */ @Env('N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET') accessSecret: string = ''; + + /** + * Use automatic credential detection to authenticate S3 calls for external storage + * This will ignore accessKey/accessSecret and use the default credential provider chain + * https://docs.aws.amazon.com/sdk-for-javascript/v3/developer-guide/setting-credentials-node.html#credchain + */ + @Env('N8N_EXTERNAL_STORAGE_S3_AUTH_AUTO_DETECT') + authAutoDetect: boolean = false; } @Config -export class S3Config { - /** Host of the n8n bucket in S3-compatible external storage @example "s3.us-east-1.amazonaws.com" */ +export class ObjectStoreConfig { + /** + * Host of the object-store bucket in S3-compatible external storage + * @example "s3.us-east-1.amazonaws.com" + **/ @Env('N8N_EXTERNAL_STORAGE_S3_HOST') host: string = ''; @@ -38,14 +48,8 @@ export class S3Config { protocol: Protocol = 'https'; @Nested - bucket: S3BucketConfig; + bucket: ObjectStoreBucketConfig = {} as ObjectStoreBucketConfig; @Nested - credentials: S3CredentialsConfig; -} - -@Config -export class ExternalStorageConfig { - @Nested - s3: S3Config; + credentials: ObjectStoreCredentialsConfig = {} as ObjectStoreCredentialsConfig; } diff --git a/packages/core/src/binary-data/object-store/object-store.service.ee.ts b/packages/core/src/binary-data/object-store/object-store.service.ee.ts index b557146872..408d420e6f 100644 --- a/packages/core/src/binary-data/object-store/object-store.service.ee.ts +++ b/packages/core/src/binary-data/object-store/object-store.service.ee.ts @@ -1,44 +1,73 @@ -import { S3Config } from '@n8n/config'; +import type { + PutObjectCommandInput, + DeleteObjectsCommandInput, + ListObjectsV2CommandInput, + S3ClientConfig, +} from '@aws-sdk/client-s3'; +import { + S3Client, + HeadBucketCommand, + PutObjectCommand, + GetObjectCommand, + HeadObjectCommand, + DeleteObjectCommand, + DeleteObjectsCommand, + ListObjectsV2Command, +} from '@aws-sdk/client-s3'; import { Service } from '@n8n/di'; -import { sign } from 'aws4'; -import type { Request as Aws4Options } from 'aws4'; -import axios from 'axios'; -import type { AxiosRequestConfig, Method } from 'axios'; -import { ApplicationError } from 'n8n-workflow'; +import { UnexpectedError } from 'n8n-workflow'; import { createHash } from 'node:crypto'; -import type { Readable } from 'stream'; +import { Readable } from 'node:stream'; import { Logger } from '@/logging/logger'; -import type { ListPage, MetadataResponseHeaders, RawListPage, RequestOptions } from './types'; -import { isStream, parseXml } from './utils'; +import { ObjectStoreConfig } from './object-store.config'; +import type { MetadataResponseHeaders } from './types'; import type { BinaryData } from '../types'; +import { streamToBuffer } from '../utils'; @Service() export class ObjectStoreService { - private baseUrl: URL; + private s3Client: S3Client; private isReady = false; + private bucket: string; + constructor( private readonly logger: Logger, - private readonly s3Config: S3Config, + private readonly s3Config: ObjectStoreConfig, ) { - const { host, bucket, protocol } = s3Config; - - if (host === '') { - throw new ApplicationError( - 'External storage host not configured. Please set `N8N_EXTERNAL_STORAGE_S3_HOST`.', - ); - } - + const { bucket } = s3Config; if (bucket.name === '') { - throw new ApplicationError( + throw new UnexpectedError( 'External storage bucket name not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME`.', ); } - this.baseUrl = new URL(`${protocol}://${host}/${bucket.name}`); + this.bucket = bucket.name; + this.s3Client = new S3Client(this.getClientConfig()); + } + + /** This generates the config for the S3Client to make it work in all various auth configurations */ + getClientConfig() { + const { host, bucket, protocol, credentials } = this.s3Config; + const clientConfig: S3ClientConfig = {}; + const endpoint = host ? `${protocol}://${host}` : undefined; + if (endpoint) { + clientConfig.endpoint = endpoint; + clientConfig.forcePathStyle = true; // Needed for non-AWS S3 compatible services + } + if (bucket.region.length) { + clientConfig.region = bucket.region; + } + if (!credentials.authAutoDetect) { + clientConfig.credentials = { + accessKeyId: credentials.accessKey, + secretAccessKey: credentials.accessSecret, + }; + } + return clientConfig; } async init() { @@ -52,92 +81,153 @@ export class ObjectStoreService { /** * Confirm that the configured bucket exists and the caller has permission to access it. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html */ async checkConnection() { if (this.isReady) return; - return await this.request('HEAD', ''); + try { + this.logger.debug('Checking connection to S3 bucket', { bucket: this.bucket }); + const command = new HeadBucketCommand({ Bucket: this.bucket }); + await this.s3Client.send(command); + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** * Upload an object to the configured bucket. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html */ async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) { - const headers: Record = { - 'Content-Length': buffer.length, - 'Content-MD5': createHash('md5').update(buffer).digest('base64'), - }; + try { + const params: PutObjectCommandInput = { + Bucket: this.bucket, + Key: filename, + Body: buffer, + ContentLength: buffer.length, + ContentMD5: createHash('md5').update(buffer).digest('base64'), + }; - if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName; - if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType; + if (metadata.fileName) { + params.Metadata = { filename: metadata.fileName }; + } - return await this.request('PUT', filename, { headers, body: buffer }); + if (metadata.mimeType) { + params.ContentType = metadata.mimeType; + } + + this.logger.debug('Sending PUT request to S3', { params }); + const command = new PutObjectCommand(params); + return await this.s3Client.send(command); + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** * Download an object as a stream or buffer from the configured bucket. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html */ async get(fileId: string, { mode }: { mode: 'buffer' }): Promise; async get(fileId: string, { mode }: { mode: 'stream' }): Promise; - async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }) { - const { data } = await this.request('GET', fileId, { - responseType: mode === 'buffer' ? 'arraybuffer' : 'stream', + async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }): Promise { + this.logger.debug('Sending GET request to S3', { bucket: this.bucket, key: fileId }); + + const command = new GetObjectCommand({ + Bucket: this.bucket, + Key: fileId, }); - if (mode === 'stream' && isStream(data)) return data; + try { + const { Body: body } = await this.s3Client.send(command); + if (!body) throw new UnexpectedError('Received empty response body'); - if (mode === 'buffer' && Buffer.isBuffer(data)) return data; + if (mode === 'stream') { + if (body instanceof Readable) return body; + throw new UnexpectedError(`Expected stream but received ${typeof body}.`); + } - throw new TypeError(`Expected ${mode} but received ${typeof data}.`); + return await streamToBuffer(body as Readable); + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** * Retrieve metadata for an object in the configured bucket. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html */ - async getMetadata(fileId: string) { - const response = await this.request('HEAD', fileId); + async getMetadata(fileId: string): Promise { + try { + const command = new HeadObjectCommand({ + Bucket: this.bucket, + Key: fileId, + }); - return response.headers as MetadataResponseHeaders; + this.logger.debug('Sending HEAD request to S3', { bucket: this.bucket, key: fileId }); + const response = await this.s3Client.send(command); + + // Convert response to the expected format for backward compatibility + const headers: MetadataResponseHeaders = {}; + + if (response.ContentType) headers['content-type'] = response.ContentType; + if (response.ContentLength) headers['content-length'] = String(response.ContentLength); + if (response.ETag) headers.etag = response.ETag; + if (response.LastModified) headers['last-modified'] = response.LastModified.toUTCString(); + + // Add metadata with the expected prefix format + if (response.Metadata) { + Object.entries(response.Metadata).forEach(([key, value]) => { + headers[`x-amz-meta-${key.toLowerCase()}`] = value; + }); + } + + return headers; + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** * Delete a single object in the configured bucket. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html */ async deleteOne(fileId: string) { - return await this.request('DELETE', fileId); + try { + const command = new DeleteObjectCommand({ + Bucket: this.bucket, + Key: fileId, + }); + + this.logger.debug('Sending DELETE request to S3', { bucket: this.bucket, key: fileId }); + return await this.s3Client.send(command); + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** * Delete objects with a common prefix in the configured bucket. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html */ async deleteMany(prefix: string) { - const objects = await this.list(prefix); + try { + const objects = await this.list(prefix); - if (objects.length === 0) return; + if (objects.length === 0) return; - const innerXml = objects.map(({ key }) => `${key}`).join('\n'); + const params: DeleteObjectsCommandInput = { + Bucket: this.bucket, + Delete: { + Objects: objects.map(({ key }) => ({ Key: key })), + }, + }; - const body = ['', innerXml, ''].join('\n'); + this.logger.debug('Sending DELETE MANY request to S3', { + bucket: this.bucket, + objectCount: objects.length, + }); - const headers = { - 'Content-Type': 'application/xml', - 'Content-Length': body.length, - 'Content-MD5': createHash('md5').update(body).digest('base64'), - }; - - return await this.request('POST', '', { headers, body, qs: { delete: '' } }); + const command = new DeleteObjectsCommand(params); + return await this.s3Client.send(command); + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** @@ -145,108 +235,62 @@ export class ObjectStoreService { */ async list(prefix: string) { const items = []; + let isTruncated = true; + let continuationToken; - let isTruncated; - let nextPageToken; + try { + while (isTruncated) { + const listPage = await this.getListPage(prefix, continuationToken); - do { - const listPage = await this.getListPage(prefix, nextPageToken); + if (listPage.contents?.length > 0) { + items.push(...listPage.contents); + } - if (listPage.contents?.length > 0) items.push(...listPage.contents); + isTruncated = listPage.isTruncated; + continuationToken = listPage.nextContinuationToken; + } - isTruncated = listPage.isTruncated; - nextPageToken = listPage.nextContinuationToken; - } while (isTruncated && nextPageToken); - - return items; + return items; + } catch (e) { + throw new UnexpectedError('Request to S3 failed', { cause: e }); + } } /** * Fetch a page of objects with a common prefix in the configured bucket. - * - * Max 1000 objects per page - set by AWS. - * - * @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html */ - async getListPage(prefix: string, nextPageToken?: string) { - const qs: Record = { 'list-type': 2, prefix }; - - if (nextPageToken) qs['continuation-token'] = nextPageToken; - - const { data } = await this.request('GET', '', { qs }); - - if (typeof data !== 'string') { - throw new TypeError(`Expected XML string but received ${typeof data}`); - } - - const { listBucketResult: page } = await parseXml(data); - - if (!page.contents) return { ...page, contents: [] }; - - // `explicitArray: false` removes array wrapper on single item array, so restore it - - if (!Array.isArray(page.contents)) page.contents = [page.contents]; - - // remove null prototype - https://github.com/Leonidas-from-XIV/node-xml2js/issues/670 - - page.contents.forEach((item) => { - Object.setPrototypeOf(item, Object.prototype); - }); - - return page as ListPage; - } - - private async request( - method: Method, - rawPath = '', - { qs, headers, body, responseType }: RequestOptions = {}, - ) { - const url = new URL(this.baseUrl); - if (rawPath && rawPath !== '/') { - url.pathname = `${url.pathname}/${rawPath}`; - } - Object.entries(qs ?? {}).forEach(([key, value]) => { - url.searchParams.set(key, String(value)); - }); - - const optionsToSign: Aws4Options = { - method, - service: 's3', - region: this.s3Config.bucket.region, - host: this.s3Config.host, - path: `${url.pathname}${url.search}`, - }; - - if (headers) optionsToSign.headers = headers; - if (body) optionsToSign.body = body; - - const { accessKey, accessSecret } = this.s3Config.credentials; - const signedOptions = sign(optionsToSign, { - accessKeyId: accessKey, - secretAccessKey: accessSecret, - }); - - const config: AxiosRequestConfig = { - method, - url: url.toString(), - headers: signedOptions.headers, - }; - - if (body) config.data = body; - if (responseType) config.responseType = responseType; - + async getListPage(prefix: string, continuationToken?: string) { try { - this.logger.debug('Sending request to S3', { config }); + const params: ListObjectsV2CommandInput = { + Bucket: this.bucket, + Prefix: prefix, + }; - return await axios.request(config); + if (continuationToken) { + params.ContinuationToken = continuationToken; + } + + this.logger.debug('Sending list request to S3', { bucket: this.bucket, prefix }); + const command = new ListObjectsV2Command(params); + const response = await this.s3Client.send(command); + + // Convert response to match expected format for compatibility + const contents = + response.Contents?.map((item) => ({ + key: item.Key ?? '', + lastModified: item.LastModified?.toISOString() ?? '', + eTag: item.ETag ?? '', + size: item.Size ?? 0, + storageClass: item.StorageClass ?? '', + })) ?? []; + + return { + contents, + isTruncated: response.IsTruncated ?? false, + nextContinuationToken: response.NextContinuationToken, + }; } catch (e) { - const error = e instanceof Error ? e : new Error(`${e}`); - - const message = `Request to S3 failed: ${error.message}`; - - this.logger.error(message, { config }); - - throw new ApplicationError(message, { cause: error, extra: { config } }); + throw new UnexpectedError('Request to S3 failed', { cause: e }); } } } diff --git a/packages/core/src/binary-data/object-store/types.ts b/packages/core/src/binary-data/object-store/types.ts index 20390cf243..54253f9067 100644 --- a/packages/core/src/binary-data/object-store/types.ts +++ b/packages/core/src/binary-data/object-store/types.ts @@ -1,38 +1,9 @@ -import type { AxiosResponseHeaders, ResponseType } from 'axios'; - import type { BinaryData } from '../types'; -export type RawListPage = { - listBucketResult: { - name: string; - prefix: string; - keyCount: number; - maxKeys: number; - isTruncated: boolean; - nextContinuationToken?: string; // only if isTruncated is true - contents?: Item | Item[]; - }; -}; - -type Item = { - key: string; - lastModified: string; - eTag: string; - size: number; // bytes - storageClass: string; -}; - -export type ListPage = Omit & { contents: Item[] }; - -export type RequestOptions = { - qs?: Record; - headers?: Record; - body?: string | Buffer; - responseType?: ResponseType; -}; - -export type MetadataResponseHeaders = AxiosResponseHeaders & { - 'content-length': string; +export type MetadataResponseHeaders = Record & { + 'content-length'?: string; 'content-type'?: string; 'x-amz-meta-filename'?: string; + etag?: string; + 'last-modified'?: string; } & BinaryData.PreWriteMetadata; diff --git a/packages/core/src/binary-data/object-store/utils.ts b/packages/core/src/binary-data/object-store/utils.ts deleted file mode 100644 index 16ed264eb3..0000000000 --- a/packages/core/src/binary-data/object-store/utils.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { Stream } from 'node:stream'; -import { parseStringPromise } from 'xml2js'; -import { firstCharLowerCase, parseBooleans, parseNumbers } from 'xml2js/lib/processors'; - -export function isStream(maybeStream: unknown): maybeStream is Stream { - return maybeStream instanceof Stream; -} - -export async function parseXml(xml: string): Promise { - return await (parseStringPromise(xml, { - explicitArray: false, - ignoreAttrs: true, - tagNameProcessors: [firstCharLowerCase], - valueProcessors: [parseNumbers, parseBooleans], - }) as Promise); -} diff --git a/packages/core/src/binary-data/utils.ts b/packages/core/src/binary-data/utils.ts index bedda5be12..13f46fef61 100644 --- a/packages/core/src/binary-data/utils.ts +++ b/packages/core/src/binary-data/utils.ts @@ -1,4 +1,4 @@ -import concatStream from 'concat-stream'; +import { UnexpectedError } from 'n8n-workflow'; import fs from 'node:fs/promises'; import type { Readable } from 'node:stream'; @@ -33,16 +33,22 @@ export async function doesNotExist(dir: string) { } } +/** Converts a readable stream to a buffer */ +export async function streamToBuffer(stream: Readable) { + return await new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + stream.on('data', (chunk: Buffer) => chunks.push(chunk)); + stream.on('end', () => resolve(Buffer.concat(chunks))); + stream.once('error', (cause) => { + if ('code' in cause && cause.code === 'Z_DATA_ERROR') + reject(new UnexpectedError('Failed to decompress response', { cause })); + else reject(cause); + }); + }); +} + /** Converts a buffer or a readable stream to a buffer */ export async function binaryToBuffer(body: Buffer | Readable) { if (Buffer.isBuffer(body)) return body; - return await new Promise((resolve, reject) => { - body - .once('error', (cause) => { - if ('code' in cause && cause.code === 'Z_DATA_ERROR') - reject(new Error('Failed to decompress response', { cause })); - else reject(cause); - }) - .pipe(concatStream(resolve)); - }); + return await streamToBuffer(body); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 773fdbe539..1a0c48719c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1306,6 +1306,9 @@ importers: packages/core: dependencies: + '@aws-sdk/client-s3': + specifier: 3.666.0 + version: 3.666.0 '@langchain/core': specifier: 'catalog:' version: 0.3.30(openai@4.78.1(encoding@0.1.13)(zod@3.24.1)) @@ -1321,9 +1324,6 @@ importers: '@sentry/node': specifier: 'catalog:' version: 8.52.1 - aws4: - specifier: 1.11.0 - version: 1.11.0 axios: specifier: 'catalog:' version: 1.8.2(debug@4.3.6) @@ -1333,9 +1333,6 @@ importers: chardet: specifier: 2.0.0 version: 2.0.0 - concat-stream: - specifier: 2.0.0 - version: 2.0.0 cron: specifier: 3.1.7 version: 3.1.7 @@ -1403,12 +1400,6 @@ importers: '@n8n/typescript-config': specifier: workspace:* version: link:../@n8n/typescript-config - '@types/aws4': - specifier: ^1.5.1 - version: 1.11.2 - '@types/concat-stream': - specifier: ^2.0.0 - version: 2.0.0 '@types/express': specifier: 'catalog:' version: 5.0.1 @@ -6014,9 +6005,6 @@ packages: '@types/compression@1.7.5': resolution: {integrity: sha512-AAQvK5pxMpaT+nDvhHrsBhLSYG5yQdtkaJE1WYieSNY2mVFKAgmU4ks65rkZD5oqnGCFLyQpUr1CqI4DmUMyDg==} - '@types/concat-stream@2.0.0': - resolution: {integrity: sha512-t3YCerNM7NTVjLuICZo5gYAXYoDvpuuTceCcFQWcDQz26kxUR5uIWolxbIR5jRNIXpMqhOpW/b8imCR1LEmuJw==} - '@types/connect@3.4.36': resolution: {integrity: sha512-P63Zd/JUGq+PdrM1lv0Wv5SBYeA2+CORvbrXbngriYY0jzLUWfQMQQxOhjONEz/wlHOAxOdY7CY65rgQdTjq2w==} @@ -18855,10 +18843,6 @@ snapshots: dependencies: '@types/express': 5.0.1 - '@types/concat-stream@2.0.0': - dependencies: - '@types/node': 18.16.16 - '@types/connect@3.4.36': dependencies: '@types/node': 18.16.16