feat(Data Table Node): Add bulk insert mode (no-changelog) (#19294)

This commit is contained in:
Charlie Kolb
2025-09-09 14:01:40 +02:00
committed by GitHub
parent a910604822
commit 897c69c70d
14 changed files with 383 additions and 109 deletions

View File

@@ -4,9 +4,10 @@ import { Z } from 'zod-class';
import { import {
dataStoreColumnNameSchema, dataStoreColumnNameSchema,
dataStoreColumnValueSchema, dataStoreColumnValueSchema,
insertRowReturnType,
} from '../../schemas/data-store.schema'; } from '../../schemas/data-store.schema';
export class AddDataStoreRowsDto extends Z.class({ export class AddDataStoreRowsDto extends Z.class({
returnData: z.boolean().optional().default(false),
data: z.array(z.record(dataStoreColumnNameSchema, dataStoreColumnValueSchema)), data: z.array(z.record(dataStoreColumnNameSchema, dataStoreColumnValueSchema)),
returnType: insertRowReturnType,
}) {} }) {}

View File

@@ -2,6 +2,8 @@ import { z } from 'zod';
import type { ListDataStoreQueryDto } from '../dto'; import type { ListDataStoreQueryDto } from '../dto';
export const insertRowReturnType = z.union([z.literal('all'), z.literal('count'), z.literal('id')]);
export const dataStoreNameSchema = z.string().trim().min(1).max(128); export const dataStoreNameSchema = z.string().trim().min(1).max(128);
export const dataStoreIdSchema = z.string().max(36); export const dataStoreIdSchema = z.string().max(36);

View File

@@ -207,9 +207,14 @@ describe('DataStoreProxyService', () => {
node, node,
'dataStore-id', 'dataStore-id',
); );
await dataStoreOperations.insertRows(rows); await dataStoreOperations.insertRows(rows, 'count');
expect(dataStoreServiceMock.insertRows).toBeCalledWith('dataStore-id', PROJECT_ID, rows, true); expect(dataStoreServiceMock.insertRows).toBeCalledWith(
'dataStore-id',
PROJECT_ID,
rows,
'count',
);
}); });
it('should call upsertRow with correct parameters', async () => { it('should call upsertRow with correct parameters', async () => {

View File

@@ -1904,6 +1904,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
await authOwnerAgent await authOwnerAgent
@@ -1921,6 +1922,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
await authOwnerAgent await authOwnerAgent
@@ -1950,6 +1952,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
await authMemberAgent await authMemberAgent
@@ -1981,6 +1984,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
await authMemberAgent await authMemberAgent
@@ -2013,6 +2017,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2053,6 +2058,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
const response = await authAdminAgent const response = await authAdminAgent
@@ -2090,6 +2096,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
second: 'another value', second: 'another value',
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2121,7 +2128,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
}); });
const payload = { const payload = {
returnData: true, returnType: 'all',
data: [ data: [
{ {
first: 'first row', first: 'first row',
@@ -2184,6 +2191,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
nonexisting: 'this does not exist', nonexisting: 'this does not exist',
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2217,6 +2225,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
b: '2025-08-15T12:34:56+02:00', b: '2025-08-15T12:34:56+02:00',
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2265,6 +2274,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
c: '2025-08-15T09:48:14.259Z', c: '2025-08-15T09:48:14.259Z',
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2305,6 +2315,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
b: false, b: false,
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2360,6 +2371,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
e: 2340439341231259, e: 2340439341231259,
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2410,6 +2422,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
d: null, d: null,
}, },
], ],
returnType: 'id',
}; };
const response = await authMemberAgent const response = await authMemberAgent
@@ -2458,6 +2471,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/insert', () => {
b: 3, b: 3,
}, },
], ],
returnType: 'id',
}; };
const first = await authMemberAgent const first = await authMemberAgent

View File

