diff --git a/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts b/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts index 2278d8df17..dcad01b86b 100644 --- a/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts +++ b/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts @@ -291,4 +291,18 @@ describe('ExecutionRepository', () => { ); }); }); + + describe('getConcurrentExecutionsCount', () => { + test('should count running executions with mode webhook or trigger', async () => { + const mockCount = 5; + entityManager.count.mockResolvedValueOnce(mockCount); + + const result = await executionRepository.getConcurrentExecutionsCount(); + + expect(entityManager.count).toHaveBeenCalledWith(ExecutionEntity, { + where: { status: 'running', mode: In(['webhook', 'trigger']) }, + }); + expect(result).toBe(mockCount); + }); + }); }); diff --git a/packages/@n8n/db/src/repositories/execution.repository.ts b/packages/@n8n/db/src/repositories/execution.repository.ts index b986b68842..9295350a72 100644 --- a/packages/@n8n/db/src/repositories/execution.repository.ts +++ b/packages/@n8n/db/src/repositories/execution.repository.ts @@ -1137,4 +1137,16 @@ export class ExecutionRepository extends Repository { return executions.map(({ id }) => id); } + + /** + * The number of executions that are running and count towards the concurrent executions limit. + * Concurrency control only applies to executions started from a webhook or trigger node. + */ + async getConcurrentExecutionsCount() { + const concurrentExecutionsCount = await this.count({ + where: { status: 'running', mode: In(['webhook', 'trigger']) }, + }); + + return concurrentExecutionsCount; + } } diff --git a/packages/cli/src/executions/__tests__/executions.controller.test.ts b/packages/cli/src/executions/__tests__/executions.controller.test.ts index bd8fd5f420..fb3a93a1a0 100644 --- a/packages/cli/src/executions/__tests__/executions.controller.test.ts +++ b/packages/cli/src/executions/__tests__/executions.controller.test.ts @@ -32,7 +32,12 @@ describe('ExecutionsController', () => { }); describe('getMany', () => { - const NO_EXECUTIONS = { count: 0, estimated: false, results: [] }; + const NO_EXECUTIONS = { + count: 0, + estimated: false, + results: [], + concurrentExecutionsCount: -1, + }; const QUERIES_WITH_EITHER_STATUS_OR_RANGE: ExecutionSummaries.RangeQuery[] = [ { @@ -91,6 +96,7 @@ describe('ExecutionsController', () => { expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled(); expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery); + expect(executionService.getConcurrentExecutionsCount).toHaveBeenCalled(); }, ); }); @@ -108,6 +114,7 @@ describe('ExecutionsController', () => { expect(executionService.findLatestCurrentAndCompleted).toHaveBeenCalled(); expect(executionService.findRangeWithCount).not.toHaveBeenCalled(); + expect(executionService.getConcurrentExecutionsCount).toHaveBeenCalled(); }, ); }); @@ -130,6 +137,7 @@ describe('ExecutionsController', () => { expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled(); expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery); + expect(executionService.getConcurrentExecutionsCount).toHaveBeenCalled(); }); }); }); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index fd72d0470f..ca4cb9ee9d 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -1,16 +1,16 @@ import { Logger } from '@n8n/backend-common'; import { GlobalConfig } from '@n8n/config'; import type { - User, CreateExecutionPayload, ExecutionSummaries, IExecutionResponse, IGetExecutionsQueryFilter, + User, } from '@n8n/db'; import { + AnnotationTagMappingRepository, ExecutionAnnotationRepository, ExecutionRepository, - AnnotationTagMappingRepository, WorkflowRepository, } from '@n8n/db'; import { Service } from '@n8n/di'; @@ -21,8 +21,8 @@ import type { INode, IRunExecutionData, IWorkflowBase, - WorkflowExecuteMode, IWorkflowExecutionDataProcess, + WorkflowExecuteMode, } from 'n8n-workflow'; import { ExecutionStatusList, @@ -372,22 +372,11 @@ export class ExecutionService { async findRangeWithCount(query: ExecutionSummaries.RangeQuery) { const results = await this.executionRepository.findManyByRangeQuery(query); - if (this.globalConfig.database.type === 'postgresdb') { - const liveRows = await this.executionRepository.getLiveExecutionRowsOnPostgres(); - - if (liveRows === -1) return { count: -1, estimated: false, results }; - - if (liveRows > 100_000) { - // likely too high to fetch exact count fast - return { count: liveRows, estimated: true, results }; - } - } - const { range: _, ...countQuery } = query; - const count = await this.executionRepository.fetchCount({ ...countQuery, kind: 'count' }); + const executionCount = await this.getExecutionsCountForQuery({ ...countQuery, kind: 'count' }); - return { results, count, estimated: false }; + return { results, ...executionCount }; } /** @@ -405,26 +394,82 @@ export class ExecutionService { const completedStatuses = ExecutionStatusList.filter((s) => !currentStatuses.includes(s)); - const [current, completed] = await Promise.all([ - this.findRangeWithCount({ - ...query, - status: currentStatuses, - order: { top: 'running' }, // ensure limit cannot exclude running - }), - this.findRangeWithCount({ - ...query, - status: completedStatuses, - order: { startedAt: 'DESC' }, - }), + const completedQuery: ExecutionSummaries.RangeQuery = { + ...query, + status: completedStatuses, + order: { startedAt: 'DESC' }, + }; + const { range: _, ...countQuery } = completedQuery; + + const currentQuery: ExecutionSummaries.RangeQuery = { + ...query, + status: currentStatuses, + order: { top: 'running' }, // ensure limit cannot exclude running + }; + + const [current, completed, completedCount] = await Promise.all([ + this.executionRepository.findManyByRangeQuery(currentQuery), + this.executionRepository.findManyByRangeQuery(completedQuery), + this.getExecutionsCountForQuery({ ...countQuery, kind: 'count' }), ]); return { - results: current.results.concat(completed.results), - count: completed.count, // exclude current from count for pagination - estimated: completed.estimated, + results: current.concat(completed), + count: completedCount.count, // exclude current from count for pagination + estimated: completedCount.estimated, }; } + /** + * @returns + * - the number of concurrent executions + * - `-1` if the count is not applicable (e.g. in 'queue' mode or if concurrency control is disabled) + * + * In 'queue' mode, concurrency control is applied per worker, so returning a global count of concurrent executions + * would not be meaningful or helpful. + */ + async getConcurrentExecutionsCount() { + if (!this.isConcurrentExecutionsCountSupported()) { + return -1; + } + + return await this.executionRepository.getConcurrentExecutionsCount(); + } + + private isConcurrentExecutionsCountSupported(): boolean { + const isConcurrencyEnabled = this.globalConfig.executions.concurrency.productionLimit !== -1; + const isInRegularMode = config.getEnv('executions.mode') === 'regular'; + + if (!isConcurrencyEnabled || !isInRegularMode) { + return false; + } + + return true; + } + + /** + * @param countQuery the query to count executions + * @returns + * - the count of executions that satisfy the query + * - whether the count is an estimate or not + */ + private async getExecutionsCountForQuery(countQuery: ExecutionSummaries.CountQuery) { + if (this.globalConfig.database.type === 'postgresdb') { + const liveRows = await this.executionRepository.getLiveExecutionRowsOnPostgres(); + + if (liveRows === -1) return { count: -1, estimated: false }; + + if (liveRows > 100_000) { + // likely too high to fetch exact count fast + return { count: liveRows, estimated: true }; + } + } + + const count = await this.executionRepository.fetchCount(countQuery); + + return { count, estimated: false }; + } + async findAllEnqueuedExecutions() { return await this.executionRepository.findMultipleExecutions( { diff --git a/packages/cli/src/executions/executions.controller.ts b/packages/cli/src/executions/executions.controller.ts index 382fcc1718..d4444b669e 100644 --- a/packages/cli/src/executions/executions.controller.ts +++ b/packages/cli/src/executions/executions.controller.ts @@ -59,20 +59,32 @@ export class ExecutionsController { const noRange = !query.range.lastId || !query.range.firstId; if (noStatus && noRange) { - const executions = await this.executionService.findLatestCurrentAndCompleted(query); + const [executions, concurrentExecutionsCount] = await Promise.all([ + this.executionService.findLatestCurrentAndCompleted(query), + this.executionService.getConcurrentExecutionsCount(), + ]); await this.executionService.addScopes( req.user, executions.results as ExecutionSummaries.ExecutionSummaryWithScopes[], ); - return executions; + return { + ...executions, + concurrentExecutionsCount, + }; } - const executions = await this.executionService.findRangeWithCount(query); + const [executions, concurrentExecutionsCount] = await Promise.all([ + this.executionService.findRangeWithCount(query), + this.executionService.getConcurrentExecutionsCount(), + ]); await this.executionService.addScopes( req.user, executions.results as ExecutionSummaries.ExecutionSummaryWithScopes[], ); - return executions; + return { + ...executions, + concurrentExecutionsCount, + }; } @Get('/:id') diff --git a/packages/cli/test/integration/execution.service.integration.test.ts b/packages/cli/test/integration/execution.service.integration.test.ts index 5ccbe9a4a3..4339713a3b 100644 --- a/packages/cli/test/integration/execution.service.integration.test.ts +++ b/packages/cli/test/integration/execution.service.integration.test.ts @@ -1,9 +1,11 @@ import { createTeamProject, createWorkflow, testDb } from '@n8n/backend-test-utils'; +import { GlobalConfig } from '@n8n/config'; import type { ExecutionSummaries } from '@n8n/db'; import { ExecutionMetadataRepository, ExecutionRepository, WorkflowRepository } from '@n8n/db'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; +import config from '@/config'; import { ExecutionService } from '@/executions/execution.service'; import { annotateExecution, createAnnotationTags, createExecution } from './shared/db/executions'; @@ -11,6 +13,7 @@ import { annotateExecution, createAnnotationTags, createExecution } from './shar describe('ExecutionService', () => { let executionService: ExecutionService; let executionRepository: ExecutionRepository; + const globalConfig = Container.get(GlobalConfig); beforeAll(async () => { await testDb.init(); @@ -18,7 +21,7 @@ describe('ExecutionService', () => { executionRepository = Container.get(ExecutionRepository); executionService = new ExecutionService( - mock(), + globalConfig, mock(), mock(), mock(), @@ -34,6 +37,11 @@ describe('ExecutionService', () => { ); }); + beforeEach(() => { + globalConfig.executions.concurrency.productionLimit = -1; + config.set('executions.mode', 'regular'); + }); + afterEach(async () => { await testDb.truncate(['ExecutionEntity']); }); @@ -504,6 +512,67 @@ describe('ExecutionService', () => { }); }); + describe('getConcurrentExecutionsCount', () => { + test('should return concurrentExecutionsCount when concurrency is enabled', async () => { + globalConfig.executions.concurrency.productionLimit = 4; + + const workflow = await createWorkflow(); + const concurrentExecutionsData = await Promise.all([ + createExecution({ status: 'running', mode: 'webhook' }, workflow), + createExecution({ status: 'running', mode: 'trigger' }, workflow), + ]); + + await Promise.all([ + createExecution({ status: 'success' }, workflow), + createExecution({ status: 'crashed' }, workflow), + createExecution({ status: 'new' }, workflow), + createExecution({ status: 'running', mode: 'manual' }, workflow), + ]); + + const output = await executionService.getConcurrentExecutionsCount(); + expect(output).toEqual(concurrentExecutionsData.length); + }); + + test('should set concurrentExecutionsCount to -1 when concurrency is disabled', async () => { + globalConfig.executions.concurrency.productionLimit = -1; + + const workflow = await createWorkflow(); + + await Promise.all([ + createExecution({ status: 'running', mode: 'webhook' }, workflow), + createExecution({ status: 'running', mode: 'trigger' }, workflow), + createExecution({ status: 'success' }, workflow), + createExecution({ status: 'crashed' }, workflow), + createExecution({ status: 'new' }, workflow), + createExecution({ status: 'running', mode: 'manual' }, workflow), + ]); + + const output = await executionService.getConcurrentExecutionsCount(); + + expect(output).toEqual(-1); + }); + + test('should set concurrentExecutionsCount to -1 in queue mode', async () => { + config.set('executions.mode', 'queue'); + globalConfig.executions.concurrency.productionLimit = 4; + + const workflow = await createWorkflow(); + + await Promise.all([ + createExecution({ status: 'running', mode: 'webhook' }, workflow), + createExecution({ status: 'running', mode: 'trigger' }, workflow), + createExecution({ status: 'success' }, workflow), + createExecution({ status: 'crashed' }, workflow), + createExecution({ status: 'new' }, workflow), + createExecution({ status: 'running', mode: 'manual' }, workflow), + ]); + + const output = await executionService.getConcurrentExecutionsCount(); + + expect(output).toEqual(-1); + }); + }); + describe('findLatestCurrentAndCompleted', () => { test('should return latest current and completed executions', async () => { const workflow = await createWorkflow(); diff --git a/packages/frontend/editor-ui/src/Interface.ts b/packages/frontend/editor-ui/src/Interface.ts index aff685ac23..b4d759b419 100644 --- a/packages/frontend/editor-ui/src/Interface.ts +++ b/packages/frontend/editor-ui/src/Interface.ts @@ -495,6 +495,7 @@ export interface IExecutionsListResponse { count: number; results: ExecutionSummaryWithScopes[]; estimated: boolean; + concurrentExecutionsCount: number; } export interface IExecutionsCurrentSummaryExtended { diff --git a/packages/frontend/editor-ui/src/components/executions/global/GlobalExecutionsList.vue b/packages/frontend/editor-ui/src/components/executions/global/GlobalExecutionsList.vue index 22b72d4223..850366ee20 100644 --- a/packages/frontend/editor-ui/src/components/executions/global/GlobalExecutionsList.vue +++ b/packages/frontend/editor-ui/src/components/executions/global/GlobalExecutionsList.vue @@ -27,10 +27,12 @@ const props = withDefaults( executions: ExecutionSummaryWithScopes[]; filters: ExecutionFilterType; total?: number; + concurrentTotal?: number; estimated?: boolean; }>(), { total: 0, + concurrentTotal: 0, estimated: false, }, ); @@ -76,16 +78,11 @@ const isAnnotationEnabled = computed( () => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters], ); -/** - * Calculate the number of executions counted towards the production executions concurrency limit. - * Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI. - */ -const runningExecutionsCount = computed(() => { - return props.executions.filter( - (execution) => - execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode), - ).length; -}); +// In 'queue' mode concurrency control is applied per worker and returning a global count +// of concurrent executions would not be meaningful/helpful. +const showConcurrencyHeader = computed( + () => settingsStore.isConcurrencyEnabled && !settingsStore.isQueueModeEnabled, +); watch( () => props.executions, @@ -338,8 +335,8 @@ const goToUpgrade = () => {
(null); const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow); -/** - * Calculate the number of executions counted towards the production executions concurrency limit. - * Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI. - */ -const runningExecutionsCount = computed(() => { - return props.executions.filter( - (execution) => - execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode), - ).length; -}); +// In 'queue' mode concurrency control is applied per worker and returning a global count +// of concurrent executions would not be meaningful/helpful. +const showConcurrencyHeader = computed( + () => settingsStore.isConcurrencyEnabled && !settingsStore.isQueueModeEnabled, +); watch( () => route, @@ -196,8 +191,8 @@ const goToUpgrade = () => { { const executionsById = ref>({}); const executionsCount = ref(0); const executionsCountEstimated = ref(false); + const concurrentExecutionsCount = ref(0); const executions = computed(() => { const data = Object.values(executionsById.value); @@ -176,6 +177,7 @@ export const useExecutionsStore = defineStore('executions', () => { executionsCount.value = data.count; executionsCountEstimated.value = data.estimated; + concurrentExecutionsCount.value = data.concurrentExecutionsCount; return data; } finally { loading.value = false; @@ -285,6 +287,7 @@ export const useExecutionsStore = defineStore('executions', () => { currentExecutionsById.value = {}; executionsCount.value = 0; executionsCountEstimated.value = false; + concurrentExecutionsCount.value = 0; } function reset() { @@ -302,6 +305,7 @@ export const useExecutionsStore = defineStore('executions', () => { executions, executionsCount, executionsCountEstimated, + concurrentExecutionsCount, executionsByWorkflowId, currentExecutions, currentExecutionsByWorkflowId, diff --git a/packages/frontend/editor-ui/src/views/ExecutionsView.vue b/packages/frontend/editor-ui/src/views/ExecutionsView.vue index fee3b11a45..a5a8cf2b0b 100644 --- a/packages/frontend/editor-ui/src/views/ExecutionsView.vue +++ b/packages/frontend/editor-ui/src/views/ExecutionsView.vue @@ -27,8 +27,13 @@ const documentTitle = useDocumentTitle(); const toast = useToast(); const overview = useProjectPages(); -const { executionsCount, executionsCountEstimated, filters, allExecutions } = - storeToRefs(executionsStore); +const { + executionsCount, + executionsCountEstimated, + concurrentExecutionsCount, + filters, + allExecutions, +} = storeToRefs(executionsStore); onBeforeMount(async () => { await loadWorkflows(); @@ -91,6 +96,7 @@ async function onExecutionStop() { :filters="filters" :total="executionsCount" :estimated-total="executionsCountEstimated" + :concurrent-total="concurrentExecutionsCount" @execution:stop="onExecutionStop" @update:filters="onUpdateFilters" >