diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index dc7d06c57b..2ac0bad081 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -31,6 +31,7 @@ describe('WaitTracker', () => { pushRef: 'push_ref', parentExecution: undefined, }), + startedAt: undefined, }); execution.workflowData = mock({ id: 'abcd' }); @@ -196,6 +197,7 @@ describe('WaitTracker', () => { workflowData: parentExecution.workflowData, projectId: project.id, pushRef: parentExecution.data.pushRef, + startedAt: parentExecution.startedAt, }, false, false, diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index d01995455c..ac25a57a6c 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -301,6 +301,177 @@ describe('enqueueExecution', () => { }); }); +describe('workflow timeout with startedAt', () => { + let mockSetTimeout: jest.SpyInstance; + let recordedTimeout: number | undefined = undefined; + + beforeAll(() => { + // Mock setTimeout globally to capture the timeout value + mockSetTimeout = jest.spyOn(global, 'setTimeout').mockImplementation((_fn, timeout) => { + // There can be multiple calls to setTimeout with 60000ms, these happen + // when accessing the database, we only capture the first one not equal to 60000ms + if (timeout !== 60000) { + recordedTimeout = timeout; // Capture the timeout value for assertions + } + return {} as NodeJS.Timeout; + }); + }); + + afterAll(() => { + // Restore the original setTimeout after tests + mockSetTimeout.mockRestore(); + }); + + it('should calculate timeout based on startedAt date when provided', async () => { + // ARRANGE + const activeExecutions = Container.get(ActiveExecutions); + jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); + jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce(); + const permissionChecker = Container.get(CredentialsPermissionChecker); + jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce(); + + const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution'); + + // Mock config to return a workflow timeout of 10 seconds + jest.spyOn(config, 'getEnv').mockReturnValue(10); + + const startedAt = new Date(Date.now() - 5000); // 5 seconds ago + const data = mock({ + workflowData: { + nodes: [], + settings: { executionTimeout: 10 }, // 10 seconds timeout + }, + executionData: undefined, + executionMode: 'webhook', + startedAt, + }); + + const mockHooks = mock(); + jest + .spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain') + .mockReturnValue(mockHooks); + + const mockAdditionalData = mock(); + jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData); + + const manualExecutionService = Container.get(ManualExecutionService); + jest.spyOn(manualExecutionService, 'runManually').mockReturnValue( + new PCancelable(() => { + return mock(); + }), + ); + + // ACT + await runner.run(data); + + // ASSERT + // The timeout should be adjusted: 10 seconds - 5 seconds elapsed = ~5 seconds remaining + expect(mockSetTimeout).toHaveBeenCalledWith(expect.any(Function), expect.any(Number)); + + // Should be approximately 5000ms (5 seconds remaining), allowing for timing differences + expect(recordedTimeout).toBeLessThan(6000); + expect(recordedTimeout).toBeGreaterThan(4000); + + recordedTimeout = undefined; // Reset for next test + + // Execution should not be stopped immediately + expect(mockStopExecution).not.toHaveBeenCalled(); + }); + + it('should stop execution immediately when timeout has already elapsed', async () => { + // ARRANGE + const activeExecutions = Container.get(ActiveExecutions); + jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); + jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce(); + const permissionChecker = Container.get(CredentialsPermissionChecker); + jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce(); + + const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution'); + + // Mock config to return a workflow timeout of 10 seconds + jest.spyOn(config, 'getEnv').mockReturnValue(10); + + const startedAt = new Date(Date.now() - 15000); // 15 seconds ago (timeout already elapsed) + const data = mock({ + workflowData: { + nodes: [], + settings: { executionTimeout: 10 }, // 10 seconds timeout + }, + executionData: undefined, + executionMode: 'webhook', + startedAt, + }); + + const mockHooks = mock(); + jest + .spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain') + .mockReturnValue(mockHooks); + + const mockAdditionalData = mock(); + jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData); + + const manualExecutionService = Container.get(ManualExecutionService); + jest.spyOn(manualExecutionService, 'runManually').mockReturnValue( + new PCancelable(() => { + return mock(); + }), + ); + + // ACT + await runner.run(data); + + // ASSERT + // The execution should be stopped immediately because the timeout has already elapsed + expect(mockStopExecution).toHaveBeenCalledWith('1'); + }); + + it('should use original timeout logic when startedAt is not provided', async () => { + // ARRANGE + const activeExecutions = Container.get(ActiveExecutions); + jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); + jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce(); + const permissionChecker = Container.get(CredentialsPermissionChecker); + jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce(); + + const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution'); + + // Mock config to return a workflow timeout of 10 seconds + jest.spyOn(config, 'getEnv').mockReturnValue(10); + + const data = mock({ + workflowData: { + nodes: [], + settings: { executionTimeout: 10 }, // 10 seconds timeout + }, + executionData: undefined, + executionMode: 'webhook', + // No startedAt provided + }); + + const mockHooks = mock(); + jest + .spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain') + .mockReturnValue(mockHooks); + + const mockAdditionalData = mock(); + jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData); + + const manualExecutionService = Container.get(ManualExecutionService); + jest.spyOn(manualExecutionService, 'runManually').mockReturnValue( + new PCancelable(() => { + return mock(); + }), + ); + + // ACT + await runner.run(data); + + // ASSERT + // The execution should not be stopped immediately (original timeout logic) + expect(mockStopExecution).not.toHaveBeenCalled(); + }); +}); + describe('streaming functionality', () => { it('should setup sendChunk handler when streaming is enabled and execution mode is not manual', async () => { // ARRANGE diff --git a/packages/cli/src/wait-tracker.ts b/packages/cli/src/wait-tracker.ts index 4f5e772a83..a93e958a0d 100644 --- a/packages/cli/src/wait-tracker.ts +++ b/packages/cli/src/wait-tracker.ts @@ -119,6 +119,7 @@ export class WaitTracker { workflowData: fullExecutionData.workflowData, projectId: project.id, pushRef: fullExecutionData.data.pushRef, + startedAt: fullExecutionData.startedAt, }; // Start the execution again diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index fefdaa3785..fa85c8149f 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -307,10 +307,21 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); if (workflowTimeout > 0) { - const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds - executionTimeout = setTimeout(() => { - void this.activeExecutions.stopExecution(executionId); - }, timeout); + let timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as milliseconds + if (data.startedAt && data.startedAt instanceof Date) { + // If startedAt is set, we calculate the timeout based on the startedAt time + // This is useful for executions that were waiting in a waiting state + // and we want to ensure the timeout is relative to when the execution started. + const now = Date.now(); + timeout = Math.max(timeout - (now - data.startedAt.getTime()), 0); + } + if (timeout === 0) { + this.activeExecutions.stopExecution(executionId); + } else { + executionTimeout = setTimeout(() => { + void this.activeExecutions.stopExecution(executionId); + }, timeout); + } } workflowExecution diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index a79e59b93d..46d9f38c87 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -2342,6 +2342,7 @@ export interface IWorkflowExecutionDataProcess { agentRequest?: AiAgentRequest; httpResponse?: express.Response; // Used for streaming responses streamingEnabled?: boolean; + startedAt?: Date; } export interface ExecuteWorkflowOptions {