@@ -417,11 +417,16 @@ describe('dataStore', () => {
], ],
}); });
const results = await dataStoreService.insertRows(dataStoreId, project1.id, [ const results = await dataStoreService.insertRows(
{ name: 'Alice', age: 30 }, dataStoreId,
{ name: 'Bob', age: 25 }, project1.id,
{ name: 'Charlie', age: 35 }, [
]); { name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
{ name: 'Charlie', age: 35 },
],
'id',
);
expect(results).toEqual([ expect(results).toEqual([
{ {
@@ -474,9 +479,12 @@ describe('dataStore', () => {
]); ]);
// Verify we can insert new rows with the new column // Verify we can insert new rows with the new column
const newRow = await dataStoreService.insertRows(dataStoreId, project1.id, [ const newRow = await dataStoreService.insertRows(
{ name: 'David', age: 28, email: 'david@example.com' }, dataStoreId,
]); project1.id,
[{ name: 'David', age: 28, email: 'david@example.com' }],
'id',
);
expect(newRow).toEqual([ expect(newRow).toEqual([
{ {
id: 4, id: 4,
@@ -832,7 +840,7 @@ describe('dataStore', () => {
c4: 'iso 8601 date strings are okay too', c4: 'iso 8601 date strings are okay too',
}, },
]; ];
const result = await dataStoreService.insertRows(dataStoreId, project1.id, rows); const result = await dataStoreService.insertRows(dataStoreId, project1.id, rows, 'id');
// ASSERT // ASSERT
expect(result).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }, { id: 4 }]); expect(result).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }, { id: 4 }]);
@@ -867,15 +875,21 @@ describe('dataStore', () => {
}); });
// Insert initial row // Insert initial row
const initial = await dataStoreService.insertRows(dataStoreId, project1.id, [ const initial = await dataStoreService.insertRows(
{ c1: 1, c2: 'foo' }, dataStoreId,
]); project1.id,
[{ c1: 1, c2: 'foo' }],
'id',
);
expect(initial).toEqual([{ id: 1 }]); expect(initial).toEqual([{ id: 1 }]);
// Attempt to insert a row with the same primary key // Attempt to insert a row with the same primary key
const result = await dataStoreService.insertRows(dataStoreId, project1.id, [ const result = await dataStoreService.insertRows(
{ c1: 1, c2: 'foo' }, dataStoreId,
]); project1.id,
[{ c1: 1, c2: 'foo' }],
'id',
);
// ASSERT // ASSERT
expect(result).toEqual([{ id: 2 }]); expect(result).toEqual([{ id: 2 }]);
@@ -908,19 +922,29 @@ describe('dataStore', () => {
}); });
// Insert initial row // Insert initial row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [ const ids = await dataStoreService.insertRows(
{ c1: 1, c2: 'foo' }, dataStoreId,
{ c1: 2, c2: 'bar' }, project1.id,
]); [
{ c1: 1, c2: 'foo' },
{ c1: 2, c2: 'bar' },
],
'id',
);
expect(ids).toEqual([{ id: 1 }, { id: 2 }]); expect(ids).toEqual([{ id: 1 }, { id: 2 }]);
await dataStoreService.deleteRows(dataStoreId, project1.id, [ids[0].id]); await dataStoreService.deleteRows(dataStoreId, project1.id, [ids[0].id]);
// Insert a new row // Insert a new row
const result = await dataStoreService.insertRows(dataStoreId, project1.id, [ const result = await dataStoreService.insertRows(
{ c1: 1, c2: 'baz' }, dataStoreId,
{ c1: 2, c2: 'faz' }, project1.id,
]); [
{ c1: 1, c2: 'baz' },
{ c1: 2, c2: 'faz' },
],
'id',
);
// ASSERT // ASSERT
expect(result).toEqual([{ id: 3 }, { id: 4 }]); expect(result).toEqual([{ id: 3 }, { id: 4 }]);
@@ -970,7 +994,7 @@ describe('dataStore', () => {
{ c1: 2, c2: 'bar', c3: false, c4: now }, { c1: 2, c2: 'bar', c3: false, c4: now },
{ c1: null, c2: null, c3: null, c4: null }, { c1: null, c2: null, c3: null, c4: null },
], ],
true, 'all',
); );
expect(ids).toEqual([ expect(ids).toEqual([
{ {
@@ -1026,7 +1050,7 @@ describe('dataStore', () => {
{ c2: 'bar', c1: 2, c3: false, c4: now }, { c2: 'bar', c1: 2, c3: false, c4: now },
{ c1: null, c2: null, c3: null, c4: null }, { c1: null, c2: null, c3: null, c4: null },
], ],
true, 'all',
); );
expect(ids).toEqual([ expect(ids).toEqual([
{ {
@@ -1072,10 +1096,15 @@ describe('dataStore', () => {
}); });
// ACT // ACT
const result = dataStoreService.insertRows(dataStoreId, project1.id, [ const result = dataStoreService.insertRows(
{ c1: 3, c2: true, c3: new Date(), c4: 'hello?' }, dataStoreId,
{ cWrong: 3, c1: 4, c2: true, c3: new Date(), c4: 'hello?' }, project1.id,
]); [
{ c1: 3, c2: true, c3: new Date(), c4: 'hello?' },
{ cWrong: 3, c1: 4, c2: true, c3: new Date(), c4: 'hello?' },
],
'id',
);
// ASSERT // ASSERT
await expect(result).rejects.toThrow( await expect(result).rejects.toThrow(
@@ -1096,12 +1125,17 @@ describe('dataStore', () => {
}); });
// ACT // ACT
await dataStoreService.insertRows(dataStoreId, project1.id, [ await dataStoreService.insertRows(
{ name: 'Mary', age: 20, email: 'mary@example.com', active: true }, // full row dataStoreId,
{ name: 'Alice', age: 30 }, // missing email and active project1.id,
{ name: 'Bob' }, // missing age, email and active [
{}, // missing all columns { name: 'Mary', age: 20, email: 'mary@example.com', active: true }, // full row
]); { name: 'Alice', age: 30 }, // missing email and active
{ name: 'Bob' }, // missing age, email and active
{}, // missing all columns
],
'id',
);
const { count, data } = await dataStoreService.getManyRowsAndCount( const { count, data } = await dataStoreService.getManyRowsAndCount(
dataStoreId, dataStoreId,
@@ -1150,10 +1184,15 @@ describe('dataStore', () => {
}); });
// ACT // ACT
const result = dataStoreService.insertRows(dataStoreId, project1.id, [ const result = dataStoreService.insertRows(
{ c1: 3, c2: true, c3: new Date(), c4: 'hello?' }, dataStoreId,
{ cWrong: 3, c2: true, c3: new Date(), c4: 'hello?' }, project1.id,
]); [
{ c1: 3, c2: true, c3: new Date(), c4: 'hello?' },
{ cWrong: 3, c2: true, c3: new Date(), c4: 'hello?' },
],
'id',
);
// ASSERT // ASSERT
await expect(result).rejects.toThrow( await expect(result).rejects.toThrow(
@@ -1169,9 +1208,12 @@ describe('dataStore', () => {
}); });
// ACT // ACT
const result = dataStoreService.insertRows(dataStoreId, project1.id, [ const result = dataStoreService.insertRows(
{ c1: '2025-99-15T09:48:14.259Z' }, dataStoreId,
]); project1.id,
[{ c1: '2025-99-15T09:48:14.259Z' }],
'id',
);
// ASSERT // ASSERT
await expect(result).rejects.toThrow(DataStoreValidationError); await expect(result).rejects.toThrow(DataStoreValidationError);
@@ -1193,10 +1235,15 @@ describe('dataStore', () => {
}); });
// ACT // ACT
const result = dataStoreService.insertRows('this is not an id', project1.id, [ const result = dataStoreService.insertRows(
{ c1: 3, c2: true, c3: new Date(), c4: 'hello?' }, 'this is not an id',
{ cWrong: 3, c2: true, c3: new Date(), c4: 'hello?' }, project1.id,
]); [
{ c1: 3, c2: true, c3: new Date(), c4: 'hello?' },
{ cWrong: 3, c2: true, c3: new Date(), c4: 'hello?' },
],
'id',
);
// ASSERT // ASSERT
await expect(result).rejects.toThrow(DataStoreNotFoundError); await expect(result).rejects.toThrow(DataStoreNotFoundError);
@@ -1211,10 +1258,12 @@ describe('dataStore', () => {
// ACT // ACT
const wrongValue = new Date().toISOString(); const wrongValue = new Date().toISOString();
const result = dataStoreService.insertRows(dataStoreId, project1.id, [ const result = dataStoreService.insertRows(
{ c1: 3 }, dataStoreId,
{ c1: wrongValue }, project1.id,
]); [{ c1: 3 }, { c1: wrongValue }],
'id',
);
// ASSERT // ASSERT
await expect(result).rejects.toThrow(DataStoreValidationError); await expect(result).rejects.toThrow(DataStoreValidationError);
@@ -1232,7 +1281,7 @@ describe('dataStore', () => {
// ACT // ACT
const rows = [{}, {}, {}]; const rows = [{}, {}, {}];
const result = await dataStoreService.insertRows(dataStoreId, project1.id, rows); const result = await dataStoreService.insertRows(dataStoreId, project1.id, rows, 'id');
// ASSERT // ASSERT
expect(result).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]); expect(result).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]);
@@ -1261,6 +1310,69 @@ describe('dataStore', () => {
}, },
]); ]);
}); });
describe('bulk', () => {
it('handles single empty row correctly in bulk mode', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
name: 'dataStore',
columns: [],
});
// ACT
const rows = [{}];
const result = await dataStoreService.insertRows(dataStoreId, project1.id, rows);
// ASSERT
expect(result).toEqual({ success: true, insertedRows: 1 });
const { count, data } = await dataStoreService.getManyRowsAndCount(
dataStoreId,
project1.id,
{},
);
expect(count).toEqual(1);
expect(data).toEqual([{ id: 1, createdAt: expect.any(Date), updatedAt: expect.any(Date) }]);
});
it('handles multi-batch bulk correctly in bulk mode', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
name: 'dataStore',
columns: [
{ name: 'c1', type: 'number' },
{ name: 'c2', type: 'boolean' },
{ name: 'c3', type: 'string' },
],
});
// ACT
const rows = Array.from({ length: 3000 }, (_, index) => ({
c1: index,
c2: index % 2 === 0,
c3: `index ${index}`,
}));
const result = await dataStoreService.insertRows(dataStoreId, project1.id, rows);
// ASSERT
expect(result).toEqual({ success: true, insertedRows: rows.length });
const { count, data } = await dataStoreService.getManyRowsAndCount(
dataStoreId,
project1.id,
{},
);
expect(count).toEqual(rows.length);
const expected = rows.map(
(row, i) =>
expect.objectContaining<DataStoreRow>({
...row,
id: i + 1,
}) as jest.AsymmetricMatcher,
);
expect(data).toEqual(expected);
});
});
}); });
describe('upsertRow', () => { describe('upsertRow', () => {
@@ -1275,11 +1387,16 @@ describe('dataStore', () => {
], ],
}); });
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [ const ids = await dataStoreService.insertRows(
{ pid: '1995-111a', name: 'Alice', age: 30 }, dataStoreId,
{ pid: '1994-222a', name: 'John', age: 31 }, project1.id,
{ pid: '1993-333a', name: 'Paul', age: 32 }, [
]); { pid: '1995-111a', name: 'Alice', age: 30 },
{ pid: '1994-222a', name: 'John', age: 31 },
{ pid: '1993-333a', name: 'Paul', age: 32 },
],
'id',
);
expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]); expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]);
@@ -1335,9 +1452,12 @@ describe('dataStore', () => {
}); });
// Insert initial row // Insert initial row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [ const ids = await dataStoreService.insertRows(
{ pid: '1995-111a', name: 'Alice', age: 30 }, dataStoreId,
]); project1.id,
[{ pid: '1995-111a', name: 'Alice', age: 30 }],
'id',
);
expect(ids).toEqual([{ id: 1 }]); expect(ids).toEqual([{ id: 1 }]);
// ACT // ACT
@@ -1387,9 +1507,12 @@ describe('dataStore', () => {
}); });
// Insert initial row // Insert initial row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [ const ids = await dataStoreService.insertRows(
{ fullName: 'Alice Cooper', age: 30, birthday: new Date('1995-01-01') }, dataStoreId,
]); project1.id,
[{ fullName: 'Alice Cooper', age: 30, birthday: new Date('1995-01-01') }],
'id',
);
expect(ids).toEqual([{ id: 1 }]); expect(ids).toEqual([{ id: 1 }]);
// ACT // ACT
@@ -1431,9 +1554,12 @@ describe('dataStore', () => {
}); });
// Insert initial row // Insert initial row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [ const ids = await dataStoreService.insertRows(
{ fullName: 'Alice Cooper', age: 30, birthday: new Date('1995-01-01') }, dataStoreId,
]); project1.id,
[{ fullName: 'Alice Cooper', age: 30, birthday: new Date('1995-01-01') }],
'id',
);
expect(ids).toEqual([{ id: 1 }]); expect(ids).toEqual([{ id: 1 }]);
// ACT // ACT
@@ -1477,11 +1603,16 @@ describe('dataStore', () => {
const { id: dataStoreId } = dataStore; const { id: dataStoreId } = dataStore;
// Insert test rows // Insert test rows
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [ const ids = await dataStoreService.insertRows(
{ name: 'Alice', age: 30 }, dataStoreId,
{ name: 'Bob', age: 25 }, project1.id,
{ name: 'Charlie', age: 35 }, [
]); { name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
{ name: 'Charlie', age: 35 },
],
'id',
);
expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]); expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]);
// ACT - Delete first and third rows // ACT - Delete first and third rows
@@ -1531,7 +1662,12 @@ describe('dataStore', () => {
const { id: dataStoreId } = dataStore; const { id: dataStoreId } = dataStore;
// Insert one row // Insert one row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [{ name: 'Alice' }]); const ids = await dataStoreService.insertRows(
dataStoreId,
project1.id,
[{ name: 'Alice' }],
'id',
);
expect(ids).toEqual([{ id: 1 }]); expect(ids).toEqual([{ id: 1 }]);
// ACT - Try to delete existing and non-existing IDs // ACT - Try to delete existing and non-existing IDs
@@ -2299,7 +2435,7 @@ describe('dataStore', () => {
}, },
]; ];
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, rows); const ids = await dataStoreService.insertRows(dataStoreId, project1.id, rows, 'id');
expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]); expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]);
// ACT // ACT

