fix(editor): Update node execution itemCount to support multiple outputs (no-changelog) (#19646)

This commit is contained in:
Alex Grozav
2025-09-18 12:11:04 +01:00
committed by GitHub
parent dee22162f4
commit 83b2a5772e
13 changed files with 435 additions and 27 deletions

View File

@@ -2,6 +2,7 @@ import type {
ExecutionStatus,
ITaskData,
ITaskStartedData,
NodeConnectionType,
WorkflowExecuteMode,
} from 'n8n-workflow';
@@ -61,8 +62,15 @@ export type NodeExecuteAfter = {
data: {
executionId: string;
nodeName: string;
data: Omit<ITaskData, 'data'>;
itemCount: number;
/**
* The data field for task data in `NodeExecuteAfter` is always trimmed (undefined).
*/
data: ITaskData;
/**
* The number of items per output connection type. This is needed so that the frontend
* can know how many items to expect when receiving the `NodeExecuteAfterData` message.
*/
itemCountByConnectionType: Partial<Record<NodeConnectionType, number[]>>;
};
};
@@ -81,7 +89,7 @@ export type NodeExecuteAfterData = {
* Later we fetch the entire execution data and fill in any placeholders.
*/
data: ITaskData;
itemCount: number;
itemCountByConnectionType: NodeExecuteAfter['data']['itemCountByConnectionType'];
};
};

View File

