From 9509ef3e79faeda03b94c2f1cce79459f612a7ce Mon Sep 17 00:00:00 2001 From: Charlie Kolb Date: Fri, 12 Sep 2025 12:35:54 +0200 Subject: [PATCH] fix(core): Wrap Data Table insert, update, upsert in transactions (#19333) --- .../data-table/data-store-rows.repository.ts | 58 ++++--- .../modules/data-table/data-store.service.ts | 144 +++++++++++------- 2 files changed, 120 insertions(+), 82 deletions(-) diff --git a/packages/cli/src/modules/data-table/data-store-rows.repository.ts b/packages/cli/src/modules/data-table/data-store-rows.repository.ts index 10d51fae5b..2376893852 100644 --- a/packages/cli/src/modules/data-table/data-store-rows.repository.ts +++ b/packages/cli/src/modules/data-table/data-store-rows.repository.ts @@ -9,6 +9,7 @@ import { UpdateQueryBuilder, In, ObjectLiteral, + EntityManager, DeleteQueryBuilder, } from '@n8n/typeorm'; import { @@ -155,6 +156,7 @@ export class DataStoreRowsRepository { table: DataStoreUserTableName, rows: DataStoreRows, columns: DataTableColumn[], + em: EntityManager, ) { // DB systems have different maximum parameters per query // with old sqlite versions having the lowest in 999 parameters @@ -187,11 +189,7 @@ export class DataStoreRowsRepository { completeRows[j - start] = insertArray; } - const query = this.dataSource - .createQueryBuilder() - .insert() - .into(table, columnNames) - .values(completeRows); + const query = em.createQueryBuilder().insert().into(table, columnNames).values(completeRows); await query.execute(); insertedRows += completeRows.length; } @@ -203,13 +201,16 @@ export class DataStoreRowsRepository { rows: DataStoreRows, columns: DataTableColumn[], returnType: T, + em?: EntityManager, ): Promise>; async insertRows( dataStoreId: string, rows: DataStoreRows, columns: DataTableColumn[], returnType: T, + em?: EntityManager, ): Promise { + em = em ?? this.dataSource.manager; const inserted: Array> = []; const dbType = this.dataSource.options.type; const useReturning = dbType === 'postgres' || dbType === 'mariadb'; @@ -222,7 +223,7 @@ export class DataStoreRowsRepository { const selectColumns = [...escapedSystemColumns, ...escapedColumns]; if (returnType === 'count') { - return await this.insertRowsBulk(table, rows, columns); + return await this.insertRowsBulk(table, rows, columns, em); } // We insert one by one as the default behavior of returning the last inserted ID @@ -239,7 +240,7 @@ export class DataStoreRowsRepository { completeRow[column.name] = normalizeValue(completeRow[column.name], column.type, dbType); } - const query = this.dataSource.createQueryBuilder().insert().into(table).values(completeRow); + const query = em.createQueryBuilder().insert().into(table).values(completeRow); if (useReturning) { query.returning(returnType === 'all' ? selectColumns.join(',') : 'id'); @@ -267,7 +268,7 @@ export class DataStoreRowsRepository { continue; } - const insertedRows = await this.getManyByIds(dataStoreId, ids, columns); + const insertedRows = await this.getManyByIds(dataStoreId, ids, columns, em); inserted.push(...insertedRows); } @@ -275,13 +276,23 @@ export class DataStoreRowsRepository { return inserted; } + async updateRow( + dataStoreId: string, + data: Record, + filter: DataTableFilter, + columns: DataTableColumn[], + returnData?: T, + em?: EntityManager, + ): Promise; async updateRow( dataStoreId: string, data: Record, filter: DataTableFilter, columns: DataTableColumn[], returnData: boolean = false, + em?: EntityManager, ) { + em = em ?? this.dataSource.manager; const dbType = this.dataSource.options.type; const useReturning = dbType === 'postgres'; @@ -303,17 +314,14 @@ export class DataStoreRowsRepository { if (!useReturning && returnData) { // Only Postgres supports RETURNING statement on updates (with our typeorm), // on other engines we must query the list of updates rows later by ID - const selectQuery = this.dataSource - .createQueryBuilder() - .select('id') - .from(table, 'dataTable'); + const selectQuery = em.createQueryBuilder().select('id').from(table, 'dataTable'); this.applyFilters(selectQuery, filter, 'dataTable', columns); affectedRows = await selectQuery.getRawMany<{ id: number }>(); } setData.updatedAt = normalizeValue(new Date(), 'date', dbType); - const query = this.dataSource.createQueryBuilder().update(table); + const query = em.createQueryBuilder().update(table); // Some DBs (like SQLite) don't allow using table aliases as column prefixes in UPDATE statements this.applyFilters(query, filter, undefined, columns); query.set(setData); @@ -333,7 +341,7 @@ export class DataStoreRowsRepository { } const ids = affectedRows.map((row) => row.id); - return await this.getManyByIds(dataStoreId, ids, columns); + return await this.getManyByIds(dataStoreId, ids, columns, em); } async deleteRows( @@ -436,8 +444,10 @@ export class DataStoreRowsRepository { dataStoreId: string, dto: ListDataStoreContentQueryDto, columns?: DataTableColumn[], + em?: EntityManager, ) { - const [countQuery, query] = this.getManyQuery(dataStoreId, dto, columns); + em = em ?? this.dataSource.manager; + const [countQuery, query] = this.getManyQuery(dataStoreId, dto, em, columns); const data: DataStoreRowsReturn = await query.select('*').getRawMany(); const countResult = await countQuery.select('COUNT(*) as count').getRawOne<{ count: number | string | null; @@ -447,7 +457,12 @@ export class DataStoreRowsRepository { return { count: count ?? -1, data }; } - async getManyByIds(dataStoreId: string, ids: number[], columns: DataTableColumn[]) { + async getManyByIds( + dataStoreId: string, + ids: number[], + columns: DataTableColumn[], + em: EntityManager, + ) { const table = toTableName(dataStoreId); const escapedColumns = columns.map((c) => this.dataSource.driver.escape(c.name)); const escapedSystemColumns = DATA_TABLE_SYSTEM_COLUMNS.map((x) => @@ -459,7 +474,7 @@ export class DataStoreRowsRepository { return []; } - const updatedRows = await this.dataSource + const updatedRows = await em .createQueryBuilder() .select(selectColumns) .from(table, 'dataTable') @@ -469,18 +484,13 @@ export class DataStoreRowsRepository { return normalizeRows(updatedRows, columns); } - async getRowIds(dataStoreId: string, dto: ListDataStoreContentQueryDto) { - const [_, query] = this.getManyQuery(dataStoreId, dto); - const result = await query.select('dataStore.id').getRawMany(); - return result; - } - private getManyQuery( dataStoreId: string, dto: ListDataStoreContentQueryDto, + em: EntityManager, columns?: DataTableColumn[], ): [QueryBuilder, QueryBuilder] { - const query = this.dataSource.createQueryBuilder(); + const query = em.createQueryBuilder(); const tableReference = 'dataTable'; query.from(toTableName(dataStoreId), tableReference); diff --git a/packages/cli/src/modules/data-table/data-store.service.ts b/packages/cli/src/modules/data-table/data-store.service.ts index 61f207cfdc..1c075039ef 100644 --- a/packages/cli/src/modules/data-table/data-store.service.ts +++ b/packages/cli/src/modules/data-table/data-store.service.ts @@ -122,15 +122,22 @@ export class DataStoreService { ) { await this.validateDataStoreExists(dataStoreId, projectId); - const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId); - if (dto.filter) { - this.validateAndTransformFilters(dto.filter, columns); - } - const result = await this.dataStoreRowsRepository.getManyAndCount(dataStoreId, dto, columns); - return { - count: result.count, - data: normalizeRows(result.data, columns), - }; + return await this.dataStoreColumnRepository.manager.transaction(async (em) => { + const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId, em); + if (dto.filter) { + this.validateAndTransformFilters(dto.filter, columns); + } + const result = await this.dataStoreRowsRepository.getManyAndCount( + dataStoreId, + dto, + columns, + em, + ); + return { + count: result.count, + data: normalizeRows(result.data, columns), + }; + }); } async getColumns(dataStoreId: string, projectId: string) { @@ -153,39 +160,82 @@ export class DataStoreService { ) { await this.validateDataTableSize(); await this.validateDataStoreExists(dataStoreId, projectId); - await this.validateRows(dataStoreId, rows); + return await this.dataStoreColumnRepository.manager.transaction(async (em) => { + const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId, em); + this.validateRowsWithColumns(rows, columns); - const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId); - return await this.dataStoreRowsRepository.insertRows(dataStoreId, rows, columns, returnType); + return await this.dataStoreRowsRepository.insertRows( + dataStoreId, + rows, + columns, + returnType, + em, + ); + }); } async upsertRow( - dataStoreId: string, + dataTableId: string, projectId: string, dto: Omit, returnData?: T, ): Promise; async upsertRow( - dataStoreId: string, + dataTableId: string, projectId: string, dto: Omit, returnData: boolean = false, ) { await this.validateDataTableSize(); - const updated = await this.updateRow(dataStoreId, projectId, dto, true); + await this.validateDataStoreExists(dataTableId, projectId); - if (updated.length > 0) { - return returnData ? updated : true; + return await this.dataStoreColumnRepository.manager.transaction(async (em) => { + const columns = await this.dataStoreColumnRepository.getColumns(dataTableId, em); + this.validateUpdateParams(dto, columns); + const updated = await this.dataStoreRowsRepository.updateRow( + dataTableId, + dto.data, + dto.filter, + columns, + true, + em, + ); + + if (updated.length > 0) { + return returnData ? updated : true; + } + + // No rows were updated, so insert a new one + const inserted = await this.dataStoreRowsRepository.insertRows( + dataTableId, + [dto.data], + columns, + returnData ? 'all' : 'id', + em, + ); + return returnData ? inserted : true; + }); + } + + validateUpdateParams( + { filter, data }: Pick, + columns: DataTableColumn[], + ) { + if (columns.length === 0) { + throw new DataStoreValidationError( + 'No columns found for this data table or data table not found', + ); } - // No rows were updated, so insert a new one - const inserted = await this.insertRows( - dataStoreId, - projectId, - [dto.data], - returnData ? 'all' : 'count', - ); - return returnData ? inserted : true; + if (!filter?.filters || filter.filters.length === 0) { + throw new DataStoreValidationError('Filter must not be empty'); + } + if (!data || Object.keys(data).length === 0) { + throw new DataStoreValidationError('Data columns must not be empty'); + } + + this.validateRowsWithColumns([data], columns, false); + this.validateAndTransformFilters(filter, columns); } async updateRow( @@ -203,31 +253,18 @@ export class DataStoreService { await this.validateDataTableSize(); await this.validateDataStoreExists(dataTableId, projectId); - const columns = await this.dataStoreColumnRepository.getColumns(dataTableId); - if (columns.length === 0) { - throw new DataStoreValidationError( - 'No columns found for this data table or data table not found', + return await this.dataStoreColumnRepository.manager.transaction(async (em) => { + const columns = await this.dataStoreColumnRepository.getColumns(dataTableId, em); + this.validateUpdateParams(dto, columns); + return await this.dataStoreRowsRepository.updateRow( + dataTableId, + dto.data, + dto.filter, + columns, + returnData, + em, ); - } - - const { data, filter } = dto; - if (!filter?.filters || filter.filters.length === 0) { - throw new DataStoreValidationError('Filter must not be empty'); - } - if (!data || Object.keys(data).length === 0) { - throw new DataStoreValidationError('Data columns must not be empty'); - } - - this.validateRowsWithColumns([data], columns, false); - this.validateAndTransformFilters(filter, columns); - - return await this.dataStoreRowsRepository.updateRow( - dataTableId, - data, - filter, - columns, - returnData, - ); + }); } async deleteRows( @@ -291,15 +328,6 @@ export class DataStoreService { } } - private async validateRows( - dataStoreId: string, - rows: DataStoreRows, - includeSystemColumns = false, - ): Promise { - const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId); - this.validateRowsWithColumns(rows, columns, includeSystemColumns); - } - private validateCell(row: DataStoreRow, key: string, columnTypeMap: Map) { const cell = row[key]; if (cell === null) return;