feat(API): Add running status query on the executions public api endpoint (#19205)

Co-authored-by: Konstantin Tieber <46342664+konstantintieber@users.noreply.github.com>
This commit is contained in:
Irénée
2025-09-11 13:03:23 +01:00
committed by GitHub
parent 03b865d4db
commit 3af4541391
8 changed files with 241 additions and 132 deletions

View File

@@ -1,31 +1,14 @@
import { Container, type Constructable } from '@n8n/di';
import { DataSource, EntityManager, In, LessThan, type EntityMetadata } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import type { Class } from 'n8n-core';
import type { DeepPartial } from 'ts-essentials';
import { Container } from '@n8n/di';
import { In, LessThan, And, Not } from '@n8n/typeorm';
import { ExecutionEntity } from '../../entities';
import { mockEntityManager } from '../../utils/test-utils/mock-entity-manager';
import { ExecutionRepository } from '../execution.repository';
const mockInstance = <T>(
serviceClass: Constructable<T>,
data: DeepPartial<T> | undefined = undefined,
) => {
const instance = mock<T>(data);
Container.set(serviceClass, instance);
return instance;
};
const mockEntityManager = (entityClass: Class) => {
const entityManager = mockInstance(EntityManager);
const dataSource = mockInstance(DataSource, {
manager: entityManager,
getMetadata: () => mock<EntityMetadata>({ target: entityClass }),
});
Object.assign(entityManager, { connection: dataSource });
return entityManager;
};
/**
* TODO: add tests for all the other methods
* TODO: getExecutionsForPublicApi -> add test cases for the `includeData` toggle
*/
describe('ExecutionRepository', () => {
const entityManager = mockEntityManager(ExecutionEntity);
const executionRepository = Container.get(ExecutionRepository);
@@ -35,10 +18,29 @@ describe('ExecutionRepository', () => {
});
describe('getExecutionsForPublicApi', () => {
test('should get executions matching the filter parameters', async () => {
const limit = 10;
const defaultLimit = 10;
const defaultQuery = {
select: [
'id',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowId',
'waitTill',
'finished',
'status',
],
where: {},
order: { id: 'DESC' },
take: defaultLimit,
relations: ['executionData'],
};
test('should get executions matching all filter parameters', async () => {
const params = {
limit: 10,
limit: defaultLimit,
lastId: '3',
workflowIds: ['3', '4'],
};
@@ -48,64 +50,89 @@ describe('ExecutionRepository', () => {
const result = await executionRepository.getExecutionsForPublicApi(params);
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
select: [
'id',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowId',
'waitTill',
'finished',
'status',
],
...defaultQuery,
where: {
id: LessThan(params.lastId),
workflowId: In(params.workflowIds),
},
order: { id: 'DESC' },
take: limit,
relations: ['executionData'],
});
expect(result.length).toBe(mockEntities.length);
expect(result[0].id).toEqual(mockEntities[0].id);
});
test('should get executions matching the workflowIds filter', async () => {
const params = {
limit: 10,
workflowIds: ['3', '4'],
};
const mockEntities = [{ id: '1' }, { id: '2' }];
entityManager.find.mockResolvedValueOnce(mockEntities);
const result = await executionRepository.getExecutionsForPublicApi(params);
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
...defaultQuery,
where: {
workflowId: In(params.workflowIds),
},
});
expect(result.length).toBe(mockEntities.length);
expect(result[0].id).toEqual(mockEntities[0].id);
});
describe('with id filters', () => {
test.each`
lastId | excludedExecutionsIds | expectedIdCondition
${'5'} | ${['2', '3']} | ${And(LessThan('5'), Not(In(['2', '3'])))}
${'5'} | ${[]} | ${LessThan('5')}
${'5'} | ${undefined} | ${LessThan('5')}
${undefined} | ${['2', '3']} | ${Not(In(['2', '3']))}
${undefined} | ${[]} | ${undefined}
${undefined} | ${undefined} | ${undefined}
`(
'should find with id less than "$lastId" and not in "$excludedExecutionsIds"',
async ({ lastId, excludedExecutionsIds, expectedIdCondition }) => {
const params = {
limit: defaultLimit,
...(lastId ? { lastId } : {}),
...(excludedExecutionsIds ? { excludedExecutionsIds } : {}),
};
const mockEntities = [{ id: '1' }, { id: '2' }];
entityManager.find.mockResolvedValueOnce(mockEntities);
const result = await executionRepository.getExecutionsForPublicApi(params);
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
...defaultQuery,
where: {
...(expectedIdCondition ? { id: expectedIdCondition } : {}),
},
});
expect(result.length).toBe(mockEntities.length);
expect(result[0].id).toEqual(mockEntities[0].id);
},
);
});
describe('with status filter', () => {
test.each`
filterStatus | entityStatus
${'canceled'} | ${'canceled'}
${'error'} | ${In(['error', 'crashed'])}
${'running'} | ${'running'}
${'success'} | ${'success'}
${'waiting'} | ${'waiting'}
`('should find all "$filterStatus" executions', async ({ filterStatus, entityStatus }) => {
const limit = 10;
const mockEntities = [{ id: '1' }, { id: '2' }];
entityManager.find.mockResolvedValueOnce(mockEntities);
const result = await executionRepository.getExecutionsForPublicApi({
limit,
limit: defaultLimit,
status: filterStatus,
});
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
select: [
'id',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowId',
'waitTill',
'finished',
'status',
],
...defaultQuery,
where: { status: entityStatus },
order: { id: 'DESC' },
take: limit,
relations: ['executionData'],
});
expect(result.length).toBe(mockEntities.length);
expect(result[0].id).toEqual(mockEntities[0].id);
@@ -115,37 +142,21 @@ describe('ExecutionRepository', () => {
filterStatus
${'crashed'}
${'new'}
${'running'}
${'unknown'}
`(
'should find all executions and ignore status filter "$filterStatus"',
async ({ filterStatus }) => {
const limit = 10;
const mockEntities = [{ id: '1' }, { id: '2' }];
entityManager.find.mockResolvedValueOnce(mockEntities);
const result = await executionRepository.getExecutionsForPublicApi({
limit,
limit: defaultLimit,
status: filterStatus,
});
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
select: [
'id',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowId',
'waitTill',
'finished',
'status',
],
...defaultQuery,
where: {},
order: { id: 'DESC' },
take: limit,
relations: ['executionData'],
});
expect(result.length).toBe(mockEntities.length);
expect(result[0].id).toEqual(mockEntities[0].id);
@@ -155,8 +166,7 @@ describe('ExecutionRepository', () => {
});
describe('getExecutionsCountForPublicApi', () => {
test('should get executions matching the filter parameters', async () => {
const limit = 10;
test('should get executions matching all filter parameters', async () => {
const mockCount = 20;
const params = {
limit: 10,
@@ -172,16 +182,68 @@ describe('ExecutionRepository', () => {
id: LessThan(params.lastId),
workflowId: In(params.workflowIds),
},
take: limit,
take: params.limit,
});
expect(result).toBe(mockCount);
});
test('should get executions matching the workflowIds filter', async () => {
const mockCount = 12;
const params = {
limit: 10,
workflowIds: ['7', '8'],
};
entityManager.count.mockResolvedValueOnce(mockCount);
const result = await executionRepository.getExecutionsCountForPublicApi(params);
expect(entityManager.count).toHaveBeenCalledWith(ExecutionEntity, {
where: {
workflowId: In(params.workflowIds),
},
take: params.limit,
});
expect(result).toBe(mockCount);
});
describe('with id filters', () => {
test.each`
lastId | excludedExecutionsIds | expectedIdCondition
${'5'} | ${['2', '3']} | ${And(LessThan('5'), Not(In(['2', '3'])))}
${'5'} | ${[]} | ${LessThan('5')}
${'5'} | ${undefined} | ${LessThan('5')}
${undefined} | ${['2', '3']} | ${Not(In(['2', '3']))}
${undefined} | ${[]} | ${undefined}
${undefined} | ${undefined} | ${undefined}
`(
'should find with id less than "$lastId" and not in "$excludedExecutionsIds"',
async ({ lastId, excludedExecutionsIds, expectedIdCondition }) => {
const mockCount = 15;
const params = {
limit: 10,
...(lastId ? { lastId } : {}),
...(excludedExecutionsIds ? { excludedExecutionsIds } : {}),
};
entityManager.count.mockResolvedValueOnce(mockCount);
const result = await executionRepository.getExecutionsCountForPublicApi(params);
expect(entityManager.count).toHaveBeenCalledWith(ExecutionEntity, {
where: {
...(expectedIdCondition ? { id: expectedIdCondition } : {}),
},
take: params.limit,
});
expect(result).toBe(mockCount);
},
);
});
describe('with status filter', () => {
test.each`
filterStatus | entityStatus
${'canceled'} | ${'canceled'}
${'error'} | ${In(['error', 'crashed'])}
${'running'} | ${'running'}
${'success'} | ${'success'}
${'waiting'} | ${'waiting'}
`('should retrieve all $filterStatus executions', async ({ filterStatus, entityStatus }) => {
@@ -206,7 +268,6 @@ describe('ExecutionRepository', () => {
filterStatus
${'crashed'}
${'new'}
${'running'}
${'unknown'}
`(
'should find all executions and ignore status filter "$filterStatus"',

View File

@@ -17,38 +17,38 @@ import {
LessThanOrEqual,
MoreThanOrEqual,
Not,
Raw,
Repository,
And,
} from '@n8n/typeorm';
import { DateUtils } from '@n8n/typeorm/util/DateUtils';
import { parse, stringify } from 'flatted';
import pick from 'lodash/pick';
import { BinaryDataService, ErrorReporter } from 'n8n-core';
import { ExecutionCancelledError, UnexpectedError } from 'n8n-workflow';
import type {
AnnotationVote,
ExecutionStatus,
ExecutionSummary,
IRunExecutionData,
} from 'n8n-workflow';
import { ExecutionCancelledError, UnexpectedError } from 'n8n-workflow';
import { ExecutionDataRepository } from './execution-data.repository';
import {
AnnotationTagEntity,
AnnotationTagMapping,
ExecutionAnnotation,
ExecutionData,
ExecutionEntity,
ExecutionMetadata,
ExecutionData,
ExecutionAnnotation,
AnnotationTagMapping,
WorkflowEntity,
SharedWorkflow,
AnnotationTagEntity,
WorkflowEntity,
} from '../entities';
import type {
CreateExecutionPayload,
IExecutionFlattedDb,
IExecutionBase,
IExecutionResponse,
ExecutionSummaries,
IExecutionBase,
IExecutionFlattedDb,
IExecutionResponse,
} from '../entities/types-db';
import { separate } from '../utils/separate';
@@ -631,27 +631,22 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
}
async getExecutionsCountForPublicApi(data: {
async getExecutionsCountForPublicApi(params: {
limit: number;
lastId?: string;
workflowIds?: string[];
status?: ExecutionStatus;
excludedWorkflowIds?: string[];
excludedExecutionsIds?: string[];
}): Promise<number> {
const executionsCount = await this.count({
where: {
...(data.lastId && { id: LessThan(data.lastId) }),
...(data.status && { ...this.getStatusCondition(data.status) }),
...(data.workflowIds && { workflowId: In(data.workflowIds) }),
...(data.excludedWorkflowIds && { workflowId: Not(In(data.excludedWorkflowIds)) }),
},
take: data.limit,
where: this.getFindExecutionsForPublicApiCondition(params),
take: params.limit,
});
return executionsCount;
}
private getStatusCondition(status: ExecutionStatus) {
private getStatusCondition(status?: ExecutionStatus) {
const condition: Pick<FindOptionsWhere<IExecutionFlattedDb>, 'status'> = {};
if (status === 'success') {
@@ -662,11 +657,45 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
condition.status = In(['error', 'crashed']);
} else if (status === 'canceled') {
condition.status = 'canceled';
} else if (status === 'running') {
condition.status = 'running';
}
return condition;
}
private getIdCondition(params: { lastId?: string; excludedExecutionsIds?: string[] }) {
const condition: Pick<FindOptionsWhere<IExecutionFlattedDb>, 'id'> = {};
if (params.lastId && params.excludedExecutionsIds?.length) {
condition.id = And(LessThan(params.lastId), Not(In(params.excludedExecutionsIds)));
} else if (params.lastId) {
condition.id = LessThan(params.lastId);
} else if (params.excludedExecutionsIds?.length) {
condition.id = Not(In(params.excludedExecutionsIds));
}
return condition;
}
private getFindExecutionsForPublicApiCondition(params: {
lastId?: string;
workflowIds?: string[];
status?: ExecutionStatus;
excludedExecutionsIds?: string[];
}) {
const where: FindOptionsWhere<IExecutionFlattedDb> = {
...this.getIdCondition({
lastId: params.lastId,
excludedExecutionsIds: params.excludedExecutionsIds,
}),
...this.getStatusCondition(params.status),
...(params.workflowIds && { workflowId: In(params.workflowIds) }),
};
return where;
}
async getExecutionsForPublicApi(params: {
limit: number;
includeData?: boolean;
@@ -675,26 +704,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
status?: ExecutionStatus;
excludedExecutionsIds?: string[];
}): Promise<IExecutionBase[]> {
let where: FindOptionsWhere<IExecutionFlattedDb> = {};
if (params.lastId && params.excludedExecutionsIds?.length) {
where.id = Raw((id) => `${id} < :lastId AND ${id} NOT IN (:...excludedExecutionsIds)`, {
lastId: params.lastId,
excludedExecutionsIds: params.excludedExecutionsIds,
});
} else if (params.lastId) {
where.id = LessThan(params.lastId);
} else if (params.excludedExecutionsIds?.length) {
where.id = Not(In(params.excludedExecutionsIds));
}
if (params.status) {
where = { ...where, ...this.getStatusCondition(params.status) };
}
if (params.workflowIds) {
where = { ...where, workflowId: In(params.workflowIds) };
}
const where = this.getFindExecutionsForPublicApiCondition(params);
return await this.findMultipleExecutions(
{

View File

@@ -0,0 +1,15 @@
import { DataSource, EntityManager, type EntityMetadata } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import type { Class } from 'n8n-core';
import { mockInstance } from './mock-instance';
export const mockEntityManager = (entityClass: Class) => {
const entityManager = mockInstance(EntityManager);
const dataSource = mockInstance(DataSource, {
manager: entityManager,
getMetadata: () => mock<EntityMetadata>({ target: entityClass }),
});
Object.assign(entityManager, { connection: dataSource });
return entityManager;
};

View File

@@ -0,0 +1,12 @@
import { Container, type Constructable } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import type { DeepPartial } from 'ts-essentials';
export const mockInstance = <T>(
serviceClass: Constructable<T>,
data: DeepPartial<T> | undefined = undefined,
) => {
const instance = mock<T>(data);
Container.set(serviceClass, instance);
return instance;
};

View File

@@ -114,19 +114,23 @@ export = {
return res.status(200).json({ data: [], nextCursor: null });
}
// get running workflows so we exclude them from the result
// get running executions so we exclude them from the result
const runningExecutionsIds = Container.get(ActiveExecutions)
.getActiveExecutions()
.map(({ id }) => id);
const filters = {
status,
limit,
lastId,
includeData,
workflowIds: workflowId ? [workflowId] : sharedWorkflowsIds,
excludedExecutionsIds: runningExecutionsIds,
};
const filters: Parameters<typeof ExecutionRepository.prototype.getExecutionsForPublicApi>[0] =
{
status,
limit,
lastId,
includeData,
workflowIds: workflowId ? [workflowId] : sharedWorkflowsIds,
// for backward compatibility `running` executions are always excluded
// unless the user explicitly filters by `running` status
excludedExecutionsIds: status !== 'running' ? runningExecutionsIds : undefined,
};
const executions =
await Container.get(ExecutionRepository).getExecutionsForPublicApi(filters);

View File

@@ -13,7 +13,7 @@ get:
required: false
schema:
type: string
enum: ['canceled', 'error', 'success', 'waiting']
enum: ['canceled', 'error', 'running', 'success', 'waiting']
- name: workflowId
in: query
description: Workflow to filter the executions by.

View File

@@ -24,6 +24,8 @@ properties:
stoppedAt:
type: string
format: date-time
nullable: true
description: The time at which the execution stopped. Will only be null for executions that still have the status 'running'.
workflowId:
type: number
example: '1000'

View File

@@ -402,12 +402,13 @@ describe('GET /executions', () => {
});
describe('with query status', () => {
type AllowedQueryStatus = 'success' | 'error' | 'canceled' | 'waiting';
type AllowedQueryStatus = 'canceled' | 'error' | 'running' | 'success' | 'waiting';
test.each`
queryStatus | entityStatus
${'canceled'} | ${'canceled'}
${'error'} | ${'error'}
${'error'} | ${'crashed'}
${'running'} | ${'running'}
${'success'} | ${'success'}
${'waiting'} | ${'waiting'}
`(
@@ -419,6 +420,10 @@ describe('GET /executions', () => {
const workflow = await createWorkflow({}, owner);
await createdExecutionWithStatus(workflow, queryStatus === 'success' ? 'error' : 'success');
if (queryStatus !== 'running') {
// ensure there is a running execution that gets excluded unless filtering by `running`
await createdExecutionWithStatus(workflow, 'running');
}
const expectedExecution = await createdExecutionWithStatus(workflow, entityStatus);