fix(core): Fix $getWorkflowStaticData on task runners (#12153)

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Iván Ovejero
2024-12-11 19:48:36 +01:00
committed by GitHub
parent dce0c58f86
commit b479f14ef5
5 changed files with 23 additions and 4 deletions

View File

@@ -36,6 +36,7 @@ describe('JsTaskRunner', () => {
grantToken: 'grantToken', grantToken: 'grantToken',
maxConcurrency: 1, maxConcurrency: 1,
taskBrokerUri: 'http://localhost', taskBrokerUri: 'http://localhost',
taskTimeout: 60,
...baseRunnerOpts, ...baseRunnerOpts,
}, },
jsRunnerConfig: { jsRunnerConfig: {
@@ -244,6 +245,7 @@ describe('JsTaskRunner', () => {
['$runIndex', 0], ['$runIndex', 0],
['{ wf: $workflow }', { wf: { active: true, id: '1', name: 'Test Workflow' } }], ['{ wf: $workflow }', { wf: { active: true, id: '1', name: 'Test Workflow' } }],
['$vars', { var: 'value' }], ['$vars', { var: 'value' }],
['$getWorkflowStaticData("global")', {}],
], ],
'Node.js internal functions': [ 'Node.js internal functions': [
['typeof Function', 'function'], ['typeof Function', 'function'],

View File

@@ -1,5 +1,5 @@
import { getAdditionalKeys } from 'n8n-core'; import { getAdditionalKeys } from 'n8n-core';
import { WorkflowDataProxy, Workflow } from 'n8n-workflow'; import { WorkflowDataProxy, Workflow, ObservableObject } from 'n8n-workflow';
import type { import type {
CodeExecutionMode, CodeExecutionMode,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
@@ -138,6 +138,8 @@ export class JsTaskRunner extends TaskRunner {
}, },
}; };
workflow.staticData = ObservableObject.create(workflow.staticData);
const result = const result =
settings.nodeMode === 'runOnceForAllItems' settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal) ? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal)
@@ -146,6 +148,7 @@ export class JsTaskRunner extends TaskRunner {
return { return {
result, result,
customData: data.runExecutionData.resultData.metadata, customData: data.runExecutionData.resultData.metadata,
staticData: workflow.staticData.__dataChanged ? workflow.staticData : undefined,
}; };
} }
@@ -200,7 +203,7 @@ export class JsTaskRunner extends TaskRunner {
module: {}, module: {},
console: customConsole, console: customConsole,
items: inputItems, items: inputItems,
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, data.node),
...this.getNativeVariables(), ...this.getNativeVariables(),
...dataProxy, ...dataProxy,
...this.buildRpcCallObject(taskId), ...this.buildRpcCallObject(taskId),
@@ -273,7 +276,8 @@ export class JsTaskRunner extends TaskRunner {
module: {}, module: {},
console: customConsole, console: customConsole,
item, item,
$getWorkflowStaticData: (type: 'global' | 'node') =>
workflow.getStaticData(type, data.node),
...this.getNativeVariables(), ...this.getNativeVariables(),
...dataProxy, ...dataProxy,
...this.buildRpcCallObject(taskId), ...this.buildRpcCallObject(taskId),

View File

@@ -61,6 +61,7 @@ export interface DataRequestResponse {
export interface TaskResultData { export interface TaskResultData {
result: INodeExecutionData[]; result: INodeExecutionData[];
customData?: Record<string, string>; customData?: Record<string, string>;
staticData?: IDataObject;
} }
export interface TaskData { export interface TaskData {

View File

@@ -1,5 +1,6 @@
import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner';
import { RPC_ALLOW_LIST } from '@n8n/task-runner'; import { RPC_ALLOW_LIST } from '@n8n/task-runner';
import { createResultOk, createResultError } from 'n8n-workflow';
import type { import type {
EnvProviderState, EnvProviderState,
IExecuteFunctions, IExecuteFunctions,
@@ -15,7 +16,6 @@ import type {
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
Result, Result,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { Service } from 'typedi'; import { Service } from 'typedi';
@@ -158,6 +158,11 @@ export abstract class TaskManager {
}); });
} }
const { staticData: incomingStaticData } = resultData;
// if the runner sent back static data, then it changed, so update it
if (incomingStaticData) workflow.overrideStaticData(incomingStaticData);
return createResultOk(resultData.result as TData); return createResultOk(resultData.result as TData);
} catch (e: unknown) { } catch (e: unknown) {
return createResultError(e as TError); return createResultError(e as TError);

View File

@@ -157,6 +157,13 @@ export class Workflow {
this.expression = new Expression(this); this.expression = new Expression(this);
} }
overrideStaticData(staticData?: IDataObject) {
this.staticData = ObservableObject.create(staticData || {}, undefined, {
ignoreEmptyOnFirstChild: true,
});
this.staticData.__dataChanged = true;
}
/** /**
* The default connections are by source node. This function rewrites them by destination nodes * The default connections are by source node. This function rewrites them by destination nodes
* to easily find parent nodes. * to easily find parent nodes.