refactor(editor): Refactor usePushConnection and introduce new queueing system (#14529)

This commit is contained in:
Alex Grozav
2025-04-30 15:36:43 +03:00
committed by GitHub
parent 442cd094ee
commit 833d8e3c18
49 changed files with 1525 additions and 1262 deletions

View File

@@ -1,5 +1,11 @@
import { stringify } from 'flatted';
import type { IDataObject, ITaskData, ITaskDataConnections } from 'n8n-workflow';
import type {
IDataObject,
IRunData,
IRunExecutionData,
ITaskData,
ITaskDataConnections,
} from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { clickExecuteWorkflowButton } from '../composables/workflow';
@@ -53,6 +59,28 @@ export function runMockWorkflowExecution({
const workflowId = nanoid();
const executionId = Math.floor(Math.random() * 1_000_000).toString();
const resolvedRunData = runData.reduce<IRunData>((acc, nodeExecution) => {
const nodeName = Object.keys(nodeExecution)[0];
acc[nodeName] = [nodeExecution[nodeName]];
return acc;
}, {});
const executionData: IRunExecutionData = {
startData: {},
resultData: {
runData: resolvedRunData,
pinData: {},
lastNodeExecuted,
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
};
cy.intercept('POST', '/rest/workflows/**/run?**', {
statusCode: 201,
body: {
@@ -70,7 +98,15 @@ export function runMockWorkflowExecution({
cy.wait('@runWorkflow');
const resolvedRunData: Record<string, ITaskData> = {};
cy.push('executionStarted', {
workflowId,
executionId,
mode: 'manual',
startedAt: new Date(),
workflowName: '',
flattedRunData: '',
});
runData.forEach((nodeExecution) => {
const nodeName = Object.keys(nodeExecution)[0];
const nodeRunData = nodeExecution[nodeName];
@@ -85,28 +121,12 @@ export function runMockWorkflowExecution({
nodeName,
data: nodeRunData,
});
resolvedRunData[nodeName] = nodeExecution[nodeName];
});
cy.push('executionFinished', {
executionId,
workflowId,
status: 'success',
rawData: stringify({
startData: {},
resultData: {
runData,
pinData: {},
lastNodeExecuted,
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
}),
rawData: stringify(executionData),
});
}

View File

@@ -6,7 +6,7 @@ export type Collaborator = {
lastSeen: Iso8601DateTimeString;
};
type CollaboratorsChanged = {
export type CollaboratorsChanged = {
type: 'collaboratorsChanged';
data: {
workflowId: string;

View File

@@ -1,4 +1,4 @@
type SendConsoleMessage = {
export type SendConsoleMessage = {
type: 'sendConsoleMessage';
data: {
source: string;

View File

@@ -5,7 +5,7 @@ import type {
WorkflowExecuteMode,
} from 'n8n-workflow';
type ExecutionStarted = {
export type ExecutionStarted = {
type: 'executionStarted';
data: {
executionId: string;
@@ -18,14 +18,14 @@ type ExecutionStarted = {
};
};
type ExecutionWaiting = {
export type ExecutionWaiting = {
type: 'executionWaiting';
data: {
executionId: string;
};
};
type ExecutionFinished = {
export type ExecutionFinished = {
type: 'executionFinished';
data: {
executionId: string;
@@ -36,14 +36,14 @@ type ExecutionFinished = {
};
};
type ExecutionRecovered = {
export type ExecutionRecovered = {
type: 'executionRecovered';
data: {
executionId: string;
};
};
type NodeExecuteBefore = {
export type NodeExecuteBefore = {
type: 'nodeExecuteBefore';
data: {
executionId: string;
@@ -52,7 +52,7 @@ type NodeExecuteBefore = {
};
};
type NodeExecuteAfter = {
export type NodeExecuteAfter = {
type: 'nodeExecuteAfter';
data: {
executionId: string;

View File

@@ -1,19 +1,19 @@
type NodeTypeData = {
export type NodeTypeData = {
name: string;
version: number;
};
type ReloadNodeType = {
export type ReloadNodeType = {
type: 'reloadNodeType';
data: NodeTypeData;
};
type RemoveNodeType = {
export type RemoveNodeType = {
type: 'removeNodeType';
data: NodeTypeData;
};
type NodeDescriptionUpdated = {
export type NodeDescriptionUpdated = {
type: 'nodeDescriptionUpdated';
data: {};
};

View File

@@ -1,4 +1,4 @@
type TestWebhookDeleted = {
export type TestWebhookDeleted = {
type: 'testWebhookDeleted';
data: {
executionId?: string;
@@ -6,7 +6,7 @@ type TestWebhookDeleted = {
};
};
type TestWebhookReceived = {
export type TestWebhookReceived = {
type: 'testWebhookReceived';
data: {
executionId: string;

View File

@@ -1,11 +1,11 @@
type WorkflowActivated = {
export type WorkflowActivated = {
type: 'workflowActivated';
data: {
workflowId: string;
};
};
type WorkflowFailedToActivate = {
export type WorkflowFailedToActivate = {
type: 'workflowFailedToActivate';
data: {
workflowId: string;
@@ -13,7 +13,7 @@ type WorkflowFailedToActivate = {
};
};
type WorkflowDeactivated = {
export type WorkflowDeactivated = {
type: 'workflowDeactivated';
data: {
workflowId: string;

View File

@@ -0,0 +1,100 @@
import { createEventQueue } from './event-queue';
describe('createEventQueue', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it('should process events in order', async () => {
const processedEvents: string[] = [];
// Create an async handler that pushes events into the processedEvents array.
const processEvent = vi.fn(async (event: string) => {
processedEvents.push(event);
// Simulate asynchronous delay of 10ms.
await new Promise((resolve) => setTimeout(resolve, 10));
});
// Create the event queue.
const { enqueue } = createEventQueue<string>(processEvent);
// Enqueue events in a specific order.
enqueue('Event 1');
enqueue('Event 2');
enqueue('Event 3');
// Advance the timers enough to process all events.
// runAllTimersAsync() will run all pending timers and wait for any pending promise resolution.
await vi.runAllTimersAsync();
expect(processEvent).toHaveBeenCalledTimes(3);
expect(processedEvents).toEqual(['Event 1', 'Event 2', 'Event 3']);
});
it('should handle errors and continue processing', async () => {
const processedEvents: string[] = [];
const processEvent = vi.fn(async (event: string) => {
if (event === 'fail') {
throw new Error('Processing error'); // eslint-disable-line n8n-local-rules/no-plain-errors
}
processedEvents.push(event);
await new Promise((resolve) => setTimeout(resolve, 10));
});
const { enqueue } = createEventQueue<string>(processEvent);
const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
enqueue('Event A');
enqueue('fail');
enqueue('Event B');
await vi.runAllTimersAsync();
expect(processEvent).toHaveBeenCalledTimes(3);
// 'fail' should cause an error but processing continues.
expect(processedEvents).toEqual(['Event A', 'Event B']);
expect(consoleSpy).toHaveBeenCalledWith('Error processing event:', expect.any(Error));
consoleSpy.mockRestore();
});
it('should not process any events if none are enqueued', async () => {
const processEvent = vi.fn(async (_event: string) => {
await new Promise((resolve) => setTimeout(resolve, 10));
});
createEventQueue<string>(processEvent);
await vi.runAllTimersAsync();
// Did not enqueue any event.
expect(processEvent).not.toHaveBeenCalled();
});
it('should ensure no concurrent processing of events', async () => {
let processingCounter = 0;
let maxConcurrent = 0;
const processEvent = vi.fn(async (_event: string) => {
processingCounter++;
maxConcurrent = Math.max(maxConcurrent, processingCounter);
// Simulate asynchronous delay.
await new Promise((resolve) => setTimeout(resolve, 20));
processingCounter--;
});
const { enqueue } = createEventQueue<string>(processEvent);
enqueue('A');
enqueue('B');
enqueue('C');
await vi.runAllTimersAsync();
// Throughout processing, maxConcurrent should remain 1.
expect(maxConcurrent).toEqual(1);
});
});

View File

@@ -0,0 +1,50 @@
/**
* Create an event queue that processes events sequentially.
*
* @param processEvent - Async function that processes a single event.
* @returns A function that enqueues events for processing.
*/
export function createEventQueue<T>(processEvent: (event: T) => Promise<void>) {
// The internal queue holding events.
const queue: T[] = [];
// Flag to indicate whether an event is currently being processed.
let processing = false;
/**
* Process the next event in the queue (if not already processing).
*/
async function processNext(): Promise<void> {
if (processing || queue.length === 0) {
return;
}
processing = true;
const currentEvent = queue.shift();
if (currentEvent !== undefined) {
try {
await processEvent(currentEvent);
} catch (error) {
console.error('Error processing event:', error);
}
}
processing = false;
// Recursively process the next event.
await processNext();
}
/**
* Enqueue an event and trigger processing.
*
* @param event - The event to enqueue.
*/
function enqueue(event: T): void {
queue.push(event);
void processNext();
}
return { enqueue };
}

View File

@@ -0,0 +1,122 @@
import { retry } from './retry';
describe('retry', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllTimers();
});
it('should resolve true when the function eventually returns true', async () => {
let callCount = 0;
const fn = vi.fn(async () => {
callCount++;
// Return true on the second attempt.
return callCount === 2;
});
const promise = retry(fn, 1000, 2, null);
// The first call happens immediately.
expect(fn).toHaveBeenCalledTimes(1);
// Advance timers by 1000ms asynchronously to allow the waiting period to complete.
await vi.advanceTimersByTimeAsync(1000);
// After advancing, the second attempt should have occurred.
expect(fn).toHaveBeenCalledTimes(2);
// The promise should now resolve with true.
const result = await promise;
expect(result).toBe(true);
});
it('should resolve false if maximum retries are reached with no success', async () => {
let callCount = 0;
const fn = vi.fn(async () => {
callCount++;
return false;
});
const promise = retry(fn, 1000, 3, null);
// The first attempt fires immediately.
expect(fn).toHaveBeenCalledTimes(1);
// Advance timers for the delay after the first attempt.
await vi.advanceTimersByTimeAsync(1000);
expect(fn).toHaveBeenCalledTimes(2);
// Advance timers for the delay after the second attempt.
await vi.advanceTimersByTimeAsync(1000);
expect(fn).toHaveBeenCalledTimes(3);
// With maxRetries reached (3 calls), promise should resolve to false.
const result = await promise;
expect(result).toBe(false);
});
it('should reject if the function throws an error', async () => {
const fn = vi.fn(async () => {
throw new Error('Test error'); // eslint-disable-line n8n-local-rules/no-plain-errors
});
// Since the error is thrown on the first call, no timer advancement is needed.
await expect(retry(fn, 1000, 3, null)).rejects.toThrow('Test error');
expect(fn).toHaveBeenCalledTimes(1);
});
it('should use linear backoff strategy', async () => {
let callCount = 0;
const fn = vi.fn(async () => {
callCount++;
return callCount === 4; // Return true on the fourth attempt.
});
const promise = retry(fn, 1000, 4, 'linear');
expect(fn).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1000); // First backoff
expect(fn).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(2000); // Second backoff
expect(fn).toHaveBeenCalledTimes(3);
await vi.advanceTimersByTimeAsync(3000); // Third backoff
expect(fn).toHaveBeenCalledTimes(4);
const result = await promise;
expect(result).toBe(true);
});
it('should use exponential backoff strategy', async () => {
let callCount = 0;
const fn = vi.fn(async () => {
callCount++;
return callCount === 5; // Return true on the fifth attempt.
});
const promise = retry(fn, 1000, 5, 'exponential');
expect(fn).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1000); // First backoff
expect(fn).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(2000); // Second backoff
expect(fn).toHaveBeenCalledTimes(3);
await vi.advanceTimersByTimeAsync(4000); // Third backoff
expect(fn).toHaveBeenCalledTimes(4);
await vi.advanceTimersByTimeAsync(8000); // Fourth backoff
expect(fn).toHaveBeenCalledTimes(5);
const result = await promise;
expect(result).toBe(true);
});
});

View File

@@ -0,0 +1,51 @@
type RetryFn = () => boolean | Promise<boolean>;
/**
* A utility that retries a function every `interval` milliseconds
* until the function returns true or the maximum number of retries is reached.
*
* @param fn - A function that returns a boolean or a Promise resolving to a boolean.
* @param interval - The time interval (in milliseconds) between each retry. Defaults to 1000.
* @param maxRetries - The maximum number of retry attempts. Defaults to 3.
* @param backoff - The backoff strategy to use: 'linear', 'exponential', or null.
* @returns {Promise<boolean>} - A promise that resolves to:
* - true: If the function returns true before reaching maxRetries.
* - false: If the function never returns true or if an error occurs.
*/
export async function retry(
fn: RetryFn,
interval: number = 1000,
maxRetries: number = 3,
backoff: 'exponential' | 'linear' | null = 'linear',
): Promise<boolean> {
let attempt = 0;
while (attempt < maxRetries) {
attempt++;
try {
const result = await fn();
if (result) {
return true;
}
} catch (error) {
console.error('Error during retry:', error);
throw error;
}
// Wait for the specified interval before the next attempt, if any attempts remain.
if (attempt < maxRetries) {
let computedInterval = interval;
if (backoff === 'linear') {
computedInterval = interval * attempt;
} else if (backoff === 'exponential') {
computedInterval = Math.pow(2, attempt - 1) * interval;
computedInterval = Math.min(computedInterval, 30000); // Cap the maximum interval to 30 seconds
}
await new Promise<void>((resolve) => setTimeout(resolve, computedInterval));
}
}
return false;
}

View File

@@ -221,7 +221,7 @@ describe('LogsPanel', () => {
expect(rendered.getByText('Running')).toBeInTheDocument();
expect(rendered.queryByText('AI Agent')).not.toBeInTheDocument();
workflowsStore.setNodeExecuting({
workflowsStore.addNodeExecutionData({
nodeName: 'AI Agent',
executionId: '567',
data: { executionIndex: 0, startTime: Date.parse('2025-04-20T12:34:51.000Z'), source: [] },
@@ -243,7 +243,6 @@ describe('LogsPanel', () => {
executionStatus: 'success',
},
});
expect(await treeItem.findByText('AI Agent')).toBeInTheDocument();
expect(treeItem.getByText('Success in 33ms')).toBeInTheDocument();

View File

@@ -8,8 +8,8 @@ import {
START_NODE_TYPE,
} from '@/constants';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNDVStore } from '@/stores/ndv.store';
import { waitingNodeTooltip } from '@/utils/executionUtils';
import { uniqBy } from 'lodash-es';
import { N8nIcon, N8nRadioButtons, N8nText, N8nTooltip } from '@n8n/design-system';
@@ -23,7 +23,6 @@ import {
} from 'n8n-workflow';
import { storeToRefs } from 'pinia';
import { computed, ref, watch } from 'vue';
import { useNDVStore } from '../stores/ndv.store';
import InputNodeSelect from './InputNodeSelect.vue';
import NodeExecuteButton from './NodeExecuteButton.vue';
import RunData from './RunData.vue';
@@ -90,7 +89,6 @@ const inputModes = [
const nodeTypesStore = useNodeTypesStore();
const ndvStore = useNDVStore();
const workflowsStore = useWorkflowsStore();
const uiStore = useUIStore();
const {
activeNode,
@@ -166,7 +164,7 @@ const isMappingEnabled = computed(() => {
return true;
});
const isExecutingPrevious = computed(() => {
if (!workflowRunning.value) {
if (!workflowsStore.isWorkflowRunning) {
return false;
}
const triggeredNode = workflowsStore.executedNode;
@@ -187,7 +185,6 @@ const isExecutingPrevious = computed(() => {
}
return false;
});
const workflowRunning = computed(() => uiStore.isActionActive.workflowRunning);
const rootNodesParents = computed(() => {
if (!rootNode.value) return [];

View File

@@ -31,7 +31,6 @@ import { dataPinningEventBus, ndvEventBus } from '@/event-bus';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNDVStore } from '@/stores/ndv.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useUIStore } from '@/stores/ui.store';
import { useSettingsStore } from '@/stores/settings.store';
import { useDeviceSupport } from '@n8n/composables/useDeviceSupport';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
@@ -72,7 +71,6 @@ const { activeNode } = storeToRefs(ndvStore);
const pinnedData = usePinnedData(activeNode);
const workflowActivate = useWorkflowActivate();
const nodeTypesStore = useNodeTypesStore();
const uiStore = useUIStore();
const workflowsStore = useWorkflowsStore();
const settingsStore = useSettingsStore();
const deviceSupport = useDeviceSupport();
@@ -108,14 +106,12 @@ const activeNodeType = computed(() => {
return null;
});
const workflowRunning = computed(() => uiStore.isActionActive.workflowRunning);
const showTriggerWaitingWarning = computed(
() =>
triggerWaitingWarningEnabled.value &&
!!activeNodeType.value &&
!activeNodeType.value.group.includes('trigger') &&
workflowRunning.value &&
workflowsStore.isWorkflowRunning &&
workflowsStore.executionWaitingForWebhook,
);
@@ -327,11 +323,11 @@ const featureRequestUrl = computed(() => {
const outputPanelEditMode = computed(() => ndvStore.outputPanelEditMode);
const isWorkflowRunning = computed(() => uiStore.isActionActive.workflowRunning);
const isExecutionWaitingForWebhook = computed(() => workflowsStore.executionWaitingForWebhook);
const blockUi = computed(() => isWorkflowRunning.value || isExecutionWaitingForWebhook.value);
const blockUi = computed(
() => workflowsStore.isWorkflowRunning || isExecutionWaitingForWebhook.value,
);
const foreignCredentials = computed(() => {
const credentials = activeNode.value?.credentials;
@@ -470,7 +466,7 @@ const onUnlinkRun = (pane: string) => {
const onNodeExecute = () => {
setTimeout(() => {
if (!activeNode.value || !workflowRunning.value) {
if (!activeNode.value || !workflowsStore.isWorkflowRunning) {
return;
}
triggerWaitingWarningEnabled.value = true;

View File

@@ -15,7 +15,6 @@ import {
} from '@/constants';
import NodeExecuteButton from '@/components/NodeExecuteButton.vue';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useUIStore } from '@/stores/ui.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useNDVStore } from '@/stores/ndv.store';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
@@ -84,7 +83,6 @@ vi.mock('@/composables/useMessage', () => {
let renderComponent: ReturnType<typeof createComponentRenderer>;
let workflowsStore: MockedStore<typeof useWorkflowsStore>;
let uiStore: MockedStore<typeof useUIStore>;
let nodeTypesStore: MockedStore<typeof useNodeTypesStore>;
let ndvStore: MockedStore<typeof useNDVStore>;
@@ -109,7 +107,6 @@ describe('NodeExecuteButton', () => {
});
workflowsStore = mockedStore(useWorkflowsStore);
uiStore = mockedStore(useUIStore);
nodeTypesStore = mockedStore(useNodeTypesStore);
ndvStore = mockedStore(useNDVStore);
@@ -193,7 +190,7 @@ describe('NodeExecuteButton', () => {
workflowsStore.getNodeByName.mockReturnValue(node);
workflowsStore.isNodeExecuting = vi.fn(() => true);
nodeTypesStore.isTriggerNode = () => true;
uiStore.isActionActive.workflowRunning = true;
workflowsStore.isWorkflowRunning = true;
const { getByRole } = renderComponent();
expect(getByRole('button').textContent).toBe('Stop Listening');
@@ -203,7 +200,7 @@ describe('NodeExecuteButton', () => {
const node = mockNode({ name: 'test-node', type: SET_NODE_TYPE });
workflowsStore.getNodeByName.mockReturnValue(node);
workflowsStore.isNodeExecuting = vi.fn(() => true);
uiStore.isActionActive.workflowRunning = true;
workflowsStore.isWorkflowRunning = true;
const { getByRole } = renderComponent();
expect(getByRole('button').querySelector('.n8n-spinner')).toBeVisible();
@@ -227,7 +224,7 @@ describe('NodeExecuteButton', () => {
});
it('should be disabled when workflow is running but node is not executing', async () => {
uiStore.isActionActive.workflowRunning = true;
workflowsStore.isWorkflowRunning = true;
workflowsStore.isNodeExecuting.mockReturnValue(false);
workflowsStore.getNodeByName.mockReturnValue(
mockNode({ name: 'test-node', type: SET_NODE_TYPE }),
@@ -277,7 +274,7 @@ describe('NodeExecuteButton', () => {
});
it('stops execution when clicking button while workflow is running', async () => {
uiStore.isActionActive.workflowRunning = true;
workflowsStore.isWorkflowRunning = true;
nodeTypesStore.isTriggerNode = () => true;
workflowsStore.setActiveExecutionId('test-execution-id');
workflowsStore.isNodeExecuting.mockReturnValue(true);

View File

@@ -22,7 +22,6 @@ import { useExternalHooks } from '@/composables/useExternalHooks';
import { nodeViewEventBus } from '@/event-bus';
import { usePinnedData } from '@/composables/usePinnedData';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import { useUIStore } from '@/stores/ui.store';
import { useRouter } from 'vue-router';
import { useI18n } from '@/composables/useI18n';
import { useTelemetry } from '@/composables/useTelemetry';
@@ -72,7 +71,6 @@ const externalHooks = useExternalHooks();
const toast = useToast();
const ndvStore = useNDVStore();
const nodeTypesStore = useNodeTypesStore();
const uiStore = useUIStore();
const i18n = useI18n();
const message = useMessage();
const telemetry = useTelemetry();
@@ -85,7 +83,7 @@ const nodeType = computed((): INodeTypeDescription | null => {
});
const isNodeRunning = computed(() => {
if (!uiStore.isActionActive.workflowRunning || codeGenerationInProgress.value) return false;
if (!workflowsStore.isWorkflowRunning || codeGenerationInProgress.value) return false;
const triggeredNode = workflowsStore.executedNode;
return (
workflowsStore.isNodeExecuting(node.value?.name ?? '') || triggeredNode === node.value?.name
@@ -96,8 +94,6 @@ const isTriggerNode = computed(() => {
return node.value ? nodeTypesStore.isTriggerNode(node.value.type) : false;
});
const isWorkflowRunning = computed(() => uiStore.isActionActive.workflowRunning);
const isManualTriggerNode = computed(() =>
nodeType.value ? nodeType.value.name === MANUAL_TRIGGER_NODE_TYPE : false,
);
@@ -168,7 +164,7 @@ const disabledHint = computed(() => {
return i18n.baseText('ndv.execute.requiredFieldsMissing');
}
if (isWorkflowRunning.value && !isNodeRunning.value) {
if (workflowsStore.isWorkflowRunning && !isNodeRunning.value) {
return i18n.baseText('ndv.execute.workflowAlreadyRunning');
}

View File

@@ -9,7 +9,6 @@ import {
import RunData from './RunData.vue';
import RunInfo from './RunInfo.vue';
import { storeToRefs } from 'pinia';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNDVStore } from '@/stores/ndv.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
@@ -77,7 +76,6 @@ const emit = defineEmits<{
const ndvStore = useNDVStore();
const nodeTypesStore = useNodeTypesStore();
const workflowsStore = useWorkflowsStore();
const uiStore = useUIStore();
const telemetry = useTelemetry();
const i18n = useI18n();
const { activeNode } = storeToRefs(ndvStore);
@@ -144,7 +142,7 @@ const isNodeRunning = computed(() => {
return workflowRunning.value && !!node.value && workflowsStore.isNodeExecuting(node.value.name);
});
const workflowRunning = computed(() => uiStore.isActionActive.workflowRunning);
const workflowRunning = computed(() => workflowsStore.isWorkflowRunning);
const workflowExecution = computed(() => {
return workflowsStore.getWorkflowExecution;

View File

@@ -162,9 +162,7 @@ const isListeningForEvents = computed(() => {
);
});
const workflowRunning = computed(() => {
return uiStore.isActionActive.workflowRunning;
});
const workflowRunning = computed(() => workflowsStore.isWorkflowRunning);
const isActivelyPolling = computed(() => {
const triggeredNode = workflowsStore.executedNode;

View File

@@ -16,7 +16,7 @@ const {
hasIssues,
executionStatus,
executionWaiting,
executionRunningThrottled,
executionRunning,
hasRunData,
runDataIterations,
isDisabled,
@@ -55,6 +55,16 @@ const dirtiness = computed(() =>
<FontAwesomeIcon icon="sync-alt" spin />
</div>
</div>
<div v-else-if="executionStatus === 'unknown'">
<!-- Do nothing, unknown means the node never executed -->
</div>
<div
v-else-if="executionRunning || executionStatus === 'running'"
data-test-id="canvas-node-status-running"
:class="[$style.status, $style.running]"
>
<FontAwesomeIcon icon="sync-alt" spin />
</div>
<div
v-else-if="hasPinnedData && !nodeHelpers.isProductionExecutionPreview.value && !isDisabled"
data-test-id="canvas-node-status-pinned"
@@ -62,16 +72,6 @@ const dirtiness = computed(() =>
>
<FontAwesomeIcon icon="thumbtack" />
</div>
<div v-else-if="executionStatus === 'unknown'">
<!-- Do nothing, unknown means the node never executed -->
</div>
<div
v-else-if="executionRunningThrottled || executionStatus === 'running'"
data-test-id="canvas-node-status-running"
:class="[$style.status, $style.running]"
>
<FontAwesomeIcon icon="sync-alt" spin />
</div>
<div v-else-if="dirtiness !== undefined">
<N8nTooltip :show-after="500" placement="bottom">
<template #content>

View File

@@ -4,7 +4,6 @@ import { useCanvasOperations } from '@/composables/useCanvasOperations';
import { useI18n } from '@/composables/useI18n';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import { CHAT_TRIGGER_NODE_TYPE } from '@/constants';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { computed, useCssModule } from 'vue';
import { useRouter } from 'vue-router';
@@ -36,12 +35,11 @@ const containerClass = computed(() => ({
const router = useRouter();
const i18n = useI18n();
const workflowsStore = useWorkflowsStore();
const uiStore = useUIStore();
const { runEntireWorkflow } = useRunWorkflow({ router });
const { toggleChatOpen } = useCanvasOperations({ router });
const isChatOpen = computed(() => workflowsStore.logsPanelState !== LOGS_PANEL_STATE.CLOSED);
const isExecuting = computed(() => uiStore.isActionActive.workflowRunning);
const isExecuting = computed(() => workflowsStore.isWorkflowRunning);
const testId = computed(() => `execute-workflow-button-${name}`);
</script>

View File

@@ -1,13 +1,7 @@
/**
* Canvas V2 Only
* @TODO Remove this notice when Canvas V2 is the only one in use
*/
import { CanvasNodeKey } from '@/constants';
import { computed, inject } from 'vue';
import type { CanvasNodeData } from '@/types';
import { CanvasNodeRenderType, CanvasConnectionMode } from '@/types';
import { refThrottled } from '@vueuse/core';
export function useCanvasNode() {
const node = inject(CanvasNodeKey);
@@ -59,7 +53,6 @@ export function useCanvasNode() {
const executionStatus = computed(() => data.value.execution.status);
const executionWaiting = computed(() => data.value.execution.waiting);
const executionRunning = computed(() => data.value.execution.running);
const executionRunningThrottled = refThrottled(executionRunning, 300);
const runDataOutputMap = computed(() => data.value.runData.outputMap);
const runDataIterations = computed(() => data.value.runData.iterations);
@@ -91,7 +84,6 @@ export function useCanvasNode() {
executionStatus,
executionWaiting,
executionRunning,
executionRunningThrottled,
render,
eventBus,
};

View File

@@ -1454,10 +1454,10 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
workflowsStore.resetWorkflow();
workflowsStore.resetState();
workflowsStore.currentWorkflowExecutions = [];
workflowsStore.setActiveExecutionId(undefined);
// Reset actions
uiStore.resetLastInteractedWith();
uiStore.removeActiveAction('workflowRunning');
uiStore.stateIsDirty = false;
// Reset executions

View File

@@ -1,109 +1,56 @@
import { useExecutingNode } from '@/composables/useExecutingNode';
import { useExecutingNode } from './useExecutingNode';
describe('useExecutingNode', () => {
it('should always have at least one executing node during execution', () => {
const { executingNode, executingNodeCompletionQueue, addExecutingNode, removeExecutingNode } =
useExecutingNode();
describe('useExecutingNode composable', () => {
it('should initialize with an empty executingNode queue', () => {
const { executingNode } = useExecutingNode();
expect(executingNode.value).toEqual([]);
});
it('should add a node to the executing queue', () => {
const { executingNode, addExecutingNode } = useExecutingNode();
addExecutingNode('node1');
expect(executingNode.value).toEqual(['node1']);
});
it('should remove an executing node from the queue (removes one occurrence at a time)', () => {
const { executingNode, addExecutingNode, removeExecutingNode } = useExecutingNode();
// Add nodes, including duplicates.
addExecutingNode('node1');
addExecutingNode('node2');
addExecutingNode('node1');
expect(executingNode.value).toEqual(['node1']);
expect(executingNodeCompletionQueue.value).toEqual([]);
addExecutingNode('node2');
expect(executingNode.value).toEqual(['node1', 'node2']);
expect(executingNodeCompletionQueue.value).toEqual([]);
addExecutingNode('node3');
expect(executingNode.value).toEqual(['node1', 'node2', 'node3']);
expect(executingNodeCompletionQueue.value).toEqual([]);
// After removal, only the first occurrence of "node1" should be removed.
removeExecutingNode('node1');
expect(executingNode.value).toEqual(['node2', 'node3']);
expect(executingNodeCompletionQueue.value).toEqual([]);
removeExecutingNode('node2');
expect(executingNode.value).toEqual(['node3']);
expect(executingNodeCompletionQueue.value).toEqual([]);
removeExecutingNode('node3');
expect(executingNode.value).toEqual(['node3']);
expect(executingNodeCompletionQueue.value).toEqual(['node3']);
addExecutingNode('node4');
expect(executingNode.value).toEqual(['node4']);
expect(executingNodeCompletionQueue.value).toEqual([]);
expect(executingNode.value).toEqual(['node2', 'node1']);
});
describe('resolveNodeExecutionQueue', () => {
it('should clear all nodes from the execution queue', () => {
const { executingNode, executingNodeCompletionQueue, resolveNodeExecutionQueue } =
useExecutingNode();
it('should not remove a node that does not exist', () => {
const { executingNode, removeExecutingNode } = useExecutingNode();
executingNode.value = ['node1', 'node2'];
executingNodeCompletionQueue.value = ['node1', 'node2'];
// Manually set the state for testing.
executingNode.value = ['node1'];
removeExecutingNode('node2'); // Trying to remove a non-existent node.
expect(executingNode.value).toEqual(['node1']);
});
resolveNodeExecutionQueue();
it('should return true if a node is executing', () => {
const { addExecutingNode, isNodeExecuting } = useExecutingNode();
addExecutingNode('node1');
expect(isNodeExecuting('node1')).toBe(true);
});
expect(executingNode.value).toEqual([]);
expect(executingNodeCompletionQueue.value).toEqual([]);
});
it('should return false if a node is not executing', () => {
const { isNodeExecuting } = useExecutingNode();
expect(isNodeExecuting('node1')).toBe(false);
});
it('should keep the last executing node if keepLastInQueue is true and only one node is executing', () => {
const { executingNode, executingNodeCompletionQueue, resolveNodeExecutionQueue } =
useExecutingNode();
executingNode.value = ['node1'];
executingNodeCompletionQueue.value = ['node1'];
resolveNodeExecutionQueue(true);
expect(executingNode.value).toEqual(['node1']);
expect(executingNodeCompletionQueue.value).toEqual(['node1']);
});
it('should remove all nodes except the last one if keepLastInQueue is true and more than one node is executing', () => {
const { executingNode, executingNodeCompletionQueue, resolveNodeExecutionQueue } =
useExecutingNode();
executingNode.value = ['node1', 'node2'];
executingNodeCompletionQueue.value = ['node1', 'node2'];
resolveNodeExecutionQueue(true);
expect(executingNode.value).toEqual(['node2']);
expect(executingNodeCompletionQueue.value).toEqual(['node2']);
});
it('should clear all nodes if keepLastInQueue is false', () => {
const { executingNode, executingNodeCompletionQueue, resolveNodeExecutionQueue } =
useExecutingNode();
executingNode.value = ['node1', 'node2'];
executingNodeCompletionQueue.value = ['node1', 'node2'];
resolveNodeExecutionQueue(false);
expect(executingNode.value).toEqual([]);
expect(executingNodeCompletionQueue.value).toEqual([]);
});
it('should handle empty execution queue gracefully', () => {
const { executingNode, executingNodeCompletionQueue, resolveNodeExecutionQueue } =
useExecutingNode();
executingNode.value = [];
executingNodeCompletionQueue.value = [];
resolveNodeExecutionQueue();
expect(executingNode.value).toEqual([]);
expect(executingNodeCompletionQueue.value).toEqual([]);
});
it('should clear the node execution queue', () => {
const { executingNode, addExecutingNode, clearNodeExecutionQueue } = useExecutingNode();
addExecutingNode('node1');
addExecutingNode('node2');
expect(executingNode.value).toEqual(['node1', 'node2']);
clearNodeExecutionQueue();
expect(executingNode.value).toEqual([]);
});
});

View File

@@ -2,51 +2,45 @@ import { ref } from 'vue';
/**
* Composable to keep track of the currently executing node.
* The queue is used to keep track of the order in which nodes are completed and
* to ensure that there's always at least one node in the executing queue.
* The queue is used to keep track of the order in which nodes are executed and to ensure that
* the UI reflects the correct execution status.
*
* The completion queue serves as a workaround for the fact that the execution status of a node
* is not updated in real-time when dealing with large amounts of data, meaning we can end up in a
* state where no node is actively executing, even though the workflow execution is not completed.
* Once a node is added to the queue, it will be removed after a short delay
* to allow the running spinner to show for a small amount of time.
*
* The number of additions and removals from the queue should always be equal.
* A node can exist multiple times in the queue, in order to prevent the loading spinner from
* disappearing when a node is executed multiple times in quick succession.
*/
export function useExecutingNode() {
const executingNode = ref<string[]>([]);
const executingNodeCompletionQueue = ref<string[]>([]);
function addExecutingNode(nodeName: string) {
resolveNodeExecutionQueue();
executingNode.value.push(nodeName);
}
function removeExecutingNode(nodeName: string) {
executingNodeCompletionQueue.value.push(nodeName);
resolveNodeExecutionQueue(
executingNode.value.length <= executingNodeCompletionQueue.value.length,
);
}
const executionIndex = executingNode.value.indexOf(nodeName);
if (executionIndex === -1) {
return;
}
function resolveNodeExecutionQueue(keepLastInQueue = false) {
const lastExecutingNode = executingNodeCompletionQueue.value.at(-1);
const nodesToRemove = keepLastInQueue
? executingNodeCompletionQueue.value.slice(0, -1)
: executingNodeCompletionQueue.value;
executingNode.value = executingNode.value.filter((name) => !nodesToRemove.includes(name));
executingNodeCompletionQueue.value =
keepLastInQueue && lastExecutingNode ? [lastExecutingNode] : [];
executingNode.value.splice(executionIndex, 1);
}
function clearNodeExecutionQueue() {
executingNode.value = [];
executingNodeCompletionQueue.value = [];
}
function isNodeExecuting(nodeName: string): boolean {
return executingNode.value.includes(nodeName);
}
return {
executingNode,
executingNodeCompletionQueue,
addExecutingNode,
removeExecutingNode,
resolveNodeExecutionQueue,
isNodeExecuting,
clearNodeExecutionQueue,
};
}

View File

@@ -1,298 +0,0 @@
import { stringify } from 'flatted';
import { useRouter } from 'vue-router';
import { createPinia, setActivePinia } from 'pinia';
import type { PushMessage, PushPayload } from '@n8n/api-types';
import type { ITaskData, WorkflowOperationError, IRunData } from 'n8n-workflow';
import { usePushConnection } from '@/composables/usePushConnection';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useToast } from '@/composables/useToast';
import type { IExecutionResponse } from '@/Interface';
vi.mock('vue-router', () => {
return {
RouterLink: vi.fn(),
useRouter: () => ({
push: vi.fn(),
}),
useRoute: () => ({}),
};
});
vi.mock('@/composables/useToast', () => {
const showMessage = vi.fn();
const showError = vi.fn();
return {
useToast: () => {
return {
showMessage,
showError,
};
},
};
});
describe('usePushConnection()', () => {
let router: ReturnType<typeof useRouter>;
let pushStore: ReturnType<typeof usePushConnectionStore>;
let orchestrationStore: ReturnType<typeof useOrchestrationStore>;
let pushConnection: ReturnType<typeof usePushConnection>;
let uiStore: ReturnType<typeof useUIStore>;
let workflowsStore: ReturnType<typeof useWorkflowsStore>;
let toast: ReturnType<typeof useToast>;
beforeEach(() => {
setActivePinia(createPinia());
router = vi.mocked(useRouter)();
pushStore = usePushConnectionStore();
orchestrationStore = useOrchestrationStore();
uiStore = useUIStore();
workflowsStore = useWorkflowsStore();
pushConnection = usePushConnection({ router });
toast = useToast();
});
afterEach(() => {
vi.restoreAllMocks();
pushConnection.pushMessageQueue.value = [];
});
describe('initialize()', () => {
it('should add event listener to the pushStore', () => {
const spy = vi.spyOn(pushStore, 'addEventListener').mockImplementation(() => () => {});
pushConnection.initialize();
expect(spy).toHaveBeenCalled();
});
});
describe('terminate()', () => {
it('should remove event listener from the pushStore', () => {
const returnFn = vi.fn();
vi.spyOn(pushStore, 'addEventListener').mockImplementation(() => returnFn);
pushConnection.initialize();
pushConnection.terminate();
expect(returnFn).toHaveBeenCalled();
});
});
describe('queuePushMessage()', () => {
it('should add message to the queue and sets timeout if not already set', () => {
const event: PushMessage = {
type: 'sendWorkerStatusMessage',
data: {
workerId: '1',
status: {} as PushPayload<'sendWorkerStatusMessage'>['status'],
},
};
pushConnection.queuePushMessage(event, 5);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.pushMessageQueue.value[0]).toEqual({ message: event, retriesLeft: 5 });
expect(pushConnection.retryTimeout.value).not.toBeNull();
});
});
describe('processWaitingPushMessages()', () => {
it('should clear the queue and reset the timeout', async () => {
const event: PushMessage = { type: 'executionRecovered', data: { executionId: '1' } };
pushConnection.queuePushMessage(event, 0);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.retryTimeout.value).toBeDefined();
await pushConnection.processWaitingPushMessages();
expect(pushConnection.pushMessageQueue.value).toHaveLength(0);
expect(pushConnection.retryTimeout.value).toBeNull();
});
});
describe('pushMessageReceived()', () => {
describe('sendWorkerStatusMessage', () => {
it('should handle event type correctly', async () => {
const spy = vi.spyOn(orchestrationStore, 'updateWorkerStatus').mockImplementation(() => {});
const event: PushMessage = {
type: 'sendWorkerStatusMessage',
data: {
workerId: '1',
status: {} as PushPayload<'sendWorkerStatusMessage'>['status'],
},
};
const result = await pushConnection.pushMessageReceived(event);
expect(spy).toHaveBeenCalledWith(event.data.status);
expect(result).toBeTruthy();
});
});
describe('executionFinished', () => {
const executionId = '1';
const workflowId = 'abc';
beforeEach(() => {
workflowsStore.setActiveExecutionId(executionId);
uiStore.isActionActive.workflowRunning = true;
});
it('should handle executionFinished event correctly', async () => {
const result = await pushConnection.pushMessageReceived({
type: 'executionFinished',
data: {
executionId,
workflowId,
status: 'success',
rawData: stringify({
resultData: {
runData: {},
},
}),
},
});
expect(result).toBeTruthy();
expect(workflowsStore.workflowExecutionData).toBeDefined();
expect(uiStore.isActionActive.workflowRunning).toBeTruthy();
expect(toast.showMessage).toHaveBeenCalledWith({
title: 'Workflow executed successfully',
type: 'success',
});
});
it('should handle isManualExecutionCancelled correctly', async () => {
const result = await pushConnection.pushMessageReceived({
type: 'executionFinished',
data: {
executionId,
workflowId,
status: 'error',
rawData: stringify({
startData: {},
resultData: {
runData: {
'Last Node': [],
},
lastNodeExecuted: 'Last Node',
error: {
message:
'Your trial has ended. <a href="https://app.n8n.cloud/account/change-plan">Upgrade now</a> to keep automating',
name: 'NodeApiError',
node: 'Last Node',
} as unknown as WorkflowOperationError,
},
}),
},
});
expect(useToast().showMessage).toHaveBeenCalledWith({
message:
'Your trial has ended. <a href="https://app.n8n.cloud/account/change-plan">Upgrade now</a> to keep automating',
title: 'Problem in node Last Node',
type: 'error',
duration: 0,
});
expect(result).toBeTruthy();
expect(workflowsStore.workflowExecutionData).toBeDefined();
expect(uiStore.isActionActive.workflowRunning).toBeTruthy();
});
});
describe('nodeExecuteAfter', async () => {
it("enqueues messages if we don't have the active execution id yet", async () => {
uiStore.isActionActive.workflowRunning = true;
const event: PushMessage = {
type: 'nodeExecuteAfter',
data: {
executionId: '1',
nodeName: 'foo',
data: {} as ITaskData,
},
};
expect(pushConnection.retryTimeout.value).toBeNull();
expect(pushConnection.pushMessageQueue.value.length).toBe(0);
const result = await pushConnection.pushMessageReceived(event);
expect(result).toBe(false);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.pushMessageQueue.value).toContainEqual({
message: event,
retriesLeft: 5,
});
expect(pushConnection.retryTimeout).not.toBeNull();
});
});
describe('executionStarted', async () => {
it("enqueues messages if we don't have the active execution id yet", async () => {
uiStore.isActionActive.workflowRunning = true;
const event: PushMessage = {
type: 'nodeExecuteAfter',
data: {
executionId: '1',
nodeName: 'Node',
data: {
executionIndex: 0,
startTime: 0,
executionTime: 0,
source: [],
},
},
};
expect(pushConnection.retryTimeout.value).toBeNull();
expect(pushConnection.pushMessageQueue.value.length).toBe(0);
const result = await pushConnection.pushMessageReceived(event);
expect(result).toBe(false);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.pushMessageQueue.value).toContainEqual({
message: event,
retriesLeft: 5,
});
expect(pushConnection.retryTimeout).not.toBeNull();
});
it('overwrites the run data in the workflow store', async () => {
// ARRANGE
uiStore.isActionActive.workflowRunning = true;
const oldRunData: IRunData = { foo: [] };
workflowsStore.workflowExecutionData = {
data: { resultData: { runData: oldRunData } },
} as IExecutionResponse;
const newRunData: IRunData = { bar: [] };
const event: PushMessage = {
type: 'executionStarted',
data: {
executionId: '1',
flattedRunData: stringify(newRunData),
mode: 'manual',
startedAt: new Date(),
workflowId: '1',
},
};
workflowsStore.setActiveExecutionId(event.data.executionId);
// ACT
const result = await pushConnection.pushMessageReceived(event);
// ASSERT
expect(result).toBe(true);
expect(workflowsStore.workflowExecutionData.data?.resultData.runData).toEqual(newRunData);
});
});
});
});

View File

@@ -1,680 +0,0 @@
import { parse } from 'flatted';
import { h, ref } from 'vue';
import type { useRouter } from 'vue-router';
import { TelemetryHelpers } from 'n8n-workflow';
import type {
ExpressionError,
IDataObject,
INodeTypeNameVersion,
IRunExecutionData,
IWorkflowBase,
SubworkflowOperationError,
IExecuteContextData,
NodeOperationError,
INodeTypeDescription,
NodeError,
} from 'n8n-workflow';
import type { PushMessage } from '@n8n/api-types';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useToast } from '@/composables/useToast';
import { WORKFLOW_SETTINGS_MODAL_KEY } from '@/constants';
import { getTriggerNodeServiceName } from '@/utils/nodeTypesUtils';
import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useCredentialsStore } from '@/stores/credentials.store';
import { useSettingsStore } from '@/stores/settings.store';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useExternalHooks } from '@/composables/useExternalHooks';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useI18n } from '@/composables/useI18n';
import { useTelemetry } from '@/composables/useTelemetry';
import type { PushMessageQueueItem } from '@/types';
import { useAssistantStore } from '@/stores/assistant.store';
import NodeExecutionErrorMessage from '@/components/NodeExecutionErrorMessage.vue';
import type { IExecutionResponse } from '@/Interface';
import { clearPopupWindowState, hasTrimmedData, hasTrimmedItem } from '../utils/executionUtils';
import { getEasyAiWorkflowJson } from '@/utils/easyAiWorkflowUtils';
import { useSchemaPreviewStore } from '@/stores/schemaPreview.store';
export function usePushConnection({ router }: { router: ReturnType<typeof useRouter> }) {
const workflowHelpers = useWorkflowHelpers({ router });
const nodeHelpers = useNodeHelpers();
const toast = useToast();
const i18n = useI18n();
const telemetry = useTelemetry();
const credentialsStore = useCredentialsStore();
const nodeTypesStore = useNodeTypesStore();
const orchestrationManagerStore = useOrchestrationStore();
const pushStore = usePushConnectionStore();
const settingsStore = useSettingsStore();
const uiStore = useUIStore();
const workflowsStore = useWorkflowsStore();
const assistantStore = useAssistantStore();
const retryTimeout = ref<NodeJS.Timeout | null>(null);
const pushMessageQueue = ref<PushMessageQueueItem[]>([]);
const removeEventListener = ref<(() => void) | null>(null);
function initialize() {
removeEventListener.value = pushStore.addEventListener((message) => {
void pushMessageReceived(message);
});
}
function terminate() {
if (typeof removeEventListener.value === 'function') {
removeEventListener.value();
}
}
/**
* Sometimes the push message is faster as the result from
* the REST API so we do not know yet what execution ID
* is currently active. So internally resend the message
* a few more times
*/
function queuePushMessage(event: PushMessage, retryAttempts: number) {
pushMessageQueue.value.push({ message: event, retriesLeft: retryAttempts });
if (retryTimeout.value === null) {
retryTimeout.value = setTimeout(processWaitingPushMessages, 20);
}
}
/**
* Process the push messages which are waiting in the queue
*/
async function processWaitingPushMessages() {
if (retryTimeout.value !== null) {
clearTimeout(retryTimeout.value);
retryTimeout.value = null;
}
const queueLength = pushMessageQueue.value.length;
for (let i = 0; i < queueLength; i++) {
const messageData = pushMessageQueue.value.shift() as PushMessageQueueItem;
const result = await pushMessageReceived(messageData.message, true);
if (!result) {
// Was not successful
messageData.retriesLeft -= 1;
if (messageData.retriesLeft > 0) {
// If still retries are left add it back and stop execution
pushMessageQueue.value.unshift(messageData);
}
break;
}
}
if (pushMessageQueue.value.length !== 0 && retryTimeout.value === null) {
retryTimeout.value = setTimeout(processWaitingPushMessages, 25);
}
}
/**
* Process a newly received message
*/
async function pushMessageReceived(
receivedData: PushMessage,
isRetry?: boolean,
): Promise<boolean> {
const retryAttempts = 5;
if (receivedData.type === 'sendWorkerStatusMessage') {
const pushData = receivedData.data;
orchestrationManagerStore.updateWorkerStatus(pushData.status);
return true;
}
if (receivedData.type === 'sendConsoleMessage') {
const pushData = receivedData.data;
console.log(pushData.source, ...pushData.messages);
return true;
}
if (
!['testWebhookReceived'].includes(receivedData.type) &&
isRetry !== true &&
pushMessageQueue.value.length
) {
// If there are already messages in the queue add the new one that all of them
// get executed in order
queuePushMessage(receivedData, retryAttempts);
return false;
}
if (receivedData.type === 'executionStarted') {
if (!workflowsStore.activeExecutionId) {
workflowsStore.setActiveExecutionId(receivedData.data.executionId);
}
}
if (
receivedData.type === 'nodeExecuteAfter' ||
receivedData.type === 'nodeExecuteBefore' ||
receivedData.type === 'executionStarted'
) {
if (!uiStore.isActionActive.workflowRunning) {
// No workflow is running so ignore the messages
return false;
}
const pushData = receivedData.data;
if (workflowsStore.activeExecutionId !== pushData.executionId) {
// The data is not for the currently active execution or
// we do not have the execution id yet.
if (isRetry !== true) {
queuePushMessage(receivedData, retryAttempts);
}
return false;
}
}
if (
receivedData.type === 'workflowFailedToActivate' &&
workflowsStore.workflowId === receivedData.data.workflowId
) {
workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
workflowsStore.setActive(false);
toast.showError(
new Error(receivedData.data.errorMessage),
i18n.baseText('workflowActivator.showError.title', {
interpolate: { newStateName: 'activated' },
}) + ':',
);
return true;
}
if (receivedData.type === 'workflowActivated') {
workflowsStore.setWorkflowActive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'workflowDeactivated') {
workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
if (!uiStore.isActionActive.workflowRunning) {
// No workflow is running so ignore the messages
return false;
}
if (receivedData.type === 'executionFinished') {
clearPopupWindowState();
const workflow = workflowsStore.getWorkflowById(receivedData.data.workflowId);
if (workflow?.meta?.templateId) {
const easyAiWorkflowJson = getEasyAiWorkflowJson();
const isEasyAIWorkflow = workflow.meta.templateId === easyAiWorkflowJson.meta.templateId;
if (isEasyAIWorkflow) {
telemetry.track(
'User executed test AI workflow',
{
status: receivedData.data.status,
},
{ withPostHog: true },
);
}
}
}
const { executionId } = receivedData.data;
if (executionId !== workflowsStore.activeExecutionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
queuePushMessage(receivedData, retryAttempts);
}
return false;
}
let showedSuccessToast = false;
let executionData: Pick<
IExecutionResponse,
'workflowId' | 'data' | 'status' | 'startedAt' | 'stoppedAt' | 'workflowData'
>;
if (receivedData.type === 'executionFinished' && receivedData.data.rawData) {
const { workflowId, status, rawData } = receivedData.data;
executionData = {
workflowId,
workflowData: workflowsStore.workflow,
data: parse(rawData),
status,
startedAt: workflowsStore.workflowExecutionData?.startedAt ?? new Date(),
stoppedAt: new Date(),
};
} else {
uiStore.setProcessingExecutionResults(true);
/**
* On successful completion without data, we show a success toast
* immediately, even though we still need to fetch and deserialize the
* full execution data, to minimize perceived latency.
*/
if (receivedData.type === 'executionFinished' && receivedData.data.status === 'success') {
workflowHelpers.setDocumentTitle(
workflowsStore.getWorkflowById(receivedData.data.workflowId)?.name,
'IDLE',
);
uiStore.removeActiveAction('workflowRunning');
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
showedSuccessToast = true;
}
let execution: IExecutionResponse | null;
try {
execution = await workflowsStore.fetchExecutionDataById(executionId);
if (!execution?.data) {
uiStore.setProcessingExecutionResults(false);
return false;
}
executionData = {
workflowId: execution.workflowId,
workflowData: workflowsStore.workflow,
data: parse(execution.data as unknown as string),
status: execution.status,
startedAt: workflowsStore.workflowExecutionData?.startedAt as Date,
stoppedAt: receivedData.type === 'executionFinished' ? new Date() : undefined,
};
} catch {
uiStore.setProcessingExecutionResults(false);
return false;
}
}
const iRunExecutionData: IRunExecutionData = {
startData: executionData.data?.startData,
resultData: executionData.data?.resultData ?? { runData: {} },
executionData: executionData.data?.executionData,
};
if (workflowsStore.workflowExecutionData?.workflowId === executionData.workflowId) {
const activeRunData = workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
if (hasTrimmedItem(activeRunData[key])) continue;
iRunExecutionData.resultData.runData[key] = activeRunData[key];
}
}
}
uiStore.setProcessingExecutionResults(false);
let runDataExecutedErrorMessage = getExecutionError(iRunExecutionData);
if (executionData.status === 'crashed') {
runDataExecutedErrorMessage = i18n.baseText('pushConnection.executionFailed.message');
} else if (executionData.status === 'canceled') {
runDataExecutedErrorMessage = i18n.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId: workflowsStore.activeExecutionId },
},
);
}
const lineNumber = iRunExecutionData.resultData?.error?.lineNumber;
codeNodeEditorEventBus.emit('highlightLine', lineNumber ?? 'last');
const workflow = workflowHelpers.getCurrentWorkflow();
if (executionData.data?.waitTill !== undefined) {
const workflowSettings = workflowsStore.workflowSettings;
const saveManualExecutions = settingsStore.saveManualExecutions;
const isSavingExecutions =
workflowSettings.saveManualExecutions === undefined
? saveManualExecutions
: workflowSettings.saveManualExecutions;
if (!isSavingExecutions) {
globalLinkActionsEventBus.emit('registerGlobalLinkAction', {
key: 'open-settings',
action: async () => {
if (workflowsStore.isNewWorkflow) await workflowHelpers.saveAsNewWorkflow();
uiStore.openModal(WORKFLOW_SETTINGS_MODAL_KEY);
},
});
}
// Workflow did start but had been put to wait
workflowHelpers.setDocumentTitle(workflow.name as string, 'IDLE');
} else if (executionData.status === 'error' || executionData.status === 'canceled') {
workflowHelpers.setDocumentTitle(workflow.name as string, 'ERROR');
if (
iRunExecutionData.resultData.error?.name === 'ExpressionError' &&
(iRunExecutionData.resultData.error as ExpressionError).functionality === 'pairedItem'
) {
const error = iRunExecutionData.resultData.error as ExpressionError;
void workflowHelpers.getWorkflowDataToSave().then((workflowData) => {
const eventData: IDataObject = {
caused_by_credential: false,
error_message: error.description,
error_title: error.message,
error_type: error.context.type,
node_graph_string: JSON.stringify(
TelemetryHelpers.generateNodesGraph(
workflowData as IWorkflowBase,
workflowHelpers.getNodeTypes(),
).nodeGraph,
),
workflow_id: workflowsStore.workflowId,
};
if (
error.context.nodeCause &&
['paired_item_no_info', 'paired_item_invalid_info'].includes(
error.context.type as string,
)
) {
const node = workflow.getNode(error.context.nodeCause as string);
if (node) {
eventData.is_pinned = !!workflow.getPinDataOfNode(node.name);
eventData.mode = node.parameters.mode;
eventData.node_type = node.type;
eventData.operation = node.parameters.operation;
eventData.resource = node.parameters.resource;
}
}
telemetry.track('Instance FE emitted paired item error', eventData, {
withPostHog: true,
});
});
}
if (iRunExecutionData.resultData.error?.name === 'SubworkflowOperationError') {
const error = iRunExecutionData.resultData.error as SubworkflowOperationError;
workflowsStore.subWorkflowExecutionError = error;
toast.showMessage({
title: error.message,
message: error.description,
type: 'error',
duration: 0,
});
} else if (
(iRunExecutionData.resultData.error?.name === 'NodeOperationError' ||
iRunExecutionData.resultData.error?.name === 'NodeApiError') &&
(iRunExecutionData.resultData.error as NodeError).functionality === 'configuration-node'
) {
// If the error is a configuration error of the node itself doesn't get executed so we can't use lastNodeExecuted for the title
let title: string;
const nodeError = iRunExecutionData.resultData.error as NodeOperationError;
if (nodeError.node.name) {
title = `Error in sub-node ${nodeError.node.name}`;
} else {
title = 'Problem executing workflow';
}
toast.showMessage({
title,
message: h(NodeExecutionErrorMessage, {
errorMessage: nodeError?.description ?? runDataExecutedErrorMessage,
nodeName: nodeError.node.name,
}),
type: 'error',
duration: 0,
});
} else {
// Do not show the error message if the workflow got canceled
if (executionData.status === 'canceled') {
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} else {
let title: string;
if (iRunExecutionData.resultData.lastNodeExecuted) {
title = `Problem in node ${iRunExecutionData.resultData.lastNodeExecuted}`;
} else {
title = 'Problem executing workflow';
}
toast.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
duration: 0,
});
}
}
} else {
workflowHelpers.setDocumentTitle(workflow.name as string, 'IDLE');
const execution = workflowsStore.getWorkflowExecution;
if (execution?.executedNode) {
const node = workflowsStore.getNodeByName(execution.executedNode);
const nodeType = node && nodeTypesStore.getNodeType(node.type, node.typeVersion);
const nodeOutput =
execution &&
execution.executedNode &&
execution.data?.resultData?.runData?.[execution.executedNode];
if (nodeType?.polling && !nodeOutput) {
toast.showMessage({
title: i18n.baseText('pushConnection.pollingNode.dataNotFound', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
message: i18n.baseText('pushConnection.pollingNode.dataNotFound.message', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
type: 'success',
});
} else {
toast.showMessage({
title: i18n.baseText('pushConnection.nodeExecutedSuccessfully'),
type: 'success',
});
}
} else if (!showedSuccessToast) {
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
}
}
// It does not push the runData as it got already pushed with each
// node that did finish. For that reason copy in here the data
// which we already have. But if the run data in the store is trimmed,
// we skip copying so we use the full data from the final message.
if (workflowsStore.getWorkflowRunData && !hasTrimmedData(workflowsStore.getWorkflowRunData)) {
iRunExecutionData.resultData.runData = workflowsStore.getWorkflowRunData;
}
workflowsStore.executingNode.length = 0;
if (receivedData.type === 'executionFinished') {
// As a temporary workaround for https://linear.app/n8n/issue/PAY-2762,
// remove runs that is still 'running' status when execution is finished
executionData = removeRunningTaskData(executionData as IExecutionResponse);
}
workflowsStore.setWorkflowExecutionData(executionData as IExecutionResponse);
uiStore.removeActiveAction('workflowRunning');
// Set the node execution issues on all the nodes which produced an error so that
// it can be displayed in the node-view
nodeHelpers.updateNodesExecutionIssues();
const lastNodeExecuted: string | undefined = iRunExecutionData.resultData.lastNodeExecuted;
let itemsCount = 0;
if (
lastNodeExecuted &&
iRunExecutionData.resultData.runData[lastNodeExecuted] &&
!runDataExecutedErrorMessage
) {
itemsCount =
iRunExecutionData.resultData.runData[lastNodeExecuted][0].data!.main[0]!.length;
}
workflowsStore.setActiveExecutionId(null);
void useExternalHooks().run('pushConnection.executionFinished', {
itemsCount,
nodeName: iRunExecutionData.resultData.lastNodeExecuted,
errorMessage: runDataExecutedErrorMessage,
runDataExecutedStartData: iRunExecutionData.startData,
resultDataError: iRunExecutionData.resultData.error,
});
} else if (receivedData.type === 'executionWaiting') {
// Nothing to do
} else if (receivedData.type === 'executionStarted') {
if (workflowsStore.workflowExecutionData?.data && receivedData.data.flattedRunData) {
workflowsStore.workflowExecutionData.data.resultData.runData = parse(
receivedData.data.flattedRunData,
);
}
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
/**
* When we receive a placeholder in `nodeExecuteAfter`, we fake the items
* to be the same count as the data the placeholder is standing in for.
* This prevents the items count from jumping up when the execution
* finishes and the full data replaces the placeholder.
*/
if (
pushData.itemCount &&
pushData.data?.data?.main &&
Array.isArray(pushData.data.data.main[0]) &&
pushData.data.data.main[0].length < pushData.itemCount
) {
pushData.data.data.main[0]?.push(...new Array(pushData.itemCount - 1).fill({ json: {} }));
}
workflowsStore.updateNodeExecutionData(pushData);
void assistantStore.onNodeExecution(pushData);
void useSchemaPreviewStore().trackSchemaPreviewExecution(pushData);
} else if (receivedData.type === 'nodeExecuteBefore') {
// A node started to be executed. Set it as executing.
workflowsStore.setNodeExecuting(receivedData.data);
} else if (receivedData.type === 'testWebhookDeleted') {
// A test-webhook was deleted
const pushData = receivedData.data;
if (pushData.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
uiStore.removeActiveAction('workflowRunning');
}
} else if (receivedData.type === 'testWebhookReceived') {
// A test-webhook did get called
const pushData = receivedData.data;
if (pushData.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
workflowsStore.setActiveExecutionId(pushData.executionId);
}
void processWaitingPushMessages();
} else if (receivedData.type === 'reloadNodeType') {
await nodeTypesStore.getNodeTypes();
await nodeTypesStore.getFullNodesProperties([receivedData.data]);
} else if (receivedData.type === 'removeNodeType') {
const pushData = receivedData.data;
const nodesToBeRemoved: INodeTypeNameVersion[] = [pushData];
// Force reload of all credential types
await credentialsStore.fetchCredentialTypes(false).then(() => {
nodeTypesStore.removeNodeTypes(nodesToBeRemoved as INodeTypeDescription[]);
});
} else if (receivedData.type === 'nodeDescriptionUpdated') {
await nodeTypesStore.getNodeTypes();
await credentialsStore.fetchCredentialTypes(true);
}
return true;
}
function getExecutionError(data: IRunExecutionData | IExecuteContextData) {
const error = data.resultData.error;
let errorMessage: string;
if (data.resultData.lastNodeExecuted && error) {
errorMessage = error.message || error.description;
} else {
errorMessage = i18n.baseText('pushConnection.executionError', {
interpolate: { error: '!' },
});
if (error?.message) {
let nodeName: string | undefined;
if ('node' in error) {
nodeName = typeof error.node === 'string' ? error.node : error.node!.name;
}
const receivedError = nodeName ? `${nodeName}: ${error.message}` : error.message;
errorMessage = i18n.baseText('pushConnection.executionError', {
interpolate: {
error: `.${i18n.baseText('pushConnection.executionError.details', {
interpolate: {
details: receivedError,
},
})}`,
},
});
}
}
return errorMessage;
}
return {
initialize,
terminate,
pushMessageReceived,
queuePushMessage,
processWaitingPushMessages,
pushMessageQueue,
retryTimeout,
};
}
function removeRunningTaskData(execution: IExecutionResponse): IExecutionResponse {
if (!execution.data) {
return execution;
}
return {
...execution,
data: {
...execution.data,
resultData: {
...execution.data.resultData,
runData: Object.fromEntries(
Object.entries(execution.data.resultData.runData)
.map(([nodeName, runs]) => [
nodeName,
runs.filter((run) => run.executionStatus !== 'running'),
])
.filter(([, runs]) => runs.length > 0),
),
},
},
};
}

View File

@@ -0,0 +1,526 @@
import type { ExecutionFinished } from '@n8n/api-types/push/execution';
import { useUIStore } from '@/stores/ui.store';
import type { IExecutionResponse } from '@/Interface';
import { WORKFLOW_SETTINGS_MODAL_KEY } from '@/constants';
import { getEasyAiWorkflowJson } from '@/utils/easyAiWorkflowUtils';
import { clearPopupWindowState, hasTrimmedData, hasTrimmedItem } from '@/utils/executionUtils';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useSettingsStore } from '@/stores/settings.store';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useTelemetry } from '@/composables/useTelemetry';
import { parse } from 'flatted';
import { useToast } from '@/composables/useToast';
import type { useRouter } from 'vue-router';
import { useI18n } from '@/composables/useI18n';
import { TelemetryHelpers } from 'n8n-workflow';
import type {
IWorkflowBase,
NodeError,
NodeOperationError,
SubworkflowOperationError,
ExpressionError,
IDataObject,
IRunExecutionData,
} from 'n8n-workflow';
import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus';
import { h } from 'vue';
import NodeExecutionErrorMessage from '@/components/NodeExecutionErrorMessage.vue';
import { getTriggerNodeServiceName } from '@/utils/nodeTypesUtils';
import { useExternalHooks } from '@/composables/useExternalHooks';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
export type SimplifiedExecution = Pick<
IExecutionResponse,
'workflowId' | 'workflowData' | 'data' | 'status' | 'startedAt' | 'stoppedAt'
>;
/**
* Handles the 'executionFinished' event, which happens when a workflow execution is finished.
*/
export async function executionFinished(
{ data }: ExecutionFinished,
options: { router: ReturnType<typeof useRouter> },
) {
const workflowsStore = useWorkflowsStore();
const uiStore = useUIStore();
// No workflow is actively running, therefore we ignore this event
if (typeof workflowsStore.activeExecutionId === 'undefined') {
return;
}
const telemetry = useTelemetry();
clearPopupWindowState();
const workflow = workflowsStore.getWorkflowById(data.workflowId);
if (workflow?.meta?.templateId) {
const easyAiWorkflowJson = getEasyAiWorkflowJson();
const isEasyAIWorkflow = workflow.meta.templateId === easyAiWorkflowJson.meta.templateId;
if (isEasyAIWorkflow) {
telemetry.track(
'User executed test AI workflow',
{
status: data.status,
},
{ withPostHog: true },
);
}
}
uiStore.setProcessingExecutionResults(true);
let successToastAlreadyShown = false;
let execution: SimplifiedExecution | undefined;
if (data.rawData) {
const { workflowId, status, rawData } = data;
execution = {
workflowId,
workflowData: workflowsStore.workflow,
data: parse(rawData),
status,
startedAt: workflowsStore.workflowExecutionData?.startedAt ?? new Date(),
stoppedAt: new Date(),
};
} else {
if (data.status === 'success') {
handleExecutionFinishedSuccessfully(data.workflowId, options);
successToastAlreadyShown = true;
}
execution = await fetchExecutionData(data.executionId);
if (!execution) {
uiStore.setProcessingExecutionResults(false);
return;
}
}
const runExecutionData = getRunExecutionData(execution);
uiStore.setProcessingExecutionResults(false);
if (execution.data?.waitTill !== undefined) {
handleExecutionFinishedWithWaitTill(options);
} else if (execution.status === 'error' || execution.status === 'canceled') {
handleExecutionFinishedWithErrorOrCanceled(execution, runExecutionData, options);
} else {
handleExecutionFinishedWithOther(successToastAlreadyShown, options);
}
setRunExecutionData(execution, runExecutionData);
}
/**
* Fetches the execution data from the server and returns a simplified execution object
*/
export async function fetchExecutionData(
executionId: string,
): Promise<SimplifiedExecution | undefined> {
const workflowsStore = useWorkflowsStore();
try {
const executionResponse = await workflowsStore.fetchExecutionDataById(executionId);
if (!executionResponse?.data) {
return;
}
return {
workflowId: executionResponse.workflowId,
workflowData: workflowsStore.workflow,
data: parse(executionResponse.data as unknown as string),
status: executionResponse.status,
startedAt: workflowsStore.workflowExecutionData?.startedAt as Date,
stoppedAt: new Date(),
};
} catch {
return;
}
}
/**
* Returns the run execution data from the execution object in a normalized format
*/
export function getRunExecutionData(execution: SimplifiedExecution): IRunExecutionData {
const workflowsStore = useWorkflowsStore();
const runExecutionData: IRunExecutionData = {
startData: execution.data?.startData,
resultData: execution.data?.resultData ?? { runData: {} },
executionData: execution.data?.executionData,
};
if (workflowsStore.workflowExecutionData?.workflowId === execution.workflowId) {
const activeRunData = workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
if (hasTrimmedItem(activeRunData[key])) continue;
runExecutionData.resultData.runData[key] = activeRunData[key];
}
}
}
return runExecutionData;
}
/**
* Returns the error message from the execution object if it exists,
* or a fallback error message otherwise
*/
export function getExecutionError(execution: SimplifiedExecution): string {
const error = execution.data?.resultData.error;
const i18n = useI18n();
let errorMessage: string;
if (execution.data?.resultData.lastNodeExecuted && error) {
errorMessage = error.message ?? error.description ?? '';
} else {
errorMessage = i18n.baseText('pushConnection.executionError', {
interpolate: { error: '!' },
});
if (error?.message) {
let nodeName: string | undefined;
if ('node' in error) {
nodeName = typeof error.node === 'string' ? error.node : error.node!.name;
}
const receivedError = nodeName ? `${nodeName}: ${error.message}` : error.message;
errorMessage = i18n.baseText('pushConnection.executionError', {
interpolate: {
error: `.${i18n.baseText('pushConnection.executionError.details', {
interpolate: {
details: receivedError,
},
})}`,
},
});
}
}
return errorMessage;
}
/**
* Returns the error message for the execution run data if the execution status is crashed or canceled,
* or a fallback error message otherwise
*/
export function getRunDataExecutedErrorMessage(execution: SimplifiedExecution) {
const i18n = useI18n();
if (execution.status === 'crashed') {
return i18n.baseText('pushConnection.executionFailed.message');
} else if (execution.status === 'canceled') {
const workflowsStore = useWorkflowsStore();
return i18n.baseText('executionsList.showMessage.stopExecution.message', {
interpolate: { activeExecutionId: workflowsStore.activeExecutionId ?? '' },
});
}
return getExecutionError(execution);
}
/**
* Handle the case when the workflow execution finished with `waitTill`,
* meaning that it's in a waiting state.
*/
export function handleExecutionFinishedWithWaitTill(options: {
router: ReturnType<typeof useRouter>;
}) {
const workflowsStore = useWorkflowsStore();
const settingsStore = useSettingsStore();
const workflowHelpers = useWorkflowHelpers(options);
const workflowObject = workflowsStore.getCurrentWorkflow();
const workflowSettings = workflowsStore.workflowSettings;
const saveManualExecutions =
workflowSettings.saveManualExecutions ?? settingsStore.saveManualExecutions;
if (!saveManualExecutions) {
const uiStore = useUIStore();
globalLinkActionsEventBus.emit('registerGlobalLinkAction', {
key: 'open-settings',
action: async () => {
if (workflowsStore.isNewWorkflow) await workflowHelpers.saveAsNewWorkflow();
uiStore.openModal(WORKFLOW_SETTINGS_MODAL_KEY);
},
});
}
// Workflow did start but had been put to wait
workflowHelpers.setDocumentTitle(workflowObject.name as string, 'IDLE');
}
/**
* Handle the case when the workflow execution finished with an `error` or `canceled` status.
*/
export function handleExecutionFinishedWithErrorOrCanceled(
execution: SimplifiedExecution,
runExecutionData: IRunExecutionData,
options: { router: ReturnType<typeof useRouter> },
) {
const toast = useToast();
const i18n = useI18n();
const telemetry = useTelemetry();
const workflowsStore = useWorkflowsStore();
const workflowHelpers = useWorkflowHelpers(options);
const workflowObject = workflowsStore.getCurrentWorkflow();
const runDataExecutedErrorMessage = getRunDataExecutedErrorMessage(execution);
workflowHelpers.setDocumentTitle(workflowObject.name as string, 'ERROR');
if (
runExecutionData.resultData.error?.name === 'ExpressionError' &&
(runExecutionData.resultData.error as ExpressionError).functionality === 'pairedItem'
) {
const error = runExecutionData.resultData.error as ExpressionError;
void workflowHelpers.getWorkflowDataToSave().then((workflowData) => {
const eventData: IDataObject = {
caused_by_credential: false,
error_message: error.description,
error_title: error.message,
error_type: error.context.type,
node_graph_string: JSON.stringify(
TelemetryHelpers.generateNodesGraph(
workflowData as IWorkflowBase,
workflowHelpers.getNodeTypes(),
).nodeGraph,
),
workflow_id: workflowsStore.workflowId,
};
if (
error.context.nodeCause &&
['paired_item_no_info', 'paired_item_invalid_info'].includes(error.context.type as string)
) {
const node = workflowObject.getNode(error.context.nodeCause as string);
if (node) {
eventData.is_pinned = !!workflowObject.getPinDataOfNode(node.name);
eventData.mode = node.parameters.mode;
eventData.node_type = node.type;
eventData.operation = node.parameters.operation;
eventData.resource = node.parameters.resource;
}
}
telemetry.track('Instance FE emitted paired item error', eventData, {
withPostHog: true,
});
});
}
if (runExecutionData.resultData.error?.name === 'SubworkflowOperationError') {
const error = runExecutionData.resultData.error as SubworkflowOperationError;
workflowsStore.subWorkflowExecutionError = error;
toast.showMessage({
title: error.message,
message: error.description,
type: 'error',
duration: 0,
});
} else if (
(runExecutionData.resultData.error?.name === 'NodeOperationError' ||
runExecutionData.resultData.error?.name === 'NodeApiError') &&
(runExecutionData.resultData.error as NodeError).functionality === 'configuration-node'
) {
// If the error is a configuration error of the node itself doesn't get executed so we can't use lastNodeExecuted for the title
let title: string;
const nodeError = runExecutionData.resultData.error as NodeOperationError;
if (nodeError.node.name) {
title = `Error in sub-node ${nodeError.node.name}`;
} else {
title = 'Problem executing workflow';
}
toast.showMessage({
title,
message: h(NodeExecutionErrorMessage, {
errorMessage: nodeError?.description ?? runDataExecutedErrorMessage,
nodeName: nodeError.node.name,
}),
type: 'error',
duration: 0,
});
} else {
// Do not show the error message if the workflow got canceled
if (execution.status === 'canceled') {
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} else {
let title: string;
if (runExecutionData.resultData.lastNodeExecuted) {
title = `Problem in node ${runExecutionData.resultData.lastNodeExecuted}`;
} else {
title = 'Problem executing workflow';
}
toast.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
duration: 0,
});
}
}
}
/**
* Handle the case when the workflow execution finished successfully.
*
* On successful completion without data, we show a success toast
* immediately, even though we still need to fetch and deserialize the
* full execution data, to minimize perceived latency.
*/
export function handleExecutionFinishedSuccessfully(
workflowId: string,
options: { router: ReturnType<typeof useRouter> },
) {
const workflowsStore = useWorkflowsStore();
const workflowHelpers = useWorkflowHelpers(options);
const toast = useToast();
const i18n = useI18n();
workflowHelpers.setDocumentTitle(workflowsStore.getWorkflowById(workflowId)?.name, 'IDLE');
workflowsStore.setActiveExecutionId(undefined);
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
}
/**
* Handle the case when the workflow execution finished successfully.
*/
export function handleExecutionFinishedWithOther(
successToastAlreadyShown: boolean,
options: { router: ReturnType<typeof useRouter> },
) {
const workflowsStore = useWorkflowsStore();
const toast = useToast();
const i18n = useI18n();
const workflowHelpers = useWorkflowHelpers(options);
const nodeTypesStore = useNodeTypesStore();
const workflowObject = workflowsStore.getCurrentWorkflow();
workflowHelpers.setDocumentTitle(workflowObject.name as string, 'IDLE');
const workflowExecution = workflowsStore.getWorkflowExecution;
if (workflowExecution?.executedNode) {
const node = workflowsStore.getNodeByName(workflowExecution.executedNode);
const nodeType = node && nodeTypesStore.getNodeType(node.type, node.typeVersion);
const nodeOutput =
workflowExecution?.executedNode &&
workflowExecution.data?.resultData?.runData?.[workflowExecution.executedNode];
if (nodeType?.polling && !nodeOutput) {
toast.showMessage({
title: i18n.baseText('pushConnection.pollingNode.dataNotFound', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
message: i18n.baseText('pushConnection.pollingNode.dataNotFound.message', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
type: 'success',
});
} else {
toast.showMessage({
title: i18n.baseText('pushConnection.nodeExecutedSuccessfully'),
type: 'success',
});
}
} else if (!successToastAlreadyShown) {
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
}
}
export function setRunExecutionData(
execution: SimplifiedExecution,
runExecutionData: IRunExecutionData,
normalize = true,
) {
const workflowsStore = useWorkflowsStore();
const nodeHelpers = useNodeHelpers();
const runDataExecutedErrorMessage = getRunDataExecutedErrorMessage(execution);
const workflowExecution = workflowsStore.getWorkflowExecution;
// It does not push the runData as it got already pushed with each
// node that did finish. For that reason copy in here the data
// which we already have. But if the run data in the store is trimmed,
// we skip copying so we use the full data from the final message.
if (workflowsStore.getWorkflowRunData && !hasTrimmedData(workflowsStore.getWorkflowRunData)) {
runExecutionData.resultData.runData = workflowsStore.getWorkflowRunData;
}
workflowsStore.executingNode.length = 0;
if (normalize) {
// As a temporary workaround for https://linear.app/n8n/issue/PAY-2762,
// remove runs that is still 'running' status when execution is finished
removeRunningTaskData(execution as IExecutionResponse);
}
workflowsStore.setWorkflowExecutionData(workflowExecution as IExecutionResponse);
workflowsStore.setWorkflowExecutionRunData(runExecutionData);
workflowsStore.setActiveExecutionId(undefined);
// Set the node execution issues on all the nodes which produced an error so that
// it can be displayed in the node-view
nodeHelpers.updateNodesExecutionIssues();
const lastNodeExecuted: string | undefined = runExecutionData.resultData.lastNodeExecuted;
let itemsCount = 0;
if (
lastNodeExecuted &&
runExecutionData.resultData.runData[lastNodeExecuted] &&
!runDataExecutedErrorMessage
) {
itemsCount =
runExecutionData.resultData.runData[lastNodeExecuted][0].data?.main[0]?.length ?? 0;
}
workflowsStore.setActiveExecutionId(undefined);
void useExternalHooks().run('pushConnection.executionFinished', {
itemsCount,
nodeName: runExecutionData.resultData.lastNodeExecuted,
errorMessage: runDataExecutedErrorMessage,
runDataExecutedStartData: runExecutionData.startData,
resultDataError: runExecutionData.resultData.error,
});
const lineNumber = runExecutionData.resultData?.error?.lineNumber;
codeNodeEditorEventBus.emit('highlightLine', lineNumber ?? 'last');
}
function removeRunningTaskData(execution: IExecutionResponse): void {
if (execution.data) {
execution.data = {
...execution.data,
resultData: {
...execution.data.resultData,
runData: Object.fromEntries(
Object.entries(execution.data.resultData.runData)
.map(([nodeName, runs]) => [
nodeName,
runs.filter((run) => run.executionStatus !== 'running'),
])
.filter(([, runs]) => runs.length > 0),
),
},
};
}
}

View File

@@ -0,0 +1,46 @@
import type { ExecutionRecovered } from '@n8n/api-types/push/execution';
import { useUIStore } from '@/stores/ui.store';
import {
fetchExecutionData,
getRunExecutionData,
handleExecutionFinishedWithOther,
handleExecutionFinishedWithErrorOrCanceled,
handleExecutionFinishedWithWaitTill,
setRunExecutionData,
} from './executionFinished';
import { useWorkflowsStore } from '@/stores/workflows.store';
import type { useRouter } from 'vue-router';
export async function executionRecovered(
{ data }: ExecutionRecovered,
options: { router: ReturnType<typeof useRouter> },
) {
const workflowsStore = useWorkflowsStore();
const uiStore = useUIStore();
// No workflow is actively running, therefore we ignore this event
if (typeof workflowsStore.activeExecutionId === 'undefined') {
return;
}
uiStore.setProcessingExecutionResults(true);
const execution = await fetchExecutionData(data.executionId);
if (!execution) {
uiStore.setProcessingExecutionResults(false);
return;
}
const runExecutionData = getRunExecutionData(execution);
uiStore.setProcessingExecutionResults(false);
if (execution.data?.waitTill !== undefined) {
handleExecutionFinishedWithWaitTill(options);
} else if (execution.status === 'error' || execution.status === 'canceled') {
handleExecutionFinishedWithErrorOrCanceled(execution, runExecutionData, options);
} else {
handleExecutionFinishedWithOther(false, options);
}
setRunExecutionData(execution, runExecutionData, false);
}

View File

@@ -0,0 +1,21 @@
import type { ExecutionStarted } from '@n8n/api-types/push/execution';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { parse } from 'flatted';
/**
* Handles the 'executionStarted' event, which happens when a workflow is executed.
*/
export async function executionStarted({ data }: ExecutionStarted) {
const workflowsStore = useWorkflowsStore();
// No workflow execution is ongoing, so we can ignore this event
if (typeof workflowsStore.activeExecutionId === 'undefined') {
return;
} else if (workflowsStore.activeExecutionId === null) {
workflowsStore.setActiveExecutionId(data.executionId);
}
if (workflowsStore.workflowExecutionData?.data && data.flattedRunData) {
workflowsStore.workflowExecutionData.data.resultData.runData = parse(data.flattedRunData);
}
}

View File

@@ -0,0 +1,15 @@
export * from './executionFinished';
export * from './executionRecovered';
export * from './executionStarted';
export * from './nodeDescriptionUpdated';
export * from './nodeExecuteAfter';
export * from './nodeExecuteBefore';
export * from './reloadNodeType';
export * from './removeNodeType';
export * from './sendConsoleMessage';
export * from './sendWorkerStatusMessage';
export * from './testWebhookDeleted';
export * from './testWebhookReceived';
export * from './workflowActivated';
export * from './workflowDeactivated';
export * from './workflowFailedToActivate';

View File

@@ -0,0 +1,16 @@
import type { NodeDescriptionUpdated } from '@n8n/api-types/push/hot-reload';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useCredentialsStore } from '@/stores/credentials.store';
/**
* Handles the 'nodeDescriptionUpdated' event from the push connection, which indicates
* that a node description has been updated.
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export async function nodeDescriptionUpdated(_event: NodeDescriptionUpdated) {
const nodeTypesStore = useNodeTypesStore();
const credentialsStore = useCredentialsStore();
await nodeTypesStore.getNodeTypes();
await credentialsStore.fetchCredentialTypes(true);
}

View File

@@ -0,0 +1,39 @@
import type { NodeExecuteAfter } from '@n8n/api-types/push/execution';
import { useSchemaPreviewStore } from '@/stores/schemaPreview.store';
import { useAssistantStore } from '@/stores/assistant.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
/**
* Handles the 'nodeExecuteAfter' event, which happens after a node is executed.
*/
export async function nodeExecuteAfter({ data: pushData }: NodeExecuteAfter) {
const workflowsStore = useWorkflowsStore();
const assistantStore = useAssistantStore();
const schemaPreviewStore = useSchemaPreviewStore();
/**
* When we receive a placeholder in `nodeExecuteAfter`, we fake the items
* to be the same count as the data the placeholder is standing in for.
* This prevents the items count from jumping up when the execution
* finishes and the full data replaces the placeholder.
*/
if (
pushData.itemCount &&
pushData.data?.data?.main &&
Array.isArray(pushData.data.data.main[0]) &&
pushData.data.data.main[0].length < pushData.itemCount
) {
pushData.data.data.main[0]?.push(...new Array(pushData.itemCount - 1).fill({ json: {} }));
}
workflowsStore.updateNodeExecutionData(pushData);
// Remove the node from the executing queue after a short delay
// To allow the running spinner to show for at least 50ms
setTimeout(() => {
workflowsStore.removeExecutingNode(pushData.nodeName);
}, 50);
void assistantStore.onNodeExecution(pushData);
void schemaPreviewStore.trackSchemaPreviewExecution(pushData);
}

View File

@@ -0,0 +1,12 @@
import type { NodeExecuteBefore } from '@n8n/api-types/push/execution';
import { useWorkflowsStore } from '@/stores/workflows.store';
/**
* Handles the 'nodeExecuteBefore' event, which happens before a node is executed.
*/
export async function nodeExecuteBefore({ data }: NodeExecuteBefore) {
const workflowsStore = useWorkflowsStore();
workflowsStore.addExecutingNode(data.nodeName);
workflowsStore.addNodeExecutionData(data);
}

View File

@@ -0,0 +1,13 @@
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import type { ReloadNodeType } from '@n8n/api-types/push/hot-reload';
/**
* Handles the 'reloadNodeType' event from the push connection, which indicates
* that a node type needs to be reloaded.
*/
export async function reloadNodeType({ data }: ReloadNodeType) {
const nodeTypesStore = useNodeTypesStore();
await nodeTypesStore.getNodeTypes();
await nodeTypesStore.getFullNodesProperties([data]);
}

View File

@@ -0,0 +1,20 @@
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import type { RemoveNodeType } from '@n8n/api-types/push/hot-reload';
import type { INodeTypeDescription, INodeTypeNameVersion } from 'n8n-workflow';
import { useCredentialsStore } from '@/stores/credentials.store';
/**
* Handles the 'removeNodeType' event from the push connection, which indicates
* that a node type needs to be removed
*/
export async function removeNodeType({ data }: RemoveNodeType) {
const nodeTypesStore = useNodeTypesStore();
const credentialsStore = useCredentialsStore();
const nodesToBeRemoved: INodeTypeNameVersion[] = [data];
// Force reload of all credential types
await credentialsStore.fetchCredentialTypes(false).then(() => {
nodeTypesStore.removeNodeTypes(nodesToBeRemoved as INodeTypeDescription[]);
});
}

View File

@@ -0,0 +1,9 @@
import type { SendConsoleMessage } from '@n8n/api-types/push/debug';
/**
* Handles the 'sendConsoleMessage' event from the push connection, which indicates
* that a console message should be sent.
*/
export async function sendConsoleMessage({ data }: SendConsoleMessage) {
console.log(data.source, ...data.messages);
}

View File

@@ -0,0 +1,11 @@
import type { SendWorkerStatusMessage } from '@n8n/api-types';
import { useOrchestrationStore } from '@/stores/orchestration.store';
/**
* Handles the 'sendWorkerStatusMessage' event from the push connection, which indicates
* that a worker status message should be sent.
*/
export async function sendWorkerStatusMessage({ data }: SendWorkerStatusMessage) {
const orchestrationStore = useOrchestrationStore();
orchestrationStore.updateWorkerStatus(data.status);
}

View File

@@ -0,0 +1,14 @@
import type { TestWebhookDeleted } from '@n8n/api-types/push/webhook';
import { useWorkflowsStore } from '@/stores/workflows.store';
/**
* Handles the 'testWebhookDeleted' push message, which is sent when a test webhook is deleted.
*/
export async function testWebhookDeleted({ data }: TestWebhookDeleted) {
const workflowsStore = useWorkflowsStore();
if (data.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
workflowsStore.setActiveExecutionId(undefined);
}
}

View File

@@ -0,0 +1,14 @@
import type { TestWebhookReceived } from '@n8n/api-types/push/webhook';
import { useWorkflowsStore } from '@/stores/workflows.store';
/**
* Handles the 'testWebhookReceived' push message, which is sent when a test webhook is received.
*/
export async function testWebhookReceived({ data }: TestWebhookReceived) {
const workflowsStore = useWorkflowsStore();
if (data.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
workflowsStore.setActiveExecutionId(data.executionId ?? null);
}
}

View File

@@ -0,0 +1,8 @@
import type { WorkflowActivated } from '@n8n/api-types/push/workflow';
import { useWorkflowsStore } from '@/stores/workflows.store';
export async function workflowActivated({ data }: WorkflowActivated) {
const workflowsStore = useWorkflowsStore();
workflowsStore.setWorkflowActive(data.workflowId);
}

View File

@@ -0,0 +1,8 @@
import type { WorkflowDeactivated } from '@n8n/api-types/push/workflow';
import { useWorkflowsStore } from '@/stores/workflows.store';
export async function workflowDeactivated({ data }: WorkflowDeactivated) {
const workflowsStore = useWorkflowsStore();
workflowsStore.setWorkflowInactive(data.workflowId);
}

View File

@@ -0,0 +1,24 @@
import type { WorkflowFailedToActivate } from '@n8n/api-types/push/workflow';
import { useToast } from '@/composables/useToast';
import { useI18n } from '@/composables/useI18n';
import { useWorkflowsStore } from '@/stores/workflows.store';
export async function workflowFailedToActivate({ data }: WorkflowFailedToActivate) {
const workflowsStore = useWorkflowsStore();
if (workflowsStore.workflowId !== data.workflowId) {
return;
}
workflowsStore.setWorkflowInactive(data.workflowId);
workflowsStore.setActive(false);
const toast = useToast();
const i18n = useI18n();
toast.showError(
new Error(data.errorMessage),
i18n.baseText('workflowActivator.showError.title', {
interpolate: { newStateName: 'activated' },
}) + ':',
);
}

View File

@@ -0,0 +1 @@
export * from './usePushConnection';

View File

@@ -0,0 +1,93 @@
import { usePushConnection } from '@/composables/usePushConnection';
import { testWebhookReceived } from '@/composables/usePushConnection/handlers';
import type { TestWebhookReceived } from '@n8n/api-types/push/webhook';
import { useRouter } from 'vue-router';
import type { OnPushMessageHandler } from '@/stores/pushConnection.store';
const removeEventListener = vi.fn();
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const addEventListener = vi.fn((_handler: OnPushMessageHandler) => removeEventListener);
vi.mock('@/stores/pushConnection.store', () => ({
usePushConnectionStore: () => ({
addEventListener,
}),
}));
vi.mock('@/composables/usePushConnection/handlers', () => ({
testWebhookDeleted: vi.fn(),
testWebhookReceived: vi.fn(),
reloadNodeType: vi.fn(),
removeNodeType: vi.fn(),
nodeDescriptionUpdated: vi.fn(),
nodeExecuteBefore: vi.fn(),
nodeExecuteAfter: vi.fn(),
executionStarted: vi.fn(),
executionWaiting: vi.fn(),
sendWorkerStatusMessage: vi.fn(),
sendConsoleMessage: vi.fn(),
workflowFailedToActivate: vi.fn(),
executionFinished: vi.fn(),
executionRecovered: vi.fn(),
workflowActivated: vi.fn(),
workflowDeactivated: vi.fn(),
collaboratorsChanged: vi.fn(),
}));
vi.mock('vue-router', async () => {
return {
useRouter: vi.fn().mockReturnValue({
push: vi.fn(),
}),
};
});
describe('usePushConnection composable', () => {
let pushConnection: ReturnType<typeof usePushConnection>;
beforeEach(() => {
vi.clearAllMocks();
const router = useRouter();
pushConnection = usePushConnection({ router });
});
it('should register an event listener on initialize', () => {
pushConnection.initialize();
expect(addEventListener).toHaveBeenCalledTimes(1);
});
it('should call the correct handler when an event is received', async () => {
pushConnection.initialize();
// Get the event callback which was registered via addEventListener.
const handler = addEventListener.mock.calls[0][0];
// Create a test event for one of the handled types.
// In this test, we simulate the event type 'testWebhookReceived'.
const testEvent: TestWebhookReceived = {
type: 'testWebhookReceived',
data: {
executionId: '123',
workflowId: '456',
},
};
// Call the event callback with our test event.
handler(testEvent);
// Allow any microtasks to complete.
await Promise.resolve();
// Verify that the correct handler was called.
expect(testWebhookReceived).toHaveBeenCalledTimes(1);
expect(testWebhookReceived).toHaveBeenCalledWith(testEvent);
});
it('should call removeEventListener when terminate is called', () => {
pushConnection.initialize();
pushConnection.terminate();
expect(removeEventListener).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,86 @@
import { ref } from 'vue';
import type { PushMessage } from '@n8n/api-types';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import {
testWebhookDeleted,
testWebhookReceived,
reloadNodeType,
removeNodeType,
nodeDescriptionUpdated,
nodeExecuteBefore,
nodeExecuteAfter,
executionStarted,
sendWorkerStatusMessage,
sendConsoleMessage,
workflowFailedToActivate,
executionFinished,
executionRecovered,
workflowActivated,
workflowDeactivated,
} from '@/composables/usePushConnection/handlers';
import { createEventQueue } from '@n8n/utils/event-queue';
import type { useRouter } from 'vue-router';
export function usePushConnection(options: { router: ReturnType<typeof useRouter> }) {
const pushStore = usePushConnectionStore();
const { enqueue } = createEventQueue<PushMessage>(processEvent);
const removeEventListener = ref<(() => void) | null>(null);
function initialize() {
removeEventListener.value = pushStore.addEventListener((message) => {
enqueue(message);
});
}
function terminate() {
if (typeof removeEventListener.value === 'function') {
removeEventListener.value();
}
}
/**
* Process received push message event by calling the correct handler
*/
async function processEvent(event: PushMessage) {
switch (event.type) {
case 'testWebhookDeleted':
return await testWebhookDeleted(event);
case 'testWebhookReceived':
return await testWebhookReceived(event);
case 'reloadNodeType':
return await reloadNodeType(event);
case 'removeNodeType':
return await removeNodeType(event);
case 'nodeDescriptionUpdated':
return await nodeDescriptionUpdated(event);
case 'nodeExecuteBefore':
return await nodeExecuteBefore(event);
case 'nodeExecuteAfter':
return await nodeExecuteAfter(event);
case 'executionStarted':
return await executionStarted(event);
case 'sendWorkerStatusMessage':
return await sendWorkerStatusMessage(event);
case 'sendConsoleMessage':
return await sendConsoleMessage(event);
case 'workflowFailedToActivate':
return await workflowFailedToActivate(event);
case 'executionFinished':
return await executionFinished(event, options);
case 'executionRecovered':
return await executionRecovered(event, options);
case 'workflowActivated':
return await workflowActivated(event);
case 'workflowDeactivated':
return await workflowDeactivated(event);
}
}
return {
initialize,
terminate,
};
}

View File

@@ -25,9 +25,9 @@ import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { createTestNode, createTestWorkflow } from '@/__tests__/mocks';
import { waitFor } from '@testing-library/vue';
vi.mock('@/stores/workflows.store', async () => {
vi.mock('@/stores/workflows.store', () => {
const storeState: Partial<ReturnType<typeof useWorkflowsStore>> & {
activeExecutionId: string | null;
activeExecutionId: string | null | undefined;
} = {
allNodes: [],
runWorkflow: vi.fn(),
@@ -35,8 +35,8 @@ vi.mock('@/stores/workflows.store', async () => {
getWorkflowRunData: null,
workflowExecutionData: null,
setWorkflowExecutionData: vi.fn(),
activeExecutionId: null,
previousExecutionId: null,
activeExecutionId: undefined,
previousExecutionId: undefined,
nodesIssuesExist: false,
executionWaitingForWebhook: false,
getCurrentWorkflow: vi.fn().mockReturnValue({ id: '123' }),
@@ -49,7 +49,7 @@ vi.mock('@/stores/workflows.store', async () => {
incomingConnectionsByNodeName: vi.fn(),
outgoingConnectionsByNodeName: vi.fn(),
markExecutionAsStopped: vi.fn(),
setActiveExecutionId: vi.fn((id: string | null) => {
setActiveExecutionId: vi.fn((id: string | null | undefined) => {
storeState.activeExecutionId = id;
}),
};
@@ -123,7 +123,7 @@ describe('useRunWorkflow({ router })', () => {
let workflowHelpers: ReturnType<typeof useWorkflowHelpers>;
let settingsStore: ReturnType<typeof useSettingsStore>;
beforeAll(() => {
beforeEach(() => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
@@ -137,8 +137,8 @@ describe('useRunWorkflow({ router })', () => {
workflowHelpers = useWorkflowHelpers({ router });
});
beforeEach(() => {
uiStore.activeActions = [];
afterEach(() => {
vi.mocked(workflowsStore).setActiveExecutionId(undefined);
vi.clearAllMocks();
});
@@ -160,14 +160,13 @@ describe('useRunWorkflow({ router })', () => {
const mockResponse = { executionId: '123', waitingForWebhook: false };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse);
vi.mocked(workflowsStore).setActiveExecutionId('123');
const response = await runWorkflowApi({} as IStartRunData);
expect(response).toEqual(mockResponse);
expect(workflowsStore.activeExecutionId).toBe('123');
expect(workflowsStore.setActiveExecutionId).toHaveBeenNthCalledWith(1, null);
expect(workflowsStore.setActiveExecutionId).toHaveBeenNthCalledWith(2, '123');
expect(workflowsStore.executionWaitingForWebhook).toBe(false);
expect(uiStore.addActiveAction).toHaveBeenCalledWith('workflowRunning');
});
it('should prevent running a webhook-based workflow that has issues', async () => {
@@ -192,7 +191,7 @@ describe('useRunWorkflow({ router })', () => {
vi.mocked(workflowsStore).runWorkflow.mockRejectedValue(new Error('Failed to run workflow'));
await expect(runWorkflowApi({} as IStartRunData)).rejects.toThrow('Failed to run workflow');
expect(uiStore.removeActiveAction).toHaveBeenCalledWith('workflowRunning');
expect(workflowsStore.setActiveExecutionId).toHaveBeenCalledWith(undefined);
});
it('should set waitingForWebhook if response indicates waiting', async () => {
@@ -207,6 +206,7 @@ describe('useRunWorkflow({ router })', () => {
expect(response).toEqual(mockResponse);
expect(workflowsStore.executionWaitingForWebhook).toBe(true);
});
it('should prevent execution and show error message when workflow is active with single webhook trigger', async () => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
@@ -292,7 +292,7 @@ describe('useRunWorkflow({ router })', () => {
describe('runWorkflow()', () => {
it('should return undefined if UI action "workflowRunning" is active', async () => {
const { runWorkflow } = useRunWorkflow({ router });
uiStore.addActiveAction('workflowRunning');
vi.mocked(workflowsStore).setActiveExecutionId('123');
const result = await runWorkflow({});
expect(result).toBeUndefined();
});

View File

@@ -16,8 +16,8 @@ import type {
IDataObject,
IWorkflowBase,
} from 'n8n-workflow';
import { NodeConnectionTypes, TelemetryHelpers } from 'n8n-workflow';
import { retry } from '@n8n/utils/retry';
import { useToast } from '@/composables/useToast';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
@@ -29,7 +29,6 @@ import {
} from '@/constants';
import { useRootStore } from '@/stores/root.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { displayForm } from '@/utils/executionUtils';
import { useExternalHooks } from '@/composables/useExternalHooks';
@@ -55,7 +54,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
const rootStore = useRootStore();
const pushConnectionStore = usePushConnectionStore();
const uiStore = useUIStore();
const workflowsStore = useWorkflowsStore();
const executionsStore = useExecutionsStore();
const { dirtinessByName } = useNodeDirtiness();
@@ -70,26 +68,25 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
workflowsStore.subWorkflowExecutionError = null;
uiStore.addActiveAction('workflowRunning');
// Set the execution as started, but still waiting for the execution to be retrieved
workflowsStore.setActiveExecutionId(null);
let response: IExecutionPushResponse;
try {
response = await workflowsStore.runWorkflow(runData);
} catch (error) {
uiStore.removeActiveAction('workflowRunning');
workflowsStore.setActiveExecutionId(undefined);
throw error;
}
if (
response.executionId !== undefined &&
workflowsStore.previousExecutionId !== response.executionId
) {
const workflowExecutionIdIsNew = workflowsStore.previousExecutionId !== response.executionId;
const workflowExecutionIdIsPending = workflowsStore.activeExecutionId === null;
if (response.executionId && workflowExecutionIdIsNew && workflowExecutionIdIsPending) {
workflowsStore.setActiveExecutionId(response.executionId);
}
if (response.waitingForWebhook === true && useWorkflowsStore().nodesIssuesExist) {
uiStore.removeActiveAction('workflowRunning');
if (response.waitingForWebhook === true && workflowsStore.nodesIssuesExist) {
workflowsStore.setActiveExecutionId(undefined);
throw new Error(i18n.baseText('workflowRun.showError.resolveOutstandingIssues'));
}
@@ -106,12 +103,12 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
nodeData?: ITaskData;
source?: string;
}): Promise<IExecutionPushResponse | undefined> {
const workflow = workflowHelpers.getCurrentWorkflow();
if (uiStore.isActionActive.workflowRunning) {
if (workflowsStore.activeExecutionId) {
return;
}
const workflow = workflowHelpers.getCurrentWorkflow();
toast.clearAllStickyNotifications();
try {
@@ -417,7 +414,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
async function stopCurrentExecution() {
const executionId = workflowsStore.activeExecutionId;
if (executionId === null) {
if (!executionId) {
return;
}
@@ -455,15 +452,18 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
}
} finally {
// Wait for websocket event to update the execution status to 'canceled'
for (let i = 0; i < 100; i++) {
if (workflowsStore.workflowExecutionData?.status !== 'running') {
break;
}
await retry(
async () => {
if (workflowsStore.workflowExecutionData?.status !== 'running') {
workflowsStore.markExecutionAsStopped();
return true;
}
await new Promise(requestAnimationFrame);
}
workflowsStore.markExecutionAsStopped();
return false;
},
250,
10,
);
}
}

View File

@@ -74,7 +74,7 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => {
const chatSessionCredType = ref<ICredentialType | undefined>();
const chatSessionError = ref<ChatRequest.ErrorContext | undefined>();
const currentSessionId = ref<string | undefined>();
const currentSessionActiveExecutionId = ref<string | null>(null);
const currentSessionActiveExecutionId = ref<string | undefined>(undefined);
const currentSessionWorkflowId = ref<string | undefined>();
const lastUnread = ref<ChatUI.AssistantMessage | undefined>();
const nodeExecutionStatus = ref<NodeExecutionStatus>('not_executed');
@@ -125,7 +125,7 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => {
currentSessionId.value = undefined;
chatSessionError.value = undefined;
lastUnread.value = undefined;
currentSessionActiveExecutionId.value = null;
currentSessionActiveExecutionId.value = undefined;
suggestions.value = {};
nodeExecutionStatus.value = 'not_executed';
chatSessionCredType.value = undefined;

View File

@@ -94,6 +94,7 @@ import { updateCurrentUserSettings } from '@/api/users';
import { useExecutingNode } from '@/composables/useExecutingNode';
import { LOGS_PANEL_STATE } from '@/components/CanvasChat/types/logs';
import { useLocalStorage } from '@vueuse/core';
import type { NodeExecuteBefore } from '@n8n/api-types/push/execution';
const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = {
name: '',
@@ -140,8 +141,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const currentWorkflowExecutions = ref<ExecutionSummary[]>([]);
const workflowExecutionData = ref<IExecutionResponse | null>(null);
const workflowExecutionPairedItemMappings = ref<Record<string, Set<string>>>({});
const activeExecutionId = ref<string | null>(null);
const previousExecutionId = ref<string | null>(null);
const subWorkflowExecutionError = ref<Error | null>(null);
const executionWaitingForWebhook = ref(false);
const workflowsById = ref<Record<string, IWorkflowDb>>({});
@@ -159,8 +158,13 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
: LOGS_PANEL_STATE.CLOSED,
);
const { executingNode, addExecutingNode, removeExecutingNode, clearNodeExecutionQueue } =
useExecutingNode();
const {
executingNode,
addExecutingNode,
removeExecutingNode,
isNodeExecuting,
clearNodeExecutionQueue,
} = useExecutingNode();
const workflowName = computed(() => workflow.value.name);
@@ -234,11 +238,13 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
});
const isWorkflowRunning = computed(() => {
if (uiStore.isActionActive.workflowRunning) return true;
if (activeExecutionId.value) {
const execution = getWorkflowExecution;
if (execution.value && execution.value.status === 'waiting' && !execution.value.finished) {
if (activeExecutionId.value === null) {
return true;
} else if (activeExecutionId.value && workflowExecutionData.value) {
if (
['waiting', 'running'].includes(workflowExecutionData.value.status) &&
!workflowExecutionData.value.finished
) {
return true;
}
}
@@ -288,8 +294,20 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
Workflow.getConnectionsByDestination(workflow.value.connections),
);
function setActiveExecutionId(id: string | null) {
previousExecutionId.value = activeExecutionId.value;
/**
* Sets the active execution id
*
* @param {string} id used to indicate the id of the active execution
* @param {null} id used to indicate that an execution has started but its id has not been retrieved yet
* @param {undefined} id used to indicate there is no active execution
*/
const activeExecutionId = ref<string | null | undefined>();
const previousExecutionId = ref<string | null | undefined>();
const readonlyActiveExecutionId = computed(() => activeExecutionId.value);
const readonlyPreviousExecutionId = computed(() => previousExecutionId.value);
function setActiveExecutionId(id: string | null | undefined) {
if (id) previousExecutionId.value = activeExecutionId.value;
activeExecutionId.value = id;
}
@@ -379,10 +397,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
return nodeMetadata.value[nodeName] === undefined || nodeMetadata.value[nodeName].pristine;
}
function isNodeExecuting(nodeName: string): boolean {
return executingNode.value.includes(nodeName);
}
function getExecutionDataById(id: string): ExecutionSummary | undefined {
return currentWorkflowExecutions.value.find((execution) => execution.id === id);
}
@@ -635,7 +649,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
setWorkflowSettings({ ...defaults.settings });
setWorkflowTagIds([]);
setActiveExecutionId(null);
setActiveExecutionId(undefined);
executingNode.value.length = 0;
executionWaitingForWebhook.value = false;
}
@@ -1408,24 +1422,21 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
return testUrl;
}
function setNodeExecuting(pushData: PushPayload<'nodeExecuteBefore'>): void {
addExecutingNode(pushData.nodeName);
function addNodeExecutionData(data: NodeExecuteBefore['data']): void {
if (settingsStore.isNewLogsEnabled) {
const node = getNodeByName(pushData.nodeName);
const node = getNodeByName(data.nodeName);
if (!node || !workflowExecutionData.value?.data) {
return;
}
if (workflowExecutionData.value.data.resultData.runData[pushData.nodeName] === undefined) {
workflowExecutionData.value.data.resultData.runData[pushData.nodeName] = [];
if (workflowExecutionData.value.data.resultData.runData[data.nodeName] === undefined) {
workflowExecutionData.value.data.resultData.runData[data.nodeName] = [];
}
workflowExecutionData.value.data.resultData.runData[pushData.nodeName].push({
workflowExecutionData.value.data.resultData.runData[data.nodeName].push({
executionStatus: 'running',
executionTime: 0,
...pushData.data,
...data.data,
});
}
}
@@ -1475,7 +1486,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
tasksData.push(data);
}
removeExecutingNode(nodeName);
void trackNodeExecution(pushData);
}
}
@@ -1738,10 +1748,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
function markExecutionAsStopped() {
setActiveExecutionId(null);
setActiveExecutionId(undefined);
clearNodeExecutionQueue();
executionWaitingForWebhook.value = false;
uiStore.removeActiveAction('workflowRunning');
workflowHelpers.setDocumentTitle(workflowName.value, 'IDLE');
clearPopupWindowState();
@@ -1762,8 +1771,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
currentWorkflowExecutions,
workflowExecutionData,
workflowExecutionPairedItemMappings,
activeExecutionId: computed(() => activeExecutionId.value),
previousExecutionId: computed(() => previousExecutionId.value),
activeExecutionId: readonlyActiveExecutionId,
previousExecutionId: readonlyPreviousExecutionId,
setActiveExecutionId,
subWorkflowExecutionError,
executionWaitingForWebhook,
@@ -1830,7 +1839,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
makeNewWorkflowShareable,
resetWorkflow,
resetState,
setNodeExecuting,
addNodeExecutionData,
addExecutingNode,
removeExecutingNode,
setWorkflowId,
setUsedCredentials,