Add polling support to Trigger-Nodes

This commit is contained in:
Jan Oberhauser
2019-12-31 14:19:37 -06:00
parent d072321ad4
commit 584033ab4a
12 changed files with 600 additions and 289 deletions

View File

@@ -1,19 +1,23 @@
import { CronJob } from 'cron';
import {
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
Workflow,
} from 'n8n-workflow';
export interface WorkflowData {
workflow: Workflow;
triggerResponse?: ITriggerResponse;
}
import {
ITriggerTime,
IWorkflowData,
} from './';
export class ActiveWorkflows {
private workflowData: {
[key: string]: WorkflowData;
[key: string]: IWorkflowData;
} = {};
@@ -48,7 +52,7 @@ export class ActiveWorkflows {
* @returns {(WorkflowData | undefined)}
* @memberof ActiveWorkflows
*/
get(id: string): WorkflowData | undefined {
get(id: string): IWorkflowData | undefined {
return this.workflowData[id];
}
@@ -62,7 +66,7 @@ export class ActiveWorkflows {
* @returns {Promise<void>}
* @memberof ActiveWorkflows
*/
async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getTriggerFunctions: IGetExecuteTriggerFunctions): Promise<void> {
async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getTriggerFunctions: IGetExecuteTriggerFunctions, getPollFunctions: IGetExecutePollFunctions): Promise<void> {
console.log('ADD ID (active): ' + id);
this.workflowData[id] = {
@@ -78,9 +82,110 @@ export class ActiveWorkflows {
this.workflowData[id].triggerResponse = triggerResponse;
}
}
const pollNodes = workflow.getPollNodes();
for (const pollNode of pollNodes) {
this.workflowData[id].pollResponse = await this.activatePolling(pollNode, workflow, additionalData, getPollFunctions);
}
}
/**
* Activates polling for the given node
*
* @param {INode} node
* @param {Workflow} workflow
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {IGetExecutePollFunctions} getPollFunctions
* @returns {Promise<IPollResponse>}
* @memberof ActiveWorkflows
*/
async activatePolling(node: INode, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getPollFunctions: IGetExecutePollFunctions): Promise<IPollResponse> {
const mode = 'trigger';
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode);
const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
item: ITriggerTime[];
};
// Define the order the cron-time-parameter appear
const parameterOrder = [
'second', // 0 - 59
'minute', // 0 - 59
'hour', // 0 - 23
'dayOfMonth', // 1 - 31
'month', // 0 - 11(Jan - Dec)
'weekday', // 0 - 6(Sun - Sat)
];
// Get all the trigger times
const cronTimes: string[] = [];
let cronTime: string[];
let parameterName: string;
if (pollTimes.item !== undefined) {
for (const item of pollTimes.item) {
cronTime = [];
if (item.mode === 'custom') {
cronTimes.push(item.cronExpression as string);
continue;
}
if (item.mode === 'everyMinute') {
cronTimes.push(`${Math.floor(Math.random() * 60).toString()} * * * * *`);
continue;
}
for (parameterName of parameterOrder) {
if (item[parameterName] !== undefined) {
// Value is set so use it
cronTime.push(item[parameterName] as string);
} else if (parameterName === 'second') {
// For seconds we use by default a random one to make sure to
// balance the load a little bit over time
cronTime.push(Math.floor(Math.random() * 60).toString());
} else {
// For all others set "any"
cronTime.push('*');
}
}
cronTimes.push(cronTime.join(' '));
}
}
// The trigger function to execute when the cron-time got reached
const executeTrigger = async () => {
const pollResponse = await workflow.runPoll(node, pollFunctions);
if (pollResponse !== null) {
// TODO: Run workflow
pollFunctions.__emit(pollResponse);
}
};
// Execute the trigger directly to be able to know if it works
await executeTrigger();
const timezone = pollFunctions.getTimezone();
// Start the cron-jobs
const cronJobs: CronJob[] = [];
for (const cronTime of cronTimes) {
cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone));
}
// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
}
return {
closeFunction,
};
}
/**
* Makes a workflow inactive
@@ -103,6 +208,10 @@ export class ActiveWorkflows {
await workflowData.triggerResponse.closeFunction();
}
if (workflowData.pollResponse && workflowData.pollResponse.closeFunction) {
await workflowData.pollResponse.closeFunction();
}
delete this.workflowData[id];
}

View File

@@ -8,13 +8,16 @@ import {
ILoadOptionsFunctions as ILoadOptionsFunctionsBase,
INodeExecutionData,
INodeType,
IPollFunctions as IPollFunctionsBase,
IPollResponse,
ITriggerFunctions as ITriggerFunctionsBase,
ITriggerResponse,
IWebhookFunctions as IWebhookFunctionsBase,
IWorkflowSettings as IWorkflowSettingsWorkflow,
Workflow,
} from 'n8n-workflow';
import * as request from 'request';
import * as requestPromise from 'request-promise-native';
interface Constructable<T> {
@@ -31,7 +34,7 @@ export interface IProcessMessage {
export interface IExecuteFunctions extends IExecuteFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
request: requestPromise.RequestPromiseAPI,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
@@ -40,7 +43,16 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase {
export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI < requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl >,
request: requestPromise.RequestPromiseAPI,
};
}
export interface IPollFunctions extends IPollFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: requestPromise.RequestPromiseAPI,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
@@ -48,12 +60,22 @@ export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase {
export interface ITriggerFunctions extends ITriggerFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
request: requestPromise.RequestPromiseAPI,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
export interface ITriggerTime {
mode: string;
hour: number;
minute: number;
dayOfMonth: number;
weekeday: number;
[key: string]: string | number;
}
export interface IUserSettings {
encryptionKey?: string;
tunnelSubdomain?: string;
@@ -61,14 +83,14 @@ export interface IUserSettings {
export interface ILoadOptionsFunctions extends ILoadOptionsFunctionsBase {
helpers: {
request?: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
request?: requestPromise.RequestPromiseAPI,
};
}
export interface IHookFunctions extends IHookFunctionsBase {
helpers: {
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
request: requestPromise.RequestPromiseAPI,
};
}
@@ -76,7 +98,7 @@ export interface IHookFunctions extends IHookFunctionsBase {
export interface IWebhookFunctions extends IWebhookFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
request: requestPromise.RequestPromiseAPI,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
@@ -98,3 +120,10 @@ export interface INodeDefinitionFile {
export interface INodeInputDataConnections {
[key: string]: INodeExecutionData[][];
}
export interface IWorkflowData {
pollResponse?: IPollResponse;
triggerResponse?: ITriggerResponse;
workflow: Workflow;
}

View File

@@ -17,6 +17,7 @@ import {
INodeExecutionData,
INodeParameters,
INodeType,
IPollFunctions,
IRunExecutionData,
ITaskDataConnections,
ITriggerFunctions,
@@ -310,6 +311,57 @@ export function getWebhookDescription(name: string, workflow: Workflow, node: IN
/**
* Returns the execute functions the poll nodes have access to.
*
* @export
* @param {Workflow} workflow
* @param {INode} node
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {ITriggerFunctions}
*/
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowRunner.add
export function getExecutePollFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IPollFunctions {
return ((workflow: Workflow, node: INode) => {
return {
__emit: (data: INodeExecutionData[][]): void => {
throw new Error('Overwrite NodeExecuteFunctions.getExecutePullFunctions.__emit function!');
},
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getMode: (): WorkflowExecuteMode => {
return mode;
},
getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getRestApiUrl: (): string => {
return additionalData.restApiUrl;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getWorkflowStaticData(type: string): IDataObject {
return workflow.getStaticData(type, node);
},
helpers: {
prepareBinaryData,
request: requestPromise,
returnJsonArray,
},
};
})(workflow, node);
}
/**
* Returns the execute functions the trigger nodes have access to.
*