View File

@@ -10,6 +10,7 @@ import {
DataStoreRows, DataStoreRows,
IDataStoreProjectAggregateService, IDataStoreProjectAggregateService,
IDataStoreProjectService, IDataStoreProjectService,
DataTableInsertRowsReturnType,
INode, INode,
ListDataStoreOptions, ListDataStoreOptions,
ListDataStoreRowsOptions, ListDataStoreRowsOptions,
@@ -131,8 +132,11 @@ export class DataStoreProxyService implements DataStoreProxyProvider {
return await dataStoreService.getManyRowsAndCount(dataStoreId, projectId, options); return await dataStoreService.getManyRowsAndCount(dataStoreId, projectId, options);
}, },
async insertRows(rows: DataStoreRows) { async insertRows<T extends DataTableInsertRowsReturnType>(
return await dataStoreService.insertRows(dataStoreId, projectId, rows, true); rows: DataStoreRows,
returnType: T,
) {
return await dataStoreService.insertRows(dataStoreId, projectId, rows, returnType);
}, },
async updateRow(options: UpdateDataStoreRowOptions) { async updateRow(options: UpdateDataStoreRowOptions) {

View File

@@ -18,6 +18,8 @@ import {
UnexpectedError, UnexpectedError,
DataStoreRowsReturn, DataStoreRowsReturn,
DATA_TABLE_SYSTEM_COLUMNS, DATA_TABLE_SYSTEM_COLUMNS,
DataTableInsertRowsReturnType,
DataTableInsertRowsResult,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { DataStoreUserTableName } from './data-store.types'; import { DataStoreUserTableName } from './data-store.types';
@@ -156,18 +158,65 @@ export class DataStoreRowsRepository {
return `${tablePrefix}data_table_user_${dataStoreId}`; return `${tablePrefix}data_table_user_${dataStoreId}`;
} }
async insertRows<T extends boolean | undefined>( async insertRowsBulk(
table: DataStoreUserTableName,
rows: DataStoreRows,
columns: DataTableColumn[],
) {
// DB systems have different maximum parameters per query
// with old sqlite versions having the lowest in 999 parameters
// In practice 20000 works here, but performance didn't meaningfully change
// so this should be a safe limit
const batchSize = 800;
const batches = Math.max(1, Math.ceil((columns.length * rows.length) / batchSize));
const rowsPerBatch = Math.ceil(rows.length / batches);
const columnNames = columns.map((x) => x.name);
const dbType = this.dataSource.options.type;
let insertedRows = 0;
for (let i = 0; i < batches; ++i) {
const start = i * rowsPerBatch;
const endExclusive = Math.min(rows.length, (i + 1) * rowsPerBatch);
if (endExclusive <= start) break;
const completeRows = new Array<DataStoreColumnJsType[]>(endExclusive - start);
for (let j = start; j < endExclusive; ++j) {
const insertArray: DataStoreColumnJsType[] = [];
for (let h = 0; h < columnNames.length; ++h) {
const column = columns[h];
// Fill missing columns with null values to support partial data insertion
const value = rows[j][column.name] ?? null;
insertArray[h] = normalizeValue(value, column.type, dbType);
}
completeRows[j - start] = insertArray;
}
const query = this.dataSource
.createQueryBuilder()
.insert()
.into(table, columnNames)
.values(completeRows);
await query.execute();
insertedRows += completeRows.length;
}
return { success: true, insertedRows } as const;
}
async insertRows<T extends DataTableInsertRowsReturnType>(
dataStoreId: string, dataStoreId: string,
rows: DataStoreRows, rows: DataStoreRows,
columns: DataTableColumn[], columns: DataTableColumn[],
returnData?: T, returnType: T,
): Promise<Array<T extends true ? DataStoreRowReturn : Pick<DataStoreRowReturn, 'id'>>>; ): Promise<DataTableInsertRowsResult<T>>;
async insertRows( async insertRows<T extends DataTableInsertRowsReturnType>(
dataStoreId: string, dataStoreId: string,
rows: DataStoreRows, rows: DataStoreRows,
columns: DataTableColumn[], columns: DataTableColumn[],
returnData?: boolean, returnType: T,
): Promise<Array<DataStoreRowReturn | Pick<DataStoreRowReturn, 'id'>>> { ): Promise<DataTableInsertRowsResult> {
const inserted: Array<Pick<DataStoreRowReturn, 'id'>> = []; const inserted: Array<Pick<DataStoreRowReturn, 'id'>> = [];
const dbType = this.dataSource.options.type; const dbType = this.dataSource.options.type;
const useReturning = dbType === 'postgres' || dbType === 'mariadb'; const useReturning = dbType === 'postgres' || dbType === 'mariadb';
@@ -179,6 +228,10 @@ export class DataStoreRowsRepository {
); );
const selectColumns = [...escapedSystemColumns, ...escapedColumns]; const selectColumns = [...escapedSystemColumns, ...escapedColumns];
if (returnType === 'count') {
return await this.insertRowsBulk(table, rows, columns);
}
// We insert one by one as the default behavior of returning the last inserted ID // We insert one by one as the default behavior of returning the last inserted ID
// is consistent, whereas getting all inserted IDs when inserting multiple values is // is consistent, whereas getting all inserted IDs when inserting multiple values is
// surprisingly awkward without Entities, e.g. `RETURNING id` explicitly does not aggregate // surprisingly awkward without Entities, e.g. `RETURNING id` explicitly does not aggregate
@@ -196,15 +249,16 @@ export class DataStoreRowsRepository {
const query = this.dataSource.createQueryBuilder().insert().into(table).values(completeRow); const query = this.dataSource.createQueryBuilder().insert().into(table).values(completeRow);
if (useReturning) { if (useReturning) {
query.returning(returnData ? selectColumns.join(',') : 'id'); query.returning(returnType === 'all' ? selectColumns.join(',') : 'id');
} }
const result = await query.execute(); const result = await query.execute();
if (useReturning) { if (useReturning) {
const returned = returnData const returned =
? normalizeRows(extractReturningData(result.raw), columns) returnType === 'all'
: extractInsertedIds(result.raw, dbType).map((id) => ({ id })); ? normalizeRows(extractReturningData(result.raw), columns)
: extractInsertedIds(result.raw, dbType).map((id) => ({ id }));
inserted.push.apply(inserted, returned); inserted.push.apply(inserted, returned);
continue; continue;
} }
@@ -215,7 +269,7 @@ export class DataStoreRowsRepository {
throw new UnexpectedError("Couldn't find the inserted row ID"); throw new UnexpectedError("Couldn't find the inserted row ID");
} }
if (!returnData) { if (returnType === 'id') {
inserted.push(...ids.map((id) => ({ id }))); inserted.push(...ids.map((id) => ({ id })));
continue; continue;
} }

View File

@@ -238,11 +238,11 @@ export class DataStoreController {
/** /**
* @returns the IDs of the inserted rows * @returns the IDs of the inserted rows
*/ */
async appendDataStoreRows<T extends boolean | undefined>( async appendDataStoreRows<T extends DataStoreRowReturn | undefined>(
req: AuthenticatedRequest<{ projectId: string }>, req: AuthenticatedRequest<{ projectId: string }>,
_res: Response, _res: Response,
dataStoreId: string, dataStoreId: string,
dto: AddDataStoreRowsDto & { returnData?: T }, dto: AddDataStoreRowsDto & { returnType?: T },
): Promise<Array<T extends true ? DataStoreRowReturn : Pick<DataStoreRowReturn, 'id'>>>; ): Promise<Array<T extends true ? DataStoreRowReturn : Pick<DataStoreRowReturn, 'id'>>>;
@Post('/:dataStoreId/insert') @Post('/:dataStoreId/insert')
@ProjectScope('dataStore:writeRow') @ProjectScope('dataStore:writeRow')
@@ -257,7 +257,7 @@ export class DataStoreController {
dataStoreId, dataStoreId,
req.params.projectId, req.params.projectId,
dto.data, dto.data,
dto.returnData, dto.returnType,
); );
} catch (e: unknown) { } catch (e: unknown) {
if (e instanceof DataStoreNotFoundError) { if (e instanceof DataStoreNotFoundError) {

View File

@@ -17,6 +17,8 @@ import type {
DataStoreRow, DataStoreRow,
DataStoreRowReturn, DataStoreRowReturn,
DataStoreRows, DataStoreRows,
DataTableInsertRowsReturnType,
DataTableInsertRowsResult,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { validateFieldType } from 'n8n-workflow'; import { validateFieldType } from 'n8n-workflow';
@@ -134,23 +136,23 @@ export class DataStoreService {
return await this.dataStoreColumnRepository.getColumns(dataStoreId); return await this.dataStoreColumnRepository.getColumns(dataStoreId);
} }
async insertRows<T extends boolean | undefined>( async insertRows<T extends DataTableInsertRowsReturnType = 'count'>(
dataStoreId: string, dataStoreId: string,
projectId: string, projectId: string,
rows: DataStoreRows, rows: DataStoreRows,
returnData?: T, returnType?: T,
): Promise<Array<T extends true ? DataStoreRowReturn : Pick<DataStoreRowReturn, 'id'>>>; ): Promise<DataTableInsertRowsResult<T>>;
async insertRows( async insertRows(
dataStoreId: string, dataStoreId: string,
projectId: string, projectId: string,
rows: DataStoreRows, rows: DataStoreRows,
returnData?: boolean, returnType: DataTableInsertRowsReturnType = 'count',
) { ) {
await this.validateDataStoreExists(dataStoreId, projectId); await this.validateDataStoreExists(dataStoreId, projectId);
await this.validateRows(dataStoreId, rows); await this.validateRows(dataStoreId, rows);
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId); const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId);
return await this.dataStoreRowsRepository.insertRows(dataStoreId, rows, columns, returnData); return await this.dataStoreRowsRepository.insertRows(dataStoreId, rows, columns, returnType);
} }
async upsertRow<T extends boolean | undefined>( async upsertRow<T extends boolean | undefined>(
@@ -172,7 +174,12 @@ export class DataStoreService {
} }
// No rows were updated, so insert a new one // No rows were updated, so insert a new one
const inserted = await this.insertRows(dataStoreId, projectId, [dto.data], returnData); const inserted = await this.insertRows(
dataStoreId,
projectId,
[dto.data],
returnData ? 'all' : 'count',
);
return returnData ? inserted : true; return returnData ? inserted : true;
} }

View File

@@ -36,7 +36,7 @@ export const createDataStore = async (
const columns = await dataStoreColumnRepository.getColumns(dataStore.id); const columns = await dataStoreColumnRepository.getColumns(dataStore.id);
const dataStoreRowsRepository = Container.get(DataStoreRowsRepository); const dataStoreRowsRepository = Container.get(DataStoreRowsRepository);
await dataStoreRowsRepository.insertRows(dataStore.id, options.data, columns); await dataStoreRowsRepository.insertRows(dataStore.id, options.data, columns, 'count');
} }
return dataStore; return dataStore;

View File

@@ -157,7 +157,7 @@ export const insertDataStoreRowApi = async (
'POST', 'POST',
`/projects/${projectId}/data-tables/${dataStoreId}/insert`, `/projects/${projectId}/data-tables/${dataStoreId}/insert`,
{ {
returnData: true, returnType: 'all',
data: [row], data: [row],
}, },
); );

