feat(core): Use filters for data table upsert (no-changelog) (#19292)

This commit is contained in:
Daria
2025-09-08 14:41:09 +03:00
committed by GitHub
parent 70254526bb
commit 9b06648edc
17 changed files with 218 additions and 609 deletions

View File

@@ -7,6 +7,6 @@ import {
} from '../../schemas/data-store.schema';
export class AddDataStoreRowsDto extends Z.class({
returnData: z.boolean().default(false),
returnData: z.boolean().optional().default(false),
data: z.array(z.record(dataStoreColumnNameSchema, dataStoreColumnValueSchema)),
}) {}

View File

@@ -18,7 +18,7 @@ const updateDataTableRowShape = {
.refine((obj) => Object.keys(obj).length > 0, {
message: 'data must not be empty',
}),
returnData: z.boolean().default(false),
returnData: z.boolean().optional().default(false),
};
export class UpdateDataTableRowDto extends Z.class(updateDataTableRowShape) {}

View File

@@ -0,0 +1,24 @@
import { z } from 'zod';
import { Z } from 'zod-class';
import {
dataStoreColumnNameSchema,
dataStoreColumnValueSchema,
} from '../../schemas/data-store.schema';
import { dataTableFilterSchema } from '../../schemas/data-table-filter.schema';
const upsertFilterSchema = dataTableFilterSchema.refine((filter) => filter.filters.length > 0, {
message: 'filter must not be empty',
});
const upsertDataStoreRowShape = {
filter: upsertFilterSchema,
data: z
.record(dataStoreColumnNameSchema, dataStoreColumnValueSchema)
.refine((obj) => Object.keys(obj).length > 0, {
message: 'data must not be empty',
}),
returnData: z.boolean().optional().default(false),
};
export class UpsertDataStoreRowDto extends Z.class(upsertDataStoreRowShape) {}

View File

@@ -1,15 +0,0 @@
import { z } from 'zod';
import { Z } from 'zod-class';
import {
dataStoreColumnNameSchema,
dataStoreColumnValueSchema,
} from '../../schemas/data-store.schema';
const upsertDataStoreRowsShape = {
rows: z.array(z.record(dataStoreColumnNameSchema, dataStoreColumnValueSchema)),
matchFields: z.array(dataStoreColumnNameSchema).min(1),
returnData: z.boolean().optional().default(false),
};
export class UpsertDataStoreRowsDto extends Z.class(upsertDataStoreRowsShape) {}

View File

@@ -86,7 +86,7 @@ export { OidcConfigDto } from './oidc/config.dto';
export { CreateDataStoreDto } from './data-store/create-data-store.dto';
export { UpdateDataStoreDto } from './data-store/update-data-store.dto';
export { UpdateDataTableRowDto } from './data-store/update-data-store-row.dto';
export { UpsertDataStoreRowsDto } from './data-store/upsert-data-store-rows.dto';
export { UpsertDataStoreRowDto } from './data-store/upsert-data-store-row.dto';
export { ListDataStoreQueryDto } from './data-store/list-data-store-query.dto';
export { ListDataStoreContentQueryDto } from './data-store/list-data-store-content-query.dto';
export { CreateDataStoreColumnDto } from './data-store/create-data-store-column.dto';

View File

@@ -7,7 +7,7 @@ import type {
INode,
ListDataStoreRowsOptions,
MoveDataStoreColumnOptions,
UpsertDataStoreRowsOptions,
UpsertDataStoreRowOptions,
Workflow,
} from 'n8n-workflow';
@@ -212,10 +212,13 @@ describe('DataStoreProxyService', () => {
expect(dataStoreServiceMock.insertRows).toBeCalledWith('dataStore-id', PROJECT_ID, rows, true);
});
it('should call upsertRows with correct parameters', async () => {
const options: UpsertDataStoreRowsOptions = {
matchFields: ['name'],
rows: [{ id: 1, name: 'row1' }],
it('should call upsertRow with correct parameters', async () => {
const options: UpsertDataStoreRowOptions = {
filter: {
filters: [{ columnName: 'name', condition: 'eq', value: 'test' }],
type: 'and',
},
data: { name: 'newName' },
};
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
@@ -223,9 +226,9 @@ describe('DataStoreProxyService', () => {
node,
'dataStore-id',
);
await dataStoreOperations.upsertRows(options);
await dataStoreOperations.upsertRow(options);
expect(dataStoreServiceMock.upsertRows).toBeCalledWith(
expect(dataStoreServiceMock.upsertRow).toBeCalledWith(
'dataStore-id',
PROJECT_ID,
options,

View File

@@ -2802,13 +2802,8 @@ describe('DELETE /projects/:projectId/data-tables/:dataStoreId/rows', () => {
describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
test('should not upsert rows when project does not exist', async () => {
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30 },
};
await authOwnerAgent
@@ -2820,13 +2815,8 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
test('should not upsert rows when data store does not exist', async () => {
const project = await createTeamProject('test project', owner);
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30 },
};
await authOwnerAgent
@@ -2839,24 +2829,19 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const dataStore = await createDataStore(ownerProject, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
});
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30 },
};
await authMemberAgent
@@ -2871,24 +2856,19 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const dataStore = await createDataStore(project, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
});
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30 },
};
await authMemberAgent
@@ -2904,24 +2884,19 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const dataStore = await createDataStore(project, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
});
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { name: 'Alice', age: 30 },
};
await authMemberAgent
@@ -2931,7 +2906,7 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const rowsInDb = await dataStoreRowsRepository.getManyAndCount(dataStore.id, {});
expect(rowsInDb.count).toBe(1);
expect(rowsInDb.data[0]).toMatchObject(payload.rows[0]);
expect(rowsInDb.data[0]).toMatchObject(payload.data);
});
test('should upsert rows if user has project:admin role in team project', async () => {
@@ -2941,24 +2916,19 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const dataStore = await createDataStore(project, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
});
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30 },
};
await authAdminAgent
@@ -2968,31 +2938,26 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const rowsInDb = await dataStoreRowsRepository.getManyAndCount(dataStore.id, {});
expect(rowsInDb.count).toBe(1);
expect(rowsInDb.data[0]).toMatchObject(payload.rows[0]);
expect(rowsInDb.data[0]).toMatchObject(payload.data);
});
test('should upsert rows in personal project', async () => {
const dataStore = await createDataStore(memberProject, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
});
const payload = {
rows: [
{
first: 'test value',
second: 'another value',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30 },
};
await authMemberAgent
@@ -3002,31 +2967,26 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
const rowsInDb = await dataStoreRowsRepository.getManyAndCount(dataStore.id, {});
expect(rowsInDb.count).toBe(1);
expect(rowsInDb.data[0]).toMatchObject(payload.rows[0]);
expect(rowsInDb.data[0]).toMatchObject(payload.data);
});
test('should not upsert rows when column does not exist', async () => {
const dataStore = await createDataStore(memberProject, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
});
const payload = {
rows: [
{
first: 'test value',
nonexisting: 'this does not exist',
},
],
matchFields: ['first', 'second'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 30, nonexisting: 'this does not exist' },
};
const response = await authMemberAgent
@@ -3039,96 +2999,33 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
expect(rowsInDb.count).toBe(0);
});
test('should update existing matched fields and insert new ones', async () => {
test('should return updated row if returnData is set', async () => {
const dataStore = await createDataStore(memberProject, {
columns: [
{
name: 'first',
name: 'name',
type: 'string',
},
{
name: 'second',
type: 'string',
name: 'age',
type: 'number',
},
],
data: [
{
first: 'test row',
second: 'test value',
name: 'Alice',
age: 30,
},
{
first: 'test row',
second: 'another row with same first column',
name: 'John',
age: 25,
},
],
});
const payload = {
rows: [
{
first: 'test row',
second: 'updated value',
},
{
first: 'new row',
second: 'new value',
},
],
matchFields: ['first'],
};
const result = await authMemberAgent
.post(`/projects/${memberProject.id}/data-tables/${dataStore.id}/upsert`)
.send(payload)
.expect(200);
expect(result.body.data).toBe(true);
const rowsInDb = await dataStoreRowsRepository.getManyAndCount(dataStore.id, {
sortBy: ['id', 'ASC'],
});
expect(rowsInDb.count).toBe(3);
expect(rowsInDb.data[0]).toMatchObject(payload.rows[0]);
expect(rowsInDb.data[1]).toMatchObject(payload.rows[0]);
expect(rowsInDb.data[2]).toMatchObject(payload.rows[1]);
});
test('should return affected rows if returnData is set', async () => {
const dataStore = await createDataStore(memberProject, {
columns: [
{
name: 'first',
type: 'string',
},
{
name: 'second',
type: 'string',
},
],
data: [
{
first: 'test row',
second: 'test value',
},
{
first: 'test row',
second: 'another row with same first column',
},
],
});
const payload = {
rows: [
{
first: 'test row',
second: 'updated value',
},
{
first: 'new row',
second: 'new value',
},
],
matchFields: ['first'],
filter: { type: 'and', filters: [{ columnName: 'name', condition: 'eq', value: 'Alice' }] },
data: { age: 35 },
returnData: true,
};
@@ -3137,31 +3034,15 @@ describe('POST /projects/:projectId/data-tables/:dataStoreId/upsert', () => {
.send(payload)
.expect(200);
expect(result.body.data).toEqual(
expect.arrayContaining([
{
id: 1,
first: 'test row',
second: 'updated value',
createdAt: expect.any(String),
updatedAt: expect.any(String),
},
{
id: 2,
first: 'test row',
second: 'updated value',
createdAt: expect.any(String),
updatedAt: expect.any(String),
},
{
id: 3,
first: 'new row',
second: 'new value',
createdAt: expect.any(String),
updatedAt: expect.any(String),
},
]),
);
expect(result.body.data).toEqual([
{
id: expect.any(Number),
name: 'Alice',
age: 35,
createdAt: expect.any(String),
updatedAt: expect.any(String),
},
]);
});
});

View File

@@ -1263,7 +1263,7 @@ describe('dataStore', () => {
});
});
describe('upsertRows', () => {
describe('upsertRow', () => {
it('should update a row if filter matches', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
@@ -1284,12 +1284,12 @@ describe('dataStore', () => {
expect(ids).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]);
// ACT
const result = await dataStoreService.upsertRows(dataStoreId, project1.id, {
rows: [
{ pid: '1995-111a', name: 'Alicia', age: 31 },
{ pid: '1994-222a', name: 'John', age: 32 },
],
matchFields: ['pid'],
const result = await dataStoreService.upsertRow(dataStoreId, project1.id, {
filter: {
type: 'and',
filters: [{ columnName: 'pid', value: '1995-111a', condition: 'eq' }],
},
data: { name: 'Alicia', age: 31 },
});
// ASSERT
@@ -1307,74 +1307,17 @@ describe('dataStore', () => {
expect.objectContaining({
pid: '1995-111a',
name: 'Alicia',
age: 31,
age: 31, // updated
}),
expect.objectContaining({
pid: '1994-222a',
name: 'John',
age: 32,
age: 31,
}),
expect.objectContaining({
pid: '1993-333a',
name: 'Paul',
age: 32,
}), // unchanged
]),
);
});
it('should work correctly with multiple filters', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
name: 'dataStore',
columns: [
{ name: 'city', type: 'string' },
{ name: 'age', type: 'number' },
{ name: 'isEligible', type: 'boolean' },
],
});
await dataStoreService.insertRows(dataStoreId, project1.id, [
{ city: 'Berlin', age: 30, isEligible: false },
{ city: 'Amsterdam', age: 32, isEligible: false },
{ city: 'Oslo', age: 28, isEligible: false },
]);
// ACT
const result = await dataStoreService.upsertRows(dataStoreId, project1.id, {
rows: [
{ city: 'Berlin', age: 30, isEligible: true },
{ city: 'Amsterdam', age: 32, isEligible: true },
],
matchFields: ['age', 'city'],
});
// ASSERT
expect(result).toBe(true);
const { count, data } = await dataStoreService.getManyRowsAndCount(
dataStoreId,
project1.id,
{},
);
expect(count).toEqual(3);
expect(data).toEqual(
expect.arrayContaining([
expect.objectContaining({
city: 'Berlin',
age: 30,
isEligible: true,
}),
expect.objectContaining({
city: 'Amsterdam',
age: 32,
isEligible: true,
}),
expect.objectContaining({
city: 'Oslo',
age: 28,
isEligible: false,
}),
]),
);
@@ -1398,58 +1341,12 @@ describe('dataStore', () => {
expect(ids).toEqual([{ id: 1 }]);
// ACT
const result = await dataStoreService.upsertRows(dataStoreId, project1.id, {
rows: [{ pid: '1992-222b', name: 'Alice', age: 30 }],
matchFields: ['pid'],
});
// ASSERT
expect(result).toBe(true);
const { count, data } = await dataStoreService.getManyRowsAndCount(
dataStoreId,
project1.id,
{},
);
expect(count).toEqual(2);
expect(data).toEqual([
expect.objectContaining({
name: 'Alice',
age: 30,
pid: '1995-111a',
}),
expect.objectContaining({
name: 'Alice',
age: 30,
pid: '1992-222b',
}),
]);
});
it('should allow adding partial data', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
name: 'dataStore',
columns: [
{ name: 'pid', type: 'string' },
{ name: 'name', type: 'string' },
{ name: 'age', type: 'number' },
],
});
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [
{ pid: '1995-111a', name: 'Alice', age: 30 },
]);
expect(ids).toEqual([{ id: 1 }]);
// ACT
const result = await dataStoreService.upsertRows(dataStoreId, project1.id, {
rows: [
{ pid: '1992-222b', name: 'Alice' }, // age is missing
{ pid: '1995-111a', age: 35 }, // name is missing
],
matchFields: ['pid'],
const result = await dataStoreService.upsertRow(dataStoreId, project1.id, {
filter: {
type: 'and',
filters: [{ columnName: 'pid', value: '1995-222b', condition: 'eq' }],
},
data: { pid: '1995-222b', name: 'Alice', age: 30 },
});
// ASSERT
@@ -1466,24 +1363,23 @@ describe('dataStore', () => {
expect.arrayContaining([
expect.objectContaining({
name: 'Alice',
age: 35, // updated age
age: 30,
pid: '1995-111a',
}),
expect.objectContaining({
name: 'Alice',
age: null, // missing age
pid: '1992-222b',
age: 30,
pid: '1995-222b',
}),
]),
);
});
it('should return full upserted rows if returnData is set', async () => {
it('should return full inserted row if returnData is set', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
name: 'dataStore',
columns: [
{ name: 'pid', type: 'string' },
{ name: 'fullName', type: 'string' },
{ name: 'age', type: 'number' },
{ name: 'birthday', type: 'date' },
@@ -1492,47 +1388,79 @@ describe('dataStore', () => {
// Insert initial row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [
{ pid: '1995-111a', fullName: 'Alice', age: 30, birthday: new Date('1995-01-01') },
{ fullName: 'Alice Cooper', age: 30, birthday: new Date('1995-01-01') },
]);
expect(ids).toEqual([{ id: 1 }]);
// ACT
const result = await dataStoreService.upsertRows(
const result = await dataStoreService.upsertRow(
dataStoreId,
project1.id,
{
rows: [
{ pid: '1995-111a', fullName: 'Alicia', age: 31, birthday: new Date('1995-01-01') },
{ pid: '1992-222b', fullName: 'Bob', age: 30, birthday: new Date('1992-01-01') },
],
matchFields: ['pid'],
filter: {
type: 'and',
filters: [{ columnName: 'fullName', value: 'Bob Vance', condition: 'eq' }],
},
data: { fullName: 'Bob Vance', age: 30, birthday: new Date('1992-01-02') },
},
true,
);
// ASSERT
expect(result).toEqual(
expect.arrayContaining([
{
id: 1,
fullName: 'Alicia',
age: 31,
pid: '1995-111a',
birthday: new Date('1995-01-01'),
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
expect(result).toEqual([
{
id: expect.any(Number),
fullName: 'Bob Vance',
age: 30,
birthday: new Date('1992-01-02'),
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
},
]);
});
it('should return full updated row if returnData is set', async () => {
// ARRANGE
const { id: dataStoreId } = await dataStoreService.createDataStore(project1.id, {
name: 'dataStore',
columns: [
{ name: 'fullName', type: 'string' },
{ name: 'age', type: 'number' },
{ name: 'birthday', type: 'date' },
],
});
// Insert initial row
const ids = await dataStoreService.insertRows(dataStoreId, project1.id, [
{ fullName: 'Alice Cooper', age: 30, birthday: new Date('1995-01-01') },
]);
expect(ids).toEqual([{ id: 1 }]);
// ACT
const result = await dataStoreService.upsertRow(
dataStoreId,
project1.id,
{
filter: {
type: 'and',
filters: [{ columnName: 'age', value: 30, condition: 'eq' }],
},
{
id: 2,
fullName: 'Bob',
age: 30,
pid: '1992-222b',
birthday: new Date('1992-01-01'),
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
},
]),
data: { age: 35, birthday: new Date('1990-01-01') },
},
true,
);
// ASSERT
expect(result).toEqual([
{
id: expect.any(Number),
fullName: 'Alice Cooper',
age: 35,
birthday: new Date('1990-01-01'),
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
},
]);
});
});
@@ -2117,7 +2045,7 @@ describe('dataStore', () => {
// ASSERT
await expect(result).rejects.toThrow(
new DataStoreValidationError('Filter must not be empty for updateRow'),
new DataStoreValidationError('Filter must not be empty'),
);
const { data } = await dataStoreService.getManyRowsAndCount(dataStoreId, project1.id, {});
@@ -2149,7 +2077,7 @@ describe('dataStore', () => {
// ASSERT
await expect(result).rejects.toThrow(
new DataStoreValidationError('Data columns must not be empty for updateRow'),
new DataStoreValidationError('Data columns must not be empty'),
);
const { data } = await dataStoreService.getManyRowsAndCount(dataStoreId, project1.id, {});

View File

@@ -1,6 +1,4 @@
import type { DataStoreRows } from 'n8n-workflow';
import { addColumnQuery, deleteColumnQuery, splitRowsByExistence } from '../utils/sql-utils';
import { addColumnQuery, deleteColumnQuery } from '../utils/sql-utils';
describe('sql-utils', () => {
describe('addColumnQuery', () => {
@@ -51,71 +49,4 @@ describe('sql-utils', () => {
expect(query).toBe('ALTER TABLE "data_table_user_abc" DROP COLUMN "email"');
});
});
describe('splitRowsByExistence', () => {
it('should correctly separate rows into insert and update based on matchFields', () => {
const existing = [
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
];
const matchFields = ['name'];
const rows: DataStoreRows = [
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 26 },
{ name: 'Charlie', age: 35 },
];
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
expect(rowsToUpdate).toEqual([
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 26 },
]);
expect(rowsToInsert).toEqual([{ name: 'Charlie', age: 35 }]);
});
it('should treat rows as new if matchFields combination does not exist', () => {
const existing = [{ name: 'Bob', city: 'Berlin' }];
const matchFields = ['name', 'city'];
const rows: DataStoreRows = [{ name: 'Alice', city: 'Berlin' }];
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
expect(rowsToUpdate).toEqual([]);
expect(rowsToInsert).toEqual([{ name: 'Alice', city: 'Berlin' }]);
});
it('should insert all rows if existing set is empty', () => {
const existing: Array<Record<string, unknown>> = [];
const matchFields = ['name'];
const rows: DataStoreRows = [{ name: 'Alice' }, { name: 'Bob' }];
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
expect(rowsToUpdate).toEqual([]);
expect(rowsToInsert).toEqual(rows);
});
it('should update all rows if all keys match existing', () => {
const existing = [{ name: 'Alice' }, { name: 'Bob' }];
const matchFields = ['name'];
const rows: DataStoreRows = [{ name: 'Alice' }, { name: 'Bob' }];
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
expect(rowsToInsert).toEqual([]);
expect(rowsToUpdate).toEqual(rows);
});
it('should handle empty input rows', () => {
const existing = [{ name: 'Alice' }];
const matchFields = ['name'];
const rows: DataStoreRows = [];
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
expect(rowsToInsert).toEqual([]);
expect(rowsToUpdate).toEqual([]);
});
});
});

View File

@@ -15,15 +15,15 @@ import {
ListDataStoreRowsOptions,
MoveDataStoreColumnOptions,
UpdateDataStoreOptions,
UpdateDataStoreRowsOptions,
UpsertDataStoreRowsOptions,
UpdateDataStoreRowOptions,
UpsertDataStoreRowOptions,
Workflow,
} from 'n8n-workflow';
import { DataStoreService } from './data-store.service';
import { OwnershipService } from '@/services/ownership.service';
import { DataStoreService } from './data-store.service';
@Service()
export class DataStoreProxyService implements DataStoreProxyProvider {
constructor(
@@ -135,12 +135,12 @@ export class DataStoreProxyService implements DataStoreProxyProvider {
return await dataStoreService.insertRows(dataStoreId, projectId, rows, true);
},
async updateRows(options: UpdateDataStoreRowsOptions) {
async updateRow(options: UpdateDataStoreRowOptions) {
return await dataStoreService.updateRow(dataStoreId, projectId, options, true);
},
async upsertRows(options: UpsertDataStoreRowsOptions) {
return await dataStoreService.upsertRows(dataStoreId, projectId, options, true);
async upsertRow(options: UpsertDataStoreRowOptions) {
return await dataStoreService.upsertRow(dataStoreId, projectId, options, true);
},
async deleteRows(ids: number[]) {

View File

@@ -31,7 +31,6 @@ import {
normalizeRows,
normalizeValue,
quoteIdentifier,
splitRowsByExistence,
toDslColumns,
toSqliteGlobFromPercent,
} from './utils/sql-utils';
@@ -231,7 +230,7 @@ export class DataStoreRowsRepository {
async updateRow(
dataStoreId: string,
setData: Record<string, DataStoreColumnJsType | null>,
data: Record<string, DataStoreColumnJsType | null>,
filter: DataTableFilter,
columns: DataTableColumn[],
returnData: boolean = false,
@@ -245,6 +244,7 @@ export class DataStoreRowsRepository {
this.dataSource.driver.escape(x),
);
const selectColumns = [...escapedSystemColumns, ...escapedColumns];
const setData = { ...data };
for (const column of columns) {
if (column.name in setData) {
@@ -289,66 +289,6 @@ export class DataStoreRowsRepository {
return await this.getManyByIds(dataStoreId, ids, columns);
}
// TypeORM cannot infer the columns for a dynamic table name, so we use a raw query
async upsertRows<T extends boolean | undefined>(
dataStoreId: string,
matchFields: string[],
rows: DataStoreRows,
columns: DataTableColumn[],
returnData?: T,
): Promise<T extends true ? DataStoreRowReturn[] : true>;
async upsertRows(
dataStoreId: string,
matchFields: string[],
rows: DataStoreRows,
columns: DataTableColumn[],
returnData?: boolean,
) {
returnData = returnData ?? false;
const { rowsToInsert, rowsToUpdate } = await this.fetchAndSplitRowsByExistence(
dataStoreId,
matchFields,
rows,
);
const output: DataStoreRowReturn[] = [];
if (rowsToInsert.length > 0) {
const result = await this.insertRows(dataStoreId, rowsToInsert, columns, returnData);
if (returnData) {
output.push.apply(output, result);
}
}
if (rowsToUpdate.length > 0) {
for (const row of rowsToUpdate) {
const updateKeys = Object.keys(row).filter((key) => !matchFields.includes(key));
if (updateKeys.length === 0) {
return true;
}
const setData = Object.fromEntries(updateKeys.map((key) => [key, row[key]]));
const whereData = Object.fromEntries(matchFields.map((key) => [key, row[key]]));
// Convert whereData object to DataTableFilter format
const filter: DataTableFilter = {
type: 'and',
filters: Object.entries(whereData).map(([columnName, value]) => ({
columnName,
condition: 'eq' as const,
value,
})),
};
const result = await this.updateRow(dataStoreId, setData, filter, columns, returnData);
if (returnData) {
output.push.apply(output, result);
}
}
}
return returnData ? output : true;
}
async deleteRows(dataStoreId: string, ids: number[]) {
if (ids.length === 0) {
return true;
@@ -505,28 +445,4 @@ export class DataStoreRowsRepository {
query.skip(dto.skip);
query.take(dto.take);
}
private async fetchAndSplitRowsByExistence(
dataStoreId: string,
matchFields: string[],
rows: DataStoreRows,
): Promise<{ rowsToInsert: DataStoreRows; rowsToUpdate: DataStoreRows }> {
const queryBuilder = this.dataSource
.createQueryBuilder()
.select(matchFields)
.from(this.toTableName(dataStoreId), 'datatable');
rows.forEach((row, index) => {
const matchData = Object.fromEntries(matchFields.map((field) => [field, row[field]]));
if (index === 0) {
queryBuilder.where(matchData);
} else {
queryBuilder.orWhere(matchData);
}
});
const existing: Array<Record<string, DataStoreColumnJsType>> = await queryBuilder.getRawMany();
return splitRowsByExistence(existing, matchFields, rows);
}
}

View File

@@ -8,7 +8,7 @@ import {
MoveDataStoreColumnDto,
UpdateDataStoreDto,
UpdateDataTableRowDto,
UpsertDataStoreRowsDto,
UpsertDataStoreRowDto,
} from '@n8n/api-types';
import { AuthenticatedRequest } from '@n8n/db';
import {
@@ -274,14 +274,14 @@ export class DataStoreController {
@Post('/:dataStoreId/upsert')
@ProjectScope('dataStore:writeRow')
async upsertDataStoreRows(
async upsertDataStoreRow(
req: AuthenticatedRequest<{ projectId: string }>,
_res: Response,
@Param('dataStoreId') dataStoreId: string,
@Body dto: UpsertDataStoreRowsDto,
@Body dto: UpsertDataStoreRowDto,
) {
try {
return await this.dataStoreService.upsertRows(
return await this.dataStoreService.upsertRow(
dataStoreId,
req.params.projectId,
dto,

View File

@@ -4,7 +4,7 @@ import type {
ListDataStoreContentQueryDto,
MoveDataStoreColumnDto,
DataStoreListOptions,
UpsertDataStoreRowsDto,
UpsertDataStoreRowDto,
UpdateDataStoreDto,
UpdateDataTableRowDto,
} from '@n8n/api-types';
@@ -153,35 +153,27 @@ export class DataStoreService {
return await this.dataStoreRowsRepository.insertRows(dataStoreId, rows, columns, returnData);
}
async upsertRows<T extends boolean | undefined>(
async upsertRow<T extends boolean | undefined>(
dataStoreId: string,
projectId: string,
dto: Omit<UpsertDataStoreRowsDto, 'returnData'>,
dto: Omit<UpsertDataStoreRowDto, 'returnData'>,
returnData?: T,
): Promise<T extends true ? DataStoreRowReturn[] : true>;
async upsertRows(
async upsertRow(
dataStoreId: string,
projectId: string,
dto: Omit<UpsertDataStoreRowsDto, 'returnData'>,
dto: Omit<UpsertDataStoreRowDto, 'returnData'>,
returnData: boolean = false,
) {
await this.validateDataStoreExists(dataStoreId, projectId);
await this.validateRows(dataStoreId, dto.rows, true);
const updated = await this.updateRow(dataStoreId, projectId, dto, true);
if (dto.rows.length === 0) {
throw new DataStoreValidationError('No rows provided for upsertRows');
if (updated.length > 0) {
return returnData ? updated : true;
}
const { matchFields, rows } = dto;
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId);
return await this.dataStoreRowsRepository.upsertRows(
dataStoreId,
matchFields,
rows,
columns,
returnData,
);
// No rows were updated, so insert a new one
const inserted = await this.insertRows(dataStoreId, projectId, [dto.data], returnData);
return returnData ? inserted : true;
}
async updateRow<T extends boolean | undefined>(
@@ -207,10 +199,10 @@ export class DataStoreService {
const { data, filter } = dto;
if (!filter?.filters || filter.filters.length === 0) {
throw new DataStoreValidationError('Filter must not be empty for updateRow');
throw new DataStoreValidationError('Filter must not be empty');
}
if (!data || Object.keys(data).length === 0) {
throw new DataStoreValidationError('Data columns must not be empty for updateRow');
throw new DataStoreValidationError('Data columns must not be empty');
}
this.validateRowsWithColumns([data], columns, false);

View File

@@ -1,19 +1,14 @@
import { DATA_STORE_COLUMN_REGEX, type DataStoreCreateColumnSchema } from '@n8n/api-types';
import { DslColumn } from '@n8n/db';
import type { DataSourceOptions } from '@n8n/typeorm';
import type {
DataStoreColumnJsType,
DataStoreRows,
DataStoreRowReturn,
DataStoreRowsReturn,
} from 'n8n-workflow';
import type { DataStoreColumnJsType, DataStoreRowReturn, DataStoreRowsReturn } from 'n8n-workflow';
import { UnexpectedError } from 'n8n-workflow';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type { DataStoreUserTableName } from '../data-store.types';
import type { DataTableColumn } from '../data-table-column.entity';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
export function toDslColumns(columns: DataStoreCreateColumnSchema[]): DslColumn[] {
return columns.map((col) => {
const name = new DslColumn(col.name.trim());
@@ -100,33 +95,6 @@ export function deleteColumnQuery(
return `ALTER TABLE ${quotedTableName} DROP COLUMN ${quoteIdentifier(column, dbType)}`;
}
export function splitRowsByExistence(
existing: Array<Record<string, unknown>>,
matchFields: string[],
rows: DataStoreRows,
): { rowsToInsert: DataStoreRows; rowsToUpdate: DataStoreRows } {
// Extracts only the fields relevant to matching and serializes them for comparison
const getMatchKey = (row: Record<string, unknown>): string =>
JSON.stringify(Object.fromEntries(matchFields.map((field) => [field, row[field]])));
const existingSet = new Set(existing.map((row) => getMatchKey(row)));
const rowsToUpdate: DataStoreRows = [];
const rowsToInsert: DataStoreRows = [];
for (const row of rows) {
const key = getMatchKey(row);
if (existingSet.has(key)) {
rowsToUpdate.push(row);
} else {
rowsToInsert.push(row);
}
}
return { rowsToInsert, rowsToUpdate };
}
export function quoteIdentifier(name: string, dbType: DataSourceOptions['type']): string {
switch (dbType) {
case 'mysql':

View File

@@ -37,7 +37,7 @@ export async function execute(
throw new NodeOperationError(this.getNode(), 'At least one condition is required');
}
const updatedRows = await dataStoreProxy.updateRows({
const updatedRows = await dataStoreProxy.updateRow({
data: row,
filter,
});

View File

@@ -7,7 +7,7 @@ import {
} from 'n8n-workflow';
import { makeAddRow, getAddRow } from '../../common/addRow';
import { executeSelectMany, getSelectFields } from '../../common/selectMany';
import { getSelectFields, getSelectFilter } from '../../common/selectMany';
import { getDataTableProxyExecute } from '../../common/utils';
export const FIELD: string = 'upsert';
@@ -32,35 +32,16 @@ export async function execute(
const row = getAddRow(this, index);
const matches = await executeSelectMany(this, index, dataStoreProxy, true);
const filter = getSelectFilter(this, index);
// insert
if (matches.length === 0) {
const result = await dataStoreProxy.insertRows([row]);
return result.map((json) => ({ json }));
if (filter.filters.length === 0) {
throw new NodeOperationError(this.getNode(), 'At least one condition is required');
}
// update
const result = [];
for (const match of matches) {
const updatedRows = await dataStoreProxy.updateRows({
data: row,
filter: {
type: 'and',
filters: [{ columnName: 'id', condition: 'eq', value: match.json.id }],
},
});
if (updatedRows.length !== 1) {
throw new NodeOperationError(this.getNode(), 'invariant broken');
}
// The input object gets updated in the api call, somehow
// And providing this column to the backend causes an unexpected column error
// So let's just re-delete the field until we have a more aligned API
delete row['updatedAt'];
result.push(updatedRows[0]);
}
const result = await dataStoreProxy.upsertRow({
data: row,
filter,
});
return result.map((json) => ({ json }));
}

View File

@@ -57,14 +57,14 @@ export type ListDataStoreRowsOptions = {
skip?: number;
};
export type UpdateDataStoreRowsOptions = {
export type UpdateDataStoreRowOptions = {
filter: DataTableFilter;
data: DataStoreRow;
};
export type UpsertDataStoreRowsOptions = {
rows: DataStoreRows;
matchFields: string[];
export type UpsertDataStoreRowOptions = {
filter: DataTableFilter;
data: DataStoreRow;
};
export type MoveDataStoreColumnOptions = {
@@ -118,9 +118,9 @@ export interface IDataStoreProjectService {
insertRows(rows: DataStoreRows): Promise<DataStoreRowReturn[]>;
updateRows(options: UpdateDataStoreRowsOptions): Promise<DataStoreRowReturn[]>;
updateRow(options: UpdateDataStoreRowOptions): Promise<DataStoreRowReturn[]>;
upsertRows(options: UpsertDataStoreRowsOptions): Promise<DataStoreRowReturn[]>;
upsertRow(options: UpsertDataStoreRowOptions): Promise<DataStoreRowReturn[]>;
deleteRows(ids: number[]): Promise<boolean>;
}