From a495d81c13e9a2721d4900db94d4a9765b0795c2 Mon Sep 17 00:00:00 2001 From: Alex Grozav Date: Wed, 9 Apr 2025 12:57:12 +0300 Subject: [PATCH] fix(editor): Fix race condition for updating node and workflow execution status (#14353) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- .../src/components/NodeExecuteButton.test.ts | 2 +- .../src/composables/useCanvasNode.ts | 2 +- .../src/composables/usePushConnection.test.ts | 17 ++++---- .../src/composables/usePushConnection.ts | 15 +++++-- .../src/composables/useRunWorkflow.test.ts | 40 ++++++++++++------- .../src/composables/useRunWorkflow.ts | 7 +++- .../editor-ui/src/stores/assistant.store.ts | 4 +- .../editor-ui/src/stores/workflows.store.ts | 14 +++++-- 8 files changed, 66 insertions(+), 35 deletions(-) diff --git a/packages/frontend/editor-ui/src/components/NodeExecuteButton.test.ts b/packages/frontend/editor-ui/src/components/NodeExecuteButton.test.ts index 5a434b23e3..4632dac484 100644 --- a/packages/frontend/editor-ui/src/components/NodeExecuteButton.test.ts +++ b/packages/frontend/editor-ui/src/components/NodeExecuteButton.test.ts @@ -279,7 +279,7 @@ describe('NodeExecuteButton', () => { it('stops execution when clicking button while workflow is running', async () => { uiStore.isActionActive.workflowRunning = true; nodeTypesStore.isTriggerNode = () => true; - workflowsStore.activeExecutionId = 'test-execution-id'; + workflowsStore.setActiveExecutionId('test-execution-id'); workflowsStore.isNodeExecuting.mockReturnValue(true); workflowsStore.getNodeByName.mockReturnValue( mockNode({ name: 'test-node', type: SET_NODE_TYPE }), diff --git a/packages/frontend/editor-ui/src/composables/useCanvasNode.ts b/packages/frontend/editor-ui/src/composables/useCanvasNode.ts index 878ecc4d0f..6b051992fb 100644 --- a/packages/frontend/editor-ui/src/composables/useCanvasNode.ts +++ b/packages/frontend/editor-ui/src/composables/useCanvasNode.ts @@ -59,7 +59,7 @@ export function useCanvasNode() { const executionStatus = computed(() => data.value.execution.status); const executionWaiting = computed(() => data.value.execution.waiting); const executionRunning = computed(() => data.value.execution.running); - const executionRunningThrottled = refThrottled(executionRunning, 50); + const executionRunningThrottled = refThrottled(executionRunning, 300); const runDataOutputMap = computed(() => data.value.runData.outputMap); const runDataIterations = computed(() => data.value.runData.iterations); diff --git a/packages/frontend/editor-ui/src/composables/usePushConnection.test.ts b/packages/frontend/editor-ui/src/composables/usePushConnection.test.ts index 76b9c0ee86..1964281259 100644 --- a/packages/frontend/editor-ui/src/composables/usePushConnection.test.ts +++ b/packages/frontend/editor-ui/src/composables/usePushConnection.test.ts @@ -140,7 +140,7 @@ describe('usePushConnection()', () => { const workflowId = 'abc'; beforeEach(() => { - workflowsStore.activeExecutionId = executionId; + workflowsStore.setActiveExecutionId(executionId); uiStore.isActionActive.workflowRunning = true; }); @@ -239,13 +239,16 @@ describe('usePushConnection()', () => { it("enqueues messages if we don't have the active execution id yet", async () => { uiStore.isActionActive.workflowRunning = true; const event: PushMessage = { - type: 'executionStarted', + type: 'nodeExecuteAfter', data: { executionId: '1', - mode: 'manual', - startedAt: new Date(), - workflowId: '1', - flattedRunData: stringify({}), + nodeName: 'Node', + data: { + executionIndex: 0, + startTime: 0, + executionTime: 0, + source: [], + }, }, }; @@ -281,7 +284,7 @@ describe('usePushConnection()', () => { workflowId: '1', }, }; - workflowsStore.activeExecutionId = event.data.executionId; + workflowsStore.setActiveExecutionId(event.data.executionId); // ACT const result = await pushConnection.pushMessageReceived(event); diff --git a/packages/frontend/editor-ui/src/composables/usePushConnection.ts b/packages/frontend/editor-ui/src/composables/usePushConnection.ts index 86ef569500..04c265fcaf 100644 --- a/packages/frontend/editor-ui/src/composables/usePushConnection.ts +++ b/packages/frontend/editor-ui/src/composables/usePushConnection.ts @@ -151,6 +151,12 @@ export function usePushConnection({ router }: { router: ReturnType ({ - useWorkflowsStore: vi.fn().mockReturnValue({ +vi.mock('@/stores/workflows.store', async () => { + const storeState: Partial> & { + activeExecutionId: string | null; + } = { allNodes: [], runWorkflow: vi.fn(), subWorkflowExecutionError: null, getWorkflowRunData: null, + workflowExecutionData: null, setWorkflowExecutionData: vi.fn(), activeExecutionId: null, + previousExecutionId: null, nodesIssuesExist: false, executionWaitingForWebhook: false, getCurrentWorkflow: vi.fn().mockReturnValue({ id: '123' }), getNodeByName: vi.fn(), getExecution: vi.fn(), - nodeIssuesExit: vi.fn(), checkIfNodeHasChatParent: vi.fn(), getParametersLastUpdate: vi.fn(), getPinnedDataLastUpdate: vi.fn(), @@ -47,8 +49,15 @@ vi.mock('@/stores/workflows.store', () => ({ incomingConnectionsByNodeName: vi.fn(), outgoingConnectionsByNodeName: vi.fn(), markExecutionAsStopped: vi.fn(), - }), -})); + setActiveExecutionId: vi.fn((id: string | null) => { + storeState.activeExecutionId = id; + }), + }; + + return { + useWorkflowsStore: vi.fn().mockReturnValue(storeState), + }; +}); vi.mock('@/stores/pushConnection.store', () => ({ usePushConnectionStore: vi.fn().mockReturnValue({ @@ -151,6 +160,7 @@ describe('useRunWorkflow({ router })', () => { const mockResponse = { executionId: '123', waitingForWebhook: false }; vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse); + vi.mocked(workflowsStore).setActiveExecutionId('123'); const response = await runWorkflowApi({} as IStartRunData); @@ -692,7 +702,7 @@ describe('useRunWorkflow({ router })', () => { workflowsStore.workflowExecutionData = executionData; workflowsStore.activeWorkflows = ['test-wf-id']; - workflowsStore.activeExecutionId = 'test-exec-id'; + workflowsStore.setActiveExecutionId('test-exec-id'); // Exercise - don't wait for returned promise to resolve void runWorkflowComposable.stopCurrentExecution(); diff --git a/packages/frontend/editor-ui/src/composables/useRunWorkflow.ts b/packages/frontend/editor-ui/src/composables/useRunWorkflow.ts index e675949ee7..238954c3b9 100644 --- a/packages/frontend/editor-ui/src/composables/useRunWorkflow.ts +++ b/packages/frontend/editor-ui/src/composables/useRunWorkflow.ts @@ -78,8 +78,11 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType { const chatSessionCredType = ref(); const chatSessionError = ref(); const currentSessionId = ref(); - const currentSessionActiveExecutionId = ref(); + const currentSessionActiveExecutionId = ref(null); const currentSessionWorkflowId = ref(); const lastUnread = ref(); const nodeExecutionStatus = ref('not_executed'); @@ -125,7 +125,7 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => { currentSessionId.value = undefined; chatSessionError.value = undefined; lastUnread.value = undefined; - currentSessionActiveExecutionId.value = undefined; + currentSessionActiveExecutionId.value = null; suggestions.value = {}; nodeExecutionStatus.value = 'not_executed'; chatSessionCredType.value = undefined; diff --git a/packages/frontend/editor-ui/src/stores/workflows.store.ts b/packages/frontend/editor-ui/src/stores/workflows.store.ts index 939ab921c7..407cae2b95 100644 --- a/packages/frontend/editor-ui/src/stores/workflows.store.ts +++ b/packages/frontend/editor-ui/src/stores/workflows.store.ts @@ -139,6 +139,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { const workflowExecutionData = ref(null); const workflowExecutionPairedItemMappings = ref>>({}); const activeExecutionId = ref(null); + const previousExecutionId = ref(null); const subWorkflowExecutionError = ref(null); const executionWaitingForWebhook = ref(false); const workflowsById = ref>({}); @@ -289,6 +290,11 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { Workflow.getConnectionsByDestination(workflow.value.connections), ); + function setActiveExecutionId(id: string | null) { + previousExecutionId.value = activeExecutionId.value; + activeExecutionId.value = id; + } + function getWorkflowResultDataByNodeName(nodeName: string): ITaskData[] | null { if (getWorkflowRunData.value === null) { return null; @@ -615,7 +621,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { setWorkflowSettings({ ...defaults.settings }); setWorkflowTagIds([]); - activeExecutionId.value = null; + setActiveExecutionId(null); executingNode.value.length = 0; executionWaitingForWebhook.value = false; } @@ -1687,7 +1693,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { } function markExecutionAsStopped() { - activeExecutionId.value = null; + setActiveExecutionId(null); clearNodeExecutionQueue(); executionWaitingForWebhook.value = false; uiStore.removeActiveAction('workflowRunning'); @@ -1711,7 +1717,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { currentWorkflowExecutions, workflowExecutionData, workflowExecutionPairedItemMappings, - activeExecutionId, + activeExecutionId: computed(() => activeExecutionId.value), + previousExecutionId: computed(() => previousExecutionId.value), + setActiveExecutionId, subWorkflowExecutionError, executionWaitingForWebhook, executingNode,