fix: Add accurate concurrent executions count to executions list (#19249)

This commit is contained in:
Irénée
2025-09-15 13:23:05 +01:00
committed by GitHub
parent 7ded694ce7
commit dc75be3a6f
11 changed files with 225 additions and 62 deletions

View File

@@ -291,4 +291,18 @@ describe('ExecutionRepository', () => {
);
});
});
describe('getConcurrentExecutionsCount', () => {
test('should count running executions with mode webhook or trigger', async () => {
const mockCount = 5;
entityManager.count.mockResolvedValueOnce(mockCount);
const result = await executionRepository.getConcurrentExecutionsCount();
expect(entityManager.count).toHaveBeenCalledWith(ExecutionEntity, {
where: { status: 'running', mode: In(['webhook', 'trigger']) },
});
expect(result).toBe(mockCount);
});
});
});

View File

@@ -1137,4 +1137,16 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return executions.map(({ id }) => id);
}
/**
* The number of executions that are running and count towards the concurrent executions limit.
* Concurrency control only applies to executions started from a webhook or trigger node.
*/
async getConcurrentExecutionsCount() {
const concurrentExecutionsCount = await this.count({
where: { status: 'running', mode: In(['webhook', 'trigger']) },
});
return concurrentExecutionsCount;
}
}

View File

@@ -32,7 +32,12 @@ describe('ExecutionsController', () => {
});
describe('getMany', () => {
const NO_EXECUTIONS = { count: 0, estimated: false, results: [] };
const NO_EXECUTIONS = {
count: 0,
estimated: false,
results: [],
concurrentExecutionsCount: -1,
};
const QUERIES_WITH_EITHER_STATUS_OR_RANGE: ExecutionSummaries.RangeQuery[] = [
{
@@ -91,6 +96,7 @@ describe('ExecutionsController', () => {
expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled();
expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery);
expect(executionService.getConcurrentExecutionsCount).toHaveBeenCalled();
},
);
});
@@ -108,6 +114,7 @@ describe('ExecutionsController', () => {
expect(executionService.findLatestCurrentAndCompleted).toHaveBeenCalled();
expect(executionService.findRangeWithCount).not.toHaveBeenCalled();
expect(executionService.getConcurrentExecutionsCount).toHaveBeenCalled();
},
);
});
@@ -130,6 +137,7 @@ describe('ExecutionsController', () => {
expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled();
expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery);
expect(executionService.getConcurrentExecutionsCount).toHaveBeenCalled();
});
});
});

View File

