mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-19 19:11:13 +00:00
refactor(core): Extract trigger context out of NodeExecutionFunctions (no-changelog) (#11453)
This commit is contained in:
committed by
GitHub
parent
e4aa1d01f3
commit
1be7de6180
@@ -1,6 +1,12 @@
|
|||||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||||
|
|
||||||
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions, PollContext } from 'n8n-core';
|
import {
|
||||||
|
ActiveWorkflows,
|
||||||
|
InstanceSettings,
|
||||||
|
NodeExecuteFunctions,
|
||||||
|
PollContext,
|
||||||
|
TriggerContext,
|
||||||
|
} from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
ExecutionError,
|
ExecutionError,
|
||||||
IDeferredPromise,
|
IDeferredPromise,
|
||||||
@@ -325,18 +331,11 @@ export class ActiveWorkflowManager {
|
|||||||
activation: WorkflowActivateMode,
|
activation: WorkflowActivateMode,
|
||||||
): IGetExecuteTriggerFunctions {
|
): IGetExecuteTriggerFunctions {
|
||||||
return (workflow: Workflow, node: INode) => {
|
return (workflow: Workflow, node: INode) => {
|
||||||
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(
|
const emit = (
|
||||||
workflow,
|
|
||||||
node,
|
|
||||||
additionalData,
|
|
||||||
mode,
|
|
||||||
activation,
|
|
||||||
);
|
|
||||||
returnFunctions.emit = (
|
|
||||||
data: INodeExecutionData[][],
|
data: INodeExecutionData[][],
|
||||||
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
||||||
donePromise?: IDeferredPromise<IRun | undefined>,
|
donePromise?: IDeferredPromise<IRun | undefined>,
|
||||||
): void => {
|
) => {
|
||||||
this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
|
this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
|
||||||
void this.workflowStaticDataService.saveStaticData(workflow);
|
void this.workflowStaticDataService.saveStaticData(workflow);
|
||||||
|
|
||||||
@@ -360,7 +359,7 @@ export class ActiveWorkflowManager {
|
|||||||
executePromise.catch((error: Error) => this.logger.error(error.message, { error }));
|
executePromise.catch((error: Error) => this.logger.error(error.message, { error }));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
returnFunctions.emitError = (error: Error): void => {
|
const emitError = (error: Error): void => {
|
||||||
this.logger.info(
|
this.logger.info(
|
||||||
`The trigger node "${node.name}" of workflow "${workflowData.name}" failed with the error: "${error.message}". Will try to reactivate.`,
|
`The trigger node "${node.name}" of workflow "${workflowData.name}" failed with the error: "${error.message}". Will try to reactivate.`,
|
||||||
{
|
{
|
||||||
@@ -385,7 +384,7 @@ export class ActiveWorkflowManager {
|
|||||||
|
|
||||||
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
|
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
|
||||||
};
|
};
|
||||||
return returnFunctions;
|
return new TriggerContext(workflow, node, additionalData, mode, activation, emit, emitError);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -101,7 +101,6 @@ import type {
|
|||||||
INodeParameters,
|
INodeParameters,
|
||||||
EnsureTypeOptions,
|
EnsureTypeOptions,
|
||||||
SSHTunnelFunctions,
|
SSHTunnelFunctions,
|
||||||
SchedulingFunctions,
|
|
||||||
DeduplicationHelperFunctions,
|
DeduplicationHelperFunctions,
|
||||||
IDeduplicationOutput,
|
IDeduplicationOutput,
|
||||||
IDeduplicationOutputItems,
|
IDeduplicationOutputItems,
|
||||||
@@ -168,8 +167,7 @@ import { extractValue } from './ExtractValue';
|
|||||||
import { InstanceSettings } from './InstanceSettings';
|
import { InstanceSettings } from './InstanceSettings';
|
||||||
import type { ExtendedValidationResult, IResponseError } from './Interfaces';
|
import type { ExtendedValidationResult, IResponseError } from './Interfaces';
|
||||||
// eslint-disable-next-line import/no-cycle
|
// eslint-disable-next-line import/no-cycle
|
||||||
import { PollContext } from './node-execution-context';
|
import { PollContext, TriggerContext } from './node-execution-context';
|
||||||
import { ScheduledTaskManager } from './ScheduledTaskManager';
|
|
||||||
import { getSecretsProxy } from './Secrets';
|
import { getSecretsProxy } from './Secrets';
|
||||||
import { SSHClientsManager } from './SSHClientsManager';
|
import { SSHClientsManager } from './SSHClientsManager';
|
||||||
|
|
||||||
@@ -3346,14 +3344,6 @@ const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
|
|||||||
await Container.get(SSHClientsManager).getClient(credentials),
|
await Container.get(SSHClientsManager).getClient(credentials),
|
||||||
});
|
});
|
||||||
|
|
||||||
const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
|
|
||||||
const scheduledTaskManager = Container.get(ScheduledTaskManager);
|
|
||||||
return {
|
|
||||||
registerCron: (cronExpression, onTick) =>
|
|
||||||
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
const getAllowedPaths = () => {
|
const getAllowedPaths = () => {
|
||||||
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
|
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
|
||||||
if (!restrictFileAccessTo) {
|
if (!restrictFileAccessTo) {
|
||||||
@@ -3571,58 +3561,7 @@ export function getExecuteTriggerFunctions(
|
|||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
activation: WorkflowActivateMode,
|
activation: WorkflowActivateMode,
|
||||||
): ITriggerFunctions {
|
): ITriggerFunctions {
|
||||||
return ((workflow: Workflow, node: INode) => {
|
return new TriggerContext(workflow, node, additionalData, mode, activation);
|
||||||
return {
|
|
||||||
...getCommonWorkflowFunctions(workflow, node, additionalData),
|
|
||||||
emit: (): void => {
|
|
||||||
throw new ApplicationError(
|
|
||||||
'Overwrite NodeExecuteFunctions.getExecuteTriggerFunctions.emit function',
|
|
||||||
);
|
|
||||||
},
|
|
||||||
emitError: (): void => {
|
|
||||||
throw new ApplicationError(
|
|
||||||
'Overwrite NodeExecuteFunctions.getExecuteTriggerFunctions.emit function',
|
|
||||||
);
|
|
||||||
},
|
|
||||||
getMode: () => mode,
|
|
||||||
getActivationMode: () => activation,
|
|
||||||
getCredentials: async (type) =>
|
|
||||||
await getCredentials(workflow, node, type, additionalData, mode),
|
|
||||||
getNodeParameter: (
|
|
||||||
parameterName: string,
|
|
||||||
fallbackValue?: any,
|
|
||||||
options?: IGetNodeParameterOptions,
|
|
||||||
): NodeParameterValueType | object => {
|
|
||||||
const runExecutionData: IRunExecutionData | null = null;
|
|
||||||
const itemIndex = 0;
|
|
||||||
const runIndex = 0;
|
|
||||||
const connectionInputData: INodeExecutionData[] = [];
|
|
||||||
|
|
||||||
return getNodeParameter(
|
|
||||||
workflow,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
connectionInputData,
|
|
||||||
node,
|
|
||||||
parameterName,
|
|
||||||
itemIndex,
|
|
||||||
mode,
|
|
||||||
getAdditionalKeys(additionalData, mode, runExecutionData),
|
|
||||||
undefined,
|
|
||||||
fallbackValue,
|
|
||||||
options,
|
|
||||||
);
|
|
||||||
},
|
|
||||||
helpers: {
|
|
||||||
createDeferredPromise,
|
|
||||||
...getSSHTunnelFunctions(),
|
|
||||||
...getRequestHelperFunctions(workflow, node, additionalData),
|
|
||||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
|
||||||
...getSchedulingFunctions(workflow),
|
|
||||||
returnJsonArray,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
})(workflow, node);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,96 @@
|
|||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import type {
|
||||||
|
Expression,
|
||||||
|
ICredentialDataDecryptedObject,
|
||||||
|
ICredentialsHelper,
|
||||||
|
INode,
|
||||||
|
INodeType,
|
||||||
|
INodeTypes,
|
||||||
|
IWorkflowExecuteAdditionalData,
|
||||||
|
Workflow,
|
||||||
|
WorkflowActivateMode,
|
||||||
|
WorkflowExecuteMode,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { TriggerContext } from '../trigger-context';
|
||||||
|
|
||||||
|
describe('TriggerContext', () => {
|
||||||
|
const testCredentialType = 'testCredential';
|
||||||
|
const nodeType = mock<INodeType>({
|
||||||
|
description: {
|
||||||
|
credentials: [
|
||||||
|
{
|
||||||
|
name: testCredentialType,
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
properties: [
|
||||||
|
{
|
||||||
|
name: 'testParameter',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const nodeTypes = mock<INodeTypes>();
|
||||||
|
const expression = mock<Expression>();
|
||||||
|
const workflow = mock<Workflow>({ expression, nodeTypes });
|
||||||
|
const node = mock<INode>({
|
||||||
|
credentials: {
|
||||||
|
[testCredentialType]: {
|
||||||
|
id: 'testCredentialId',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
node.parameters = {
|
||||||
|
testParameter: 'testValue',
|
||||||
|
};
|
||||||
|
const credentialsHelper = mock<ICredentialsHelper>();
|
||||||
|
const additionalData = mock<IWorkflowExecuteAdditionalData>({ credentialsHelper });
|
||||||
|
const mode: WorkflowExecuteMode = 'manual';
|
||||||
|
const activation: WorkflowActivateMode = 'init';
|
||||||
|
|
||||||
|
const triggerContext = new TriggerContext(workflow, node, additionalData, mode, activation);
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getActivationMode', () => {
|
||||||
|
it('should return the activation property', () => {
|
||||||
|
const result = triggerContext.getActivationMode();
|
||||||
|
expect(result).toBe(activation);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getCredentials', () => {
|
||||||
|
it('should get decrypted credentials', async () => {
|
||||||
|
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||||
|
credentialsHelper.getDecrypted.mockResolvedValue({ secret: 'token' });
|
||||||
|
|
||||||
|
const credentials =
|
||||||
|
await triggerContext.getCredentials<ICredentialDataDecryptedObject>(testCredentialType);
|
||||||
|
|
||||||
|
expect(credentials).toEqual({ secret: 'token' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getNodeParameter', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||||
|
expression.getParameterValue.mockImplementation((value) => value);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return parameter value when it exists', () => {
|
||||||
|
const parameter = triggerContext.getNodeParameter('testParameter');
|
||||||
|
|
||||||
|
expect(parameter).toBe('testValue');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return the fallback value when the parameter does not exist', () => {
|
||||||
|
const parameter = triggerContext.getNodeParameter('otherParameter', 'fallback');
|
||||||
|
|
||||||
|
expect(parameter).toBe('fallback');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import type { SSHCredentials } from 'n8n-workflow';
|
||||||
|
import type { Client } from 'ssh2';
|
||||||
|
import { Container } from 'typedi';
|
||||||
|
|
||||||
|
import { SSHClientsManager } from '@/SSHClientsManager';
|
||||||
|
|
||||||
|
import { SSHTunnelHelpers } from '../ssh-tunnel-helpers';
|
||||||
|
|
||||||
|
describe('SSHTunnelHelpers', () => {
|
||||||
|
const sshClientsManager = mock<SSHClientsManager>();
|
||||||
|
Container.set(SSHClientsManager, sshClientsManager);
|
||||||
|
const sshTunnelHelpers = new SSHTunnelHelpers();
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getSSHClient', () => {
|
||||||
|
const credentials = mock<SSHCredentials>();
|
||||||
|
|
||||||
|
it('should call SSHClientsManager.getClient with the given credentials', async () => {
|
||||||
|
const mockClient = mock<Client>();
|
||||||
|
sshClientsManager.getClient.mockResolvedValue(mockClient);
|
||||||
|
|
||||||
|
const client = await sshTunnelHelpers.getSSHClient(credentials);
|
||||||
|
|
||||||
|
expect(sshClientsManager.getClient).toHaveBeenCalledWith(credentials);
|
||||||
|
expect(client).toBe(mockClient);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
import type { SSHCredentials, SSHTunnelFunctions } from 'n8n-workflow';
|
||||||
|
import { Container } from 'typedi';
|
||||||
|
|
||||||
|
import { SSHClientsManager } from '@/SSHClientsManager';
|
||||||
|
|
||||||
|
export class SSHTunnelHelpers {
|
||||||
|
private readonly sshClientsManager = Container.get(SSHClientsManager);
|
||||||
|
|
||||||
|
get exported(): SSHTunnelFunctions {
|
||||||
|
return {
|
||||||
|
getSSHClient: this.getSSHClient.bind(this),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSSHClient(credentials: SSHCredentials) {
|
||||||
|
return await this.sshClientsManager.getClient(credentials);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,2 +1,3 @@
|
|||||||
// eslint-disable-next-line import/no-cycle
|
// eslint-disable-next-line import/no-cycle
|
||||||
export { PollContext } from './poll-context';
|
export { PollContext } from './poll-context';
|
||||||
|
export { TriggerContext } from './trigger-context';
|
||||||
|
|||||||
96
packages/core/src/node-execution-context/trigger-context.ts
Normal file
96
packages/core/src/node-execution-context/trigger-context.ts
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import type {
|
||||||
|
ICredentialDataDecryptedObject,
|
||||||
|
IGetNodeParameterOptions,
|
||||||
|
INode,
|
||||||
|
INodeExecutionData,
|
||||||
|
IRunExecutionData,
|
||||||
|
ITriggerFunctions,
|
||||||
|
IWorkflowExecuteAdditionalData,
|
||||||
|
NodeParameterValueType,
|
||||||
|
Workflow,
|
||||||
|
WorkflowActivateMode,
|
||||||
|
WorkflowExecuteMode,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
|
||||||
|
|
||||||
|
// eslint-disable-next-line import/no-cycle
|
||||||
|
import {
|
||||||
|
getAdditionalKeys,
|
||||||
|
getCredentials,
|
||||||
|
getNodeParameter,
|
||||||
|
returnJsonArray,
|
||||||
|
} from '@/NodeExecuteFunctions';
|
||||||
|
|
||||||
|
import { BinaryHelpers } from './helpers/binary-helpers';
|
||||||
|
import { RequestHelpers } from './helpers/request-helpers';
|
||||||
|
import { SchedulingHelpers } from './helpers/scheduling-helpers';
|
||||||
|
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
|
||||||
|
import { NodeExecutionContext } from './node-execution-context';
|
||||||
|
|
||||||
|
const throwOnEmit = () => {
|
||||||
|
throw new ApplicationError('Overwrite TriggerContext.emit function');
|
||||||
|
};
|
||||||
|
|
||||||
|
const throwOnEmitError = () => {
|
||||||
|
throw new ApplicationError('Overwrite TriggerContext.emitError function');
|
||||||
|
};
|
||||||
|
|
||||||
|
export class TriggerContext extends NodeExecutionContext implements ITriggerFunctions {
|
||||||
|
readonly helpers: ITriggerFunctions['helpers'];
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
workflow: Workflow,
|
||||||
|
node: INode,
|
||||||
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
|
mode: WorkflowExecuteMode,
|
||||||
|
private readonly activation: WorkflowActivateMode,
|
||||||
|
readonly emit: ITriggerFunctions['emit'] = throwOnEmit,
|
||||||
|
readonly emitError: ITriggerFunctions['emitError'] = throwOnEmitError,
|
||||||
|
) {
|
||||||
|
super(workflow, node, additionalData, mode);
|
||||||
|
|
||||||
|
this.helpers = {
|
||||||
|
createDeferredPromise,
|
||||||
|
returnJsonArray,
|
||||||
|
...new BinaryHelpers(workflow, additionalData).exported,
|
||||||
|
...new RequestHelpers(this, workflow, node, additionalData).exported,
|
||||||
|
...new SchedulingHelpers(workflow).exported,
|
||||||
|
...new SSHTunnelHelpers().exported,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
getActivationMode() {
|
||||||
|
return this.activation;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
|
||||||
|
return await getCredentials<T>(this.workflow, this.node, type, this.additionalData, this.mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
getNodeParameter(
|
||||||
|
parameterName: string,
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
fallbackValue?: any,
|
||||||
|
options?: IGetNodeParameterOptions,
|
||||||
|
): NodeParameterValueType | object {
|
||||||
|
const runExecutionData: IRunExecutionData | null = null;
|
||||||
|
const itemIndex = 0;
|
||||||
|
const runIndex = 0;
|
||||||
|
const connectionInputData: INodeExecutionData[] = [];
|
||||||
|
|
||||||
|
return getNodeParameter(
|
||||||
|
this.workflow,
|
||||||
|
runExecutionData,
|
||||||
|
runIndex,
|
||||||
|
connectionInputData,
|
||||||
|
this.node,
|
||||||
|
parameterName,
|
||||||
|
itemIndex,
|
||||||
|
this.mode,
|
||||||
|
getAdditionalKeys(this.additionalData, this.mode, runExecutionData),
|
||||||
|
undefined,
|
||||||
|
fallbackValue,
|
||||||
|
options,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user