mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
feat: Enforce data-stores limits (no-changelog) (#19116)
Co-authored-by: Charlie Kolb <charlie@n8n.io>
This commit is contained in:
@@ -0,0 +1,230 @@
|
||||
import { mockInstance } from '@n8n/backend-test-utils';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
|
||||
import { DataStoreSizeValidator } from '../data-store-size-validator.service';
|
||||
|
||||
describe('DataStoreSizeValidator', () => {
|
||||
let validator: DataStoreSizeValidator;
|
||||
let fetchSizeFn: jest.Mock;
|
||||
const globalConfig = mockInstance(GlobalConfig, {
|
||||
dataTable: {
|
||||
sizeCheckCacheDuration: 1000,
|
||||
warningThreshold: 90 * 1024 * 1024,
|
||||
maxSize: 100 * 1024 * 1024,
|
||||
},
|
||||
});
|
||||
beforeEach(() => {
|
||||
validator = new DataStoreSizeValidator(globalConfig);
|
||||
fetchSizeFn = jest.fn();
|
||||
});
|
||||
|
||||
describe('basic functionality', () => {
|
||||
it('should fetch size on first call', async () => {
|
||||
fetchSizeFn.mockResolvedValue(50 * 1024 * 1024); // 50MB
|
||||
|
||||
await validator.validateSize(fetchSizeFn, new Date('2024-01-01T00:00:00Z'));
|
||||
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should pass validation when size is under limit', async () => {
|
||||
fetchSizeFn.mockResolvedValue(50 * 1024 * 1024);
|
||||
|
||||
await expect(
|
||||
validator.validateSize(fetchSizeFn, new Date('2024-01-01T00:00:00Z')),
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it('should throw error when size exceeds limit', async () => {
|
||||
fetchSizeFn.mockResolvedValue(150 * 1024 * 1024);
|
||||
|
||||
await expect(
|
||||
validator.validateSize(fetchSizeFn, new Date('2024-01-01T00:00:00Z')),
|
||||
).rejects.toThrow('Data store size limit exceeded: 150MB used, limit is 100MB');
|
||||
});
|
||||
|
||||
it('should throw error when size equals limit', async () => {
|
||||
fetchSizeFn.mockResolvedValue(100 * 1024 * 1024);
|
||||
|
||||
await expect(
|
||||
validator.validateSize(fetchSizeFn, new Date('2024-01-01T00:00:00Z')),
|
||||
).rejects.toThrow('Data store size limit exceeded: 100MB used, limit is 100MB');
|
||||
});
|
||||
});
|
||||
|
||||
describe('caching behavior', () => {
|
||||
it('should use cached value within cache duration', async () => {
|
||||
fetchSizeFn.mockResolvedValue(50);
|
||||
const time1 = new Date('2024-01-01T00:00:00Z');
|
||||
const time2 = new Date('2024-01-01T00:00:00.500Z'); // 500ms later
|
||||
|
||||
await validator.validateSize(fetchSizeFn, time1);
|
||||
await validator.validateSize(fetchSizeFn, time2);
|
||||
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should refresh cache after cache duration expires', async () => {
|
||||
fetchSizeFn.mockResolvedValue(50 * 1024 * 1024);
|
||||
const time1 = new Date('2024-01-01T00:00:00Z');
|
||||
const time2 = new Date('2024-01-01T00:00:01.001Z'); // 1001ms later
|
||||
|
||||
await validator.validateSize(fetchSizeFn, time1);
|
||||
await validator.validateSize(fetchSizeFn, time2);
|
||||
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should always validate against cached value even without refresh', async () => {
|
||||
// First call: DB at 50MB
|
||||
fetchSizeFn.mockResolvedValue(50 * 1024 * 1024);
|
||||
const time1 = new Date('2024-01-01T00:00:00Z');
|
||||
await validator.validateSize(fetchSizeFn, time1);
|
||||
|
||||
// Second call within cache duration: should still validate against 50MB
|
||||
const time2 = new Date('2024-01-01T00:00:00.500Z');
|
||||
await expect(validator.validateSize(fetchSizeFn, time2)).resolves.toBeUndefined();
|
||||
|
||||
// Even though fetchSizeFn wasn't called again
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should fail validation once cached value shows full DB', async () => {
|
||||
// First call: DB becomes full (100MB)
|
||||
fetchSizeFn.mockResolvedValue(100 * 1024 * 1024);
|
||||
const time1 = new Date('2024-01-01T00:00:00Z');
|
||||
|
||||
await expect(validator.validateSize(fetchSizeFn, time1)).rejects.toThrow(
|
||||
'Data store size limit exceeded: 100MB used, limit is 100MB',
|
||||
);
|
||||
|
||||
// Subsequent calls within cache duration should also fail
|
||||
const time2 = new Date('2024-01-01T00:00:00.500Z');
|
||||
await expect(validator.validateSize(fetchSizeFn, time2)).rejects.toThrow(
|
||||
'Data store size limit exceeded: 100MB used, limit is 100MB',
|
||||
);
|
||||
|
||||
// Size was only fetched once
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('concurrent calls', () => {
|
||||
it('should handle concurrent calls correctly', async () => {
|
||||
let resolveCheck: (value: number) => void;
|
||||
const checkPromise = new Promise<number>((resolve) => {
|
||||
resolveCheck = resolve;
|
||||
});
|
||||
|
||||
fetchSizeFn.mockImplementation(async () => await checkPromise);
|
||||
|
||||
const time = new Date('2024-01-01T00:00:00Z');
|
||||
|
||||
// Start multiple concurrent calls
|
||||
const promise1 = validator.validateSize(fetchSizeFn, time);
|
||||
const promise2 = validator.validateSize(fetchSizeFn, time);
|
||||
const promise3 = validator.validateSize(fetchSizeFn, time);
|
||||
|
||||
// Let promises start
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
// Resolve the check with a value under the limit
|
||||
resolveCheck!(50 * 1024 * 1024);
|
||||
|
||||
await Promise.all([promise1, promise2, promise3]);
|
||||
|
||||
// Should only fetch once
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should share failure state among concurrent calls', async () => {
|
||||
let resolveCheck: (value: number) => void;
|
||||
const checkPromise = new Promise<number>((resolve) => {
|
||||
resolveCheck = resolve;
|
||||
});
|
||||
|
||||
fetchSizeFn.mockImplementation(async () => await checkPromise);
|
||||
|
||||
const time = new Date('2024-01-01T00:00:00Z');
|
||||
|
||||
// Start multiple concurrent calls
|
||||
const promise1 = validator.validateSize(fetchSizeFn, time);
|
||||
const promise2 = validator.validateSize(fetchSizeFn, time);
|
||||
const promise3 = validator.validateSize(fetchSizeFn, time);
|
||||
|
||||
// Resolve with size over limit
|
||||
resolveCheck!(150 * 1024 * 1024);
|
||||
|
||||
// All should fail with the same error
|
||||
await expect(promise1).rejects.toThrow(
|
||||
'Data store size limit exceeded: 150MB used, limit is 100MB',
|
||||
);
|
||||
await expect(promise2).rejects.toThrow(
|
||||
'Data store size limit exceeded: 150MB used, limit is 100MB',
|
||||
);
|
||||
await expect(promise3).rejects.toThrow(
|
||||
'Data store size limit exceeded: 150MB used, limit is 100MB',
|
||||
);
|
||||
|
||||
// Should only fetch once
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('reset functionality', () => {
|
||||
it('should clear cache when reset is called', async () => {
|
||||
fetchSizeFn.mockResolvedValue(50 * 1024 * 1024);
|
||||
const time1 = new Date('2024-01-01T00:00:00Z');
|
||||
|
||||
// First call
|
||||
await validator.validateSize(fetchSizeFn, time1);
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Reset the cache
|
||||
validator.reset();
|
||||
|
||||
// Next call should fetch again even within cache duration
|
||||
const time2 = new Date('2024-01-01T00:00:00.500Z');
|
||||
await validator.validateSize(fetchSizeFn, time2);
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('edge case: DB becomes full after initial check', () => {
|
||||
it('should correctly handle DB becoming full between cached checks', async () => {
|
||||
// This test verifies that the validator maintains consistency within cache windows.
|
||||
// Timeline:
|
||||
// t=0: DB at 99MB - fetch and cache this value (99 < 100 limit, so passes)
|
||||
// t=500ms: In reality, DB grows to 100MB, but we don't know yet (still using cached 99MB)
|
||||
// New request validates against cached 99MB - correctly PASSES
|
||||
// This is expected behavior: we maintain consistency within the cache window
|
||||
// t=1001ms: Cache expires, fetch new value (100MB), validation now correctly FAILS
|
||||
// t=1500ms: Still within new cache window, uses cached 100MB, continues to FAIL
|
||||
|
||||
// First check: DB at 99MB (under limit)
|
||||
fetchSizeFn.mockResolvedValueOnce(99 * 1024 * 1024);
|
||||
const time1 = new Date('2024-01-01T00:00:00Z');
|
||||
await expect(validator.validateSize(fetchSizeFn, time1)).resolves.toBeUndefined();
|
||||
|
||||
// Within cache duration: still validates against cached 99MB
|
||||
// This PASSES, which is correct - we're being consistent within our cache window
|
||||
const time2 = new Date('2024-01-01T00:00:00.500Z');
|
||||
await expect(validator.validateSize(fetchSizeFn, time2)).resolves.toBeUndefined();
|
||||
|
||||
// After cache expires: new check fetches current state showing DB is now full
|
||||
fetchSizeFn.mockResolvedValueOnce(100 * 1024 * 1024);
|
||||
const time3 = new Date('2024-01-01T00:00:01.001Z');
|
||||
await expect(validator.validateSize(fetchSizeFn, time3)).rejects.toThrow(
|
||||
'Data store size limit exceeded: 100MB used, limit is 100MB',
|
||||
);
|
||||
|
||||
// Subsequent calls use the cached "full" state and continue to fail correctly
|
||||
const time4 = new Date('2024-01-01T00:00:01.500Z');
|
||||
await expect(validator.validateSize(fetchSizeFn, time4)).rejects.toThrow(
|
||||
'Data store size limit exceeded: 100MB used, limit is 100MB',
|
||||
);
|
||||
|
||||
expect(fetchSizeFn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,11 +1,13 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
import type { AddDataStoreColumnDto, CreateDataStoreColumnDto } from '@n8n/api-types';
|
||||
import { createTeamProject, testDb, testModules } from '@n8n/backend-test-utils';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import type { Project } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import type { DataStoreRow } from 'n8n-workflow';
|
||||
|
||||
import { DataStoreRowsRepository } from '../data-store-rows.repository';
|
||||
import { DataStoreSizeValidator } from '../data-store-size-validator.service';
|
||||
import { DataStoreRepository } from '../data-store.repository';
|
||||
import { DataStoreService } from '../data-store.service';
|
||||
import { DataStoreColumnNameConflictError } from '../errors/data-store-column-name-conflict.error';
|
||||
@@ -13,6 +15,7 @@ import { DataStoreColumnNotFoundError } from '../errors/data-store-column-not-fo
|
||||
import { DataStoreNameConflictError } from '../errors/data-store-name-conflict.error';
|
||||
import { DataStoreNotFoundError } from '../errors/data-store-not-found.error';
|
||||
import { DataStoreValidationError } from '../errors/data-store-validation.error';
|
||||
import { toTableName } from '../utils/sql-utils';
|
||||
|
||||
beforeAll(async () => {
|
||||
await testModules.loadModules(['data-table']);
|
||||
@@ -21,6 +24,8 @@ beforeAll(async () => {
|
||||
|
||||
beforeEach(async () => {
|
||||
await testDb.truncate(['DataTable', 'DataTableColumn']);
|
||||
const dataStoreSizeValidator = Container.get(DataStoreSizeValidator);
|
||||
dataStoreSizeValidator.reset();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
@@ -71,7 +76,7 @@ describe('dataStore', () => {
|
||||
]);
|
||||
|
||||
// Select the column from user table to check for its existence
|
||||
const userTableName = dataStoreRowsRepository.toTableName(dataTableId);
|
||||
const userTableName = toTableName(dataTableId);
|
||||
const rows = await dataStoreRepository.manager
|
||||
.createQueryBuilder()
|
||||
.select('foo')
|
||||
@@ -94,7 +99,7 @@ describe('dataStore', () => {
|
||||
|
||||
await expect(dataStoreService.getColumns(dataStoreId, project1.id)).resolves.toEqual([]);
|
||||
|
||||
const userTableName = dataStoreRowsRepository.toTableName(dataStoreId);
|
||||
const userTableName = toTableName(dataStoreId);
|
||||
const queryRunner = dataStoreRepository.manager.connection.createQueryRunner();
|
||||
try {
|
||||
const table = await queryRunner.getTable(userTableName);
|
||||
@@ -223,7 +228,7 @@ describe('dataStore', () => {
|
||||
|
||||
// ACT
|
||||
const result = await dataStoreService.deleteDataStore(dataStoreId, project1.id);
|
||||
const userTableName = dataStoreRowsRepository.toTableName(dataStoreId);
|
||||
const userTableName = toTableName(dataStoreId);
|
||||
|
||||
// ASSERT
|
||||
expect(result).toEqual(true);
|
||||
@@ -303,7 +308,7 @@ describe('dataStore', () => {
|
||||
}),
|
||||
]);
|
||||
|
||||
const userTableName = dataStoreRowsRepository.toTableName(dataTableId);
|
||||
const userTableName = toTableName(dataTableId);
|
||||
const queryRunner = dataStoreRepository.manager.connection.createQueryRunner();
|
||||
try {
|
||||
const table = await queryRunner.getTable(userTableName);
|
||||
@@ -360,7 +365,7 @@ describe('dataStore', () => {
|
||||
}),
|
||||
]);
|
||||
|
||||
const userTableName = dataStoreRowsRepository.toTableName(dataTableId);
|
||||
const userTableName = toTableName(dataTableId);
|
||||
const queryRunner = dataStoreRepository.manager.connection.createQueryRunner();
|
||||
try {
|
||||
const table = await queryRunner.getTable(userTableName);
|
||||
@@ -2497,4 +2502,92 @@ describe('dataStore', () => {
|
||||
await expect(result).rejects.toThrow(DataStoreValidationError);
|
||||
});
|
||||
});
|
||||
|
||||
describe('size validation', () => {
|
||||
it('should prevent insertRows when size limit exceeded', async () => {
|
||||
// ARRANGE
|
||||
const dataStoreSizeValidator = Container.get(DataStoreSizeValidator);
|
||||
dataStoreSizeValidator.reset();
|
||||
|
||||
const maxSize = Container.get(GlobalConfig).dataTable.maxSize;
|
||||
|
||||
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
|
||||
name: 'dataStore',
|
||||
columns: [{ name: 'data', type: 'string' }],
|
||||
});
|
||||
|
||||
const mockFindDataTablesSize = jest
|
||||
.spyOn(dataStoreRepository, 'findDataTablesSize')
|
||||
.mockResolvedValue(maxSize + 1);
|
||||
|
||||
// ACT & ASSERT
|
||||
await expect(
|
||||
dataStoreService.insertRows(dataStoreId, project1.id, [{ data: 'test' }]),
|
||||
).rejects.toThrow(DataStoreValidationError);
|
||||
|
||||
expect(mockFindDataTablesSize).toHaveBeenCalled();
|
||||
mockFindDataTablesSize.mockRestore();
|
||||
});
|
||||
|
||||
it('should prevent updateRow when size limit exceeded', async () => {
|
||||
// ARRANGE
|
||||
const dataStoreSizeValidator = Container.get(DataStoreSizeValidator);
|
||||
dataStoreSizeValidator.reset();
|
||||
|
||||
const maxSize = Container.get(GlobalConfig).dataTable.maxSize;
|
||||
|
||||
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
|
||||
name: 'dataStore',
|
||||
columns: [{ name: 'data', type: 'string' }],
|
||||
});
|
||||
|
||||
// Now mock the size check to be over limit
|
||||
const mockFindDataTablesSize = jest
|
||||
.spyOn(dataStoreRepository, 'findDataTablesSize')
|
||||
.mockResolvedValue(maxSize + 1);
|
||||
|
||||
// ACT & ASSERT
|
||||
await expect(
|
||||
dataStoreService.updateRow(dataStoreId, project1.id, {
|
||||
filter: {
|
||||
type: 'and',
|
||||
filters: [{ columnName: 'id', condition: 'eq', value: 1 }],
|
||||
},
|
||||
data: { data: 'updated' },
|
||||
}),
|
||||
).rejects.toThrow(DataStoreValidationError);
|
||||
|
||||
expect(mockFindDataTablesSize).toHaveBeenCalled();
|
||||
mockFindDataTablesSize.mockRestore();
|
||||
});
|
||||
|
||||
it('should prevent upsertRow when size limit exceeded (insert case)', async () => {
|
||||
// ARRANGE
|
||||
|
||||
const maxSize = Container.get(GlobalConfig).dataTable.maxSize;
|
||||
|
||||
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
|
||||
name: 'dataStore',
|
||||
columns: [{ name: 'data', type: 'string' }],
|
||||
});
|
||||
|
||||
const mockFindDataTablesSize = jest
|
||||
.spyOn(dataStoreRepository, 'findDataTablesSize')
|
||||
.mockResolvedValue(maxSize + 1);
|
||||
|
||||
// ACT & ASSERT
|
||||
await expect(
|
||||
dataStoreService.upsertRow(dataStoreId, project1.id, {
|
||||
filter: {
|
||||
type: 'and',
|
||||
filters: [{ columnName: 'data', condition: 'eq', value: 'nonexistent' }],
|
||||
},
|
||||
data: { data: 'new' },
|
||||
}),
|
||||
).rejects.toThrow(DataStoreValidationError);
|
||||
|
||||
expect(mockFindDataTablesSize).toHaveBeenCalled();
|
||||
mockFindDataTablesSize.mockRestore();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,10 +3,14 @@ import { AuthenticatedRequest } from '@n8n/db';
|
||||
import { Get, GlobalScope, Query, RestController } from '@n8n/decorators';
|
||||
|
||||
import { DataStoreAggregateService } from './data-store-aggregate.service';
|
||||
import { DataStoreService } from './data-store.service';
|
||||
|
||||
@RestController('/data-tables-global')
|
||||
export class DataStoreAggregateController {
|
||||
constructor(private readonly dataStoreAggregateService: DataStoreAggregateService) {}
|
||||
constructor(
|
||||
private readonly dataStoreAggregateService: DataStoreAggregateService,
|
||||
private readonly dataStoreService: DataStoreService,
|
||||
) {}
|
||||
|
||||
@Get('/')
|
||||
@GlobalScope('dataStore:list')
|
||||
@@ -17,4 +21,10 @@ export class DataStoreAggregateController {
|
||||
) {
|
||||
return await this.dataStoreAggregateService.getManyAndCount(req.user, payload);
|
||||
}
|
||||
|
||||
@Get('/limits')
|
||||
@GlobalScope('dataStore:list')
|
||||
async getDataTablesSize() {
|
||||
return await this.dataStoreService.getDataTablesSize();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@ import { DataSource, EntityManager, Repository } from '@n8n/typeorm';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
|
||||
import { DataStoreRowsRepository } from './data-store-rows.repository';
|
||||
import { DataTable } from './data-table.entity';
|
||||
import { DataTableColumn } from './data-table-column.entity';
|
||||
import { DataTable } from './data-table.entity';
|
||||
import { DataStoreColumnNameConflictError } from './errors/data-store-column-name-conflict.error';
|
||||
import { DataStoreValidationError } from './errors/data-store-validation.error';
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import type { ListDataStoreContentQueryDto, DataTableFilter } from '@n8n/api-types';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { CreateTable, DslColumn } from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import {
|
||||
@@ -35,6 +34,7 @@ import {
|
||||
quoteIdentifier,
|
||||
toDslColumns,
|
||||
toSqliteGlobFromPercent,
|
||||
toTableName,
|
||||
} from './utils/sql-utils';
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
@@ -148,15 +148,7 @@ function getConditionAndParams(
|
||||
|
||||
@Service()
|
||||
export class DataStoreRowsRepository {
|
||||
constructor(
|
||||
private dataSource: DataSource,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
) {}
|
||||
|
||||
toTableName(dataStoreId: string): DataStoreUserTableName {
|
||||
const { tablePrefix } = this.globalConfig.database;
|
||||
return `${tablePrefix}data_table_user_${dataStoreId}`;
|
||||
}
|
||||
constructor(private dataSource: DataSource) {}
|
||||
|
||||
async insertRowsBulk(
|
||||
table: DataStoreUserTableName,
|
||||
@@ -221,7 +213,7 @@ export class DataStoreRowsRepository {
|
||||
const dbType = this.dataSource.options.type;
|
||||
const useReturning = dbType === 'postgres' || dbType === 'mariadb';
|
||||
|
||||
const table = this.toTableName(dataStoreId);
|
||||
const table = toTableName(dataStoreId);
|
||||
const escapedColumns = columns.map((c) => this.dataSource.driver.escape(c.name));
|
||||
const escapedSystemColumns = DATA_TABLE_SYSTEM_COLUMNS.map((x) =>
|
||||
this.dataSource.driver.escape(x),
|
||||
@@ -292,7 +284,7 @@ export class DataStoreRowsRepository {
|
||||
const dbType = this.dataSource.options.type;
|
||||
const useReturning = dbType === 'postgres';
|
||||
|
||||
const table = this.toTableName(dataStoreId);
|
||||
const table = toTableName(dataStoreId);
|
||||
const escapedColumns = columns.map((c) => this.dataSource.driver.escape(c.name));
|
||||
const escapedSystemColumns = DATA_TABLE_SYSTEM_COLUMNS.map((x) =>
|
||||
this.dataSource.driver.escape(x),
|
||||
@@ -348,7 +340,7 @@ export class DataStoreRowsRepository {
|
||||
return true;
|
||||
}
|
||||
|
||||
const table = this.toTableName(dataStoreId);
|
||||
const table = toTableName(dataStoreId);
|
||||
|
||||
await this.dataSource
|
||||
.createQueryBuilder()
|
||||
@@ -366,7 +358,7 @@ export class DataStoreRowsRepository {
|
||||
queryRunner: QueryRunner,
|
||||
) {
|
||||
const dslColumns = [new DslColumn('id').int.autoGenerate2.primary, ...toDslColumns(columns)];
|
||||
const createTable = new CreateTable(this.toTableName(dataStoreId), '', queryRunner).withColumns(
|
||||
const createTable = new CreateTable(toTableName(dataStoreId), '', queryRunner).withColumns(
|
||||
...dslColumns,
|
||||
).withTimestamps;
|
||||
|
||||
@@ -374,7 +366,7 @@ export class DataStoreRowsRepository {
|
||||
}
|
||||
|
||||
async dropTable(dataStoreId: string, queryRunner: QueryRunner) {
|
||||
await queryRunner.dropTable(this.toTableName(dataStoreId), true);
|
||||
await queryRunner.dropTable(toTableName(dataStoreId), true);
|
||||
}
|
||||
|
||||
async addColumn(
|
||||
@@ -383,7 +375,7 @@ export class DataStoreRowsRepository {
|
||||
queryRunner: QueryRunner,
|
||||
dbType: DataSourceOptions['type'],
|
||||
) {
|
||||
await queryRunner.query(addColumnQuery(this.toTableName(dataStoreId), column, dbType));
|
||||
await queryRunner.query(addColumnQuery(toTableName(dataStoreId), column, dbType));
|
||||
}
|
||||
|
||||
async dropColumnFromTable(
|
||||
@@ -392,7 +384,7 @@ export class DataStoreRowsRepository {
|
||||
queryRunner: QueryRunner,
|
||||
dbType: DataSourceOptions['type'],
|
||||
) {
|
||||
await queryRunner.query(deleteColumnQuery(this.toTableName(dataStoreId), columnName, dbType));
|
||||
await queryRunner.query(deleteColumnQuery(toTableName(dataStoreId), columnName, dbType));
|
||||
}
|
||||
|
||||
async getManyAndCount(
|
||||
@@ -411,7 +403,7 @@ export class DataStoreRowsRepository {
|
||||
}
|
||||
|
||||
async getManyByIds(dataStoreId: string, ids: number[], columns: DataTableColumn[]) {
|
||||
const table = this.toTableName(dataStoreId);
|
||||
const table = toTableName(dataStoreId);
|
||||
const escapedColumns = columns.map((c) => this.dataSource.driver.escape(c.name));
|
||||
const escapedSystemColumns = DATA_TABLE_SYSTEM_COLUMNS.map((x) =>
|
||||
this.dataSource.driver.escape(x),
|
||||
@@ -446,7 +438,7 @@ export class DataStoreRowsRepository {
|
||||
const query = this.dataSource.createQueryBuilder();
|
||||
|
||||
const tableReference = 'dataTable';
|
||||
query.from(this.toTableName(dataStoreId), tableReference);
|
||||
query.from(toTableName(dataStoreId), tableReference);
|
||||
if (dto.filter) {
|
||||
this.applyFilters(query, dto.filter, tableReference, columns);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { Service } from '@n8n/di';
|
||||
|
||||
import { DataStoreValidationError } from './errors/data-store-validation.error';
|
||||
import { DataTableSizeStatus } from 'n8n-workflow';
|
||||
|
||||
@Service()
|
||||
export class DataStoreSizeValidator {
|
||||
private lastCheck: Date | undefined;
|
||||
private cachedSizeInBytes: number | undefined;
|
||||
private pendingCheck: Promise<number> | null = null;
|
||||
|
||||
constructor(private readonly globalConfig: GlobalConfig) {}
|
||||
|
||||
private shouldRefresh(sizeInBytes: number | undefined, now: Date): sizeInBytes is undefined {
|
||||
if (
|
||||
!this.lastCheck ||
|
||||
now.getTime() - this.lastCheck.getTime() >= this.globalConfig.dataTable.sizeCheckCacheDuration
|
||||
) {
|
||||
sizeInBytes = undefined;
|
||||
}
|
||||
|
||||
return sizeInBytes === undefined;
|
||||
}
|
||||
|
||||
private async getCachedSize(
|
||||
fetchSizeFn: () => Promise<number>,
|
||||
now = new Date(),
|
||||
): Promise<number> {
|
||||
// If there's a pending check, wait for it to complete
|
||||
|
||||
if (this.pendingCheck) {
|
||||
this.cachedSizeInBytes = await this.pendingCheck;
|
||||
} else {
|
||||
// Check if we need to refresh the db size
|
||||
|
||||
if (this.shouldRefresh(this.cachedSizeInBytes, now)) {
|
||||
this.pendingCheck = fetchSizeFn();
|
||||
try {
|
||||
this.cachedSizeInBytes = await this.pendingCheck;
|
||||
this.lastCheck = now;
|
||||
} finally {
|
||||
this.pendingCheck = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.cachedSizeInBytes;
|
||||
}
|
||||
|
||||
async validateSize(fetchSizeFn: () => Promise<number>, now = new Date()): Promise<void> {
|
||||
const size = await this.getCachedSize(fetchSizeFn, now);
|
||||
if (size >= this.globalConfig.dataTable.maxSize) {
|
||||
throw new DataStoreValidationError(
|
||||
`Data store size limit exceeded: ${this.toMb(size)}MB used, limit is ${this.toMb(this.globalConfig.dataTable.maxSize)}MB`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
sizeToState(sizeBytes: number): DataTableSizeStatus {
|
||||
if (sizeBytes >= this.globalConfig.dataTable.maxSize) {
|
||||
return 'error';
|
||||
} else if (sizeBytes >= this.globalConfig.dataTable.warningThreshold) {
|
||||
return 'warn';
|
||||
}
|
||||
return 'ok';
|
||||
}
|
||||
|
||||
async getSizeStatus(fetchSizeFn: () => Promise<number>, now = new Date()) {
|
||||
const size = await this.getCachedSize(fetchSizeFn, now);
|
||||
return this.sizeToState(size);
|
||||
}
|
||||
|
||||
private toMb(sizeInBytes: number): number {
|
||||
return Math.round(sizeInBytes / (1024 * 1024));
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.lastCheck = undefined;
|
||||
this.cachedSizeInBytes = undefined;
|
||||
this.pendingCheck = null;
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ import {
|
||||
Query,
|
||||
RestController,
|
||||
} from '@n8n/decorators';
|
||||
import { DataStoreRowReturn } from 'n8n-workflow';
|
||||
|
||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||
import { ConflictError } from '@/errors/response-errors/conflict.error';
|
||||
@@ -34,7 +35,6 @@ import { DataStoreColumnNotFoundError } from './errors/data-store-column-not-fou
|
||||
import { DataStoreNameConflictError } from './errors/data-store-name-conflict.error';
|
||||
import { DataStoreNotFoundError } from './errors/data-store-not-found.error';
|
||||
import { DataStoreValidationError } from './errors/data-store-validation.error';
|
||||
import { DataStoreRowReturn } from 'n8n-workflow';
|
||||
|
||||
@RestController('/projects/:projectId/data-tables')
|
||||
export class DataStoreController {
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
type DataStoreCreateColumnSchema,
|
||||
type ListDataStoreQueryDto,
|
||||
} from '@n8n/api-types';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { Service } from '@n8n/di';
|
||||
import { DataSource, EntityManager, Repository, SelectQueryBuilder } from '@n8n/typeorm';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
@@ -10,12 +11,14 @@ import { UnexpectedError } from 'n8n-workflow';
|
||||
import { DataStoreRowsRepository } from './data-store-rows.repository';
|
||||
import { DataTableColumn } from './data-table-column.entity';
|
||||
import { DataTable } from './data-table.entity';
|
||||
import { toTableName } from './utils/sql-utils';
|
||||
|
||||
@Service()
|
||||
export class DataStoreRepository extends Repository<DataTable> {
|
||||
constructor(
|
||||
dataSource: DataSource,
|
||||
private dataStoreRowsRepository: DataStoreRowsRepository,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
) {
|
||||
super(DataTable, dataSource.manager);
|
||||
}
|
||||
@@ -238,4 +241,50 @@ export class DataStoreRepository extends Repository<DataTable> {
|
||||
private getProjectFields(alias: string): string[] {
|
||||
return [`${alias}.id`, `${alias}.name`, `${alias}.type`, `${alias}.icon`];
|
||||
}
|
||||
|
||||
async findDataTablesSize(): Promise<number> {
|
||||
const dbType = this.globalConfig.database.type;
|
||||
const schemaName = this.globalConfig.database.postgresdb.schema;
|
||||
|
||||
let sql = '';
|
||||
|
||||
switch (dbType) {
|
||||
case 'sqlite':
|
||||
sql = `
|
||||
SELECT SUM(pgsize) AS total_bytes
|
||||
FROM dbstat
|
||||
WHERE name LIKE '${toTableName('%')}'
|
||||
`;
|
||||
break;
|
||||
|
||||
case 'postgresdb':
|
||||
sql = `
|
||||
SELECT SUM(pg_relation_size(c.oid)) AS total_bytes
|
||||
FROM pg_class c
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE n.nspname = '${schemaName}'
|
||||
AND c.relname LIKE '${toTableName('%')}'
|
||||
AND c.relkind IN ('r', 'm', 'p')
|
||||
`;
|
||||
break;
|
||||
|
||||
case 'mysqldb':
|
||||
case 'mariadb': {
|
||||
const databaseName = this.globalConfig.database.mysqldb.database;
|
||||
sql = `
|
||||
SELECT SUM((DATA_LENGTH + INDEX_LENGTH)) AS total_bytes
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = '${databaseName}'
|
||||
AND table_name LIKE '${toTableName('%')}'
|
||||
`;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
||||
const result = (await this.query(sql)) as Array<{ total_bytes: number | null }>;
|
||||
return result[0]?.total_bytes ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import { validateFieldType } from 'n8n-workflow';
|
||||
|
||||
import { DataStoreColumnRepository } from './data-store-column.repository';
|
||||
import { DataStoreRowsRepository } from './data-store-rows.repository';
|
||||
import { DataStoreSizeValidator } from './data-store-size-validator.service';
|
||||
import { DataStoreRepository } from './data-store.repository';
|
||||
import { columnTypeToFieldType } from './data-store.types';
|
||||
import { DataTableColumn } from './data-table-column.entity';
|
||||
@@ -40,6 +41,7 @@ export class DataStoreService {
|
||||
private readonly dataStoreColumnRepository: DataStoreColumnRepository,
|
||||
private readonly dataStoreRowsRepository: DataStoreRowsRepository,
|
||||
private readonly logger: Logger,
|
||||
private readonly dataStoreSizeValidator: DataStoreSizeValidator,
|
||||
) {
|
||||
this.logger = this.logger.scoped('data-table');
|
||||
}
|
||||
@@ -148,6 +150,7 @@ export class DataStoreService {
|
||||
rows: DataStoreRows,
|
||||
returnType: DataTableInsertRowsReturnType = 'count',
|
||||
) {
|
||||
await this.validateDataTableSize();
|
||||
await this.validateDataStoreExists(dataStoreId, projectId);
|
||||
await this.validateRows(dataStoreId, rows);
|
||||
|
||||
@@ -167,6 +170,7 @@ export class DataStoreService {
|
||||
dto: Omit<UpsertDataStoreRowDto, 'returnData'>,
|
||||
returnData: boolean = false,
|
||||
) {
|
||||
await this.validateDataTableSize();
|
||||
const updated = await this.updateRow(dataStoreId, projectId, dto, true);
|
||||
|
||||
if (updated.length > 0) {
|
||||
@@ -195,6 +199,7 @@ export class DataStoreService {
|
||||
dto: Omit<UpdateDataTableRowDto, 'returnData'>,
|
||||
returnData = false,
|
||||
) {
|
||||
await this.validateDataTableSize();
|
||||
await this.validateDataStoreExists(dataTableId, projectId);
|
||||
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataTableId);
|
||||
@@ -383,4 +388,18 @@ export class DataStoreService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async validateDataTableSize() {
|
||||
await this.dataStoreSizeValidator.validateSize(
|
||||
async () => await this.dataStoreRepository.findDataTablesSize(),
|
||||
);
|
||||
}
|
||||
|
||||
async getDataTablesSize() {
|
||||
const sizeBytes = await this.dataStoreRepository.findDataTablesSize();
|
||||
return {
|
||||
sizeBytes,
|
||||
sizeState: this.dataStoreSizeValidator.sizeToState(sizeBytes),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { DATA_STORE_COLUMN_REGEX, type DataStoreCreateColumnSchema } from '@n8n/api-types';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { DslColumn } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import type { DataSourceOptions } from '@n8n/typeorm';
|
||||
import type { DataStoreColumnJsType, DataStoreRowReturn, DataStoreRowsReturn } from 'n8n-workflow';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
@@ -286,3 +288,8 @@ export function escapeLikeSpecials(input: string): string {
|
||||
.replace(/\\/g, '\\\\') // escape the escape char itself
|
||||
.replace(/_/g, '\\_'); // make '_' literal ('%' stays a wildcard)
|
||||
}
|
||||
|
||||
export function toTableName(dataStoreId: string): DataStoreUserTableName {
|
||||
const { tablePrefix } = Container.get(GlobalConfig).database;
|
||||
return `${tablePrefix}data_table_user_${dataStoreId}`;
|
||||
}
|
||||
|
||||
@@ -182,6 +182,10 @@ export class FrontendService {
|
||||
callbackUrl: `${instanceBaseUrl}/${restEndpoint}/sso/oidc/callback`,
|
||||
},
|
||||
},
|
||||
dataTables: {
|
||||
maxSize: this.globalConfig.dataTable.maxSize,
|
||||
warningThreshold: this.globalConfig.dataTable.warningThreshold,
|
||||
},
|
||||
publicApi: {
|
||||
enabled: isApiEnabled(),
|
||||
latestVersion: 1,
|
||||
|
||||
Reference in New Issue
Block a user