mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-21 03:42:16 +00:00
feat: Execution custom data saving and filtering (#5496)
* wip: workflow execution filtering * fix: import type failing to build * fix: remove console.logs * feat: execution metadata migrations * fix(editor): Move global executions filter to its own component * fix(editor): Using the same filter component in workflow level * fix(editor): a small housekeeping * checking workflowId in filter applied * fix(editor): update filter after resolving merge conflicts * fix(editor): unify empy filter status * feat(editor): add datetime picker to filter * feat(editor): add meta fields * fix: fix button override in datepicker panel * feat(editor): add filter metadata * feat(core): add 'startedBefore' execution filter prop * feat(core): add 'tags' execution query filter * Revert "feat(core): add 'tags' execution query filter" This reverts commit a7b968081c91290b0c94df18c6a73d29950222d9. * feat(editor): add translations and tooltip and counting selected filter props * fix(editor): fix label layouts * fix(editor): update custom data docs link * fix(editor): update custom data tooltip position * fix(editor): update tooltip text * refactor: Ignore metadata if not enabled by license * fix(editor): Add paywall states to advanced execution filter * refactor: Save custom data also for worker mode * fix: Remove duplicate migration name from list * fix(editor): Reducing filter complexity and add debounce to text inputs * fix(editor): Remove unused import, add comment * fix(editor): simplify event listener * fix: Prevent error when there are running executions * test(editor): Add advanced execution filter basic unit test * test(editor): Add advanced execution filter state change unit test * fix: Small lint issue * feat: Add indices to speed up queries * feat: add customData limits * refactor: put metadata save in transaction * chore: remove unneed comment * test: add tests for execution metadata * fix(editor): Fixes after merge conflict * fix(editor): Remove unused import * wordings and ui fixes * fix(editor): type fixes * feat: add code node autocompletions for customData * fix: Prevent transaction issues and ambiguous ID in sql clauses * fix(editor): Suppress requesting current executions if metadata is used in filter (#5739) * fix(editor): Suppress requesting current executions if metadata is used in filter * fix(editor): Fix arrows for select in popover * refactor: Improve performance by correcting database indices * fix: Lint issue * test: Fix broken test * fix: Broken test * test: add call data check for saveExecutionMetadata test --------- Co-authored-by: Valya Bullions <valya@n8n.io> Co-authored-by: Alex Grozav <alex@grozav.com> Co-authored-by: Omar Ajoue <krynble@gmail.com> Co-authored-by: Romain Minaud <romain.minaud@gmail.com>
This commit is contained in:
@@ -169,6 +169,8 @@ export async function init(
|
||||
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
|
||||
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
|
||||
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
|
||||
collections.ExecutionMetadata = linkRepository(entities.ExecutionMetadata);
|
||||
|
||||
collections.EventDestinations = linkRepository(entities.EventDestinations);
|
||||
|
||||
isInitialized = true;
|
||||
|
||||
@@ -48,6 +48,7 @@ import type { WebhookEntity } from '@db/entities/WebhookEntity';
|
||||
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
||||
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
|
||||
import type { EventDestinations } from '@db/entities/MessageEventBusDestinationEntity';
|
||||
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';
|
||||
|
||||
export interface IActivationError {
|
||||
time: number;
|
||||
@@ -88,6 +89,7 @@ export interface IDatabaseCollections {
|
||||
InstalledNodes: Repository<InstalledNodes>;
|
||||
WorkflowStatistics: Repository<WorkflowStatistics>;
|
||||
EventDestinations: Repository<EventDestinations>;
|
||||
ExecutionMetadata: Repository<ExecutionMetadata>;
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
|
||||
@@ -71,6 +71,7 @@ import { PermissionChecker } from './UserManagement/PermissionChecker';
|
||||
import { WorkflowsService } from './workflows/workflows.services';
|
||||
import { Container } from 'typedi';
|
||||
import { InternalHooks } from '@/InternalHooks';
|
||||
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';
|
||||
|
||||
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
|
||||
|
||||
@@ -264,6 +265,22 @@ async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
export async function saveExecutionMetadata(
|
||||
executionId: string,
|
||||
executionMetadata: Record<string, string>,
|
||||
): Promise<ExecutionMetadata[]> {
|
||||
const metadataRows = [];
|
||||
for (const [key, value] of Object.entries(executionMetadata)) {
|
||||
metadataRows.push({
|
||||
execution: { id: executionId },
|
||||
key,
|
||||
value,
|
||||
});
|
||||
}
|
||||
|
||||
return Db.collections.ExecutionMetadata.save(metadataRows);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns hook functions to push data to Editor-UI
|
||||
*
|
||||
@@ -657,6 +674,14 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
|
||||
executionData as IExecutionFlattedDb,
|
||||
);
|
||||
|
||||
try {
|
||||
if (fullRunData.data.resultData.metadata) {
|
||||
await saveExecutionMetadata(this.executionId, fullRunData.data.resultData.metadata);
|
||||
}
|
||||
} catch (e) {
|
||||
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
|
||||
}
|
||||
|
||||
if (fullRunData.finished === true && this.retryOf !== undefined) {
|
||||
// If the retry was successful save the reference it on the original execution
|
||||
// await Db.collections.Execution.save(executionData as IExecutionFlattedDb);
|
||||
@@ -789,6 +814,14 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
|
||||
status: executionData.status,
|
||||
});
|
||||
|
||||
try {
|
||||
if (fullRunData.data.resultData.metadata) {
|
||||
await saveExecutionMetadata(this.executionId, fullRunData.data.resultData.metadata);
|
||||
}
|
||||
} catch (e) {
|
||||
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
|
||||
}
|
||||
|
||||
if (fullRunData.finished === true && this.retryOf !== undefined) {
|
||||
// If the retry was successful save the reference it on the original execution
|
||||
await Db.collections.Execution.update(this.retryOf, {
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
|
||||
import { Column, Entity, Generated, Index, PrimaryColumn } from 'typeorm';
|
||||
import { Column, Entity, Generated, Index, OneToMany, PrimaryColumn } from 'typeorm';
|
||||
import { datetimeColumnType, jsonColumnType } from './AbstractEntity';
|
||||
import { IWorkflowDb } from '@/Interfaces';
|
||||
import type { IExecutionFlattedDb } from '@/Interfaces';
|
||||
import { idStringifier } from '../utils/transformers';
|
||||
import type { ExecutionMetadata } from './ExecutionMetadata';
|
||||
|
||||
@Entity()
|
||||
@Index(['workflowId', 'id'])
|
||||
@@ -49,4 +50,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
|
||||
|
||||
@Column({ type: datetimeColumnType, nullable: true })
|
||||
waitTill: Date;
|
||||
|
||||
@OneToMany('ExecutionMetadata', 'execution')
|
||||
metadata: ExecutionMetadata[];
|
||||
}
|
||||
|
||||
22
packages/cli/src/databases/entities/ExecutionMetadata.ts
Normal file
22
packages/cli/src/databases/entities/ExecutionMetadata.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn, RelationId } from 'typeorm';
|
||||
import { ExecutionEntity } from './ExecutionEntity';
|
||||
|
||||
@Entity()
|
||||
export class ExecutionMetadata {
|
||||
@PrimaryGeneratedColumn()
|
||||
id: number;
|
||||
|
||||
@ManyToOne('ExecutionEntity', 'metadata', {
|
||||
onDelete: 'CASCADE',
|
||||
})
|
||||
execution: ExecutionEntity;
|
||||
|
||||
@RelationId((executionMetadata: ExecutionMetadata) => executionMetadata.execution)
|
||||
executionId: number;
|
||||
|
||||
@Column('text')
|
||||
key: string;
|
||||
|
||||
@Column('text')
|
||||
value: string;
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import { User } from './User';
|
||||
import { WebhookEntity } from './WebhookEntity';
|
||||
import { WorkflowEntity } from './WorkflowEntity';
|
||||
import { WorkflowStatistics } from './WorkflowStatistics';
|
||||
import { ExecutionMetadata } from './ExecutionMetadata';
|
||||
|
||||
export const entities = {
|
||||
AuthIdentity,
|
||||
@@ -33,4 +34,5 @@ export const entities = {
|
||||
WebhookEntity,
|
||||
WorkflowEntity,
|
||||
WorkflowStatistics,
|
||||
ExecutionMetadata,
|
||||
};
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
|
||||
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
|
||||
|
||||
export class CreateExecutionMetadataTable1679416281779 implements MigrationInterface {
|
||||
name = 'CreateExecutionMetadataTable1679416281779';
|
||||
|
||||
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||
logMigrationStart(this.name);
|
||||
const tablePrefix = getTablePrefix();
|
||||
|
||||
await queryRunner.query(
|
||||
`CREATE TABLE ${tablePrefix}execution_metadata (
|
||||
id int(11) auto_increment NOT NULL PRIMARY KEY,
|
||||
executionId int(11) NOT NULL,
|
||||
\`key\` TEXT NOT NULL,
|
||||
value TEXT NOT NULL,
|
||||
CONSTRAINT \`${tablePrefix}execution_metadata_FK\` FOREIGN KEY (\`executionId\`) REFERENCES \`${tablePrefix}execution_entity\` (\`id\`) ON DELETE CASCADE,
|
||||
INDEX \`IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb\` (\`executionId\` ASC)
|
||||
)
|
||||
ENGINE=InnoDB`,
|
||||
);
|
||||
|
||||
// Remove indices that are no longer needed since the addition of the status column
|
||||
await queryRunner.query(
|
||||
`DROP INDEX \`IDX_${tablePrefix}06da892aaf92a48e7d3e400003\` ON \`${tablePrefix}execution_entity\``,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`DROP INDEX \`IDX_${tablePrefix}78d62b89dc1433192b86dce18a\` ON \`${tablePrefix}execution_entity\``,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`DROP INDEX \`IDX_${tablePrefix}1688846335d274033e15c846a4\` ON \`${tablePrefix}execution_entity\``,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`DROP INDEX \`IDX_${tablePrefix}cefb067df2402f6aed0638a6c1\` ON \`${tablePrefix}execution_entity\``,
|
||||
);
|
||||
|
||||
// Add index to the new status column
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX \`IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584\` ON \`${tablePrefix}execution_entity\` (\`status\`, \`workflowId\`)`,
|
||||
);
|
||||
|
||||
logMigrationEnd(this.name);
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = getTablePrefix();
|
||||
|
||||
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX \`IDX_${tablePrefix}06da892aaf92a48e7d3e400003\` ON \`${tablePrefix}execution_entity\` (\`workflowId\`, \`waitTill\`, \`id\`)`,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX \`IDX_${tablePrefix}78d62b89dc1433192b86dce18a\` ON \`${tablePrefix}execution_entity\` (\`workflowId\`, \`finished\`, \`id\`)`,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX \`IDX_${tablePrefix}1688846335d274033e15c846a4\` ON \`${tablePrefix}execution_entity\` (\`finished\`, \`id\`)`,
|
||||
);
|
||||
await queryRunner.query(
|
||||
'CREATE INDEX `IDX_' +
|
||||
tablePrefix +
|
||||
'cefb067df2402f6aed0638a6c1` ON `' +
|
||||
tablePrefix +
|
||||
'execution_entity` (`stoppedAt`)',
|
||||
);
|
||||
await queryRunner.query(
|
||||
`DROP INDEX \`IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584\` ON \`${tablePrefix}execution_entity\``,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
|
||||
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
|
||||
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
|
||||
import { UpdateRunningExecutionStatus1677236788851 } from './1677236788851-UpdateRunningExecutionStatus';
|
||||
import { CreateExecutionMetadataTable1679416281779 } from './1679416281779-CreateExecutionMetadataTable';
|
||||
|
||||
export const mysqlMigrations = [
|
||||
InitialMigration1588157391238,
|
||||
@@ -72,4 +73,5 @@ export const mysqlMigrations = [
|
||||
AddStatusToExecutions1674138566000,
|
||||
MigrateExecutionStatus1676996103000,
|
||||
UpdateRunningExecutionStatus1677236788851,
|
||||
CreateExecutionMetadataTable1679416281779,
|
||||
];
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
|
||||
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
|
||||
|
||||
export class CreateExecutionMetadataTable1679416281778 implements MigrationInterface {
|
||||
name = 'CreateExecutionMetadataTable1679416281778';
|
||||
|
||||
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||
logMigrationStart(this.name);
|
||||
const tablePrefix = getTablePrefix();
|
||||
|
||||
await queryRunner.query(
|
||||
`CREATE TABLE ${tablePrefix}execution_metadata (
|
||||
"id" serial4 NOT NULL PRIMARY KEY,
|
||||
"executionId" int4 NOT NULL,
|
||||
"key" text NOT NULL,
|
||||
"value" text NOT NULL,
|
||||
CONSTRAINT ${tablePrefix}execution_metadata_fk FOREIGN KEY ("executionId") REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
|
||||
)`,
|
||||
);
|
||||
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX "IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb" ON "${tablePrefix}execution_metadata" ("executionId");`,
|
||||
);
|
||||
|
||||
// Remove indices that are no longer needed since the addition of the status column
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}33228da131bb1112247cf52a42"`);
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}72ffaaab9f04c2c1f1ea86e662"`);
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}58154df94c686818c99fb754ce"`);
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}4f474ac92be81610439aaad61e"`);
|
||||
|
||||
// Create new index for status
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX "IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584" ON "${tablePrefix}execution_entity" ("status", "workflowId");`,
|
||||
);
|
||||
|
||||
logMigrationEnd(this.name);
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = getTablePrefix();
|
||||
|
||||
// Re-add removed indices
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}33228da131bb1112247cf52a42" ON ${tablePrefix}execution_entity ("stoppedAt") `,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}72ffaaab9f04c2c1f1ea86e662" ON ${tablePrefix}execution_entity ("finished", "id") `,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}58154df94c686818c99fb754ce" ON ${tablePrefix}execution_entity ("workflowId", "waitTill", "id") `,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}4f474ac92be81610439aaad61e" ON ${tablePrefix}execution_entity ("workflowId", "finished", "id") `,
|
||||
);
|
||||
|
||||
await queryRunner.query(
|
||||
`DROP INDEX IF EXISTS "IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584"`,
|
||||
);
|
||||
|
||||
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
|
||||
}
|
||||
}
|
||||
@@ -32,6 +32,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
|
||||
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
|
||||
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
|
||||
import { UpdateRunningExecutionStatus1677236854063 } from './1677236854063-UpdateRunningExecutionStatus';
|
||||
import { CreateExecutionMetadataTable1679416281778 } from './1679416281778-CreateExecutionMetadataTable';
|
||||
|
||||
export const postgresMigrations = [
|
||||
InitialMigration1587669153312,
|
||||
@@ -68,4 +69,5 @@ export const postgresMigrations = [
|
||||
AddStatusToExecutions1674138566000,
|
||||
MigrateExecutionStatus1676996103000,
|
||||
UpdateRunningExecutionStatus1677236854063,
|
||||
CreateExecutionMetadataTable1679416281778,
|
||||
];
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
|
||||
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
|
||||
|
||||
export class CreateExecutionMetadataTable1679416281777 implements MigrationInterface {
|
||||
name = 'CreateExecutionMetadataTable1679416281777';
|
||||
|
||||
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||
logMigrationStart(this.name);
|
||||
const tablePrefix = getTablePrefix();
|
||||
|
||||
await queryRunner.query(
|
||||
`CREATE TABLE "${tablePrefix}execution_metadata" (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
executionId INTEGER NOT NULL,
|
||||
"key" TEXT NOT NULL,
|
||||
value TEXT NOT NULL,
|
||||
CONSTRAINT ${tablePrefix}execution_metadata_entity_FK FOREIGN KEY (executionId) REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
|
||||
)`,
|
||||
);
|
||||
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb" ON "${tablePrefix}execution_metadata" ("executionId");`,
|
||||
);
|
||||
|
||||
// Re add some lost indices from migration DeleteExecutionsWithWorkflows.ts
|
||||
// that were part of AddExecutionEntityIndexes.ts
|
||||
// not all were needed since we added the `status` column to execution_entity
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}b94b45ce2c73ce46c54f20b5f9' ON '${tablePrefix}execution_entity' ('waitTill', 'id') `,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}81fc04c8a17de15835713505e4' ON '${tablePrefix}execution_entity' ('workflowId', 'id') `,
|
||||
);
|
||||
|
||||
// Also add index to the new status column
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584' ON '${tablePrefix}execution_entity' ('status', 'workflowId') `,
|
||||
);
|
||||
|
||||
// Remove no longer needed index to waitTill since it's already covered by the index b94b45ce2c73ce46c54f20b5f9 above
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2'`);
|
||||
// Remove index for stoppedAt since it's not used anymore
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}cefb067df2402f6aed0638a6c1'`);
|
||||
|
||||
logMigrationEnd(this.name);
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = getTablePrefix();
|
||||
|
||||
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}b94b45ce2c73ce46c54f20b5f9'`);
|
||||
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}81fc04c8a17de15835713505e4'`);
|
||||
await queryRunner.query(
|
||||
`DROP INDEX IF EXISTS 'IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584'`,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX "IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2" ON "${tablePrefix}execution_entity" ("waitTill")`,
|
||||
);
|
||||
await queryRunner.query(
|
||||
`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "${tablePrefix}execution_entity" ("stoppedAt")`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
|
||||
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
|
||||
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
|
||||
import { UpdateRunningExecutionStatus1677237073720 } from './1677237073720-UpdateRunningExecutionStatus';
|
||||
import { CreateExecutionMetadataTable1679416281777 } from './1679416281777-CreateExecutionMetadataTable';
|
||||
|
||||
const sqliteMigrations = [
|
||||
InitialMigration1588102412422,
|
||||
@@ -66,6 +67,7 @@ const sqliteMigrations = [
|
||||
AddStatusToExecutions1674138566000,
|
||||
MigrateExecutionStatus1676996103000,
|
||||
UpdateRunningExecutionStatus1677237073720,
|
||||
CreateExecutionMetadataTable1679416281777,
|
||||
];
|
||||
|
||||
export { sqliteMigrations };
|
||||
|
||||
@@ -14,7 +14,7 @@ import type {
|
||||
} from 'n8n-workflow';
|
||||
import { deepCopy, LoggerProxy, jsonParse, Workflow } from 'n8n-workflow';
|
||||
import type { FindOperator, FindOptionsWhere } from 'typeorm';
|
||||
import { In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm';
|
||||
import { In, IsNull, LessThanOrEqual, MoreThanOrEqual, Not, Raw } from 'typeorm';
|
||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||
import config from '@/config';
|
||||
import type { User } from '@db/entities/User';
|
||||
@@ -35,10 +35,15 @@ import * as Db from '@/Db';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { parse } from 'flatted';
|
||||
import { Container } from 'typedi';
|
||||
import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers';
|
||||
import {
|
||||
getStatusUsingPreviousExecutionStatusMethod,
|
||||
isAdvancedExecutionFiltersEnabled,
|
||||
} from './executionHelpers';
|
||||
import { ExecutionMetadata } from '@/databases/entities/ExecutionMetadata';
|
||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||
|
||||
interface IGetExecutionsQueryFilter {
|
||||
id?: FindOperator<string>;
|
||||
id?: FindOperator<string> | string;
|
||||
finished?: boolean;
|
||||
mode?: string;
|
||||
retryOf?: string;
|
||||
@@ -47,12 +52,16 @@ interface IGetExecutionsQueryFilter {
|
||||
workflowId?: string;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
waitTill?: FindOperator<any> | boolean;
|
||||
metadata?: Array<{ key: string; value: string }>;
|
||||
startedAfter?: string;
|
||||
startedBefore?: string;
|
||||
}
|
||||
|
||||
const schemaGetExecutionsQueryFilter = {
|
||||
$id: '/IGetExecutionsQueryFilter',
|
||||
type: 'object',
|
||||
properties: {
|
||||
id: { type: 'string' },
|
||||
finished: { type: 'boolean' },
|
||||
mode: { type: 'string' },
|
||||
retryOf: { type: 'string' },
|
||||
@@ -63,6 +72,21 @@ const schemaGetExecutionsQueryFilter = {
|
||||
},
|
||||
waitTill: { type: 'boolean' },
|
||||
workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] },
|
||||
metadata: { type: 'array', items: { $ref: '#/$defs/metadata' } },
|
||||
startedAfter: { type: 'date-time' },
|
||||
startedBefore: { type: 'date-time' },
|
||||
},
|
||||
$defs: {
|
||||
metadata: {
|
||||
type: 'object',
|
||||
required: ['key', 'value'],
|
||||
properties: {
|
||||
key: {
|
||||
type: 'string',
|
||||
},
|
||||
value: { type: 'string' },
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -84,17 +108,38 @@ export class ExecutionsService {
|
||||
static async getExecutionsCount(
|
||||
countFilter: IDataObject,
|
||||
user: User,
|
||||
metadata?: Array<{ key: string; value: string }>,
|
||||
): Promise<{ count: number; estimated: boolean }> {
|
||||
const dbType = config.getEnv('database.type');
|
||||
const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id');
|
||||
|
||||
// For databases other than Postgres, do a regular count
|
||||
// when filtering based on `workflowId` or `finished` fields.
|
||||
if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') {
|
||||
if (
|
||||
dbType !== 'postgresdb' ||
|
||||
metadata?.length ||
|
||||
filteredFields.length > 0 ||
|
||||
user.globalRole.name !== 'owner'
|
||||
) {
|
||||
const sharedWorkflowIds = await this.getWorkflowIdsForUser(user);
|
||||
|
||||
const countParams = { where: { workflowId: In(sharedWorkflowIds), ...countFilter } };
|
||||
const count = await Db.collections.Execution.count(countParams);
|
||||
let query = Db.collections.Execution.createQueryBuilder('execution')
|
||||
.select()
|
||||
.orderBy('execution.id', 'DESC')
|
||||
.where({ workflowId: In(sharedWorkflowIds) });
|
||||
|
||||
if (metadata?.length) {
|
||||
query = query.leftJoinAndSelect(ExecutionMetadata, 'md', 'md.executionId = execution.id');
|
||||
for (const md of metadata) {
|
||||
query = query.andWhere('md.key = :key AND md.value = :value', md);
|
||||
}
|
||||
}
|
||||
|
||||
if (filteredFields.length > 0) {
|
||||
query = query.andWhere(countFilter);
|
||||
}
|
||||
|
||||
const count = await query.getCount();
|
||||
return { count, estimated: false };
|
||||
}
|
||||
|
||||
@@ -138,6 +183,18 @@ export class ExecutionsService {
|
||||
} else {
|
||||
delete filter.waitTill;
|
||||
}
|
||||
|
||||
if (Array.isArray(filter.metadata)) {
|
||||
delete filter.metadata;
|
||||
}
|
||||
|
||||
if ('startedAfter' in filter) {
|
||||
delete filter.startedAfter;
|
||||
}
|
||||
|
||||
if ('startedBefore' in filter) {
|
||||
delete filter.startedBefore;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -227,17 +284,17 @@ export class ExecutionsService {
|
||||
} = {};
|
||||
|
||||
if (req.query.lastId) {
|
||||
rangeQuery.push('id < :lastId');
|
||||
rangeQuery.push('execution.id < :lastId');
|
||||
rangeQueryParams.lastId = req.query.lastId;
|
||||
}
|
||||
|
||||
if (req.query.firstId) {
|
||||
rangeQuery.push('id > :firstId');
|
||||
rangeQuery.push('execution.id > :firstId');
|
||||
rangeQueryParams.firstId = req.query.firstId;
|
||||
}
|
||||
|
||||
if (executingWorkflowIds.length > 0) {
|
||||
rangeQuery.push('id NOT IN (:...executingWorkflowIds)');
|
||||
rangeQuery.push('execution.id NOT IN (:...executingWorkflowIds)');
|
||||
rangeQueryParams.executingWorkflowIds = executingWorkflowIds;
|
||||
}
|
||||
|
||||
@@ -261,11 +318,36 @@ export class ExecutionsService {
|
||||
'execution.workflowData',
|
||||
'execution.status',
|
||||
])
|
||||
.orderBy('id', 'DESC')
|
||||
.orderBy('execution.id', 'DESC')
|
||||
.take(limit)
|
||||
.where(findWhere);
|
||||
|
||||
const countFilter = deepCopy(filter ?? {});
|
||||
const metadata = isAdvancedExecutionFiltersEnabled() ? filter?.metadata : undefined;
|
||||
|
||||
if (metadata?.length) {
|
||||
query = query.leftJoin(ExecutionMetadata, 'md', 'md.executionId = execution.id');
|
||||
for (const md of metadata) {
|
||||
query = query.andWhere('md.key = :key AND md.value = :value', md);
|
||||
}
|
||||
}
|
||||
|
||||
if (filter?.startedAfter) {
|
||||
query = query.andWhere({
|
||||
startedAt: MoreThanOrEqual(
|
||||
DateUtils.mixedDateToUtcDatetimeString(new Date(filter.startedAfter)),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
if (filter?.startedBefore) {
|
||||
query = query.andWhere({
|
||||
startedAt: LessThanOrEqual(
|
||||
DateUtils.mixedDateToUtcDatetimeString(new Date(filter.startedBefore)),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// deepcopy breaks the In operator so we need to reapply it
|
||||
if (filter?.status) {
|
||||
Object.assign(filter, { status: In(filter.status) });
|
||||
@@ -285,6 +367,7 @@ export class ExecutionsService {
|
||||
const { count, estimated } = await this.getExecutionsCount(
|
||||
countFilter as IDataObject,
|
||||
req.user,
|
||||
metadata,
|
||||
);
|
||||
|
||||
const formattedExecutions: IExecutionsSummary[] = executions.map((execution) => {
|
||||
|
||||
41
packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts
Normal file
41
packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { saveExecutionMetadata } from '@/WorkflowExecuteAdditionalData';
|
||||
import * as Db from '@/Db';
|
||||
import { mocked } from 'jest-mock';
|
||||
|
||||
jest.mock('@/Db', () => {
|
||||
return {
|
||||
collections: {
|
||||
ExecutionMetadata: {
|
||||
save: jest.fn(async () => Promise.resolve([])),
|
||||
},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
describe('WorkflowExecuteAdditionalData', () => {
|
||||
test('Execution metadata is saved in a batch', async () => {
|
||||
const toSave = {
|
||||
test1: 'value1',
|
||||
test2: 'value2',
|
||||
};
|
||||
const executionId = '1234';
|
||||
|
||||
await saveExecutionMetadata(executionId, toSave);
|
||||
|
||||
expect(mocked(Db.collections.ExecutionMetadata.save)).toHaveBeenCalledTimes(1);
|
||||
expect(mocked(Db.collections.ExecutionMetadata.save).mock.calls[0]).toEqual([
|
||||
[
|
||||
{
|
||||
execution: { id: executionId },
|
||||
key: 'test1',
|
||||
value: 'value1',
|
||||
},
|
||||
{
|
||||
execution: { id: executionId },
|
||||
key: 'test2',
|
||||
value: 'value2',
|
||||
},
|
||||
],
|
||||
]);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user