feat: Send node execution finished and node execution data in separate events (no-changelog) (#18875)

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Alex Grozav
2025-09-12 13:56:13 +01:00
committed by GitHub
parent 26f27efd75
commit 10fa3a9b01
28 changed files with 325 additions and 159 deletions

View File

@@ -1917,8 +1917,8 @@
"runData.aiContentBlock.tokens": "{count} Tokens",
"runData.aiContentBlock.tokens.prompt": "Prompt:",
"runData.aiContentBlock.tokens.completion": "Completion:",
"runData.trimmedData.title": "Data not viewable yet",
"runData.trimmedData.message": "It will be available here once the execution has finished.",
"runData.trimmedData.title": "No data yet",
"runData.trimmedData.message": "Data will be available here once the execution has finished.",
"runData.trimmedData.loading": "Loading data",
"runData.panel.actions.collapse": "Collapse panel",
"runData.panel.actions.open": "Open panel",
@@ -2932,7 +2932,6 @@
"settings.ldap.confirmMessage.beforeSaveForm.confirmButtonText": "Yes, disable it",
"settings.ldap.confirmMessage.beforeSaveForm.headline": "Are you sure you want to disable LDAP login?",
"settings.ldap.confirmMessage.beforeSaveForm.message": "If you do so, all LDAP users will be converted to email users.",
"settings.ldap.disabled.title": "Available on the Enterprise plan",
"settings.ldap.disabled.description": "LDAP is available as a paid feature. Learn more about it.",
"settings.ldap.disabled.buttonText": "See plans",

View File

@@ -16,12 +16,7 @@ import type {
Workflow,
NodeConnectionType,
} from 'n8n-workflow';
import {
parseErrorMetadata,
NodeConnectionTypes,
NodeHelpers,
TRIMMED_TASK_DATA_CONNECTIONS_KEY,
} from 'n8n-workflow';
import { parseErrorMetadata, NodeConnectionTypes, NodeHelpers } from 'n8n-workflow';
import { computed, defineAsyncComponent, onBeforeUnmount, onMounted, ref, toRef, watch } from 'vue';
import type { INodeUi, IRunDataDisplayMode, ITab, NodePanelType } from '@/Interface';
@@ -93,6 +88,7 @@ import { parseAiContent } from '@/utils/aiUtils';
import { usePostHog } from '@/stores/posthog.store';
import { I18nT } from 'vue-i18n';
import RunDataBinary from '@/components/RunDataBinary.vue';
import { hasTrimmedRunData } from '@/utils/executionUtils';
const LazyRunDataTable = defineAsyncComponent(
async () => await import('@/components/RunDataTable.vue'),
@@ -299,10 +295,6 @@ const isArtificialRecoveredEventItem = computed(
() => rawInputData.value?.[0]?.json?.isArtificialRecoveredEventItem,
);
const isTrimmedManualExecutionDataItem = computed(
() => rawInputData.value?.[0]?.json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY],
);
const subworkflowExecutionError = computed(() => {
if (!node.value) return null;
return {
@@ -359,6 +351,10 @@ const dataCount = computed(() =>
getDataCount(props.runIndex, currentOutputIndex.value, connectionType.value),
);
const isTrimmedManualExecutionDataItem = computed(() =>
workflowRunData.value ? hasTrimmedRunData(workflowRunData.value) : false,
);
const unfilteredDataCount = computed(() =>
pinnedData.data.value ? pinnedData.data.value.length : rawInputData.value.length,
);
@@ -673,9 +669,10 @@ watch(node, (newNode, prevNode) => {
init();
});
watch(hasNodeRun, () => {
if (props.paneType === 'output') setDisplayMode();
else {
watch([hasNodeRun, isTrimmedManualExecutionDataItem], () => {
if (props.paneType === 'output') {
setDisplayMode();
} else {
// InputPanel relies on the outputIndex to check if we have data
outputIndex.value = determineInitialOutputIndex();
}

View File

@@ -23,8 +23,6 @@ import {
clearPopupWindowState,
getExecutionErrorMessage,
getExecutionErrorToastConfiguration,
hasTrimmedData,
hasTrimmedItem,
} from '@/utils/executionUtils';
import { getTriggerNodeServiceName } from '@/utils/nodeTypesUtils';
import type { ExecutionFinished } from '@n8n/api-types/push/execution';
@@ -206,25 +204,12 @@ export async function fetchExecutionData(
* Returns the run execution data from the execution object in a normalized format
*/
export function getRunExecutionData(execution: SimplifiedExecution): IRunExecutionData {
const workflowsStore = useWorkflowsStore();
const runExecutionData: IRunExecutionData = {
return {
...execution.data,
startData: execution.data?.startData,
resultData: execution.data?.resultData ?? { runData: {} },
executionData: execution.data?.executionData,
};
if (workflowsStore.workflowExecutionData?.workflowId === execution.workflowId) {
const activeRunData = workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
if (hasTrimmedItem(activeRunData[key])) continue;
runExecutionData.resultData.runData[key] = activeRunData[key];
}
}
}
return runExecutionData;
}
/**
@@ -433,14 +418,6 @@ export function setRunExecutionData(
const runDataExecutedErrorMessage = getRunDataExecutedErrorMessage(execution);
const workflowExecution = workflowsStore.getWorkflowExecution;
// It does not push the runData as it got already pushed with each
// node that did finish. For that reason copy in here the data
// which we already have. But if the run data in the store is trimmed,
// we skip copying so we use the full data from the final message.
if (workflowsStore.getWorkflowRunData && !hasTrimmedData(workflowsStore.getWorkflowRunData)) {
runExecutionData.resultData.runData = workflowsStore.getWorkflowRunData;
}
workflowsStore.executingNode.length = 0;
workflowsStore.setWorkflowExecutionData({

View File

@@ -3,6 +3,7 @@ export * from './executionRecovered';
export * from './executionStarted';
export * from './nodeDescriptionUpdated';
export * from './nodeExecuteAfter';
export * from './nodeExecuteAfterData';
export * from './nodeExecuteBefore';
export * from './reloadNodeType';
export * from './removeNodeType';

View File

@@ -1,7 +1,9 @@
import type { NodeExecuteAfter } from '@n8n/api-types/push/execution';
import { useSchemaPreviewStore } from '@/stores/schemaPreview.store';
import { useAssistantStore } from '@/stores/assistant.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import type { ITaskData } from 'n8n-workflow';
import { TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow';
import type { PushPayload } from '@n8n/api-types';
/**
* Handles the 'nodeExecuteAfter' event, which happens after a node is executed.
@@ -9,26 +11,34 @@ import { useWorkflowsStore } from '@/stores/workflows.store';
export async function nodeExecuteAfter({ data: pushData }: NodeExecuteAfter) {
const workflowsStore = useWorkflowsStore();
const assistantStore = useAssistantStore();
const schemaPreviewStore = useSchemaPreviewStore();
/**
* When we receive a placeholder in `nodeExecuteAfter`, we fake the items
* to be the same count as the data the placeholder is standing in for.
* This prevents the items count from jumping up when the execution
* finishes and the full data replaces the placeholder.
* We trim the actual data returned from the node execution to avoid performance issues
* when dealing with large datasets. Instead of storing the actual data, we initially store
* a placeholder object indicating that the data has been trimmed until the
* `nodeExecuteAfterData` event comes in.
*/
if (
pushData.itemCount &&
pushData.data?.data?.main &&
Array.isArray(pushData.data.data.main[0]) &&
pushData.data.data.main[0].length < pushData.itemCount
) {
pushData.data.data.main[0]?.push(...new Array(pushData.itemCount - 1).fill({ json: {} }));
const placeholderOutputData: ITaskData['data'] = {
main: [],
};
if (typeof pushData.itemCount === 'number') {
const fillObject = { json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } };
const fillArray = new Array(pushData.itemCount).fill(fillObject);
placeholderOutputData.main = [fillArray];
}
workflowsStore.updateNodeExecutionData(pushData);
const pushDataWithPlaceholderOutputData: PushPayload<'nodeExecuteAfterData'> = {
...pushData,
data: {
...pushData.data,
data: placeholderOutputData,
},
};
workflowsStore.updateNodeExecutionData(pushDataWithPlaceholderOutputData);
workflowsStore.removeExecutingNode(pushData.nodeName);
void assistantStore.onNodeExecution(pushData);
void schemaPreviewStore.trackSchemaPreviewExecution(pushData);
}

View File

@@ -0,0 +1,42 @@
import { createTestingPinia } from '@pinia/testing';
import { setActivePinia } from 'pinia';
import { nodeExecuteAfterData } from './nodeExecuteAfterData';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { mockedStore } from '@/__tests__/utils';
import type { NodeExecuteAfterData } from '@n8n/api-types/push/execution';
describe('nodeExecuteAfterData', () => {
beforeEach(() => {
const pinia = createTestingPinia({
stubActions: true,
});
setActivePinia(pinia);
});
it('should update node execution data with incoming payload', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const event: NodeExecuteAfterData = {
type: 'nodeExecuteAfterData',
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCount: 1,
data: {
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
data: {
main: [[{ json: { foo: 'bar' } }]],
},
},
},
};
await nodeExecuteAfterData(event);
expect(workflowsStore.updateNodeExecutionData).toHaveBeenCalledTimes(1);
expect(workflowsStore.updateNodeExecutionData).toHaveBeenCalledWith(event.data);
});
});

View File

@@ -0,0 +1,15 @@
import type { NodeExecuteAfterData } from '@n8n/api-types/push/execution';
import { useSchemaPreviewStore } from '@/stores/schemaPreview.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
/**
* Handles the 'nodeExecuteAfterData' event, which is sent after a node has executed and contains the resulting data.
*/
export async function nodeExecuteAfterData({ data: pushData }: NodeExecuteAfterData) {
const workflowsStore = useWorkflowsStore();
const schemaPreviewStore = useSchemaPreviewStore();
workflowsStore.updateNodeExecutionData(pushData);
void schemaPreviewStore.trackSchemaPreviewExecution(pushData);
}

View File

@@ -22,6 +22,7 @@ vi.mock('@/composables/usePushConnection/handlers', () => ({
nodeDescriptionUpdated: vi.fn(),
nodeExecuteBefore: vi.fn(),
nodeExecuteAfter: vi.fn(),
nodeExecuteAfterData: vi.fn(),
executionStarted: vi.fn(),
executionWaiting: vi.fn(),
sendWorkerStatusMessage: vi.fn(),

View File

@@ -10,6 +10,7 @@ import {
nodeDescriptionUpdated,
nodeExecuteBefore,
nodeExecuteAfter,
nodeExecuteAfterData,
executionStarted,
sendWorkerStatusMessage,
sendConsoleMessage,
@@ -60,6 +61,8 @@ export function usePushConnection(options: { router: ReturnType<typeof useRouter
return await nodeExecuteBefore(event);
case 'nodeExecuteAfter':
return await nodeExecuteAfter(event);
case 'nodeExecuteAfterData':
return await nodeExecuteAfterData(event);
case 'executionStarted':
return await executionStarted(event);
case 'sendWorkerStatusMessage':

View File

@@ -9,7 +9,7 @@ import { useWorkflowsStore } from '@/stores/workflows.store';
import { computed, h, nextTick, ref } from 'vue';
import {
aiAgentNode,
aiChatExecutionResponse,
aiChatExecutionResponse as aiChatExecutionResponseTemplate,
aiChatWorkflow,
aiManualExecutionResponse,
aiManualWorkflow,
@@ -62,6 +62,8 @@ describe('LogsPanel', () => {
let ndvStore: ReturnType<typeof mockedStore<typeof useNDVStore>>;
let uiStore: ReturnType<typeof mockedStore<typeof useUIStore>>;
let aiChatExecutionResponse: typeof aiChatExecutionResponseTemplate;
function render() {
const wrapper = renderComponent(LogsPanel, {
global: {
@@ -116,6 +118,8 @@ describe('LogsPanel', () => {
} as DOMRect);
localStorage.clear();
aiChatExecutionResponse = deepCopy(aiChatExecutionResponseTemplate);
});
afterEach(() => {
@@ -326,6 +330,7 @@ describe('LogsPanel', () => {
workflowsStore.updateNodeExecutionData({
nodeName: 'AI Agent',
executionId: '567',
itemCount: 1,
data: {
executionIndex: 0,
startTime: Date.parse('2025-04-20T12:34:51.000Z'),
@@ -459,7 +464,7 @@ describe('LogsPanel', () => {
// Create deep copy so that renaming doesn't affect other test cases
workflowsStore.setWorkflow(deepCopy(aiChatWorkflow));
workflowsStore.setWorkflowExecutionData(deepCopy(aiChatExecutionResponse));
workflowsStore.setWorkflowExecutionData(aiChatExecutionResponse);
const rendered = render();
@@ -541,7 +546,7 @@ describe('LogsPanel', () => {
const canvasOperations = useCanvasOperations();
workflowsStore.setWorkflow(deepCopy(aiChatWorkflow));
workflowsStore.setWorkflowExecutionData(deepCopy(aiChatExecutionResponse));
workflowsStore.setWorkflowExecutionData(aiChatExecutionResponse);
logsStore.toggleLogSelectionSync(true);
@@ -600,7 +605,10 @@ describe('LogsPanel', () => {
// Verify message and response
expect(await findByText('Hello AI!')).toBeInTheDocument();
workflowsStore.setWorkflowExecutionData({ ...aiChatExecutionResponse, status: 'success' });
workflowsStore.setWorkflowExecutionData({
...aiChatExecutionResponse,
status: 'success',
});
await waitFor(() => expect(getByText('AI response message')).toBeInTheDocument());
// Verify workflow execution

View File

@@ -79,6 +79,7 @@ export const useWebSocketClient = <T>(options: UseWebSocketClientOptions<T>) =>
socket.value.addEventListener('message', onMessage);
socket.value.addEventListener('error', onError);
socket.value.addEventListener('close', onConnectionLost);
socket.value.binaryType = 'arraybuffer';
};
const reconnectTimer = useReconnectTimer({

View File

@@ -60,14 +60,20 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
* Process a newly received message
*/
async function onMessage(data: unknown) {
let receivedData: PushMessage;
// The `nodeExecuteAfterData` message is sent as binary data
// to be handled by a web worker in the future.
if (data instanceof ArrayBuffer) {
data = new TextDecoder('utf-8').decode(new Uint8Array(data));
}
let parsedData: PushMessage;
try {
receivedData = JSON.parse(data as string);
parsedData = JSON.parse(data as string);
} catch (error) {
return;
}
onMessageReceivedHandlers.value.forEach((handler) => handler(receivedData));
onMessageReceivedHandlers.value.forEach((handler) => handler(parsedData));
}
const url = getConnectionUrl();

View File

@@ -119,7 +119,7 @@ describe('schemaPreview.store', () => {
}),
);
await store.trackSchemaPreviewExecution(
mock<PushPayload<'nodeExecuteAfter'>>({
mock<PushPayload<'nodeExecuteAfterData'>>({
nodeName: 'Test',
data: {
executionStatus: 'success',
@@ -145,7 +145,7 @@ describe('schemaPreview.store', () => {
const store = useSchemaPreviewStore();
vi.mocked(useWorkflowsStore().getNodeByName).mockReturnValueOnce(mock<INode>());
await store.trackSchemaPreviewExecution(
mock<PushPayload<'nodeExecuteAfter'>>({
mock<PushPayload<'nodeExecuteAfterData'>>({
nodeName: 'Test',
data: {
executionStatus: 'success',
@@ -160,7 +160,7 @@ describe('schemaPreview.store', () => {
it('should not track failed executions', async () => {
const store = useSchemaPreviewStore();
await store.trackSchemaPreviewExecution(
mock<PushPayload<'nodeExecuteAfter'>>({
mock<PushPayload<'nodeExecuteAfterData'>>({
data: {
executionStatus: 'error',
},

View File

@@ -48,7 +48,7 @@ export const useSchemaPreviewStore = defineStore('schemaPreview', () => {
}
}
async function trackSchemaPreviewExecution(pushEvent: PushPayload<'nodeExecuteAfter'>) {
async function trackSchemaPreviewExecution(pushEvent: PushPayload<'nodeExecuteAfterData'>) {
if (schemaPreviews.size === 0 || pushEvent.data.executionStatus !== 'success') {
return;
}

View File

@@ -679,7 +679,17 @@ describe('useWorkflowsStore', () => {
});
describe('updateNodeExecutionData', () => {
const { successEvent, errorEvent, executionResponse } = generateMockExecutionEvents();
let successEvent: ReturnType<typeof generateMockExecutionEvents>['successEvent'];
let errorEvent: ReturnType<typeof generateMockExecutionEvents>['errorEvent'];
let executionResponse: ReturnType<typeof generateMockExecutionEvents>['executionResponse'];
beforeEach(() => {
const events = generateMockExecutionEvents();
successEvent = events.successEvent;
errorEvent = events.errorEvent;
executionResponse = events.executionResponse;
});
it('should throw error if not initialized', () => {
expect(() => workflowsStore.updateNodeExecutionData(successEvent)).toThrowError();
});
@@ -1420,6 +1430,7 @@ function generateMockExecutionEvents() {
const successEvent: PushPayload<'nodeExecuteAfter'> = {
executionId: '59',
nodeName: 'When clicking Execute workflow',
itemCount: 1,
data: {
hints: [],
startTime: 1727867966633,
@@ -1427,18 +1438,6 @@ function generateMockExecutionEvents() {
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
},
};

View File

@@ -567,8 +567,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
isArchived?: boolean;
parentFolderId?: string;
} = {},
includeFolders: boolean = false,
onlySharedWithMe: boolean = false,
includeFolders = false,
onlySharedWithMe = false,
): Promise<WorkflowListResource[]> {
const filter = { ...filters, projectId };
const options = {
@@ -1154,7 +1154,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
connections[index].index === destinationData.index
) {
// Found the connection to remove
connections.splice(parseInt(index, 10), 1);
connections.splice(Number.parseInt(index, 10), 1);
}
}
@@ -1196,16 +1196,16 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
for (sourceIndex of Object.keys(workflow.value.connections[sourceNode][type])) {
indexesToRemove.length = 0;
const connectionsToRemove =
workflow.value.connections[sourceNode][type][parseInt(sourceIndex, 10)];
workflow.value.connections[sourceNode][type][Number.parseInt(sourceIndex, 10)];
if (connectionsToRemove) {
for (connectionIndex of Object.keys(connectionsToRemove)) {
connectionData = connectionsToRemove[parseInt(connectionIndex, 10)];
connectionData = connectionsToRemove[Number.parseInt(connectionIndex, 10)];
if (connectionData.node === node.name) {
indexesToRemove.push(connectionIndex);
}
}
indexesToRemove.forEach((index) => {
connectionsToRemove.splice(parseInt(index, 10), 1);
connectionsToRemove.splice(Number.parseInt(index, 10), 1);
});
}
}
@@ -1560,7 +1560,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
];
}
function updateNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
function updateNodeExecutionData(pushData: PushPayload<'nodeExecuteAfterData'>): void {
if (!workflowExecutionData.value?.data) {
throw new Error('The "workflowExecutionData" is not initialized!');
}
@@ -1571,19 +1571,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
if (!node) return;
if (workflowExecutionData.value.data.resultData.runData[nodeName] === undefined) {
workflowExecutionData.value = {
...workflowExecutionData.value,
data: {
...workflowExecutionData.value.data,
resultData: {
...workflowExecutionData.value.data.resultData,
runData: {
...workflowExecutionData.value.data.resultData.runData,
[nodeName]: [],
},
},
},
};
workflowExecutionData.value.data.resultData.runData[nodeName] = [];
}
const tasksData = workflowExecutionData.value.data!.resultData.runData[nodeName];
@@ -1611,7 +1599,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
existingRunIndex > -1 && !hasWaitingItems ? existingRunIndex : tasksData.length - 1;
const status = tasksData[index]?.executionStatus ?? 'unknown';
if ('waiting' === status || 'running' === status) {
if ('waiting' === status || 'running' === status || tasksData[existingRunIndex]) {
tasksData.splice(index, 1, data);
} else {
tasksData.push(data);
@@ -1630,16 +1618,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const { [nodeName]: removedRunData, ...remainingRunData } =
workflowExecutionData.value.data.resultData.runData;
workflowExecutionData.value = {
...workflowExecutionData.value,
data: {
...workflowExecutionData.value.data,
resultData: {
...workflowExecutionData.value.data.resultData,
runData: remainingRunData,
},
},
};
workflowExecutionData.value.data.resultData.runData = remainingRunData;
}
function pinDataByNodeName(nodeName: string): INodeExecutionData[] | undefined {

View File

@@ -12,6 +12,7 @@ import type {
IRunData,
ExecutionError,
INodeTypeBaseDescription,
INodeExecutionData,
} from 'n8n-workflow';
import type {
ExecutionFilterType,
@@ -107,7 +108,7 @@ export const executionFilterToQueryFilter = (
return queryFilter;
};
let formPopupWindow: boolean = false;
let formPopupWindow = false;
export const openFormPopupWindow = (url: string) => {
if (!formPopupWindow) {
@@ -226,26 +227,42 @@ export const waitingNodeTooltip = (node: INodeUi | null | undefined) => {
return '';
};
/**
* Check whether node execution data contains a trimmed item.
*/
export function isTrimmedNodeExecutionData(data: INodeExecutionData[] | null) {
return data?.some((entry) => entry.json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY]);
}
/**
* Check whether task data contains a trimmed item.
*
* In manual executions in scaling mode, the payload in push messages may be
* arbitrarily large. To protect Redis as it relays run data from workers to
* main process, we set a limit on payload size. If the payload is oversize,
* the main process, we set a limit on payload size. If the payload is oversize,
* we replace it with a placeholder, which is later overridden on execution
* finish, when the client receives the full data.
*/
export function hasTrimmedItem(taskData: ITaskData[]) {
return taskData[0]?.data?.main?.[0]?.[0]?.json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY] ?? false;
export function isTrimmedTaskData(taskData: ITaskData) {
return taskData.data?.main?.some((main) => isTrimmedNodeExecutionData(main));
}
/**
* Check whether task data contains a trimmed item.
*
* See {@link isTrimmedTaskData} for more details.
*/
export function hasTrimmedTaskData(taskData: ITaskData[]) {
return taskData.some(isTrimmedTaskData);
}
/**
* Check whether run data contains any trimmed items.
*
* See {@link hasTrimmedItem} for more details.
* See {@link hasTrimmedTaskData} for more details.
*/
export function hasTrimmedData(runData: IRunData) {
return Object.keys(runData).some((nodeName) => hasTrimmedItem(runData[nodeName]));
export function hasTrimmedRunData(runData: IRunData) {
return Object.keys(runData).some((nodeName) => hasTrimmedTaskData(runData[nodeName]));
}
export function executionRetryMessage(executionStatus: ExecutionStatus):