@@ -345,7 +345,14 @@ describe('Execution Lifecycle Hooks', () => {
1,
{
type: 'nodeExecuteAfter',
data: { executionId, nodeName, itemCount: 1, data: taskDataWithoutData },
data: {
executionId,
nodeName,
itemCountByConnectionType: {
main: [1],
},
data: taskDataWithoutData,
},
},
pushRef,
);
@@ -354,7 +361,14 @@ describe('Execution Lifecycle Hooks', () => {
2,
{
type: 'nodeExecuteAfterData',
data: { executionId, nodeName, itemCount: 1, data: mockTaskData },
data: {
executionId,
nodeName,
itemCountByConnectionType: {
main: [1],
},
data: mockTaskData,
},
},
pushRef,
true,

View File

@@ -27,6 +27,7 @@ import {
updateExistingExecution,
} from './shared/shared-hook-functions';
import { type ExecutionSaveSettings, toSaveSettings } from './to-save-settings';
import { getItemCountByConnectionType } from '@/utils/get-item-count-by-connection-type';
@Service()
class ModulesHooksRegistry {
@@ -185,11 +186,14 @@ function hookFunctionsPush(
workflowId: this.workflowData.id,
});
const itemCount = data.data?.main?.[0]?.length ?? 0;
const itemCountByConnectionType = getItemCountByConnectionType(data?.data);
const { data: _, ...taskData } = data;
pushInstance.send(
{ type: 'nodeExecuteAfter', data: { executionId, nodeName, itemCount, data: taskData } },
{
type: 'nodeExecuteAfter',
data: { executionId, nodeName, itemCountByConnectionType, data: taskData },
},
pushRef,
);
@@ -203,7 +207,7 @@ function hookFunctionsPush(
pushInstance.send(
{
type: 'nodeExecuteAfterData',
data: { executionId, nodeName, itemCount, data },
data: { executionId, nodeName, itemCountByConnectionType, data },
},
pushRef,
asBinary,

View File

@@ -0,0 +1,145 @@
import type { ITaskData } from 'n8n-workflow';
import { getItemCountByConnectionType } from '../get-item-count-by-connection-type';
describe('getItemCountByConnectionType', () => {
it('should return an empty object when data is undefined', () => {
const result = getItemCountByConnectionType(undefined);
expect(result).toEqual({});
});
it('should return an empty object when data is an empty object', () => {
const result = getItemCountByConnectionType({});
expect(result).toEqual({});
});
it('should count items for a single connection with single output', () => {
const data: ITaskData['data'] = {
main: [[{ json: { id: 1 } }, { json: { id: 2 } }]],
};
const result = getItemCountByConnectionType(data);
expect(result).toEqual({
main: [2],
});
});
it('should count items for a single connection with multiple outputs', () => {
const data: ITaskData['data'] = {
main: [
[{ json: { id: 1 } }, { json: { id: 2 } }],
[{ json: { id: 3 } }],
[{ json: { id: 4 } }, { json: { id: 5 } }, { json: { id: 6 } }],
],
};
const result = getItemCountByConnectionType(data);
expect(result).toEqual({
main: [2, 1, 3],
});
});
it('should handle multiple connection types', () => {
const data: ITaskData['data'] = {
main: [[{ json: { id: 1 } }, { json: { id: 2 } }]],
ai_agent: [[{ json: { error: 'test' } }]],
ai_memory: [
[
{ json: { data: 'custom' } },
{ json: { data: 'custom2' } },
{ json: { data: 'custom3' } },
],
],
};
const result = getItemCountByConnectionType(data);
expect(result).toEqual({
main: [2],
ai_agent: [1],
ai_memory: [3],
});
});
it('should handle empty arrays in connection data', () => {
const data: ITaskData['data'] = {
main: [[], [{ json: { id: 1 } }], []],
};
const result = getItemCountByConnectionType(data);
expect(result).toEqual({
main: [0, 1, 0],
});
});
it('should handle null values in connection data arrays', () => {
const data: ITaskData['data'] = {
main: [null, [{ json: { id: 1 } }], null],
};
const result = getItemCountByConnectionType(data);
expect(result).toEqual({
main: [0, 1, 0],
});
});
it('should handle connection data with mixed null and valid arrays', () => {
const data: ITaskData['data'] = {
main: [[{ json: { id: 1 } }], null, [{ json: { id: 2 } }, { json: { id: 3 } }], null, []],
};
const result = getItemCountByConnectionType(data);
expect(result).toEqual({
main: [1, 0, 2, 0, 0],
});
});
it('should discard unknown connection types', () => {
const data: ITaskData['data'] = {
main: [[{ json: { id: 1 } }, { json: { id: 2 } }]],
unknownType: [[{ json: { data: 'should be ignored' } }]],
anotherInvalid: [[{ json: { test: 'data' } }]],
};
const result = getItemCountByConnectionType(data);
// Should only include 'main' and discard unknown types
expect(result).toEqual({
main: [2],
});
expect(result).not.toHaveProperty('unknownType');
expect(result).not.toHaveProperty('anotherInvalid');
});
it('should handle mix of valid and invalid connection types', () => {
const data: ITaskData['data'] = {
invalidType1: [[{ json: { data: 'ignored' } }]],
main: [[{ json: { id: 1 } }]],
invalidType2: [[{ json: { data: 'also ignored' } }]],
ai_agent: [[{ json: { error: 'test error' } }]],
notAValidType: [[{ json: { foo: 'bar' } }]],
};
const result = getItemCountByConnectionType(data);
// Should only include valid NodeConnectionTypes
expect(result).toEqual({
main: [1],
ai_agent: [1],
});
expect(Object.keys(result)).toHaveLength(2);
});
it('should handle data with only invalid connection types', () => {
const data: ITaskData['data'] = {
fakeType1: [[{ json: { data: 'test' } }]],
fakeType2: [[{ json: { data: 'test2' } }]],
notReal: [[{ json: { id: 1 } }, { json: { id: 2 } }]],
};
const result = getItemCountByConnectionType(data);
// Should return empty object when no valid types found
expect(result).toEqual({});
expect(Object.keys(result)).toHaveLength(0);
});
});

View File

@@ -0,0 +1,22 @@
import type { NodeConnectionType, ITaskData } from 'n8n-workflow';
import { isNodeConnectionType } from 'n8n-workflow';
export function getItemCountByConnectionType(
data: ITaskData['data'],
): Partial<Record<NodeConnectionType, number[]>> {
const itemCountByConnectionType: Partial<Record<NodeConnectionType, number[]>> = {};
for (const [connectionType, connectionData] of Object.entries(data ?? {})) {
if (!isNodeConnectionType(connectionType)) {
continue;
}
if (Array.isArray(connectionData)) {
itemCountByConnectionType[connectionType] = connectionData.map((d) => (d ? d.length : 0));
} else {
itemCountByConnectionType[connectionType] = [0];
}
}
return itemCountByConnectionType;
}

View File

@@ -0,0 +1,190 @@
import { createTestingPinia } from '@pinia/testing';
import { setActivePinia } from 'pinia';
import { nodeExecuteAfter } from './nodeExecuteAfter';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useAssistantStore } from '@/stores/assistant.store';
import { mockedStore } from '@/__tests__/utils';
import type { NodeExecuteAfter } from '@n8n/api-types/push/execution';
import { TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow';
describe('nodeExecuteAfter', () => {
beforeEach(() => {
const pinia = createTestingPinia({
stubActions: true,
});
setActivePinia(pinia);
});
it('should update node execution data with placeholder and remove executing node', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const assistantStore = mockedStore(useAssistantStore);
const event: NodeExecuteAfter = {
type: 'nodeExecuteAfter',
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCountByConnectionType: { main: [2, 1] },
data: {
executionTime: 100,
startTime: 1234567890,
executionIndex: 0,
source: [],
},
},
};
await nodeExecuteAfter(event);
expect(workflowsStore.updateNodeExecutionData).toHaveBeenCalledTimes(1);
expect(workflowsStore.removeExecutingNode).toHaveBeenCalledTimes(1);
expect(workflowsStore.removeExecutingNode).toHaveBeenCalledWith('Test Node');
expect(assistantStore.onNodeExecution).toHaveBeenCalledTimes(1);
expect(assistantStore.onNodeExecution).toHaveBeenCalledWith(event.data);
// Verify the placeholder data structure
const updateCall = workflowsStore.updateNodeExecutionData.mock.calls[0][0];
expect(updateCall.data.data).toEqual({
main: [
Array.from({ length: 2 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
Array.from({ length: 1 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
],
});
});
it('should handle multiple connection types', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const event: NodeExecuteAfter = {
type: 'nodeExecuteAfter',
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCountByConnectionType: {
main: [3],
ai_memory: [1, 2],
ai_tool: [1],
},
data: {
executionTime: 100,
startTime: 1234567890,
executionIndex: 0,
source: [],
},
},
};
await nodeExecuteAfter(event);
const updateCall = workflowsStore.updateNodeExecutionData.mock.calls[0][0];
expect(updateCall.data.data).toEqual({
main: [
Array.from({ length: 3 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
],
ai_memory: [
Array.from({ length: 1 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
Array.from({ length: 2 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
],
ai_tool: [
Array.from({ length: 1 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
],
});
});
it('should handle empty itemCountByConnectionType', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const event: NodeExecuteAfter = {
type: 'nodeExecuteAfter',
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCountByConnectionType: {},
data: {
executionTime: 100,
startTime: 1234567890,
executionIndex: 0,
source: [],
},
},
};
await nodeExecuteAfter(event);
const updateCall = workflowsStore.updateNodeExecutionData.mock.calls[0][0];
expect(updateCall.data.data).toEqual({
main: [],
});
});
it('should preserve original data structure except for data property', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const event: NodeExecuteAfter = {
type: 'nodeExecuteAfter',
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCountByConnectionType: { main: [1] },
data: {
executionTime: 100,
startTime: 1234567890,
executionIndex: 0,
source: [null],
},
},
};
await nodeExecuteAfter(event);
const updateCall = workflowsStore.updateNodeExecutionData.mock.calls[0][0];
expect(updateCall.executionId).toBe('exec-1');
expect(updateCall.nodeName).toBe('Test Node');
expect(updateCall.data.executionTime).toBe(100);
expect(updateCall.data.startTime).toBe(1234567890);
expect(updateCall.data.executionIndex).toBe(0);
expect(updateCall.data.source).toEqual([null]);
// Only the data property should be replaced with placeholder
expect(updateCall.data.data).toEqual({
main: [
Array.from({ length: 1 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
],
});
});
it('should filter out invalid connection types', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const event: NodeExecuteAfter = {
type: 'nodeExecuteAfter',
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCountByConnectionType: {
main: [1],
// @ts-expect-error Testing invalid connection type
invalid_connection: [2], // This should be filtered out by isValidNodeConnectionType
},
data: {
executionTime: 100,
startTime: 1234567890,
executionIndex: 0,
source: [],
},
},
};
await nodeExecuteAfter(event);
const updateCall = workflowsStore.updateNodeExecutionData.mock.calls[0][0];
// Should only contain main connection, invalid_connection should be filtered out
expect(updateCall.data.data).toEqual({
main: [
Array.from({ length: 1 }).fill({ json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } }),
],
});
expect(updateCall.data.data?.invalid_connection).toBeUndefined();
});
});

View File

@@ -1,9 +1,10 @@
import type { NodeExecuteAfter } from '@n8n/api-types/push/execution';
import { useAssistantStore } from '@/stores/assistant.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import type { ITaskData } from 'n8n-workflow';
import type { INodeExecutionData, ITaskData } from 'n8n-workflow';
import { TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow';
import type { PushPayload } from '@n8n/api-types';
import { isValidNodeConnectionType } from '@/utils/typeGuards';
/**
* Handles the 'nodeExecuteAfter' event, which happens after a node is executed.
@@ -18,15 +19,23 @@ export async function nodeExecuteAfter({ data: pushData }: NodeExecuteAfter) {
* a placeholder object indicating that the data has been trimmed until the
* `nodeExecuteAfterData` event comes in.
*/
const placeholderOutputData: ITaskData['data'] = {
main: [],
};
if (typeof pushData.itemCount === 'number') {
const fillObject = { json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } };
const fillArray = new Array(pushData.itemCount).fill(fillObject);
placeholderOutputData.main = [fillArray];
if (
pushData.itemCountByConnectionType &&
typeof pushData.itemCountByConnectionType === 'object'
) {
const fillObject: INodeExecutionData = { json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true } };
for (const [connectionType, outputs] of Object.entries(pushData.itemCountByConnectionType)) {
if (isValidNodeConnectionType(connectionType)) {
placeholderOutputData[connectionType] = outputs.map((count) =>
Array.from({ length: count }, () => fillObject),
);
}
}
}
const pushDataWithPlaceholderOutputData: PushPayload<'nodeExecuteAfterData'> = {

View File

@@ -21,7 +21,7 @@ describe('nodeExecuteAfterData', () => {
data: {
executionId: 'exec-1',
nodeName: 'Test Node',
itemCount: 1,
itemCountByConnectionType: { main: [1] },
data: {
executionTime: 0,
startTime: 0,

View File

@@ -330,7 +330,7 @@ describe('LogsPanel', () => {
workflowsStore.updateNodeExecutionData({
nodeName: 'AI Agent',
executionId: '567',
itemCount: 1,
itemCountByConnectionType: { ai_agent: [1] },
data: {
executionIndex: 0,
startTime: Date.parse('2025-04-20T12:34:51.000Z'),

View File

@@ -1409,7 +1409,7 @@ function generateMockExecutionEvents() {
const successEvent: PushPayload<'nodeExecuteAfter'> = {
executionId: '59',
nodeName: 'When clicking Execute workflow',
itemCount: 1,
itemCountByConnectionType: { main: [1] },
data: {
hints: [],
startTime: 1727867966633,

View File

@@ -56,6 +56,7 @@ export {
isResourceMapperValue,
isResourceLocatorValue,
isFilterValue,
isNodeConnectionType,
} from './type-guards';
export {

View File

@@ -1,10 +1,12 @@
import type {
INodeProperties,
INodePropertyOptions,
INodePropertyCollection,
INodeParameterResourceLocator,
ResourceMapperValue,
FilterValue,
import {
type INodeProperties,
type INodePropertyOptions,
type INodePropertyCollection,
type INodeParameterResourceLocator,
type ResourceMapperValue,
type FilterValue,
type NodeConnectionType,
nodeConnectionTypes,
} from './interfaces';
export function isResourceLocatorValue(value: unknown): value is INodeParameterResourceLocator {
@@ -67,3 +69,7 @@ export const isFilterValue = (value: unknown): value is FilterValue => {
typeof value === 'object' && value !== null && 'conditions' in value && 'combinator' in value
);
};
export const isNodeConnectionType = (value: unknown): value is NodeConnectionType => {
return nodeConnectionTypes.includes(value as NodeConnectionType);
};