mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 09:36:44 +00:00
fix(core): Wrap Data Table insert, update, upsert in transactions (#19333)
This commit is contained in:
@@ -9,6 +9,7 @@ import {
|
||||
UpdateQueryBuilder,
|
||||
In,
|
||||
ObjectLiteral,
|
||||
EntityManager,
|
||||
DeleteQueryBuilder,
|
||||
} from '@n8n/typeorm';
|
||||
import {
|
||||
@@ -155,6 +156,7 @@ export class DataStoreRowsRepository {
|
||||
table: DataStoreUserTableName,
|
||||
rows: DataStoreRows,
|
||||
columns: DataTableColumn[],
|
||||
em: EntityManager,
|
||||
) {
|
||||
// DB systems have different maximum parameters per query
|
||||
// with old sqlite versions having the lowest in 999 parameters
|
||||
@@ -187,11 +189,7 @@ export class DataStoreRowsRepository {
|
||||
completeRows[j - start] = insertArray;
|
||||
}
|
||||
|
||||
const query = this.dataSource
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
.into(table, columnNames)
|
||||
.values(completeRows);
|
||||
const query = em.createQueryBuilder().insert().into(table, columnNames).values(completeRows);
|
||||
await query.execute();
|
||||
insertedRows += completeRows.length;
|
||||
}
|
||||
@@ -203,13 +201,16 @@ export class DataStoreRowsRepository {
|
||||
rows: DataStoreRows,
|
||||
columns: DataTableColumn[],
|
||||
returnType: T,
|
||||
em?: EntityManager,
|
||||
): Promise<DataTableInsertRowsResult<T>>;
|
||||
async insertRows<T extends DataTableInsertRowsReturnType>(
|
||||
dataStoreId: string,
|
||||
rows: DataStoreRows,
|
||||
columns: DataTableColumn[],
|
||||
returnType: T,
|
||||
em?: EntityManager,
|
||||
): Promise<DataTableInsertRowsResult> {
|
||||
em = em ?? this.dataSource.manager;
|
||||
const inserted: Array<Pick<DataStoreRowReturn, 'id'>> = [];
|
||||
const dbType = this.dataSource.options.type;
|
||||
const useReturning = dbType === 'postgres' || dbType === 'mariadb';
|
||||
@@ -222,7 +223,7 @@ export class DataStoreRowsRepository {
|
||||
const selectColumns = [...escapedSystemColumns, ...escapedColumns];
|
||||
|
||||
if (returnType === 'count') {
|
||||
return await this.insertRowsBulk(table, rows, columns);
|
||||
return await this.insertRowsBulk(table, rows, columns, em);
|
||||
}
|
||||
|
||||
// We insert one by one as the default behavior of returning the last inserted ID
|
||||
@@ -239,7 +240,7 @@ export class DataStoreRowsRepository {
|
||||
completeRow[column.name] = normalizeValue(completeRow[column.name], column.type, dbType);
|
||||
}
|
||||
|
||||
const query = this.dataSource.createQueryBuilder().insert().into(table).values(completeRow);
|
||||
const query = em.createQueryBuilder().insert().into(table).values(completeRow);
|
||||
|
||||
if (useReturning) {
|
||||
query.returning(returnType === 'all' ? selectColumns.join(',') : 'id');
|
||||
@@ -267,7 +268,7 @@ export class DataStoreRowsRepository {
|
||||
continue;
|
||||
}
|
||||
|
||||
const insertedRows = await this.getManyByIds(dataStoreId, ids, columns);
|
||||
const insertedRows = await this.getManyByIds(dataStoreId, ids, columns, em);
|
||||
|
||||
inserted.push(...insertedRows);
|
||||
}
|
||||
@@ -275,13 +276,23 @@ export class DataStoreRowsRepository {
|
||||
return inserted;
|
||||
}
|
||||
|
||||
async updateRow<T extends boolean | undefined>(
|
||||
dataStoreId: string,
|
||||
data: Record<string, DataStoreColumnJsType | null>,
|
||||
filter: DataTableFilter,
|
||||
columns: DataTableColumn[],
|
||||
returnData?: T,
|
||||
em?: EntityManager,
|
||||
): Promise<T extends true ? DataStoreRowReturn[] : true>;
|
||||
async updateRow(
|
||||
dataStoreId: string,
|
||||
data: Record<string, DataStoreColumnJsType | null>,
|
||||
filter: DataTableFilter,
|
||||
columns: DataTableColumn[],
|
||||
returnData: boolean = false,
|
||||
em?: EntityManager,
|
||||
) {
|
||||
em = em ?? this.dataSource.manager;
|
||||
const dbType = this.dataSource.options.type;
|
||||
const useReturning = dbType === 'postgres';
|
||||
|
||||
@@ -303,17 +314,14 @@ export class DataStoreRowsRepository {
|
||||
if (!useReturning && returnData) {
|
||||
// Only Postgres supports RETURNING statement on updates (with our typeorm),
|
||||
// on other engines we must query the list of updates rows later by ID
|
||||
const selectQuery = this.dataSource
|
||||
.createQueryBuilder()
|
||||
.select('id')
|
||||
.from(table, 'dataTable');
|
||||
const selectQuery = em.createQueryBuilder().select('id').from(table, 'dataTable');
|
||||
this.applyFilters(selectQuery, filter, 'dataTable', columns);
|
||||
affectedRows = await selectQuery.getRawMany<{ id: number }>();
|
||||
}
|
||||
|
||||
setData.updatedAt = normalizeValue(new Date(), 'date', dbType);
|
||||
|
||||
const query = this.dataSource.createQueryBuilder().update(table);
|
||||
const query = em.createQueryBuilder().update(table);
|
||||
// Some DBs (like SQLite) don't allow using table aliases as column prefixes in UPDATE statements
|
||||
this.applyFilters(query, filter, undefined, columns);
|
||||
query.set(setData);
|
||||
@@ -333,7 +341,7 @@ export class DataStoreRowsRepository {
|
||||
}
|
||||
|
||||
const ids = affectedRows.map((row) => row.id);
|
||||
return await this.getManyByIds(dataStoreId, ids, columns);
|
||||
return await this.getManyByIds(dataStoreId, ids, columns, em);
|
||||
}
|
||||
|
||||
async deleteRows(
|
||||
@@ -436,8 +444,10 @@ export class DataStoreRowsRepository {
|
||||
dataStoreId: string,
|
||||
dto: ListDataStoreContentQueryDto,
|
||||
columns?: DataTableColumn[],
|
||||
em?: EntityManager,
|
||||
) {
|
||||
const [countQuery, query] = this.getManyQuery(dataStoreId, dto, columns);
|
||||
em = em ?? this.dataSource.manager;
|
||||
const [countQuery, query] = this.getManyQuery(dataStoreId, dto, em, columns);
|
||||
const data: DataStoreRowsReturn = await query.select('*').getRawMany();
|
||||
const countResult = await countQuery.select('COUNT(*) as count').getRawOne<{
|
||||
count: number | string | null;
|
||||
@@ -447,7 +457,12 @@ export class DataStoreRowsRepository {
|
||||
return { count: count ?? -1, data };
|
||||
}
|
||||
|
||||
async getManyByIds(dataStoreId: string, ids: number[], columns: DataTableColumn[]) {
|
||||
async getManyByIds(
|
||||
dataStoreId: string,
|
||||
ids: number[],
|
||||
columns: DataTableColumn[],
|
||||
em: EntityManager,
|
||||
) {
|
||||
const table = toTableName(dataStoreId);
|
||||
const escapedColumns = columns.map((c) => this.dataSource.driver.escape(c.name));
|
||||
const escapedSystemColumns = DATA_TABLE_SYSTEM_COLUMNS.map((x) =>
|
||||
@@ -459,7 +474,7 @@ export class DataStoreRowsRepository {
|
||||
return [];
|
||||
}
|
||||
|
||||
const updatedRows = await this.dataSource
|
||||
const updatedRows = await em
|
||||
.createQueryBuilder()
|
||||
.select(selectColumns)
|
||||
.from(table, 'dataTable')
|
||||
@@ -469,18 +484,13 @@ export class DataStoreRowsRepository {
|
||||
return normalizeRows(updatedRows, columns);
|
||||
}
|
||||
|
||||
async getRowIds(dataStoreId: string, dto: ListDataStoreContentQueryDto) {
|
||||
const [_, query] = this.getManyQuery(dataStoreId, dto);
|
||||
const result = await query.select('dataStore.id').getRawMany<number>();
|
||||
return result;
|
||||
}
|
||||
|
||||
private getManyQuery(
|
||||
dataStoreId: string,
|
||||
dto: ListDataStoreContentQueryDto,
|
||||
em: EntityManager,
|
||||
columns?: DataTableColumn[],
|
||||
): [QueryBuilder, QueryBuilder] {
|
||||
const query = this.dataSource.createQueryBuilder();
|
||||
const query = em.createQueryBuilder();
|
||||
|
||||
const tableReference = 'dataTable';
|
||||
query.from(toTableName(dataStoreId), tableReference);
|
||||
|
||||
@@ -122,15 +122,22 @@ export class DataStoreService {
|
||||
) {
|
||||
await this.validateDataStoreExists(dataStoreId, projectId);
|
||||
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId);
|
||||
if (dto.filter) {
|
||||
this.validateAndTransformFilters(dto.filter, columns);
|
||||
}
|
||||
const result = await this.dataStoreRowsRepository.getManyAndCount(dataStoreId, dto, columns);
|
||||
return {
|
||||
count: result.count,
|
||||
data: normalizeRows(result.data, columns),
|
||||
};
|
||||
return await this.dataStoreColumnRepository.manager.transaction(async (em) => {
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId, em);
|
||||
if (dto.filter) {
|
||||
this.validateAndTransformFilters(dto.filter, columns);
|
||||
}
|
||||
const result = await this.dataStoreRowsRepository.getManyAndCount(
|
||||
dataStoreId,
|
||||
dto,
|
||||
columns,
|
||||
em,
|
||||
);
|
||||
return {
|
||||
count: result.count,
|
||||
data: normalizeRows(result.data, columns),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async getColumns(dataStoreId: string, projectId: string) {
|
||||
@@ -153,39 +160,82 @@ export class DataStoreService {
|
||||
) {
|
||||
await this.validateDataTableSize();
|
||||
await this.validateDataStoreExists(dataStoreId, projectId);
|
||||
await this.validateRows(dataStoreId, rows);
|
||||
return await this.dataStoreColumnRepository.manager.transaction(async (em) => {
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId, em);
|
||||
this.validateRowsWithColumns(rows, columns);
|
||||
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId);
|
||||
return await this.dataStoreRowsRepository.insertRows(dataStoreId, rows, columns, returnType);
|
||||
return await this.dataStoreRowsRepository.insertRows(
|
||||
dataStoreId,
|
||||
rows,
|
||||
columns,
|
||||
returnType,
|
||||
em,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async upsertRow<T extends boolean | undefined>(
|
||||
dataStoreId: string,
|
||||
dataTableId: string,
|
||||
projectId: string,
|
||||
dto: Omit<UpsertDataStoreRowDto, 'returnData'>,
|
||||
returnData?: T,
|
||||
): Promise<T extends true ? DataStoreRowReturn[] : true>;
|
||||
async upsertRow(
|
||||
dataStoreId: string,
|
||||
dataTableId: string,
|
||||
projectId: string,
|
||||
dto: Omit<UpsertDataStoreRowDto, 'returnData'>,
|
||||
returnData: boolean = false,
|
||||
) {
|
||||
await this.validateDataTableSize();
|
||||
const updated = await this.updateRow(dataStoreId, projectId, dto, true);
|
||||
await this.validateDataStoreExists(dataTableId, projectId);
|
||||
|
||||
if (updated.length > 0) {
|
||||
return returnData ? updated : true;
|
||||
return await this.dataStoreColumnRepository.manager.transaction(async (em) => {
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataTableId, em);
|
||||
this.validateUpdateParams(dto, columns);
|
||||
const updated = await this.dataStoreRowsRepository.updateRow(
|
||||
dataTableId,
|
||||
dto.data,
|
||||
dto.filter,
|
||||
columns,
|
||||
true,
|
||||
em,
|
||||
);
|
||||
|
||||
if (updated.length > 0) {
|
||||
return returnData ? updated : true;
|
||||
}
|
||||
|
||||
// No rows were updated, so insert a new one
|
||||
const inserted = await this.dataStoreRowsRepository.insertRows(
|
||||
dataTableId,
|
||||
[dto.data],
|
||||
columns,
|
||||
returnData ? 'all' : 'id',
|
||||
em,
|
||||
);
|
||||
return returnData ? inserted : true;
|
||||
});
|
||||
}
|
||||
|
||||
validateUpdateParams(
|
||||
{ filter, data }: Pick<UpdateDataTableRowDto, 'filter' | 'data'>,
|
||||
columns: DataTableColumn[],
|
||||
) {
|
||||
if (columns.length === 0) {
|
||||
throw new DataStoreValidationError(
|
||||
'No columns found for this data table or data table not found',
|
||||
);
|
||||
}
|
||||
|
||||
// No rows were updated, so insert a new one
|
||||
const inserted = await this.insertRows(
|
||||
dataStoreId,
|
||||
projectId,
|
||||
[dto.data],
|
||||
returnData ? 'all' : 'count',
|
||||
);
|
||||
return returnData ? inserted : true;
|
||||
if (!filter?.filters || filter.filters.length === 0) {
|
||||
throw new DataStoreValidationError('Filter must not be empty');
|
||||
}
|
||||
if (!data || Object.keys(data).length === 0) {
|
||||
throw new DataStoreValidationError('Data columns must not be empty');
|
||||
}
|
||||
|
||||
this.validateRowsWithColumns([data], columns, false);
|
||||
this.validateAndTransformFilters(filter, columns);
|
||||
}
|
||||
|
||||
async updateRow<T extends boolean | undefined>(
|
||||
@@ -203,31 +253,18 @@ export class DataStoreService {
|
||||
await this.validateDataTableSize();
|
||||
await this.validateDataStoreExists(dataTableId, projectId);
|
||||
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataTableId);
|
||||
if (columns.length === 0) {
|
||||
throw new DataStoreValidationError(
|
||||
'No columns found for this data table or data table not found',
|
||||
return await this.dataStoreColumnRepository.manager.transaction(async (em) => {
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataTableId, em);
|
||||
this.validateUpdateParams(dto, columns);
|
||||
return await this.dataStoreRowsRepository.updateRow(
|
||||
dataTableId,
|
||||
dto.data,
|
||||
dto.filter,
|
||||
columns,
|
||||
returnData,
|
||||
em,
|
||||
);
|
||||
}
|
||||
|
||||
const { data, filter } = dto;
|
||||
if (!filter?.filters || filter.filters.length === 0) {
|
||||
throw new DataStoreValidationError('Filter must not be empty');
|
||||
}
|
||||
if (!data || Object.keys(data).length === 0) {
|
||||
throw new DataStoreValidationError('Data columns must not be empty');
|
||||
}
|
||||
|
||||
this.validateRowsWithColumns([data], columns, false);
|
||||
this.validateAndTransformFilters(filter, columns);
|
||||
|
||||
return await this.dataStoreRowsRepository.updateRow(
|
||||
dataTableId,
|
||||
data,
|
||||
filter,
|
||||
columns,
|
||||
returnData,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async deleteRows<T extends boolean | undefined>(
|
||||
@@ -291,15 +328,6 @@ export class DataStoreService {
|
||||
}
|
||||
}
|
||||
|
||||
private async validateRows(
|
||||
dataStoreId: string,
|
||||
rows: DataStoreRows,
|
||||
includeSystemColumns = false,
|
||||
): Promise<void> {
|
||||
const columns = await this.dataStoreColumnRepository.getColumns(dataStoreId);
|
||||
this.validateRowsWithColumns(rows, columns, includeSystemColumns);
|
||||
}
|
||||
|
||||
private validateCell(row: DataStoreRow, key: string, columnTypeMap: Map<string, string>) {
|
||||
const cell = row[key];
|
||||
if (cell === null) return;
|
||||
|
||||
Reference in New Issue
Block a user