mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
feat(core): Add InstanceRole auth support for binary-data object- storage backend (#14800)
This commit is contained in:
committed by
GitHub
parent
9082adf89a
commit
271024ded0
@@ -10,7 +10,6 @@ import { EndpointsConfig } from './configs/endpoints.config';
|
||||
import { EventBusConfig } from './configs/event-bus.config';
|
||||
import { ExecutionsConfig } from './configs/executions.config';
|
||||
import { ExternalHooksConfig } from './configs/external-hooks.config';
|
||||
import { ExternalStorageConfig } from './configs/external-storage.config';
|
||||
import { GenericConfig } from './configs/generic.config';
|
||||
import { LicenseConfig } from './configs/license.config';
|
||||
import { LoggingConfig } from './configs/logging.config';
|
||||
@@ -34,7 +33,6 @@ export { Config, Env, Nested } from './decorators';
|
||||
export { TaskRunnersConfig } from './configs/runners.config';
|
||||
export { SecurityConfig } from './configs/security.config';
|
||||
export { ExecutionsConfig } from './configs/executions.config';
|
||||
export { S3Config } from './configs/external-storage.config';
|
||||
export { LOG_SCOPES } from './configs/logging.config';
|
||||
export type { LogScope } from './configs/logging.config';
|
||||
export { WorkflowsConfig } from './configs/workflows.config';
|
||||
@@ -76,9 +74,6 @@ export class GlobalConfig {
|
||||
@Nested
|
||||
nodes: NodesConfig;
|
||||
|
||||
@Nested
|
||||
externalStorage: ExternalStorageConfig;
|
||||
|
||||
@Nested
|
||||
workflows: WorkflowsConfig;
|
||||
|
||||
|
||||
@@ -139,20 +139,6 @@ describe('GlobalConfig', () => {
|
||||
endpoint: 'https://api.n8n.io/api/versions/',
|
||||
infoUrl: 'https://docs.n8n.io/hosting/installation/updating/',
|
||||
},
|
||||
externalStorage: {
|
||||
s3: {
|
||||
host: '',
|
||||
protocol: 'https',
|
||||
bucket: {
|
||||
name: '',
|
||||
region: '',
|
||||
},
|
||||
credentials: {
|
||||
accessKey: '',
|
||||
accessSecret: '',
|
||||
},
|
||||
},
|
||||
},
|
||||
workflows: {
|
||||
defaultName: 'My workflow',
|
||||
callerPolicyDefaultOption: 'workflowsFromSameOwner',
|
||||
|
||||
@@ -28,8 +28,6 @@
|
||||
],
|
||||
"devDependencies": {
|
||||
"@n8n/typescript-config": "workspace:*",
|
||||
"@types/aws4": "^1.5.1",
|
||||
"@types/concat-stream": "^2.0.0",
|
||||
"@types/express": "catalog:",
|
||||
"@types/jsonwebtoken": "catalog:",
|
||||
"@types/lodash": "catalog:",
|
||||
@@ -38,16 +36,15 @@
|
||||
"@types/xml2js": "catalog:"
|
||||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-s3": "3.666.0",
|
||||
"@langchain/core": "catalog:",
|
||||
"@n8n/client-oauth2": "workspace:*",
|
||||
"@n8n/config": "workspace:*",
|
||||
"@n8n/di": "workspace:*",
|
||||
"@sentry/node": "catalog:",
|
||||
"aws4": "1.11.0",
|
||||
"axios": "catalog:",
|
||||
"callsites": "catalog:",
|
||||
"chardet": "2.0.0",
|
||||
"concat-stream": "2.0.0",
|
||||
"cron": "3.1.7",
|
||||
"fast-glob": "catalog:",
|
||||
"file-type": "16.5.4",
|
||||
|
||||
@@ -2,9 +2,9 @@ import fs from 'node:fs';
|
||||
import fsp from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { Readable } from 'node:stream';
|
||||
|
||||
import { FileSystemManager } from '@/binary-data/file-system.manager';
|
||||
import { isStream } from '@/binary-data/object-store/utils';
|
||||
import { toFileId, toStream } from '@test/utils';
|
||||
|
||||
jest.mock('fs');
|
||||
@@ -70,7 +70,7 @@ describe('getAsStream()', () => {
|
||||
|
||||
const stream = await fsManager.getAsStream(fileId);
|
||||
|
||||
expect(isStream(stream)).toBe(true);
|
||||
expect(stream).toBeInstanceOf(Readable);
|
||||
expect(fs.createReadStream).toHaveBeenCalledWith(toFullFilePath(fileId), {
|
||||
highWaterMark: undefined,
|
||||
});
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import fs from 'node:fs/promises';
|
||||
import { Readable } from 'node:stream';
|
||||
|
||||
import { ObjectStoreService } from '@/binary-data/object-store/object-store.service.ee';
|
||||
import type { MetadataResponseHeaders } from '@/binary-data/object-store/types';
|
||||
import { isStream } from '@/binary-data/object-store/utils';
|
||||
import { ObjectStoreManager } from '@/binary-data/object-store.manager';
|
||||
import { mockInstance, toFileId, toStream } from '@test/utils';
|
||||
|
||||
@@ -67,7 +67,7 @@ describe('getAsStream()', () => {
|
||||
|
||||
const stream = await objectStoreManager.getAsStream(fileId);
|
||||
|
||||
expect(isStream(stream)).toBe(true);
|
||||
expect(stream).toBeInstanceOf(Readable);
|
||||
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
import { Readable } from 'node:stream';
|
||||
import { createGunzip } from 'node:zlib';
|
||||
|
||||
@@ -27,7 +28,7 @@ describe('BinaryData/utils', () => {
|
||||
const gunzip = createGunzip();
|
||||
const body = Readable.from(Buffer.from('0001f8b080000000000000000', 'hex')).pipe(gunzip);
|
||||
await expect(binaryToBuffer(body)).rejects.toThrow(
|
||||
new Error('Failed to decompress response'),
|
||||
new UnexpectedError('Failed to decompress response'),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,227 +1,271 @@
|
||||
import type { S3Config } from '@n8n/config';
|
||||
import axios from 'axios';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import {
|
||||
DeleteObjectCommand,
|
||||
DeleteObjectsCommand,
|
||||
GetObjectCommand,
|
||||
HeadBucketCommand,
|
||||
HeadObjectCommand,
|
||||
ListObjectsV2Command,
|
||||
PutObjectCommand,
|
||||
type S3Client,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import { captor, mock } from 'jest-mock-extended';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import { ObjectStoreService } from '@/binary-data/object-store/object-store.service.ee';
|
||||
import type { ObjectStoreConfig } from '../object-store.config';
|
||||
import { ObjectStoreService } from '../object-store.service.ee';
|
||||
|
||||
jest.mock('axios');
|
||||
const mockS3Send = jest.fn();
|
||||
const s3Client = mock<S3Client>({ send: mockS3Send });
|
||||
jest.mock('@aws-sdk/client-s3', () => ({
|
||||
...jest.requireActual('@aws-sdk/client-s3'),
|
||||
S3Client: class {
|
||||
constructor() {
|
||||
return s3Client;
|
||||
}
|
||||
},
|
||||
}));
|
||||
|
||||
const mockAxios = axios as jest.Mocked<typeof axios>;
|
||||
|
||||
const mockBucket = { region: 'us-east-1', name: 'test-bucket' };
|
||||
const mockHost = `s3.${mockBucket.region}.amazonaws.com`;
|
||||
const mockCredentials = { accessKey: 'mock-access-key', accessSecret: 'mock-secret-key' };
|
||||
const mockUrl = `https://${mockHost}/${mockBucket.name}`;
|
||||
const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed';
|
||||
const mockError = new Error('Something went wrong!');
|
||||
const fileId =
|
||||
'workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32';
|
||||
const mockBuffer = Buffer.from('Test data');
|
||||
const s3Config = mock<S3Config>({
|
||||
describe('ObjectStoreService', () => {
|
||||
const mockBucket = { region: 'us-east-1', name: 'test-bucket' };
|
||||
const mockHost = `s3.${mockBucket.region}.amazonaws.com`;
|
||||
const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed';
|
||||
const mockError = new Error('Something went wrong!');
|
||||
const workflowId = 'workflow-id';
|
||||
const executionId = 999;
|
||||
const binaryDataId = '71f6209b-5d48-41a2-a224-80d529d8bb32';
|
||||
const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataId}`;
|
||||
const mockBuffer = Buffer.from('Test data');
|
||||
const s3Config = mock<ObjectStoreConfig>({
|
||||
host: mockHost,
|
||||
bucket: mockBucket,
|
||||
credentials: mockCredentials,
|
||||
credentials: {
|
||||
accessKey: 'mock-access-key',
|
||||
accessSecret: 'mock-secret-key',
|
||||
authAutoDetect: false,
|
||||
},
|
||||
protocol: 'https',
|
||||
});
|
||||
});
|
||||
|
||||
const toDeletionXml = (filename: string) => `<Delete>
|
||||
<Object><Key>${filename}</Key></Object>
|
||||
</Delete>`;
|
||||
let objectStoreService: ObjectStoreService;
|
||||
|
||||
let objectStoreService: ObjectStoreService;
|
||||
const now = new Date('2024-02-01T01:23:45.678Z');
|
||||
jest.useFakeTimers({ now });
|
||||
|
||||
const now = new Date('2024-02-01T01:23:45.678Z');
|
||||
jest.useFakeTimers({ now });
|
||||
|
||||
beforeEach(async () => {
|
||||
beforeEach(async () => {
|
||||
objectStoreService = new ObjectStoreService(mock(), s3Config);
|
||||
mockAxios.request.mockResolvedValueOnce({ status: 200 }); // for checkConnection
|
||||
await objectStoreService.init();
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
});
|
||||
|
||||
describe('checkConnection()', () => {
|
||||
it('should send a HEAD request to the correct host', async () => {
|
||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||
describe('getClientConfig()', () => {
|
||||
const credentials = {
|
||||
accessKeyId: s3Config.credentials.accessKey,
|
||||
secretAccessKey: s3Config.credentials.accessSecret,
|
||||
};
|
||||
|
||||
it('should return client config with endpoint and forcePathStyle when custom host is provided', () => {
|
||||
s3Config.host = 'example.com';
|
||||
|
||||
const clientConfig = objectStoreService.getClientConfig();
|
||||
|
||||
expect(clientConfig).toEqual({
|
||||
endpoint: 'https://example.com',
|
||||
forcePathStyle: true,
|
||||
region: mockBucket.region,
|
||||
credentials,
|
||||
});
|
||||
});
|
||||
|
||||
it('should return client config without endpoint when host is not provided', () => {
|
||||
s3Config.host = '';
|
||||
|
||||
const clientConfig = objectStoreService.getClientConfig();
|
||||
|
||||
expect(clientConfig).toEqual({
|
||||
region: mockBucket.region,
|
||||
credentials,
|
||||
});
|
||||
});
|
||||
|
||||
it('should return client config without credentials when authAutoDetect is true', () => {
|
||||
s3Config.credentials.authAutoDetect = true;
|
||||
|
||||
const clientConfig = objectStoreService.getClientConfig();
|
||||
|
||||
expect(clientConfig).toEqual({
|
||||
region: mockBucket.region,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('checkConnection()', () => {
|
||||
it('should send a HEAD request to the correct bucket', async () => {
|
||||
mockS3Send.mockResolvedValueOnce({});
|
||||
|
||||
objectStoreService.setReady(false);
|
||||
|
||||
await objectStoreService.checkConnection();
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'HEAD',
|
||||
url: 'https://s3.us-east-1.amazonaws.com/test-bucket',
|
||||
headers: {
|
||||
Host: 's3.us-east-1.amazonaws.com',
|
||||
'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a5240c11a706e9e6c60e7033a848fc934911b12330e5a4609b0b943f97d9781b',
|
||||
},
|
||||
});
|
||||
const commandCaptor = captor<HeadObjectCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(HeadBucketCommand);
|
||||
expect(command.input).toEqual({ Bucket: 'test-bucket' });
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
objectStoreService.setReady(false);
|
||||
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.checkConnection();
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('getMetadata()', () => {
|
||||
it('should send a HEAD request to the correct host and path', async () => {
|
||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||
describe('getMetadata()', () => {
|
||||
it('should send a HEAD request to the correct bucket and key', async () => {
|
||||
mockS3Send.mockResolvedValueOnce({
|
||||
ContentType: 'text/plain',
|
||||
ContentLength: 1024,
|
||||
ETag: '"abc123"',
|
||||
LastModified: new Date(),
|
||||
Metadata: { filename: 'test.txt' },
|
||||
});
|
||||
|
||||
await objectStoreService.getMetadata(fileId);
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'HEAD',
|
||||
url: `${mockUrl}/${fileId}`,
|
||||
headers: {
|
||||
Host: mockHost,
|
||||
'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=60e11c39580ad7dd3a3d549523e7115cdff018540f24c6412ed40053e52a21d0',
|
||||
},
|
||||
const commandCaptor = captor<HeadObjectCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(HeadObjectCommand);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Key: fileId,
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.getMetadata(fileId);
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('put()', () => {
|
||||
describe('put()', () => {
|
||||
it('should send a PUT request to upload an object', async () => {
|
||||
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
|
||||
|
||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||
mockS3Send.mockResolvedValueOnce({});
|
||||
|
||||
await objectStoreService.put(fileId, mockBuffer, metadata);
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'PUT',
|
||||
url: 'https://s3.us-east-1.amazonaws.com/test-bucket/workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32',
|
||||
headers: {
|
||||
'Content-Length': 9,
|
||||
'Content-MD5': 'yh6gLBC3w39CW5t92G1eEQ==',
|
||||
'x-amz-meta-filename': 'file.txt',
|
||||
'Content-Type': 'text/plain',
|
||||
Host: 's3.us-east-1.amazonaws.com',
|
||||
'X-Amz-Content-Sha256': 'e27c8214be8b7cf5bccc7c08247e3cb0c1514a48ee1f63197fe4ef3ef51d7e6f',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=content-length;content-md5;content-type;host;x-amz-content-sha256;x-amz-date;x-amz-meta-filename, Signature=6b0fbb51a35dbfa73ac79a964ffc7203b40517a062efc5b01f5f9b7ad553fa7a',
|
||||
},
|
||||
data: mockBuffer,
|
||||
const commandCaptor = captor<PutObjectCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(PutObjectCommand);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Key: fileId,
|
||||
Body: mockBuffer,
|
||||
ContentLength: mockBuffer.length,
|
||||
ContentMD5: 'yh6gLBC3w39CW5t92G1eEQ==',
|
||||
ContentType: 'text/plain',
|
||||
Metadata: { filename: 'file.txt' },
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
|
||||
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.put(fileId, mockBuffer, metadata);
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('get()', () => {
|
||||
describe('get()', () => {
|
||||
it('should send a GET request to download an object as a buffer', async () => {
|
||||
const fileId = 'file.txt';
|
||||
const body = Readable.from(mockBuffer);
|
||||
|
||||
mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') });
|
||||
mockS3Send.mockResolvedValueOnce({ Body: body });
|
||||
|
||||
const result = await objectStoreService.get(fileId, { mode: 'buffer' });
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'GET',
|
||||
url: `${mockUrl}/${fileId}`,
|
||||
responseType: 'arraybuffer',
|
||||
headers: {
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=5f69680786e0ad9f0a0324eb5e4b8fe8c78562afc924489ea423632a2ad2187d',
|
||||
Host: 's3.us-east-1.amazonaws.com',
|
||||
'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
},
|
||||
const commandCaptor = captor<GetObjectCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(GetObjectCommand);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Key: fileId,
|
||||
});
|
||||
|
||||
expect(Buffer.isBuffer(result)).toBe(true);
|
||||
});
|
||||
|
||||
it('should send a GET request to download an object as a stream', async () => {
|
||||
mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() });
|
||||
const body = new Readable();
|
||||
|
||||
mockS3Send.mockResolvedValueOnce({ Body: body });
|
||||
|
||||
const result = await objectStoreService.get(fileId, { mode: 'stream' });
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'GET',
|
||||
url: `${mockUrl}/${fileId}`,
|
||||
responseType: 'stream',
|
||||
headers: {
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=3ef579ebe2ae89303a89c0faf3ce8ef8e907295dc538d59e95bcf35481c0d03e',
|
||||
Host: 's3.us-east-1.amazonaws.com',
|
||||
'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
},
|
||||
const commandCaptor = captor<GetObjectCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(GetObjectCommand);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Key: fileId,
|
||||
});
|
||||
|
||||
expect(result instanceof Readable).toBe(true);
|
||||
expect(result).toBe(body);
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.get(fileId, { mode: 'buffer' });
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('deleteOne()', () => {
|
||||
describe('deleteOne()', () => {
|
||||
it('should send a DELETE request to delete a single object', async () => {
|
||||
mockAxios.request.mockResolvedValue({ status: 204 });
|
||||
mockS3Send.mockResolvedValueOnce({});
|
||||
|
||||
await objectStoreService.deleteOne(fileId);
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'DELETE',
|
||||
url: `${mockUrl}/${fileId}`,
|
||||
headers: {
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=4ad61b1b4da335c6c49772d28e54a301f787d199c9403055b217f890f7aec7fc',
|
||||
Host: 's3.us-east-1.amazonaws.com',
|
||||
'X-Amz-Content-Sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
},
|
||||
const commandCaptor = captor<DeleteObjectCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(DeleteObjectCommand);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Key: fileId,
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.deleteOne(fileId);
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('deleteMany()', () => {
|
||||
it('should send a POST request to delete multiple objects', async () => {
|
||||
describe('deleteMany()', () => {
|
||||
it('should send a DELETE request to delete multiple objects', async () => {
|
||||
const prefix = 'test-dir/';
|
||||
const fileName = 'file.txt';
|
||||
|
||||
@@ -236,25 +280,19 @@ describe('deleteMany()', () => {
|
||||
];
|
||||
|
||||
objectStoreService.list = jest.fn().mockResolvedValue(mockList);
|
||||
|
||||
mockAxios.request.mockResolvedValue({ status: 204 });
|
||||
mockS3Send.mockResolvedValueOnce({});
|
||||
|
||||
await objectStoreService.deleteMany(prefix);
|
||||
|
||||
expect(mockAxios.request).toHaveBeenCalledWith({
|
||||
method: 'POST',
|
||||
url: `${mockUrl}?delete=`,
|
||||
headers: {
|
||||
'Content-Type': 'application/xml',
|
||||
'Content-Length': 55,
|
||||
'Content-MD5': 'ybYDrpQxwYvNIGBQs7PJNA==',
|
||||
Host: 's3.us-east-1.amazonaws.com',
|
||||
'X-Amz-Content-Sha256': '5708e5c935cb75eb528e41ef1548e08b26c5b3b7504b67dc911abc1ff1881f76',
|
||||
'X-Amz-Date': '20240201T012345Z',
|
||||
Authorization:
|
||||
'AWS4-HMAC-SHA256 Credential=mock-access-key/20240201/us-east-1/s3/aws4_request, SignedHeaders=content-length;content-md5;content-type;host;x-amz-content-sha256;x-amz-date, Signature=039168f10927b31624f3a5edae8eb4c89405f7c594eb2d6e00257c1462363f99',
|
||||
const commandCaptor = captor<DeleteObjectsCommand>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(DeleteObjectsCommand);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Delete: {
|
||||
Objects: [{ Key: fileName }],
|
||||
},
|
||||
data: toDeletionXml(fileName),
|
||||
});
|
||||
});
|
||||
|
||||
@@ -264,18 +302,20 @@ describe('deleteMany()', () => {
|
||||
const result = await objectStoreService.deleteMany('non-matching-prefix');
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(mockS3Send).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
objectStoreService.list = jest.fn().mockResolvedValue([{ key: 'file.txt' }]);
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.deleteMany('test-dir/');
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('list()', () => {
|
||||
describe('list()', () => {
|
||||
it('should list objects with a common prefix', async () => {
|
||||
const prefix = 'test-dir/';
|
||||
|
||||
@@ -286,8 +326,6 @@ describe('list()', () => {
|
||||
|
||||
objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage);
|
||||
|
||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||
|
||||
const result = await objectStoreService.list(prefix);
|
||||
|
||||
expect(result).toEqual(mockListPage.contents);
|
||||
@@ -312,18 +350,80 @@ describe('list()', () => {
|
||||
.mockResolvedValueOnce(mockFirstListPage)
|
||||
.mockResolvedValueOnce(mockSecondListPage);
|
||||
|
||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||
|
||||
const result = await objectStoreService.list(prefix);
|
||||
|
||||
expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]);
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
mockAxios.request.mockRejectedValue(mockError);
|
||||
objectStoreService.getListPage = jest.fn().mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.list('test-dir/');
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getListPage()', () => {
|
||||
it('should fetch a page of objects with a common prefix', async () => {
|
||||
const prefix = 'test-dir/';
|
||||
const mockContents = [
|
||||
{
|
||||
Key: `${prefix}file1.txt`,
|
||||
LastModified: new Date(),
|
||||
ETag: '"abc123"',
|
||||
Size: 123,
|
||||
StorageClass: 'STANDARD',
|
||||
},
|
||||
];
|
||||
|
||||
mockS3Send.mockResolvedValueOnce({
|
||||
Contents: mockContents,
|
||||
IsTruncated: false,
|
||||
});
|
||||
|
||||
const result = await objectStoreService.getListPage(prefix);
|
||||
|
||||
const commandCaptor = captor<ListObjectsV2Command>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command).toBeInstanceOf(ListObjectsV2Command);
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Prefix: prefix,
|
||||
});
|
||||
|
||||
expect(result.contents).toHaveLength(1);
|
||||
expect(result.isTruncated).toBe(false);
|
||||
});
|
||||
|
||||
it('should use continuation token when provided', async () => {
|
||||
const prefix = 'test-dir/';
|
||||
const token = 'next-page-token';
|
||||
|
||||
mockS3Send.mockResolvedValueOnce({
|
||||
Contents: [],
|
||||
IsTruncated: false,
|
||||
});
|
||||
|
||||
await objectStoreService.getListPage(prefix, token);
|
||||
|
||||
const commandCaptor = captor<ListObjectsV2Command>();
|
||||
expect(mockS3Send).toHaveBeenCalledWith(commandCaptor);
|
||||
const command = commandCaptor.value;
|
||||
expect(command.input).toEqual({
|
||||
Bucket: 'test-bucket',
|
||||
Prefix: prefix,
|
||||
ContinuationToken: token,
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error on request failure', async () => {
|
||||
mockS3Send.mockRejectedValueOnce(mockError);
|
||||
|
||||
const promise = objectStoreService.getListPage('test-dir/');
|
||||
|
||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import { Config, Env, Nested } from '@n8n/config';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { Config, Env, Nested } from '../decorators';
|
||||
|
||||
const protocolSchema = z.enum(['http', 'https']);
|
||||
|
||||
export type Protocol = z.infer<typeof protocolSchema>;
|
||||
|
||||
@Config
|
||||
class S3BucketConfig {
|
||||
class ObjectStoreBucketConfig {
|
||||
/** Name of the n8n bucket in S3-compatible external storage */
|
||||
@Env('N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME')
|
||||
name: string = '';
|
||||
@@ -18,7 +17,7 @@ class S3BucketConfig {
|
||||
}
|
||||
|
||||
@Config
|
||||
class S3CredentialsConfig {
|
||||
class ObjectStoreCredentialsConfig {
|
||||
/** Access key in S3-compatible external storage */
|
||||
@Env('N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY')
|
||||
accessKey: string = '';
|
||||
@@ -26,11 +25,22 @@ class S3CredentialsConfig {
|
||||
/** Access secret in S3-compatible external storage */
|
||||
@Env('N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET')
|
||||
accessSecret: string = '';
|
||||
|
||||
/**
|
||||
* Use automatic credential detection to authenticate S3 calls for external storage
|
||||
* This will ignore accessKey/accessSecret and use the default credential provider chain
|
||||
* https://docs.aws.amazon.com/sdk-for-javascript/v3/developer-guide/setting-credentials-node.html#credchain
|
||||
*/
|
||||
@Env('N8N_EXTERNAL_STORAGE_S3_AUTH_AUTO_DETECT')
|
||||
authAutoDetect: boolean = false;
|
||||
}
|
||||
|
||||
@Config
|
||||
export class S3Config {
|
||||
/** Host of the n8n bucket in S3-compatible external storage @example "s3.us-east-1.amazonaws.com" */
|
||||
export class ObjectStoreConfig {
|
||||
/**
|
||||
* Host of the object-store bucket in S3-compatible external storage
|
||||
* @example "s3.us-east-1.amazonaws.com"
|
||||
**/
|
||||
@Env('N8N_EXTERNAL_STORAGE_S3_HOST')
|
||||
host: string = '';
|
||||
|
||||
@@ -38,14 +48,8 @@ export class S3Config {
|
||||
protocol: Protocol = 'https';
|
||||
|
||||
@Nested
|
||||
bucket: S3BucketConfig;
|
||||
bucket: ObjectStoreBucketConfig = {} as ObjectStoreBucketConfig;
|
||||
|
||||
@Nested
|
||||
credentials: S3CredentialsConfig;
|
||||
}
|
||||
|
||||
@Config
|
||||
export class ExternalStorageConfig {
|
||||
@Nested
|
||||
s3: S3Config;
|
||||
credentials: ObjectStoreCredentialsConfig = {} as ObjectStoreCredentialsConfig;
|
||||
}
|
||||
@@ -1,44 +1,73 @@
|
||||
import { S3Config } from '@n8n/config';
|
||||
import type {
|
||||
PutObjectCommandInput,
|
||||
DeleteObjectsCommandInput,
|
||||
ListObjectsV2CommandInput,
|
||||
S3ClientConfig,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import {
|
||||
S3Client,
|
||||
HeadBucketCommand,
|
||||
PutObjectCommand,
|
||||
GetObjectCommand,
|
||||
HeadObjectCommand,
|
||||
DeleteObjectCommand,
|
||||
DeleteObjectsCommand,
|
||||
ListObjectsV2Command,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import { Service } from '@n8n/di';
|
||||
import { sign } from 'aws4';
|
||||
import type { Request as Aws4Options } from 'aws4';
|
||||
import axios from 'axios';
|
||||
import type { AxiosRequestConfig, Method } from 'axios';
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
import { createHash } from 'node:crypto';
|
||||
import type { Readable } from 'stream';
|
||||
import { Readable } from 'node:stream';
|
||||
|
||||
import { Logger } from '@/logging/logger';
|
||||
|
||||
import type { ListPage, MetadataResponseHeaders, RawListPage, RequestOptions } from './types';
|
||||
import { isStream, parseXml } from './utils';
|
||||
import { ObjectStoreConfig } from './object-store.config';
|
||||
import type { MetadataResponseHeaders } from './types';
|
||||
import type { BinaryData } from '../types';
|
||||
import { streamToBuffer } from '../utils';
|
||||
|
||||
@Service()
|
||||
export class ObjectStoreService {
|
||||
private baseUrl: URL;
|
||||
private s3Client: S3Client;
|
||||
|
||||
private isReady = false;
|
||||
|
||||
private bucket: string;
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly s3Config: S3Config,
|
||||
private readonly s3Config: ObjectStoreConfig,
|
||||
) {
|
||||
const { host, bucket, protocol } = s3Config;
|
||||
|
||||
if (host === '') {
|
||||
throw new ApplicationError(
|
||||
'External storage host not configured. Please set `N8N_EXTERNAL_STORAGE_S3_HOST`.',
|
||||
);
|
||||
}
|
||||
|
||||
const { bucket } = s3Config;
|
||||
if (bucket.name === '') {
|
||||
throw new ApplicationError(
|
||||
throw new UnexpectedError(
|
||||
'External storage bucket name not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME`.',
|
||||
);
|
||||
}
|
||||
|
||||
this.baseUrl = new URL(`${protocol}://${host}/${bucket.name}`);
|
||||
this.bucket = bucket.name;
|
||||
this.s3Client = new S3Client(this.getClientConfig());
|
||||
}
|
||||
|
||||
/** This generates the config for the S3Client to make it work in all various auth configurations */
|
||||
getClientConfig() {
|
||||
const { host, bucket, protocol, credentials } = this.s3Config;
|
||||
const clientConfig: S3ClientConfig = {};
|
||||
const endpoint = host ? `${protocol}://${host}` : undefined;
|
||||
if (endpoint) {
|
||||
clientConfig.endpoint = endpoint;
|
||||
clientConfig.forcePathStyle = true; // Needed for non-AWS S3 compatible services
|
||||
}
|
||||
if (bucket.region.length) {
|
||||
clientConfig.region = bucket.region;
|
||||
}
|
||||
if (!credentials.authAutoDetect) {
|
||||
clientConfig.credentials = {
|
||||
accessKeyId: credentials.accessKey,
|
||||
secretAccessKey: credentials.accessSecret,
|
||||
};
|
||||
}
|
||||
return clientConfig;
|
||||
}
|
||||
|
||||
async init() {
|
||||
@@ -52,92 +81,153 @@ export class ObjectStoreService {
|
||||
|
||||
/**
|
||||
* Confirm that the configured bucket exists and the caller has permission to access it.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
|
||||
*/
|
||||
async checkConnection() {
|
||||
if (this.isReady) return;
|
||||
|
||||
return await this.request('HEAD', '');
|
||||
try {
|
||||
this.logger.debug('Checking connection to S3 bucket', { bucket: this.bucket });
|
||||
const command = new HeadBucketCommand({ Bucket: this.bucket });
|
||||
await this.s3Client.send(command);
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload an object to the configured bucket.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
|
||||
*/
|
||||
async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) {
|
||||
const headers: Record<string, string | number> = {
|
||||
'Content-Length': buffer.length,
|
||||
'Content-MD5': createHash('md5').update(buffer).digest('base64'),
|
||||
try {
|
||||
const params: PutObjectCommandInput = {
|
||||
Bucket: this.bucket,
|
||||
Key: filename,
|
||||
Body: buffer,
|
||||
ContentLength: buffer.length,
|
||||
ContentMD5: createHash('md5').update(buffer).digest('base64'),
|
||||
};
|
||||
|
||||
if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName;
|
||||
if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType;
|
||||
if (metadata.fileName) {
|
||||
params.Metadata = { filename: metadata.fileName };
|
||||
}
|
||||
|
||||
return await this.request('PUT', filename, { headers, body: buffer });
|
||||
if (metadata.mimeType) {
|
||||
params.ContentType = metadata.mimeType;
|
||||
}
|
||||
|
||||
this.logger.debug('Sending PUT request to S3', { params });
|
||||
const command = new PutObjectCommand(params);
|
||||
return await this.s3Client.send(command);
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Download an object as a stream or buffer from the configured bucket.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
||||
*/
|
||||
async get(fileId: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
|
||||
async get(fileId: string, { mode }: { mode: 'stream' }): Promise<Readable>;
|
||||
async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }) {
|
||||
const { data } = await this.request('GET', fileId, {
|
||||
responseType: mode === 'buffer' ? 'arraybuffer' : 'stream',
|
||||
async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }): Promise<Buffer | Readable> {
|
||||
this.logger.debug('Sending GET request to S3', { bucket: this.bucket, key: fileId });
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: fileId,
|
||||
});
|
||||
|
||||
if (mode === 'stream' && isStream(data)) return data;
|
||||
try {
|
||||
const { Body: body } = await this.s3Client.send(command);
|
||||
if (!body) throw new UnexpectedError('Received empty response body');
|
||||
|
||||
if (mode === 'buffer' && Buffer.isBuffer(data)) return data;
|
||||
if (mode === 'stream') {
|
||||
if (body instanceof Readable) return body;
|
||||
throw new UnexpectedError(`Expected stream but received ${typeof body}.`);
|
||||
}
|
||||
|
||||
throw new TypeError(`Expected ${mode} but received ${typeof data}.`);
|
||||
return await streamToBuffer(body as Readable);
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve metadata for an object in the configured bucket.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
|
||||
*/
|
||||
async getMetadata(fileId: string) {
|
||||
const response = await this.request('HEAD', fileId);
|
||||
async getMetadata(fileId: string): Promise<MetadataResponseHeaders> {
|
||||
try {
|
||||
const command = new HeadObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: fileId,
|
||||
});
|
||||
|
||||
return response.headers as MetadataResponseHeaders;
|
||||
this.logger.debug('Sending HEAD request to S3', { bucket: this.bucket, key: fileId });
|
||||
const response = await this.s3Client.send(command);
|
||||
|
||||
// Convert response to the expected format for backward compatibility
|
||||
const headers: MetadataResponseHeaders = {};
|
||||
|
||||
if (response.ContentType) headers['content-type'] = response.ContentType;
|
||||
if (response.ContentLength) headers['content-length'] = String(response.ContentLength);
|
||||
if (response.ETag) headers.etag = response.ETag;
|
||||
if (response.LastModified) headers['last-modified'] = response.LastModified.toUTCString();
|
||||
|
||||
// Add metadata with the expected prefix format
|
||||
if (response.Metadata) {
|
||||
Object.entries(response.Metadata).forEach(([key, value]) => {
|
||||
headers[`x-amz-meta-${key.toLowerCase()}`] = value;
|
||||
});
|
||||
}
|
||||
|
||||
return headers;
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a single object in the configured bucket.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
||||
*/
|
||||
async deleteOne(fileId: string) {
|
||||
return await this.request('DELETE', fileId);
|
||||
try {
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: fileId,
|
||||
});
|
||||
|
||||
this.logger.debug('Sending DELETE request to S3', { bucket: this.bucket, key: fileId });
|
||||
return await this.s3Client.send(command);
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete objects with a common prefix in the configured bucket.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
*/
|
||||
async deleteMany(prefix: string) {
|
||||
try {
|
||||
const objects = await this.list(prefix);
|
||||
|
||||
if (objects.length === 0) return;
|
||||
|
||||
const innerXml = objects.map(({ key }) => `<Object><Key>${key}</Key></Object>`).join('\n');
|
||||
|
||||
const body = ['<Delete>', innerXml, '</Delete>'].join('\n');
|
||||
|
||||
const headers = {
|
||||
'Content-Type': 'application/xml',
|
||||
'Content-Length': body.length,
|
||||
'Content-MD5': createHash('md5').update(body).digest('base64'),
|
||||
const params: DeleteObjectsCommandInput = {
|
||||
Bucket: this.bucket,
|
||||
Delete: {
|
||||
Objects: objects.map(({ key }) => ({ Key: key })),
|
||||
},
|
||||
};
|
||||
|
||||
return await this.request('POST', '', { headers, body, qs: { delete: '' } });
|
||||
this.logger.debug('Sending DELETE MANY request to S3', {
|
||||
bucket: this.bucket,
|
||||
objectCount: objects.length,
|
||||
});
|
||||
|
||||
const command = new DeleteObjectsCommand(params);
|
||||
return await this.s3Client.send(command);
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -145,108 +235,62 @@ export class ObjectStoreService {
|
||||
*/
|
||||
async list(prefix: string) {
|
||||
const items = [];
|
||||
let isTruncated = true;
|
||||
let continuationToken;
|
||||
|
||||
let isTruncated;
|
||||
let nextPageToken;
|
||||
try {
|
||||
while (isTruncated) {
|
||||
const listPage = await this.getListPage(prefix, continuationToken);
|
||||
|
||||
do {
|
||||
const listPage = await this.getListPage(prefix, nextPageToken);
|
||||
|
||||
if (listPage.contents?.length > 0) items.push(...listPage.contents);
|
||||
if (listPage.contents?.length > 0) {
|
||||
items.push(...listPage.contents);
|
||||
}
|
||||
|
||||
isTruncated = listPage.isTruncated;
|
||||
nextPageToken = listPage.nextContinuationToken;
|
||||
} while (isTruncated && nextPageToken);
|
||||
continuationToken = listPage.nextContinuationToken;
|
||||
}
|
||||
|
||||
return items;
|
||||
} catch (e) {
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a page of objects with a common prefix in the configured bucket.
|
||||
*
|
||||
* Max 1000 objects per page - set by AWS.
|
||||
*
|
||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
|
||||
*/
|
||||
async getListPage(prefix: string, nextPageToken?: string) {
|
||||
const qs: Record<string, string | number> = { 'list-type': 2, prefix };
|
||||
|
||||
if (nextPageToken) qs['continuation-token'] = nextPageToken;
|
||||
|
||||
const { data } = await this.request('GET', '', { qs });
|
||||
|
||||
if (typeof data !== 'string') {
|
||||
throw new TypeError(`Expected XML string but received ${typeof data}`);
|
||||
}
|
||||
|
||||
const { listBucketResult: page } = await parseXml<RawListPage>(data);
|
||||
|
||||
if (!page.contents) return { ...page, contents: [] };
|
||||
|
||||
// `explicitArray: false` removes array wrapper on single item array, so restore it
|
||||
|
||||
if (!Array.isArray(page.contents)) page.contents = [page.contents];
|
||||
|
||||
// remove null prototype - https://github.com/Leonidas-from-XIV/node-xml2js/issues/670
|
||||
|
||||
page.contents.forEach((item) => {
|
||||
Object.setPrototypeOf(item, Object.prototype);
|
||||
});
|
||||
|
||||
return page as ListPage;
|
||||
}
|
||||
|
||||
private async request<T>(
|
||||
method: Method,
|
||||
rawPath = '',
|
||||
{ qs, headers, body, responseType }: RequestOptions = {},
|
||||
) {
|
||||
const url = new URL(this.baseUrl);
|
||||
if (rawPath && rawPath !== '/') {
|
||||
url.pathname = `${url.pathname}/${rawPath}`;
|
||||
}
|
||||
Object.entries(qs ?? {}).forEach(([key, value]) => {
|
||||
url.searchParams.set(key, String(value));
|
||||
});
|
||||
|
||||
const optionsToSign: Aws4Options = {
|
||||
method,
|
||||
service: 's3',
|
||||
region: this.s3Config.bucket.region,
|
||||
host: this.s3Config.host,
|
||||
path: `${url.pathname}${url.search}`,
|
||||
};
|
||||
|
||||
if (headers) optionsToSign.headers = headers;
|
||||
if (body) optionsToSign.body = body;
|
||||
|
||||
const { accessKey, accessSecret } = this.s3Config.credentials;
|
||||
const signedOptions = sign(optionsToSign, {
|
||||
accessKeyId: accessKey,
|
||||
secretAccessKey: accessSecret,
|
||||
});
|
||||
|
||||
const config: AxiosRequestConfig = {
|
||||
method,
|
||||
url: url.toString(),
|
||||
headers: signedOptions.headers,
|
||||
};
|
||||
|
||||
if (body) config.data = body;
|
||||
if (responseType) config.responseType = responseType;
|
||||
|
||||
async getListPage(prefix: string, continuationToken?: string) {
|
||||
try {
|
||||
this.logger.debug('Sending request to S3', { config });
|
||||
const params: ListObjectsV2CommandInput = {
|
||||
Bucket: this.bucket,
|
||||
Prefix: prefix,
|
||||
};
|
||||
|
||||
return await axios.request<T>(config);
|
||||
if (continuationToken) {
|
||||
params.ContinuationToken = continuationToken;
|
||||
}
|
||||
|
||||
this.logger.debug('Sending list request to S3', { bucket: this.bucket, prefix });
|
||||
const command = new ListObjectsV2Command(params);
|
||||
const response = await this.s3Client.send(command);
|
||||
|
||||
// Convert response to match expected format for compatibility
|
||||
const contents =
|
||||
response.Contents?.map((item) => ({
|
||||
key: item.Key ?? '',
|
||||
lastModified: item.LastModified?.toISOString() ?? '',
|
||||
eTag: item.ETag ?? '',
|
||||
size: item.Size ?? 0,
|
||||
storageClass: item.StorageClass ?? '',
|
||||
})) ?? [];
|
||||
|
||||
return {
|
||||
contents,
|
||||
isTruncated: response.IsTruncated ?? false,
|
||||
nextContinuationToken: response.NextContinuationToken,
|
||||
};
|
||||
} catch (e) {
|
||||
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||
|
||||
const message = `Request to S3 failed: ${error.message}`;
|
||||
|
||||
this.logger.error(message, { config });
|
||||
|
||||
throw new ApplicationError(message, { cause: error, extra: { config } });
|
||||
throw new UnexpectedError('Request to S3 failed', { cause: e });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,38 +1,9 @@
|
||||
import type { AxiosResponseHeaders, ResponseType } from 'axios';
|
||||
|
||||
import type { BinaryData } from '../types';
|
||||
|
||||
export type RawListPage = {
|
||||
listBucketResult: {
|
||||
name: string;
|
||||
prefix: string;
|
||||
keyCount: number;
|
||||
maxKeys: number;
|
||||
isTruncated: boolean;
|
||||
nextContinuationToken?: string; // only if isTruncated is true
|
||||
contents?: Item | Item[];
|
||||
};
|
||||
};
|
||||
|
||||
type Item = {
|
||||
key: string;
|
||||
lastModified: string;
|
||||
eTag: string;
|
||||
size: number; // bytes
|
||||
storageClass: string;
|
||||
};
|
||||
|
||||
export type ListPage = Omit<RawListPage['listBucketResult'], 'contents'> & { contents: Item[] };
|
||||
|
||||
export type RequestOptions = {
|
||||
qs?: Record<string, string | number>;
|
||||
headers?: Record<string, string | number>;
|
||||
body?: string | Buffer;
|
||||
responseType?: ResponseType;
|
||||
};
|
||||
|
||||
export type MetadataResponseHeaders = AxiosResponseHeaders & {
|
||||
'content-length': string;
|
||||
export type MetadataResponseHeaders = Record<string, string> & {
|
||||
'content-length'?: string;
|
||||
'content-type'?: string;
|
||||
'x-amz-meta-filename'?: string;
|
||||
etag?: string;
|
||||
'last-modified'?: string;
|
||||
} & BinaryData.PreWriteMetadata;
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
import { Stream } from 'node:stream';
|
||||
import { parseStringPromise } from 'xml2js';
|
||||
import { firstCharLowerCase, parseBooleans, parseNumbers } from 'xml2js/lib/processors';
|
||||
|
||||
export function isStream(maybeStream: unknown): maybeStream is Stream {
|
||||
return maybeStream instanceof Stream;
|
||||
}
|
||||
|
||||
export async function parseXml<T>(xml: string): Promise<T> {
|
||||
return await (parseStringPromise(xml, {
|
||||
explicitArray: false,
|
||||
ignoreAttrs: true,
|
||||
tagNameProcessors: [firstCharLowerCase],
|
||||
valueProcessors: [parseNumbers, parseBooleans],
|
||||
}) as Promise<T>);
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import concatStream from 'concat-stream';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
import fs from 'node:fs/promises';
|
||||
import type { Readable } from 'node:stream';
|
||||
|
||||
@@ -33,16 +33,22 @@ export async function doesNotExist(dir: string) {
|
||||
}
|
||||
}
|
||||
|
||||
/** Converts a readable stream to a buffer */
|
||||
export async function streamToBuffer(stream: Readable) {
|
||||
return await new Promise<Buffer>((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
stream.on('data', (chunk: Buffer) => chunks.push(chunk));
|
||||
stream.on('end', () => resolve(Buffer.concat(chunks)));
|
||||
stream.once('error', (cause) => {
|
||||
if ('code' in cause && cause.code === 'Z_DATA_ERROR')
|
||||
reject(new UnexpectedError('Failed to decompress response', { cause }));
|
||||
else reject(cause);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/** Converts a buffer or a readable stream to a buffer */
|
||||
export async function binaryToBuffer(body: Buffer | Readable) {
|
||||
if (Buffer.isBuffer(body)) return body;
|
||||
return await new Promise<Buffer>((resolve, reject) => {
|
||||
body
|
||||
.once('error', (cause) => {
|
||||
if ('code' in cause && cause.code === 'Z_DATA_ERROR')
|
||||
reject(new Error('Failed to decompress response', { cause }));
|
||||
else reject(cause);
|
||||
})
|
||||
.pipe(concatStream(resolve));
|
||||
});
|
||||
return await streamToBuffer(body);
|
||||
}
|
||||
|
||||
22
pnpm-lock.yaml
generated
22
pnpm-lock.yaml
generated
@@ -1306,6 +1306,9 @@ importers:
|
||||
|
||||
packages/core:
|
||||
dependencies:
|
||||
'@aws-sdk/client-s3':
|
||||
specifier: 3.666.0
|
||||
version: 3.666.0
|
||||
'@langchain/core':
|
||||
specifier: 'catalog:'
|
||||
version: 0.3.30(openai@4.78.1(encoding@0.1.13)(zod@3.24.1))
|
||||
@@ -1321,9 +1324,6 @@ importers:
|
||||
'@sentry/node':
|
||||
specifier: 'catalog:'
|
||||
version: 8.52.1
|
||||
aws4:
|
||||
specifier: 1.11.0
|
||||
version: 1.11.0
|
||||
axios:
|
||||
specifier: 'catalog:'
|
||||
version: 1.8.2(debug@4.3.6)
|
||||
@@ -1333,9 +1333,6 @@ importers:
|
||||
chardet:
|
||||
specifier: 2.0.0
|
||||
version: 2.0.0
|
||||
concat-stream:
|
||||
specifier: 2.0.0
|
||||
version: 2.0.0
|
||||
cron:
|
||||
specifier: 3.1.7
|
||||
version: 3.1.7
|
||||
@@ -1403,12 +1400,6 @@ importers:
|
||||
'@n8n/typescript-config':
|
||||
specifier: workspace:*
|
||||
version: link:../@n8n/typescript-config
|
||||
'@types/aws4':
|
||||
specifier: ^1.5.1
|
||||
version: 1.11.2
|
||||
'@types/concat-stream':
|
||||
specifier: ^2.0.0
|
||||
version: 2.0.0
|
||||
'@types/express':
|
||||
specifier: 'catalog:'
|
||||
version: 5.0.1
|
||||
@@ -6014,9 +6005,6 @@ packages:
|
||||
'@types/compression@1.7.5':
|
||||
resolution: {integrity: sha512-AAQvK5pxMpaT+nDvhHrsBhLSYG5yQdtkaJE1WYieSNY2mVFKAgmU4ks65rkZD5oqnGCFLyQpUr1CqI4DmUMyDg==}
|
||||
|
||||
'@types/concat-stream@2.0.0':
|
||||
resolution: {integrity: sha512-t3YCerNM7NTVjLuICZo5gYAXYoDvpuuTceCcFQWcDQz26kxUR5uIWolxbIR5jRNIXpMqhOpW/b8imCR1LEmuJw==}
|
||||
|
||||
'@types/connect@3.4.36':
|
||||
resolution: {integrity: sha512-P63Zd/JUGq+PdrM1lv0Wv5SBYeA2+CORvbrXbngriYY0jzLUWfQMQQxOhjONEz/wlHOAxOdY7CY65rgQdTjq2w==}
|
||||
|
||||
@@ -18855,10 +18843,6 @@ snapshots:
|
||||
dependencies:
|
||||
'@types/express': 5.0.1
|
||||
|
||||
'@types/concat-stream@2.0.0':
|
||||
dependencies:
|
||||
'@types/node': 18.16.16
|
||||
|
||||
'@types/connect@3.4.36':
|
||||
dependencies:
|
||||
'@types/node': 18.16.16
|
||||
|
||||
Reference in New Issue
Block a user