feat(Data Table Node): Add Update, Upsert operations (no-changelog) (#18820)

This commit is contained in:
Charlie Kolb
2025-08-28 13:20:29 +02:00
committed by GitHub
parent 0347c32cd5
commit 1b743ae251
12 changed files with 241 additions and 59 deletions

View File

@@ -1066,7 +1066,9 @@ describe('dataStore', () => {
]); ]);
// ASSERT // ASSERT
await expect(result).rejects.toThrow(new DataStoreValidationError('unknown column name')); await expect(result).rejects.toThrow(
new DataStoreValidationError("unknown column name 'cWrong'"),
);
}); });
it('rejects a invalid date string to date column', async () => { it('rejects a invalid date string to date column', async () => {

View File

@@ -15,14 +15,15 @@ import {
ListDataStoreRowsOptions, ListDataStoreRowsOptions,
MoveDataStoreColumnOptions, MoveDataStoreColumnOptions,
UpdateDataStoreOptions, UpdateDataStoreOptions,
UpdateDataStoreRowsOptions,
UpsertDataStoreRowsOptions, UpsertDataStoreRowsOptions,
Workflow, Workflow,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { OwnershipService } from '@/services/ownership.service';
import { DataStoreService } from './data-store.service'; import { DataStoreService } from './data-store.service';
import { OwnershipService } from '@/services/ownership.service';
@Service() @Service()
export class DataStoreProxyService implements DataStoreProxyProvider { export class DataStoreProxyService implements DataStoreProxyProvider {
constructor( constructor(
@@ -134,6 +135,10 @@ export class DataStoreProxyService implements DataStoreProxyProvider {
return await dataStoreService.insertRows(dataStoreId, projectId, rows, true); return await dataStoreService.insertRows(dataStoreId, projectId, rows, true);
}, },
async updateRows(options: UpdateDataStoreRowsOptions) {
return await dataStoreService.updateRow(dataStoreId, projectId, options, true);
},
async upsertRows(options: UpsertDataStoreRowsOptions) { async upsertRows(options: UpsertDataStoreRowsOptions) {
return await dataStoreService.upsertRows(dataStoreId, projectId, options, true); return await dataStoreService.upsertRows(dataStoreId, projectId, options, true);
}, },

View File

@@ -176,6 +176,12 @@ export class DataStoreService {
); );
} }
async updateRow<T extends boolean | undefined>(
dataStoreId: string,
projectId: string,
dto: Omit<UpdateDataStoreRowDto, 'returnData'>,
returnData?: T,
): Promise<T extends true ? DataStoreRowReturn[] : true>;
async updateRow( async updateRow(
dataStoreId: string, dataStoreId: string,
projectId: string, projectId: string,
@@ -225,7 +231,12 @@ export class DataStoreService {
): void { ): void {
// Include system columns like 'id' if requested // Include system columns like 'id' if requested
const allColumns = includeSystemColumns const allColumns = includeSystemColumns
? [{ name: 'id', type: 'number' }, ...columns] ? [
{ name: 'id', type: 'number' },
{ name: 'createdAt', type: 'date' },
{ name: 'updatedAt', type: 'date' },
...columns,
]
: columns; : columns;
const columnNames = new Set(allColumns.map((x) => x.name)); const columnNames = new Set(allColumns.map((x) => x.name));
const columnTypeMap = new Map(allColumns.map((x) => [x.name, x.type])); const columnTypeMap = new Map(allColumns.map((x) => [x.name, x.type]));
@@ -236,7 +247,7 @@ export class DataStoreService {
} }
for (const key of keys) { for (const key of keys) {
if (!columnNames.has(key)) { if (!columnNames.has(key)) {
throw new DataStoreValidationError('unknown column name'); throw new DataStoreValidationError(`unknown column name '${key}'`);
} }
this.validateCell(row, key, columnTypeMap); this.validateCell(row, key, columnTypeMap);
} }

View File

@@ -3,7 +3,9 @@ import { NodeOperationError } from 'n8n-workflow';
import * as row from './row/Row.resource'; import * as row from './row/Row.resource';
type DataTableNodeType = AllEntities<{ row: 'insert' | 'get' | 'deleteRows' }>; type DataTableNodeType = AllEntities<{
row: 'insert' | 'get' | 'deleteRows' | 'update' | 'upsert';
}>;
export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const operationResult: INodeExecutionData[] = []; const operationResult: INodeExecutionData[] = [];

View File

@@ -3,9 +3,11 @@ import type { INodeProperties } from 'n8n-workflow';
import * as deleteRows from './delete.operation'; import * as deleteRows from './delete.operation';
import * as get from './get.operation'; import * as get from './get.operation';
import * as insert from './insert.operation'; import * as insert from './insert.operation';
import * as update from './update.operation';
import * as upsert from './upsert.operation';
import { DATA_TABLE_ID_FIELD } from '../../common/fields'; import { DATA_TABLE_ID_FIELD } from '../../common/fields';
export { insert, get, deleteRows }; export { insert, get, deleteRows, update, upsert };
export const description: INodeProperties[] = [ export const description: INodeProperties[] = [
{ {
@@ -49,6 +51,18 @@ export const description: INodeProperties[] = [
description: 'Insert a new row', description: 'Insert a new row',
action: 'Insert row', action: 'Insert row',
}, },
{
name: 'Update',
value: update.FIELD,
description: 'Update row(s) matching certain fields',
action: 'Update row(s)',
},
{
name: 'Upsert',
value: upsert.FIELD,
description: 'Update row(s), or insert if there is no match',
action: 'Upsert row(s)',
},
], ],
default: 'insert', default: 'insert',
}, },
@@ -79,4 +93,6 @@ export const description: INodeProperties[] = [
...deleteRows.description, ...deleteRows.description,
...insert.description, ...insert.description,
...get.description, ...get.description,
...update.description,
...upsert.description,
]; ];

View File

@@ -1,13 +1,12 @@
import type { import type {
IDisplayOptions, IDisplayOptions,
IDataObject,
IExecuteFunctions, IExecuteFunctions,
INodeExecutionData, INodeExecutionData,
INodeProperties, INodeProperties,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { COLUMNS } from '../../common/fields'; import { getAddRow, makeAddRow } from '../../common/addRow';
import { dataObjectToApiInput, getDataTableProxyExecute } from '../../common/utils'; import { getDataTableProxyExecute } from '../../common/utils';
export const FIELD: string = 'insert'; export const FIELD: string = 'insert';
@@ -18,34 +17,16 @@ const displayOptions: IDisplayOptions = {
}, },
}; };
export const description: INodeProperties[] = [ export const description: INodeProperties[] = [makeAddRow(FIELD, displayOptions)];
{
...COLUMNS,
displayOptions,
},
];
export async function execute( export async function execute(
this: IExecuteFunctions, this: IExecuteFunctions,
index: number, index: number,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const dataStoreProxy = await getDataTableProxyExecute(this, index); const dataStoreProxy = await getDataTableProxyExecute(this, index);
const dataMode = this.getNodeParameter('columns.mappingMode', index) as string;
let data: IDataObject; const row = getAddRow(this, index);
if (dataMode === 'autoMapInputData') { const insertedRows = await dataStoreProxy.insertRows([row]);
data = items[index].json; return insertedRows.map((json) => ({ json }));
} else {
const fields = this.getNodeParameter('columns.value', index, {}) as IDataObject;
data = fields;
}
const rows = dataObjectToApiInput(data, this.getNode(), index);
const insertedRowIds = await dataStoreProxy.insertRows([rows]);
return insertedRowIds.map((x) => ({ json: { id: x } }));
} }

View File

@@ -0,0 +1,56 @@
import {
NodeOperationError,
type IDisplayOptions,
type IExecuteFunctions,
type INodeExecutionData,
type INodeProperties,
} from 'n8n-workflow';
import { makeAddRow, getAddRow } from '../../common/addRow';
import { executeSelectMany, getSelectFields } from '../../common/selectMany';
import { getDataTableProxyExecute } from '../../common/utils';
export const FIELD: string = 'update';
const displayOptions: IDisplayOptions = {
show: {
resource: ['row'],
operation: [FIELD],
},
};
export const description: INodeProperties[] = [
...getSelectFields(displayOptions),
makeAddRow(FIELD, displayOptions),
];
export async function execute(
this: IExecuteFunctions,
index: number,
): Promise<INodeExecutionData[]> {
const dataStoreProxy = await getDataTableProxyExecute(this, index);
const row = getAddRow(this, index);
const matches = await executeSelectMany(this, index, dataStoreProxy, true);
const result = [];
for (const x of matches) {
const updatedRows = await dataStoreProxy.updateRows({
data: row,
filter: { id: x.json.id },
});
if (updatedRows.length !== 1) {
throw new NodeOperationError(this.getNode(), 'invariant broken');
}
// The input object gets updated in the api call, somehow
// And providing this column to the backend causes an unexpected column error
// So let's just re-delete the field until we have a more aligned API
delete row['updatedAt'];
result.push(updatedRows[0]);
}
return result.map((json) => ({ json }));
}

View File

@@ -0,0 +1,63 @@
import {
NodeOperationError,
type IDisplayOptions,
type IExecuteFunctions,
type INodeExecutionData,
type INodeProperties,
} from 'n8n-workflow';
import { makeAddRow, getAddRow } from '../../common/addRow';
import { executeSelectMany, getSelectFields } from '../../common/selectMany';
import { getDataTableProxyExecute } from '../../common/utils';
export const FIELD: string = 'upsert';
const displayOptions: IDisplayOptions = {
show: {
resource: ['row'],
operation: [FIELD],
},
};
export const description: INodeProperties[] = [
...getSelectFields(displayOptions),
makeAddRow(FIELD, displayOptions),
];
export async function execute(
this: IExecuteFunctions,
index: number,
): Promise<INodeExecutionData[]> {
const dataStoreProxy = await getDataTableProxyExecute(this, index);
const row = getAddRow(this, index);
const matches = await executeSelectMany(this, index, dataStoreProxy, true);
// insert
if (matches.length === 0) {
const result = await dataStoreProxy.insertRows([row]);
return result.map((json) => ({ json }));
}
// update
const result = [];
for (const match of matches) {
const updatedRows = await dataStoreProxy.updateRows({
data: row,
filter: { id: match.json.id },
});
if (updatedRows.length !== 1) {
throw new NodeOperationError(this.getNode(), 'invariant broken');
}
// The input object gets updated in the api call, somehow
// And providing this column to the backend causes an unexpected column error
// So let's just re-delete the field until we have a more aligned API
delete row['updatedAt'];
result.push(updatedRows[0]);
}
return result.map((json) => ({ json }));
}

View File

@@ -0,0 +1,55 @@
import type {
IDataObject,
IDisplayOptions,
IExecuteFunctions,
INodeProperties,
} from 'n8n-workflow';
import { DATA_TABLE_ID_FIELD } from './fields';
import { dataObjectToApiInput } from './utils';
export function makeAddRow(operation: string, displayOptions: IDisplayOptions) {
return {
displayName: 'Columns',
name: 'columns',
type: 'resourceMapper',
default: {
mappingMode: 'defineBelow',
value: null,
},
noDataExpression: true,
required: true,
typeOptions: {
loadOptionsDependsOn: [`${DATA_TABLE_ID_FIELD}.value`],
resourceMapper: {
valuesLabel: `Columns to ${operation}`,
resourceMapperMethod: 'getDataTables',
mode: 'add',
fieldWords: {
singular: 'column',
plural: 'columns',
},
addAllFields: true,
multiKeyMatch: true,
},
},
displayOptions,
} satisfies INodeProperties;
}
export function getAddRow(ctx: IExecuteFunctions, index: number) {
const items = ctx.getInputData();
const dataMode = ctx.getNodeParameter('columns.mappingMode', index) as string;
let data: IDataObject;
if (dataMode === 'autoMapInputData') {
data = items[index].json;
} else {
const fields = ctx.getNodeParameter('columns.value', index, {}) as IDataObject;
data = fields;
}
return dataObjectToApiInput(data, ctx.getNode(), index);
}

View File

@@ -2,36 +2,11 @@ import type { INodeProperties } from 'n8n-workflow';
export const DATA_TABLE_ID_FIELD = 'dataTableId'; export const DATA_TABLE_ID_FIELD = 'dataTableId';
export const COLUMNS = {
displayName: 'Columns',
name: 'columns',
type: 'resourceMapper',
default: {
mappingMode: 'defineBelow',
value: null,
},
noDataExpression: true,
required: true,
typeOptions: {
loadOptionsDependsOn: [`${DATA_TABLE_ID_FIELD}.value`],
resourceMapper: {
resourceMapperMethod: 'getDataTables',
mode: 'add',
fieldWords: {
singular: 'column',
plural: 'columns',
},
addAllFields: true,
multiKeyMatch: true,
},
},
} satisfies INodeProperties;
export const DRY_RUN = { export const DRY_RUN = {
displayName: 'Dry Run', displayName: 'Dry Run',
name: 'dryRun', name: 'dryRun',
type: 'boolean', type: 'boolean',
default: false, default: false,
description: description:
'Whether the delete operation should only be simulated, returning the rows that would have been deleted', 'Whether the operation should only be simulated, returning the rows that would have been affected',
} satisfies INodeProperties; } satisfies INodeProperties;

View File

@@ -11,7 +11,10 @@ import type { FilterType } from './constants';
import { DATA_TABLE_ID_FIELD } from './fields'; import { DATA_TABLE_ID_FIELD } from './fields';
import { buildGetManyFilter, isFieldArray, isMatchType } from './utils'; import { buildGetManyFilter, isFieldArray, isMatchType } from './utils';
export function getSelectFields(displayOptions: IDisplayOptions): INodeProperties[] { export function getSelectFields(
displayOptions: IDisplayOptions,
requireCondition = false,
): INodeProperties[] {
return [ return [
{ {
displayName: 'Must Match', displayName: 'Must Match',
@@ -36,6 +39,7 @@ export function getSelectFields(displayOptions: IDisplayOptions): INodePropertie
type: 'fixedCollection', type: 'fixedCollection',
typeOptions: { typeOptions: {
multipleValues: true, multipleValues: true,
minRequiredFields: requireCondition ? 1 : 0,
}, },
displayOptions, displayOptions,
default: {}, default: {},
@@ -105,9 +109,14 @@ export async function executeSelectMany(
ctx: IExecuteFunctions, ctx: IExecuteFunctions,
index: number, index: number,
dataStoreProxy: IDataStoreProjectService, dataStoreProxy: IDataStoreProjectService,
rejectEmpty = false,
): Promise<Array<{ json: DataStoreRowReturn }>> { ): Promise<Array<{ json: DataStoreRowReturn }>> {
const filter = getSelectFilter(ctx, index); const filter = getSelectFilter(ctx, index);
if (rejectEmpty && filter.filters.length === 0) {
throw new NodeOperationError(ctx.getNode(), 'At least one condition is required');
}
let take = 1000; let take = 1000;
const result: Array<{ json: DataStoreRowReturn }> = []; const result: Array<{ json: DataStoreRowReturn }> = [];
let totalCount = undefined; let totalCount = undefined;

View File

@@ -57,6 +57,11 @@ export type ListDataStoreRowsOptions = {
skip?: number; skip?: number;
}; };
export type UpdateDataStoreRowsOptions = {
filter: Record<string, DataStoreColumnJsType>;
data: DataStoreRow;
};
export type UpsertDataStoreRowsOptions = { export type UpsertDataStoreRowsOptions = {
rows: DataStoreRows; rows: DataStoreRows;
matchFields: string[]; matchFields: string[];
@@ -113,6 +118,8 @@ export interface IDataStoreProjectService {
insertRows(rows: DataStoreRows): Promise<DataStoreRowReturn[]>; insertRows(rows: DataStoreRows): Promise<DataStoreRowReturn[]>;
updateRows(options: UpdateDataStoreRowsOptions): Promise<DataStoreRowReturn[]>;
upsertRows(options: UpsertDataStoreRowsOptions): Promise<DataStoreRowReturn[]>; upsertRows(options: UpsertDataStoreRowsOptions): Promise<DataStoreRowReturn[]>;
deleteRows(ids: number[]): Promise<boolean>; deleteRows(ids: number[]): Promise<boolean>;