@@ -1,16 +1,16 @@
import { Logger } from '@n8n/backend-common';
import { GlobalConfig } from '@n8n/config';
import type {
User,
CreateExecutionPayload,
ExecutionSummaries,
IExecutionResponse,
IGetExecutionsQueryFilter,
User,
} from '@n8n/db';
import {
AnnotationTagMappingRepository,
ExecutionAnnotationRepository,
ExecutionRepository,
AnnotationTagMappingRepository,
WorkflowRepository,
} from '@n8n/db';
import { Service } from '@n8n/di';
@@ -21,8 +21,8 @@ import type {
INode,
IRunExecutionData,
IWorkflowBase,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
ExecutionStatusList,
@@ -372,22 +372,11 @@ export class ExecutionService {
async findRangeWithCount(query: ExecutionSummaries.RangeQuery) {
const results = await this.executionRepository.findManyByRangeQuery(query);
if (this.globalConfig.database.type === 'postgresdb') {
const liveRows = await this.executionRepository.getLiveExecutionRowsOnPostgres();
if (liveRows === -1) return { count: -1, estimated: false, results };
if (liveRows > 100_000) {
// likely too high to fetch exact count fast
return { count: liveRows, estimated: true, results };
}
}
const { range: _, ...countQuery } = query;
const count = await this.executionRepository.fetchCount({ ...countQuery, kind: 'count' });
const executionCount = await this.getExecutionsCountForQuery({ ...countQuery, kind: 'count' });
return { results, count, estimated: false };
return { results, ...executionCount };
}
/**
@@ -405,26 +394,82 @@ export class ExecutionService {
const completedStatuses = ExecutionStatusList.filter((s) => !currentStatuses.includes(s));
const [current, completed] = await Promise.all([
this.findRangeWithCount({
...query,
status: currentStatuses,
order: { top: 'running' }, // ensure limit cannot exclude running
}),
this.findRangeWithCount({
...query,
status: completedStatuses,
order: { startedAt: 'DESC' },
}),
const completedQuery: ExecutionSummaries.RangeQuery = {
...query,
status: completedStatuses,
order: { startedAt: 'DESC' },
};
const { range: _, ...countQuery } = completedQuery;
const currentQuery: ExecutionSummaries.RangeQuery = {
...query,
status: currentStatuses,
order: { top: 'running' }, // ensure limit cannot exclude running
};
const [current, completed, completedCount] = await Promise.all([
this.executionRepository.findManyByRangeQuery(currentQuery),
this.executionRepository.findManyByRangeQuery(completedQuery),
this.getExecutionsCountForQuery({ ...countQuery, kind: 'count' }),
]);
return {
results: current.results.concat(completed.results),
count: completed.count, // exclude current from count for pagination
estimated: completed.estimated,
results: current.concat(completed),
count: completedCount.count, // exclude current from count for pagination
estimated: completedCount.estimated,
};
}
/**
* @returns
* - the number of concurrent executions
* - `-1` if the count is not applicable (e.g. in 'queue' mode or if concurrency control is disabled)
*
* In 'queue' mode, concurrency control is applied per worker, so returning a global count of concurrent executions
* would not be meaningful or helpful.
*/
async getConcurrentExecutionsCount() {
if (!this.isConcurrentExecutionsCountSupported()) {
return -1;
}
return await this.executionRepository.getConcurrentExecutionsCount();
}
private isConcurrentExecutionsCountSupported(): boolean {
const isConcurrencyEnabled = this.globalConfig.executions.concurrency.productionLimit !== -1;
const isInRegularMode = config.getEnv('executions.mode') === 'regular';
if (!isConcurrencyEnabled || !isInRegularMode) {
return false;
}
return true;
}
/**
* @param countQuery the query to count executions
* @returns
* - the count of executions that satisfy the query
* - whether the count is an estimate or not
*/
private async getExecutionsCountForQuery(countQuery: ExecutionSummaries.CountQuery) {
if (this.globalConfig.database.type === 'postgresdb') {
const liveRows = await this.executionRepository.getLiveExecutionRowsOnPostgres();
if (liveRows === -1) return { count: -1, estimated: false };
if (liveRows > 100_000) {
// likely too high to fetch exact count fast
return { count: liveRows, estimated: true };
}
}
const count = await this.executionRepository.fetchCount(countQuery);
return { count, estimated: false };
}
async findAllEnqueuedExecutions() {
return await this.executionRepository.findMultipleExecutions(
{

View File

@@ -59,20 +59,32 @@ export class ExecutionsController {
const noRange = !query.range.lastId || !query.range.firstId;
if (noStatus && noRange) {
const executions = await this.executionService.findLatestCurrentAndCompleted(query);
const [executions, concurrentExecutionsCount] = await Promise.all([
this.executionService.findLatestCurrentAndCompleted(query),
this.executionService.getConcurrentExecutionsCount(),
]);
await this.executionService.addScopes(
req.user,
executions.results as ExecutionSummaries.ExecutionSummaryWithScopes[],
);
return executions;
return {
...executions,
concurrentExecutionsCount,
};
}
const executions = await this.executionService.findRangeWithCount(query);
const [executions, concurrentExecutionsCount] = await Promise.all([
this.executionService.findRangeWithCount(query),
this.executionService.getConcurrentExecutionsCount(),
]);
await this.executionService.addScopes(
req.user,
executions.results as ExecutionSummaries.ExecutionSummaryWithScopes[],
);
return executions;
return {
...executions,
concurrentExecutionsCount,
};
}
@Get('/:id')

View File

@@ -1,9 +1,11 @@
import { createTeamProject, createWorkflow, testDb } from '@n8n/backend-test-utils';
import { GlobalConfig } from '@n8n/config';
import type { ExecutionSummaries } from '@n8n/db';
import { ExecutionMetadataRepository, ExecutionRepository, WorkflowRepository } from '@n8n/db';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import config from '@/config';
import { ExecutionService } from '@/executions/execution.service';
import { annotateExecution, createAnnotationTags, createExecution } from './shared/db/executions';
@@ -11,6 +13,7 @@ import { annotateExecution, createAnnotationTags, createExecution } from './shar
describe('ExecutionService', () => {
let executionService: ExecutionService;
let executionRepository: ExecutionRepository;
const globalConfig = Container.get(GlobalConfig);
beforeAll(async () => {
await testDb.init();
@@ -18,7 +21,7 @@ describe('ExecutionService', () => {
executionRepository = Container.get(ExecutionRepository);
executionService = new ExecutionService(
mock(),
globalConfig,
mock(),
mock(),
mock(),
@@ -34,6 +37,11 @@ describe('ExecutionService', () => {
);
});
beforeEach(() => {
globalConfig.executions.concurrency.productionLimit = -1;
config.set('executions.mode', 'regular');
});
afterEach(async () => {
await testDb.truncate(['ExecutionEntity']);
});
@@ -504,6 +512,67 @@ describe('ExecutionService', () => {
});
});
describe('getConcurrentExecutionsCount', () => {
test('should return concurrentExecutionsCount when concurrency is enabled', async () => {
globalConfig.executions.concurrency.productionLimit = 4;
const workflow = await createWorkflow();
const concurrentExecutionsData = await Promise.all([
createExecution({ status: 'running', mode: 'webhook' }, workflow),
createExecution({ status: 'running', mode: 'trigger' }, workflow),
]);
await Promise.all([
createExecution({ status: 'success' }, workflow),
createExecution({ status: 'crashed' }, workflow),
createExecution({ status: 'new' }, workflow),
createExecution({ status: 'running', mode: 'manual' }, workflow),
]);
const output = await executionService.getConcurrentExecutionsCount();
expect(output).toEqual(concurrentExecutionsData.length);
});
test('should set concurrentExecutionsCount to -1 when concurrency is disabled', async () => {
globalConfig.executions.concurrency.productionLimit = -1;
const workflow = await createWorkflow();
await Promise.all([
createExecution({ status: 'running', mode: 'webhook' }, workflow),
createExecution({ status: 'running', mode: 'trigger' }, workflow),
createExecution({ status: 'success' }, workflow),
createExecution({ status: 'crashed' }, workflow),
createExecution({ status: 'new' }, workflow),
createExecution({ status: 'running', mode: 'manual' }, workflow),
]);
const output = await executionService.getConcurrentExecutionsCount();
expect(output).toEqual(-1);
});
test('should set concurrentExecutionsCount to -1 in queue mode', async () => {
config.set('executions.mode', 'queue');
globalConfig.executions.concurrency.productionLimit = 4;
const workflow = await createWorkflow();
await Promise.all([
createExecution({ status: 'running', mode: 'webhook' }, workflow),
createExecution({ status: 'running', mode: 'trigger' }, workflow),
createExecution({ status: 'success' }, workflow),
createExecution({ status: 'crashed' }, workflow),
createExecution({ status: 'new' }, workflow),
createExecution({ status: 'running', mode: 'manual' }, workflow),
]);
const output = await executionService.getConcurrentExecutionsCount();
expect(output).toEqual(-1);
});
});
describe('findLatestCurrentAndCompleted', () => {
test('should return latest current and completed executions', async () => {
const workflow = await createWorkflow();

View File

@@ -495,6 +495,7 @@ export interface IExecutionsListResponse {
count: number;
results: ExecutionSummaryWithScopes[];
estimated: boolean;
concurrentExecutionsCount: number;
}
export interface IExecutionsCurrentSummaryExtended {

View File

@@ -27,10 +27,12 @@ const props = withDefaults(
executions: ExecutionSummaryWithScopes[];
filters: ExecutionFilterType;
total?: number;
concurrentTotal?: number;
estimated?: boolean;
}>(),
{
total: 0,
concurrentTotal: 0,
estimated: false,
},
);
@@ -76,16 +78,11 @@ const isAnnotationEnabled = computed(
() => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters],
);
/**
* Calculate the number of executions counted towards the production executions concurrency limit.
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
*/
const runningExecutionsCount = computed(() => {
return props.executions.filter(
(execution) =>
execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode),
).length;
});
// In 'queue' mode concurrency control is applied per worker and returning a global count
// of concurrent executions would not be meaningful/helpful.
const showConcurrencyHeader = computed(
() => settingsStore.isConcurrencyEnabled && !settingsStore.isQueueModeEnabled,
);
watch(
() => props.executions,
@@ -338,8 +335,8 @@ const goToUpgrade = () => {
<div style="margin-left: auto">
<ConcurrentExecutionsHeader
v-if="settingsStore.isConcurrencyEnabled"
:running-executions-count="runningExecutionsCount"
v-if="showConcurrencyHeader"
:running-executions-count="concurrentTotal"
:concurrency-cap="settingsStore.concurrency"
:is-cloud-deployment="settingsStore.isCloudDeployment"
@go-to-upgrade="goToUpgrade"

View File

@@ -54,16 +54,11 @@ const executionListRef = ref<HTMLElement | null>(null);
const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow);
/**
* Calculate the number of executions counted towards the production executions concurrency limit.
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
*/
const runningExecutionsCount = computed(() => {
return props.executions.filter(
(execution) =>
execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode),
).length;
});
// In 'queue' mode concurrency control is applied per worker and returning a global count
// of concurrent executions would not be meaningful/helpful.
const showConcurrencyHeader = computed(
() => settingsStore.isConcurrencyEnabled && !settingsStore.isQueueModeEnabled,
);
watch(
() => route,
@@ -196,8 +191,8 @@ const goToUpgrade = () => {
</n8n-heading>
<ConcurrentExecutionsHeader
v-if="settingsStore.isConcurrencyEnabled"
:running-executions-count="runningExecutionsCount"
v-if="showConcurrencyHeader"
:running-executions-count="executionsStore.concurrentExecutionsCount"
:concurrency-cap="settingsStore.concurrency"
:is-cloud-deployment="settingsStore.isCloudDeployment"
@go-to-upgrade="goToUpgrade"

View File

@@ -52,6 +52,7 @@ export const useExecutionsStore = defineStore('executions', () => {
const executionsById = ref<Record<string, ExecutionSummaryWithScopes>>({});
const executionsCount = ref(0);
const executionsCountEstimated = ref(false);
const concurrentExecutionsCount = ref(0);
const executions = computed(() => {
const data = Object.values(executionsById.value);
@@ -176,6 +177,7 @@ export const useExecutionsStore = defineStore('executions', () => {
executionsCount.value = data.count;
executionsCountEstimated.value = data.estimated;
concurrentExecutionsCount.value = data.concurrentExecutionsCount;
return data;
} finally {
loading.value = false;
@@ -285,6 +287,7 @@ export const useExecutionsStore = defineStore('executions', () => {
currentExecutionsById.value = {};
executionsCount.value = 0;
executionsCountEstimated.value = false;
concurrentExecutionsCount.value = 0;
}
function reset() {
@@ -302,6 +305,7 @@ export const useExecutionsStore = defineStore('executions', () => {
executions,
executionsCount,
executionsCountEstimated,
concurrentExecutionsCount,
executionsByWorkflowId,
currentExecutions,
currentExecutionsByWorkflowId,

View File

@@ -27,8 +27,13 @@ const documentTitle = useDocumentTitle();
const toast = useToast();
const overview = useProjectPages();
const { executionsCount, executionsCountEstimated, filters, allExecutions } =
storeToRefs(executionsStore);
const {
executionsCount,
executionsCountEstimated,
concurrentExecutionsCount,
filters,
allExecutions,
} = storeToRefs(executionsStore);
onBeforeMount(async () => {
await loadWorkflows();
@@ -91,6 +96,7 @@ async function onExecutionStop() {
:filters="filters"
:total="executionsCount"
:estimated-total="executionsCountEstimated"
:concurrent-total="concurrentExecutionsCount"
@execution:stop="onExecutionStop"
@update:filters="onUpdateFilters"
>