fix(editor): Fix race condition for updating node and workflow execution status (#14353)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Alex Grozav
2025-04-09 12:57:12 +03:00
committed by GitHub
parent 84e85c9469
commit a495d81c13
8 changed files with 66 additions and 35 deletions

View File

@@ -279,7 +279,7 @@ describe('NodeExecuteButton', () => {
it('stops execution when clicking button while workflow is running', async () => {
uiStore.isActionActive.workflowRunning = true;
nodeTypesStore.isTriggerNode = () => true;
workflowsStore.activeExecutionId = 'test-execution-id';
workflowsStore.setActiveExecutionId('test-execution-id');
workflowsStore.isNodeExecuting.mockReturnValue(true);
workflowsStore.getNodeByName.mockReturnValue(
mockNode({ name: 'test-node', type: SET_NODE_TYPE }),

View File

@@ -59,7 +59,7 @@ export function useCanvasNode() {
const executionStatus = computed(() => data.value.execution.status);
const executionWaiting = computed(() => data.value.execution.waiting);
const executionRunning = computed(() => data.value.execution.running);
const executionRunningThrottled = refThrottled(executionRunning, 50);
const executionRunningThrottled = refThrottled(executionRunning, 300);
const runDataOutputMap = computed(() => data.value.runData.outputMap);
const runDataIterations = computed(() => data.value.runData.iterations);

View File

@@ -140,7 +140,7 @@ describe('usePushConnection()', () => {
const workflowId = 'abc';
beforeEach(() => {
workflowsStore.activeExecutionId = executionId;
workflowsStore.setActiveExecutionId(executionId);
uiStore.isActionActive.workflowRunning = true;
});
@@ -239,13 +239,16 @@ describe('usePushConnection()', () => {
it("enqueues messages if we don't have the active execution id yet", async () => {
uiStore.isActionActive.workflowRunning = true;
const event: PushMessage = {
type: 'executionStarted',
type: 'nodeExecuteAfter',
data: {
executionId: '1',
mode: 'manual',
startedAt: new Date(),
workflowId: '1',
flattedRunData: stringify({}),
nodeName: 'Node',
data: {
executionIndex: 0,
startTime: 0,
executionTime: 0,
source: [],
},
},
};
@@ -281,7 +284,7 @@ describe('usePushConnection()', () => {
workflowId: '1',
},
};
workflowsStore.activeExecutionId = event.data.executionId;
workflowsStore.setActiveExecutionId(event.data.executionId);
// ACT
const result = await pushConnection.pushMessageReceived(event);

View File

@@ -151,6 +151,12 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
return false;
}
if (receivedData.type === 'executionStarted') {
if (!workflowsStore.activeExecutionId) {
workflowsStore.setActiveExecutionId(receivedData.data.executionId);
}
}
if (
receivedData.type === 'nodeExecuteAfter' ||
receivedData.type === 'nodeExecuteBefore' ||
@@ -228,8 +234,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
}
const { executionId } = receivedData.data;
const { activeExecutionId } = workflowsStore;
if (executionId !== activeExecutionId) {
if (executionId !== workflowsStore.activeExecutionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
@@ -322,7 +327,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
runDataExecutedErrorMessage = i18n.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId },
interpolate: { activeExecutionId: workflowsStore.activeExecutionId },
},
);
}
@@ -523,6 +528,8 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
iRunExecutionData.resultData.runData[lastNodeExecuted][0].data!.main[0]!.length;
}
workflowsStore.setActiveExecutionId(null);
void useExternalHooks().run('pushConnection.executionFinished', {
itemsCount,
nodeName: iRunExecutionData.resultData.lastNodeExecuted,
@@ -578,7 +585,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
if (pushData.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
workflowsStore.activeExecutionId = pushData.executionId;
workflowsStore.setActiveExecutionId(pushData.executionId);
}
void processWaitingPushMessages();

View File

@@ -2,15 +2,14 @@ import { setActivePinia } from 'pinia';
import { createTestingPinia } from '@pinia/testing';
import { useRouter } from 'vue-router';
import type router from 'vue-router';
import {
ExpressionError,
type IPinData,
type IRunData,
type Workflow,
type IExecuteData,
type ITaskData,
NodeConnectionTypes,
type INodeConnections,
import { ExpressionError, NodeConnectionTypes } from 'n8n-workflow';
import type {
IPinData,
IRunData,
Workflow,
IExecuteData,
ITaskData,
INodeConnections,
} from 'n8n-workflow';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
@@ -26,20 +25,23 @@ import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { createTestNode, createTestWorkflow } from '@/__tests__/mocks';
import { waitFor } from '@testing-library/vue';
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: vi.fn().mockReturnValue({
vi.mock('@/stores/workflows.store', async () => {
const storeState: Partial<ReturnType<typeof useWorkflowsStore>> & {
activeExecutionId: string | null;
} = {
allNodes: [],
runWorkflow: vi.fn(),
subWorkflowExecutionError: null,
getWorkflowRunData: null,
workflowExecutionData: null,
setWorkflowExecutionData: vi.fn(),
activeExecutionId: null,
previousExecutionId: null,
nodesIssuesExist: false,
executionWaitingForWebhook: false,
getCurrentWorkflow: vi.fn().mockReturnValue({ id: '123' }),
getNodeByName: vi.fn(),
getExecution: vi.fn(),
nodeIssuesExit: vi.fn(),
checkIfNodeHasChatParent: vi.fn(),
getParametersLastUpdate: vi.fn(),
getPinnedDataLastUpdate: vi.fn(),
@@ -47,8 +49,15 @@ vi.mock('@/stores/workflows.store', () => ({
incomingConnectionsByNodeName: vi.fn(),
outgoingConnectionsByNodeName: vi.fn(),
markExecutionAsStopped: vi.fn(),
setActiveExecutionId: vi.fn((id: string | null) => {
storeState.activeExecutionId = id;
}),
}));
};
return {
useWorkflowsStore: vi.fn().mockReturnValue(storeState),
};
});
vi.mock('@/stores/pushConnection.store', () => ({
usePushConnectionStore: vi.fn().mockReturnValue({
@@ -151,6 +160,7 @@ describe('useRunWorkflow({ router })', () => {
const mockResponse = { executionId: '123', waitingForWebhook: false };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse);
vi.mocked(workflowsStore).setActiveExecutionId('123');
const response = await runWorkflowApi({} as IStartRunData);
@@ -692,7 +702,7 @@ describe('useRunWorkflow({ router })', () => {
workflowsStore.workflowExecutionData = executionData;
workflowsStore.activeWorkflows = ['test-wf-id'];
workflowsStore.activeExecutionId = 'test-exec-id';
workflowsStore.setActiveExecutionId('test-exec-id');
// Exercise - don't wait for returned promise to resolve
void runWorkflowComposable.stopCurrentExecution();

View File

@@ -78,8 +78,11 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
throw error;
}
if (response.executionId !== undefined) {
workflowsStore.activeExecutionId = response.executionId;
if (
response.executionId !== undefined &&
workflowsStore.previousExecutionId !== response.executionId
) {
workflowsStore.setActiveExecutionId(response.executionId);
}
if (response.waitingForWebhook === true && useWorkflowsStore().nodesIssuesExist) {

View File

@@ -74,7 +74,7 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => {
const chatSessionCredType = ref<ICredentialType | undefined>();
const chatSessionError = ref<ChatRequest.ErrorContext | undefined>();
const currentSessionId = ref<string | undefined>();
const currentSessionActiveExecutionId = ref<string | undefined>();
const currentSessionActiveExecutionId = ref<string | null>(null);
const currentSessionWorkflowId = ref<string | undefined>();
const lastUnread = ref<ChatUI.AssistantMessage | undefined>();
const nodeExecutionStatus = ref<NodeExecutionStatus>('not_executed');
@@ -125,7 +125,7 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => {
currentSessionId.value = undefined;
chatSessionError.value = undefined;
lastUnread.value = undefined;
currentSessionActiveExecutionId.value = undefined;
currentSessionActiveExecutionId.value = null;
suggestions.value = {};
nodeExecutionStatus.value = 'not_executed';
chatSessionCredType.value = undefined;

View File

@@ -139,6 +139,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const workflowExecutionData = ref<IExecutionResponse | null>(null);
const workflowExecutionPairedItemMappings = ref<Record<string, Set<string>>>({});
const activeExecutionId = ref<string | null>(null);
const previousExecutionId = ref<string | null>(null);
const subWorkflowExecutionError = ref<Error | null>(null);
const executionWaitingForWebhook = ref(false);
const workflowsById = ref<Record<string, IWorkflowDb>>({});
@@ -289,6 +290,11 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
Workflow.getConnectionsByDestination(workflow.value.connections),
);
function setActiveExecutionId(id: string | null) {
previousExecutionId.value = activeExecutionId.value;
activeExecutionId.value = id;
}
function getWorkflowResultDataByNodeName(nodeName: string): ITaskData[] | null {
if (getWorkflowRunData.value === null) {
return null;
@@ -615,7 +621,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
setWorkflowSettings({ ...defaults.settings });
setWorkflowTagIds([]);
activeExecutionId.value = null;
setActiveExecutionId(null);
executingNode.value.length = 0;
executionWaitingForWebhook.value = false;
}
@@ -1687,7 +1693,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
function markExecutionAsStopped() {
activeExecutionId.value = null;
setActiveExecutionId(null);
clearNodeExecutionQueue();
executionWaitingForWebhook.value = false;
uiStore.removeActiveAction('workflowRunning');
@@ -1711,7 +1717,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
currentWorkflowExecutions,
workflowExecutionData,
workflowExecutionPairedItemMappings,
activeExecutionId,
activeExecutionId: computed(() => activeExecutionId.value),
previousExecutionId: computed(() => previousExecutionId.value),
setActiveExecutionId,
subWorkflowExecutionError,
executionWaitingForWebhook,
executingNode,