View File

@@ -15,7 +15,7 @@ type DataTableNodeType = AllEntities<{
const BULK_OPERATIONS = ['insert'] as const; const BULK_OPERATIONS = ['insert'] as const;
function canBulk(operation: string): operation is (typeof BULK_OPERATIONS)[number] { function hasBulkExecute(operation: string): operation is (typeof BULK_OPERATIONS)[number] {
return (BULK_OPERATIONS as readonly string[]).includes(operation); return (BULK_OPERATIONS as readonly string[]).includes(operation);
} }
@@ -41,7 +41,7 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
} as DataTableNodeType; } as DataTableNodeType;
// If the operation supports // If the operation supports
if (canBulk(dataTableNodeData.operation) && !hasComplexId(this)) { if (hasBulkExecute(dataTableNodeData.operation) && !hasComplexId(this)) {
try { try {
const proxy = await getDataTableProxyExecute(this); const proxy = await getDataTableProxyExecute(this);
@@ -66,7 +66,8 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
itemData: { item: i }, itemData: { item: i },
}); });
operationResult.push.apply(operationResult, executionData); // pushing here risks stack overflows for very high numbers (~100k) of results on filter-based queries (update, get, etc.)
operationResult = operationResult.concat(executionData);
} catch (error) { } catch (error) {
if (this.continueOnFail()) { if (this.continueOnFail()) {
operationResult.push({ operationResult.push({

View File

@@ -18,26 +18,63 @@ const displayOptions: IDisplayOptions = {
}, },
}; };
export const description: INodeProperties[] = [makeAddRow(FIELD, displayOptions)]; export const description: INodeProperties[] = [
makeAddRow(FIELD, displayOptions),
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Optimize Bulk',
name: 'optimizeBulk',
type: 'boolean',
default: false,
noDataExpression: true, // bulk inserts don't support expressions so this is a bit paradoxical
description: 'Whether to improve bulk insert performance 5x by not returning inserted data',
},
],
displayOptions,
},
];
export async function execute( export async function execute(
this: IExecuteFunctions, this: IExecuteFunctions,
index: number, index: number,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
const optimizeBulkEnabled = this.getNodeParameter('options.optimizeBulk', index, false);
const dataStoreProxy = await getDataTableProxyExecute(this, index); const dataStoreProxy = await getDataTableProxyExecute(this, index);
const row = getAddRow(this, index); const row = getAddRow(this, index);
const insertedRows = await dataStoreProxy.insertRows([row]); if (optimizeBulkEnabled) {
return insertedRows.map((json) => ({ json })); // This function is always called by index, so we inherently cannot operate in bulk
this.addExecutionHints({
message: 'Unable to optimize bulk insert due to expression in Data Table ID ',
location: 'outputPane',
});
const json = await dataStoreProxy.insertRows([row], 'count');
return [{ json }];
} else {
const insertedRows = await dataStoreProxy.insertRows([row], 'all');
return insertedRows.map((json, item) => ({ json, pairedItem: { item } }));
}
} }
export async function executeBulk( export async function executeBulk(
this: IExecuteFunctions, this: IExecuteFunctions,
proxy: IDataStoreProjectService, proxy: IDataStoreProjectService,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
const optimizeBulkEnabled = this.getNodeParameter('options.optimizeBulk', 0, false);
const rows = this.getInputData().flatMap((_, i) => [getAddRow(this, i)]); const rows = this.getInputData().flatMap((_, i) => [getAddRow(this, i)]);
const insertedRows = await proxy.insertRows(rows); if (optimizeBulkEnabled) {
return insertedRows.map((json, item) => ({ json, pairedItem: { item } })); const json = await proxy.insertRows(rows, 'count');
return [{ json }];
} else {
const insertedRows = await proxy.insertRows(rows, 'all');
return insertedRows.map((json, item) => ({ json, pairedItem: { item } }));
}
} }

View File

@@ -88,6 +88,16 @@ export type DataStoreRows = DataStoreRow[];
export type DataStoreRowReturn = DataStoreRow & DataStoreRowReturnBase; export type DataStoreRowReturn = DataStoreRow & DataStoreRowReturnBase;
export type DataStoreRowsReturn = DataStoreRowReturn[]; export type DataStoreRowsReturn = DataStoreRowReturn[];
export type DataTableInsertRowsReturnType = 'all' | 'id' | 'count';
export type DataTableInsertRowsBulkResult = { success: true; insertedRows: number };
export type DataTableInsertRowsResult<
T extends DataTableInsertRowsReturnType = DataTableInsertRowsReturnType,
> = T extends 'all'
? DataStoreRowReturn[]
: T extends 'id'
? Array<Pick<DataStoreRowReturn, 'id'>>
: DataTableInsertRowsBulkResult;
// APIs for a data store service operating on a specific projectId // APIs for a data store service operating on a specific projectId
export interface IDataStoreProjectAggregateService { export interface IDataStoreProjectAggregateService {
getProjectId(): string; getProjectId(): string;
@@ -116,7 +126,10 @@ export interface IDataStoreProjectService {
dto: Partial<ListDataStoreRowsOptions>, dto: Partial<ListDataStoreRowsOptions>,
): Promise<{ count: number; data: DataStoreRowsReturn }>; ): Promise<{ count: number; data: DataStoreRowsReturn }>;
insertRows(rows: DataStoreRows): Promise<DataStoreRowReturn[]>; insertRows<T extends DataTableInsertRowsReturnType>(
rows: DataStoreRows,
returnType: T,
): Promise<DataTableInsertRowsResult<T>>;
updateRow(options: UpdateDataStoreRowOptions): Promise<DataStoreRowReturn[]>; updateRow(options: UpdateDataStoreRowOptions): Promise<DataStoreRowReturn[]>;