mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 10:02:05 +00:00
chore(core): Calculate workflow timeout based on startedAt date of execution (#17137)
This commit is contained in:
@@ -31,6 +31,7 @@ describe('WaitTracker', () => {
|
|||||||
pushRef: 'push_ref',
|
pushRef: 'push_ref',
|
||||||
parentExecution: undefined,
|
parentExecution: undefined,
|
||||||
}),
|
}),
|
||||||
|
startedAt: undefined,
|
||||||
});
|
});
|
||||||
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
|
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
|
||||||
|
|
||||||
@@ -196,6 +197,7 @@ describe('WaitTracker', () => {
|
|||||||
workflowData: parentExecution.workflowData,
|
workflowData: parentExecution.workflowData,
|
||||||
projectId: project.id,
|
projectId: project.id,
|
||||||
pushRef: parentExecution.data.pushRef,
|
pushRef: parentExecution.data.pushRef,
|
||||||
|
startedAt: parentExecution.startedAt,
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
|
|||||||
@@ -301,6 +301,177 @@ describe('enqueueExecution', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('workflow timeout with startedAt', () => {
|
||||||
|
let mockSetTimeout: jest.SpyInstance;
|
||||||
|
let recordedTimeout: number | undefined = undefined;
|
||||||
|
|
||||||
|
beforeAll(() => {
|
||||||
|
// Mock setTimeout globally to capture the timeout value
|
||||||
|
mockSetTimeout = jest.spyOn(global, 'setTimeout').mockImplementation((_fn, timeout) => {
|
||||||
|
// There can be multiple calls to setTimeout with 60000ms, these happen
|
||||||
|
// when accessing the database, we only capture the first one not equal to 60000ms
|
||||||
|
if (timeout !== 60000) {
|
||||||
|
recordedTimeout = timeout; // Capture the timeout value for assertions
|
||||||
|
}
|
||||||
|
return {} as NodeJS.Timeout;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(() => {
|
||||||
|
// Restore the original setTimeout after tests
|
||||||
|
mockSetTimeout.mockRestore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should calculate timeout based on startedAt date when provided', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const activeExecutions = Container.get(ActiveExecutions);
|
||||||
|
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
|
||||||
|
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
|
||||||
|
const permissionChecker = Container.get(CredentialsPermissionChecker);
|
||||||
|
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
|
||||||
|
|
||||||
|
const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution');
|
||||||
|
|
||||||
|
// Mock config to return a workflow timeout of 10 seconds
|
||||||
|
jest.spyOn(config, 'getEnv').mockReturnValue(10);
|
||||||
|
|
||||||
|
const startedAt = new Date(Date.now() - 5000); // 5 seconds ago
|
||||||
|
const data = mock<IWorkflowExecutionDataProcess>({
|
||||||
|
workflowData: {
|
||||||
|
nodes: [],
|
||||||
|
settings: { executionTimeout: 10 }, // 10 seconds timeout
|
||||||
|
},
|
||||||
|
executionData: undefined,
|
||||||
|
executionMode: 'webhook',
|
||||||
|
startedAt,
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockHooks = mock<core.ExecutionLifecycleHooks>();
|
||||||
|
jest
|
||||||
|
.spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain')
|
||||||
|
.mockReturnValue(mockHooks);
|
||||||
|
|
||||||
|
const mockAdditionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||||
|
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData);
|
||||||
|
|
||||||
|
const manualExecutionService = Container.get(ManualExecutionService);
|
||||||
|
jest.spyOn(manualExecutionService, 'runManually').mockReturnValue(
|
||||||
|
new PCancelable(() => {
|
||||||
|
return mock<IRun>();
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await runner.run(data);
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
// The timeout should be adjusted: 10 seconds - 5 seconds elapsed = ~5 seconds remaining
|
||||||
|
expect(mockSetTimeout).toHaveBeenCalledWith(expect.any(Function), expect.any(Number));
|
||||||
|
|
||||||
|
// Should be approximately 5000ms (5 seconds remaining), allowing for timing differences
|
||||||
|
expect(recordedTimeout).toBeLessThan(6000);
|
||||||
|
expect(recordedTimeout).toBeGreaterThan(4000);
|
||||||
|
|
||||||
|
recordedTimeout = undefined; // Reset for next test
|
||||||
|
|
||||||
|
// Execution should not be stopped immediately
|
||||||
|
expect(mockStopExecution).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should stop execution immediately when timeout has already elapsed', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const activeExecutions = Container.get(ActiveExecutions);
|
||||||
|
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
|
||||||
|
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
|
||||||
|
const permissionChecker = Container.get(CredentialsPermissionChecker);
|
||||||
|
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
|
||||||
|
|
||||||
|
const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution');
|
||||||
|
|
||||||
|
// Mock config to return a workflow timeout of 10 seconds
|
||||||
|
jest.spyOn(config, 'getEnv').mockReturnValue(10);
|
||||||
|
|
||||||
|
const startedAt = new Date(Date.now() - 15000); // 15 seconds ago (timeout already elapsed)
|
||||||
|
const data = mock<IWorkflowExecutionDataProcess>({
|
||||||
|
workflowData: {
|
||||||
|
nodes: [],
|
||||||
|
settings: { executionTimeout: 10 }, // 10 seconds timeout
|
||||||
|
},
|
||||||
|
executionData: undefined,
|
||||||
|
executionMode: 'webhook',
|
||||||
|
startedAt,
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockHooks = mock<core.ExecutionLifecycleHooks>();
|
||||||
|
jest
|
||||||
|
.spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain')
|
||||||
|
.mockReturnValue(mockHooks);
|
||||||
|
|
||||||
|
const mockAdditionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||||
|
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData);
|
||||||
|
|
||||||
|
const manualExecutionService = Container.get(ManualExecutionService);
|
||||||
|
jest.spyOn(manualExecutionService, 'runManually').mockReturnValue(
|
||||||
|
new PCancelable(() => {
|
||||||
|
return mock<IRun>();
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await runner.run(data);
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
// The execution should be stopped immediately because the timeout has already elapsed
|
||||||
|
expect(mockStopExecution).toHaveBeenCalledWith('1');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use original timeout logic when startedAt is not provided', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const activeExecutions = Container.get(ActiveExecutions);
|
||||||
|
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
|
||||||
|
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
|
||||||
|
const permissionChecker = Container.get(CredentialsPermissionChecker);
|
||||||
|
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
|
||||||
|
|
||||||
|
const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution');
|
||||||
|
|
||||||
|
// Mock config to return a workflow timeout of 10 seconds
|
||||||
|
jest.spyOn(config, 'getEnv').mockReturnValue(10);
|
||||||
|
|
||||||
|
const data = mock<IWorkflowExecutionDataProcess>({
|
||||||
|
workflowData: {
|
||||||
|
nodes: [],
|
||||||
|
settings: { executionTimeout: 10 }, // 10 seconds timeout
|
||||||
|
},
|
||||||
|
executionData: undefined,
|
||||||
|
executionMode: 'webhook',
|
||||||
|
// No startedAt provided
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockHooks = mock<core.ExecutionLifecycleHooks>();
|
||||||
|
jest
|
||||||
|
.spyOn(ExecutionLifecycleHooks, 'getLifecycleHooksForRegularMain')
|
||||||
|
.mockReturnValue(mockHooks);
|
||||||
|
|
||||||
|
const mockAdditionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||||
|
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(mockAdditionalData);
|
||||||
|
|
||||||
|
const manualExecutionService = Container.get(ManualExecutionService);
|
||||||
|
jest.spyOn(manualExecutionService, 'runManually').mockReturnValue(
|
||||||
|
new PCancelable(() => {
|
||||||
|
return mock<IRun>();
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await runner.run(data);
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
// The execution should not be stopped immediately (original timeout logic)
|
||||||
|
expect(mockStopExecution).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('streaming functionality', () => {
|
describe('streaming functionality', () => {
|
||||||
it('should setup sendChunk handler when streaming is enabled and execution mode is not manual', async () => {
|
it('should setup sendChunk handler when streaming is enabled and execution mode is not manual', async () => {
|
||||||
// ARRANGE
|
// ARRANGE
|
||||||
|
|||||||
@@ -119,6 +119,7 @@ export class WaitTracker {
|
|||||||
workflowData: fullExecutionData.workflowData,
|
workflowData: fullExecutionData.workflowData,
|
||||||
projectId: project.id,
|
projectId: project.id,
|
||||||
pushRef: fullExecutionData.data.pushRef,
|
pushRef: fullExecutionData.data.pushRef,
|
||||||
|
startedAt: fullExecutionData.startedAt,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start the execution again
|
// Start the execution again
|
||||||
|
|||||||
@@ -307,11 +307,22 @@ export class WorkflowRunner {
|
|||||||
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
|
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
|
||||||
|
|
||||||
if (workflowTimeout > 0) {
|
if (workflowTimeout > 0) {
|
||||||
const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds
|
let timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as milliseconds
|
||||||
|
if (data.startedAt && data.startedAt instanceof Date) {
|
||||||
|
// If startedAt is set, we calculate the timeout based on the startedAt time
|
||||||
|
// This is useful for executions that were waiting in a waiting state
|
||||||
|
// and we want to ensure the timeout is relative to when the execution started.
|
||||||
|
const now = Date.now();
|
||||||
|
timeout = Math.max(timeout - (now - data.startedAt.getTime()), 0);
|
||||||
|
}
|
||||||
|
if (timeout === 0) {
|
||||||
|
this.activeExecutions.stopExecution(executionId);
|
||||||
|
} else {
|
||||||
executionTimeout = setTimeout(() => {
|
executionTimeout = setTimeout(() => {
|
||||||
void this.activeExecutions.stopExecution(executionId);
|
void this.activeExecutions.stopExecution(executionId);
|
||||||
}, timeout);
|
}, timeout);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
workflowExecution
|
workflowExecution
|
||||||
.then((fullRunData) => {
|
.then((fullRunData) => {
|
||||||
|
|||||||
@@ -2342,6 +2342,7 @@ export interface IWorkflowExecutionDataProcess {
|
|||||||
agentRequest?: AiAgentRequest;
|
agentRequest?: AiAgentRequest;
|
||||||
httpResponse?: express.Response; // Used for streaming responses
|
httpResponse?: express.Response; // Used for streaming responses
|
||||||
streamingEnabled?: boolean;
|
streamingEnabled?: boolean;
|
||||||
|
startedAt?: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ExecuteWorkflowOptions {
|
export interface ExecuteWorkflowOptions {
|
||||||
|
|||||||
Reference in New Issue
Block a user