Add Webhook response node (#2254)

*  Add Webhook-Response-Node

*  Replace callback function with promise

*  Add support for Bull and binary-data

*  Add string response option

*  Remove some comments

*  Make more generically possible & fix issue multi call in
queue mode

*  Fix startup and eslint issues

*  Improvements to webhook response node and functionality

*  Replace data with more generic type

*  Make statusMessage optional

*  Change parameter order

*  Move Response Code underneath options

*  Hide Response Code on Webhook node if mode responseNode got selected

*  Minor improvements

*  Add missing file and fix lint issue

*  Fix some node linting issues

*  Apply feedback

*  Minor improvements
This commit is contained in:
Jan
2021-11-05 10:45:51 -06:00
committed by GitHub
parent 70a9f0446e
commit 7b8d388d17
23 changed files with 664 additions and 65 deletions

View File

@@ -12,7 +12,7 @@ import * as PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command'; import { Command, flags } from '@oclif/command';
import { UserSettings, WorkflowExecute } from 'n8n-core'; import { UserSettings, WorkflowExecute } from 'n8n-core';
import { INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow';
import { FindOneOptions } from 'typeorm'; import { FindOneOptions } from 'typeorm';
@@ -25,11 +25,13 @@ import {
GenericHelpers, GenericHelpers,
IBullJobData, IBullJobData,
IBullJobResponse, IBullJobResponse,
IBullWebhookResponse,
IExecutionFlattedDb, IExecutionFlattedDb,
InternalHooksManager, InternalHooksManager,
LoadNodesAndCredentials, LoadNodesAndCredentials,
NodeTypes, NodeTypes,
ResponseHelper, ResponseHelper,
WebhookHelpers,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
} from '../src'; } from '../src';
@@ -172,6 +174,16 @@ export class Worker extends Command {
currentExecutionDb.workflowData, currentExecutionDb.workflowData,
{ retryOf: currentExecutionDb.retryOf as string }, { retryOf: currentExecutionDb.retryOf as string },
); );
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
await job.progress({
executionId: job.data.executionId as string,
response: WebhookHelpers.encodeWebhookResponse(response),
} as IBullWebhookResponse);
},
];
additionalData.executionId = jobData.executionId; additionalData.executionId = jobData.executionId;
let workflowExecute: WorkflowExecute; let workflowExecute: WorkflowExecute;

View File

@@ -5,9 +5,12 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { IRun } from 'n8n-workflow'; import {
createDeferredPromise,
import { createDeferredPromise } from 'n8n-core'; IDeferredPromise,
IExecuteResponsePromiseData,
IRun,
} from 'n8n-workflow';
import { ChildProcess } from 'child_process'; import { ChildProcess } from 'child_process';
// eslint-disable-next-line import/no-extraneous-dependencies // eslint-disable-next-line import/no-extraneous-dependencies
@@ -116,6 +119,28 @@ export class ActiveExecutions {
this.activeExecutions[executionId].workflowExecution = workflowExecution; this.activeExecutions[executionId].workflowExecution = workflowExecution;
} }
attachResponsePromise(
executionId: string,
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
): void {
if (this.activeExecutions[executionId] === undefined) {
throw new Error(
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
);
}
this.activeExecutions[executionId].responsePromise = responsePromise;
}
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
if (this.activeExecutions[executionId] === undefined) {
return;
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
this.activeExecutions[executionId].responsePromise?.resolve(response);
}
/** /**
* Remove an active execution * Remove an active execution
* *
@@ -193,6 +218,7 @@ export class ActiveExecutions {
this.activeExecutions[executionId].postExecutePromises.push(waitPromise); this.activeExecutions[executionId].postExecutePromises.push(waitPromise);
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-member-access
return waitPromise.promise(); return waitPromise.promise();
} }

View File

@@ -12,7 +12,9 @@
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import { import {
IDeferredPromise,
IExecuteData, IExecuteData,
IExecuteResponsePromiseData,
IGetExecutePollFunctions, IGetExecutePollFunctions,
IGetExecuteTriggerFunctions, IGetExecuteTriggerFunctions,
INode, INode,
@@ -40,8 +42,6 @@ import {
NodeTypes, NodeTypes,
ResponseHelper, ResponseHelper,
WebhookHelpers, WebhookHelpers,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
WorkflowCredentials,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers, WorkflowHelpers,
WorkflowRunner, WorkflowRunner,
@@ -550,6 +550,7 @@ export class ActiveWorkflowRunner {
data: INodeExecutionData[][], data: INodeExecutionData[][],
additionalData: IWorkflowExecuteAdditionalDataWorkflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
) { ) {
const nodeExecutionStack: IExecuteData[] = [ const nodeExecutionStack: IExecuteData[] = [
{ {
@@ -580,7 +581,7 @@ export class ActiveWorkflowRunner {
}; };
const workflowRunner = new WorkflowRunner(); const workflowRunner = new WorkflowRunner();
return workflowRunner.run(runData, true); return workflowRunner.run(runData, true, undefined, undefined, responsePromise);
} }
/** /**
@@ -641,13 +642,16 @@ export class ActiveWorkflowRunner {
mode, mode,
activation, activation,
); );
returnFunctions.emit = (data: INodeExecutionData[][]): void => { returnFunctions.emit = (
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): void => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
Logger.debug(`Received trigger for workflow "${workflow.name}"`); Logger.debug(`Received trigger for workflow "${workflow.name}"`);
WorkflowHelpers.saveStaticData(workflow); WorkflowHelpers.saveStaticData(workflow);
// eslint-disable-next-line id-denylist // eslint-disable-next-line id-denylist
this.runWorkflow(workflowData, node, data, additionalData, mode).catch((err) => this.runWorkflow(workflowData, node, data, additionalData, mode, responsePromise).catch(
console.error(err), (error) => console.error(error),
); );
}; };
return returnFunctions; return returnFunctions;

View File

@@ -7,19 +7,19 @@ import {
ICredentialsEncrypted, ICredentialsEncrypted,
ICredentialType, ICredentialType,
IDataObject, IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
IRun, IRun,
IRunData, IRunData,
IRunExecutionData, IRunExecutionData,
ITaskData, ITaskData,
ITelemetrySettings, ITelemetrySettings,
IWorkflowBase as IWorkflowBaseWorkflow, IWorkflowBase as IWorkflowBaseWorkflow,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
IWorkflowCredentials,
Workflow, Workflow,
WorkflowExecuteMode, WorkflowExecuteMode,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { IDeferredPromise, WorkflowExecute } from 'n8n-core'; import { WorkflowExecute } from 'n8n-core';
// eslint-disable-next-line import/no-extraneous-dependencies // eslint-disable-next-line import/no-extraneous-dependencies
import * as PCancelable from 'p-cancelable'; import * as PCancelable from 'p-cancelable';
@@ -47,6 +47,11 @@ export interface IBullJobResponse {
success: boolean; success: boolean;
} }
export interface IBullWebhookResponse {
executionId: string;
response: IExecuteResponsePromiseData;
}
export interface ICustomRequest extends Request { export interface ICustomRequest extends Request {
parsedUrl: Url | undefined; parsedUrl: Url | undefined;
} }
@@ -237,6 +242,7 @@ export interface IExecutingWorkflowData {
process?: ChildProcess; process?: ChildProcess;
startedAt: Date; startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>; postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
workflowExecution?: PCancelable<IRun>; workflowExecution?: PCancelable<IRun>;
} }
@@ -490,6 +496,7 @@ export interface IPushDataConsoleMessage {
export interface IResponseCallbackData { export interface IResponseCallbackData {
data?: IDataObject | IDataObject[]; data?: IDataObject | IDataObject[];
headers?: object;
noWebhookResponse?: boolean; noWebhookResponse?: boolean;
responseCode?: number; responseCode?: number;
} }

View File

@@ -1,12 +1,21 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import * as Bull from 'bull'; import * as Bull from 'bull';
import * as config from '../config'; import * as config from '../config';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { IBullJobData } from './Interfaces'; import { IBullJobData, IBullWebhookResponse } from './Interfaces';
// eslint-disable-next-line import/no-cycle
import * as ActiveExecutions from './ActiveExecutions';
// eslint-disable-next-line import/no-cycle
import * as WebhookHelpers from './WebhookHelpers';
export class Queue { export class Queue {
private activeExecutions: ActiveExecutions.ActiveExecutions;
private jobQueue: Bull.Queue; private jobQueue: Bull.Queue;
constructor() { constructor() {
this.activeExecutions = ActiveExecutions.getInstance();
const prefix = config.get('queue.bull.prefix') as string; const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object; const redisOptions = config.get('queue.bull.redis') as object;
// Disabling ready check is necessary as it allows worker to // Disabling ready check is necessary as it allows worker to
@@ -16,6 +25,14 @@ export class Queue {
// More here: https://github.com/OptimalBits/bull/issues/890 // More here: https://github.com/OptimalBits/bull/issues/890
// @ts-ignore // @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });
this.jobQueue.on('global:progress', (jobId, progress: IBullWebhookResponse) => {
this.activeExecutions.resolveResponsePromise(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
progress.executionId,
WebhookHelpers.decodeWebhookResponse(progress.response),
);
});
} }
async add(jobData: IBullJobData, jobOptions: object): Promise<Bull.Job> { async add(jobData: IBullJobData, jobOptions: object): Promise<Bull.Job> {

View File

@@ -72,11 +72,16 @@ export function sendSuccessResponse(
data: any, data: any,
raw?: boolean, raw?: boolean,
responseCode?: number, responseCode?: number,
responseHeader?: object,
) { ) {
if (responseCode !== undefined) { if (responseCode !== undefined) {
res.status(responseCode); res.status(responseCode);
} }
if (responseHeader) {
res.header(responseHeader);
}
if (raw === true) { if (raw === true) {
if (typeof data === 'string') { if (typeof data === 'string') {
res.send(data); res.send(data);

View File

@@ -2669,7 +2669,13 @@ class App {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -2720,7 +2726,13 @@ class App {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -2746,7 +2758,13 @@ class App {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );

View File

@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable no-param-reassign */ /* eslint-disable no-param-reassign */
/* eslint-disable @typescript-eslint/prefer-optional-chain */ /* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-shadow */
@@ -18,9 +19,13 @@ import { get } from 'lodash';
import { BINARY_ENCODING, NodeExecuteFunctions } from 'n8n-core'; import { BINARY_ENCODING, NodeExecuteFunctions } from 'n8n-core';
import { import {
createDeferredPromise,
IBinaryKeyData, IBinaryKeyData,
IDataObject, IDataObject,
IDeferredPromise,
IExecuteData, IExecuteData,
IExecuteResponsePromiseData,
IN8nHttpFullResponse,
INode, INode,
IRunExecutionData, IRunExecutionData,
IWebhookData, IWebhookData,
@@ -34,20 +39,20 @@ import {
} from 'n8n-workflow'; } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { import {
ActiveExecutions,
GenericHelpers, GenericHelpers,
IExecutionDb, IExecutionDb,
IResponseCallbackData, IResponseCallbackData,
IWorkflowDb, IWorkflowDb,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
ResponseHelper, ResponseHelper,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
WorkflowCredentials,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers, WorkflowHelpers,
WorkflowRunner, WorkflowRunner,
} from '.'; } from '.';
// eslint-disable-next-line import/no-cycle
import * as ActiveExecutions from './ActiveExecutions';
const activeExecutions = ActiveExecutions.getInstance(); const activeExecutions = ActiveExecutions.getInstance();
/** /**
@@ -91,6 +96,35 @@ export function getWorkflowWebhooks(
return returnData; return returnData;
} }
export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}
return response;
}
export function encodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (typeof response === 'object' && Buffer.isBuffer(response.body)) {
response.body = {
'__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING),
};
}
return response;
}
/** /**
* Returns all the webhooks which should be created for the give workflow * Returns all the webhooks which should be created for the give workflow
* *
@@ -169,7 +203,7 @@ export async function executeWebhook(
200, 200,
) as number; ) as number;
if (!['onReceived', 'lastNode'].includes(responseMode as string)) { if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode as string)) {
// If the mode is not known we error. Is probably best like that instead of using // If the mode is not known we error. Is probably best like that instead of using
// the default that people know as early as possible (probably already testing phase) // the default that people know as early as possible (probably already testing phase)
// that something does not resolve properly. // that something does not resolve properly.
@@ -356,9 +390,52 @@ export async function executeWebhook(
workflowData, workflowData,
}; };
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
if (responseMode === 'responseNode') {
responsePromise = await createDeferredPromise<IN8nHttpFullResponse>();
responsePromise
.promise()
.then((response: IN8nHttpFullResponse) => {
if (didSendResponse) {
return;
}
if (Buffer.isBuffer(response.body)) {
res.header(response.headers);
res.end(response.body);
responseCallback(null, {
noWebhookResponse: true,
});
} else {
// TODO: This probably needs some more changes depending on the options on the
// Webhook Response node
responseCallback(null, {
data: response.body as IDataObject,
headers: response.headers,
responseCode: response.statusCode,
});
}
didSendResponse = true;
})
.catch(async (error) => {
Logger.error(
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
{ executionId, workflowId: workflow.id },
);
});
}
// Start now to run the workflow // Start now to run the workflow
const workflowRunner = new WorkflowRunner(); const workflowRunner = new WorkflowRunner();
executionId = await workflowRunner.run(runData, true, !didSendResponse, executionId); executionId = await workflowRunner.run(
runData,
true,
!didSendResponse,
executionId,
responsePromise,
);
Logger.verbose( Logger.verbose(
`Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`, `Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`,
@@ -398,6 +475,20 @@ export async function executeWebhook(
return data; return data;
} }
if (responseMode === 'responseNode') {
if (!didSendResponse) {
// Return an error if no Webhook-Response node did send any data
responseCallback(null, {
data: {
message: 'Workflow executed sucessfully.',
},
responseCode,
});
didSendResponse = true;
}
return undefined;
}
if (returnData === undefined) { if (returnData === undefined) {
if (!didSendResponse) { if (!didSendResponse) {
responseCallback(null, { responseCallback(null, {

View File

@@ -64,7 +64,13 @@ export function registerProductionWebhooks() {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -115,7 +121,13 @@ export function registerProductionWebhooks() {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -141,7 +153,13 @@ export function registerProductionWebhooks() {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -173,7 +191,13 @@ export function registerProductionWebhooks() {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -199,7 +223,13 @@ export function registerProductionWebhooks() {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
@@ -225,7 +255,13 @@ export function registerProductionWebhooks() {
return; return;
} }
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}, },
); );
} }

View File

@@ -15,6 +15,8 @@ import { IProcessMessage, WorkflowExecute } from 'n8n-core';
import { import {
ExecutionError, ExecutionError,
IDeferredPromise,
IExecuteResponsePromiseData,
IRun, IRun,
LoggerProxy as Logger, LoggerProxy as Logger,
Workflow, Workflow,
@@ -41,9 +43,7 @@ import {
IBullJobResponse, IBullJobResponse,
ICredentialsOverwrite, ICredentialsOverwrite,
ICredentialsTypeData, ICredentialsTypeData,
IExecutionDb,
IExecutionFlattedDb, IExecutionFlattedDb,
IExecutionResponse,
IProcessMessageDataHook, IProcessMessageDataHook,
ITransferNodeTypes, ITransferNodeTypes,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
@@ -51,6 +51,7 @@ import {
NodeTypes, NodeTypes,
Push, Push,
ResponseHelper, ResponseHelper,
WebhookHelpers,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers, WorkflowHelpers,
} from '.'; } from '.';
@@ -146,6 +147,7 @@ export class WorkflowRunner {
loadStaticData?: boolean, loadStaticData?: boolean,
realtime?: boolean, realtime?: boolean,
executionId?: string, executionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> { ): Promise<string> {
const executionsProcess = config.get('executions.process') as string; const executionsProcess = config.get('executions.process') as string;
const executionsMode = config.get('executions.mode') as string; const executionsMode = config.get('executions.mode') as string;
@@ -153,11 +155,17 @@ export class WorkflowRunner {
if (executionsMode === 'queue' && data.executionMode !== 'manual') { if (executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the // Do not run "manual" executions in bull because sending events to the
// frontend would not be possible // frontend would not be possible
executionId = await this.runBull(data, loadStaticData, realtime, executionId); executionId = await this.runBull(
data,
loadStaticData,
realtime,
executionId,
responsePromise,
);
} else if (executionsProcess === 'main') { } else if (executionsProcess === 'main') {
executionId = await this.runMainProcess(data, loadStaticData, executionId); executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
} else { } else {
executionId = await this.runSubprocess(data, loadStaticData, executionId); executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise);
} }
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
@@ -200,6 +208,7 @@ export class WorkflowRunner {
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean, loadStaticData?: boolean,
restartExecutionId?: string, restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> { ): Promise<string> {
if (loadStaticData === true && data.workflowData.id) { if (loadStaticData === true && data.workflowData.id) {
data.workflowData.staticData = await WorkflowHelpers.getStaticDataById( data.workflowData.staticData = await WorkflowHelpers.getStaticDataById(
@@ -256,6 +265,15 @@ export class WorkflowRunner {
executionId, executionId,
true, true,
); );
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) {
responsePromise.resolve(response);
}
},
];
additionalData.sendMessageToUI = WorkflowExecuteAdditionalData.sendMessageToUI.bind({ additionalData.sendMessageToUI = WorkflowExecuteAdditionalData.sendMessageToUI.bind({
sessionId: data.sessionId, sessionId: data.sessionId,
}); });
@@ -341,11 +359,15 @@ export class WorkflowRunner {
loadStaticData?: boolean, loadStaticData?: boolean,
realtime?: boolean, realtime?: boolean,
restartExecutionId?: string, restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> { ): Promise<string> {
// TODO: If "loadStaticData" is set to true it has to load data new on worker // TODO: If "loadStaticData" is set to true it has to load data new on worker
// Register the active execution // Register the active execution
const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId); const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
const jobData: IBullJobData = { const jobData: IBullJobData = {
executionId, executionId,
@@ -545,6 +567,7 @@ export class WorkflowRunner {
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean, loadStaticData?: boolean,
restartExecutionId?: string, restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> { ): Promise<string> {
let startedAt = new Date(); let startedAt = new Date();
const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js'));
@@ -653,6 +676,10 @@ export class WorkflowRunner {
} else if (message.type === 'end') { } else if (message.type === 'end') {
clearTimeout(executionTimeout); clearTimeout(executionTimeout);
this.activeExecutions.remove(executionId, message.data.runData); this.activeExecutions.remove(executionId, message.data.runData);
} else if (message.type === 'sendResponse') {
if (responsePromise) {
responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response));
}
} else if (message.type === 'sendMessageToUI') { } else if (message.type === 'sendMessageToUI') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call // eslint-disable-next-line @typescript-eslint/no-unsafe-call
WorkflowExecuteAdditionalData.sendMessageToUI.bind({ sessionId: data.sessionId })( WorkflowExecuteAdditionalData.sendMessageToUI.bind({ sessionId: data.sessionId })(

View File

@@ -10,6 +10,7 @@ import { IProcessMessage, UserSettings, WorkflowExecute } from 'n8n-core';
import { import {
ExecutionError, ExecutionError,
IDataObject, IDataObject,
IExecuteResponsePromiseData,
IExecuteWorkflowInfo, IExecuteWorkflowInfo,
ILogger, ILogger,
INodeExecutionData, INodeExecutionData,
@@ -33,6 +34,7 @@ import {
IWorkflowExecuteProcess, IWorkflowExecuteProcess,
IWorkflowExecutionDataProcessWithExecution, IWorkflowExecutionDataProcessWithExecution,
NodeTypes, NodeTypes,
WebhookHelpers,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers, WorkflowHelpers,
} from '.'; } from '.';
@@ -200,6 +202,15 @@ export class WorkflowRunnerProcess {
workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000,
); );
additionalData.hooks = this.getProcessForwardHooks(); additionalData.hooks = this.getProcessForwardHooks();
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
await sendToParentProcess('sendResponse', {
response: WebhookHelpers.encodeWebhookResponse(response),
});
},
];
additionalData.executionId = inputData.executionId; additionalData.executionId = inputData.executionId;
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any

View File

@@ -22,6 +22,7 @@ import {
ICredentialsExpressionResolveValues, ICredentialsExpressionResolveValues,
IDataObject, IDataObject,
IExecuteFunctions, IExecuteFunctions,
IExecuteResponsePromiseData,
IExecuteSingleFunctions, IExecuteSingleFunctions,
IExecuteWorkflowInfo, IExecuteWorkflowInfo,
IHttpRequestOptions, IHttpRequestOptions,
@@ -1635,6 +1636,9 @@ export function getExecuteFunctions(
Logger.warn(`There was a problem sending messsage to UI: ${error.message}`); Logger.warn(`There was a problem sending messsage to UI: ${error.message}`);
} }
}, },
async sendResponse(response: IExecuteResponsePromiseData): Promise<void> {
await additionalData.hooks?.executeHookFunctions('sendResponse', [response]);
},
helpers: { helpers: {
httpRequest, httpRequest,
prepareBinaryData, prepareBinaryData,

View File

@@ -12,7 +12,6 @@ export * from './ActiveWorkflows';
export * from './ActiveWebhooks'; export * from './ActiveWebhooks';
export * from './Constants'; export * from './Constants';
export * from './Credentials'; export * from './Credentials';
export * from './DeferredPromise';
export * from './Interfaces'; export * from './Interfaces';
export * from './LoadNodeParameterOptions'; export * from './LoadNodeParameterOptions';
export * from './NodeExecuteFunctions'; export * from './NodeExecuteFunctions';

View File

@@ -4,6 +4,7 @@ import {
ICredentialDataDecryptedObject, ICredentialDataDecryptedObject,
ICredentialsHelper, ICredentialsHelper,
IDataObject, IDataObject,
IDeferredPromise,
IExecuteWorkflowInfo, IExecuteWorkflowInfo,
INodeCredentialsDetails, INodeCredentialsDetails,
INodeExecutionData, INodeExecutionData,
@@ -20,7 +21,7 @@ import {
WorkflowHooks, WorkflowHooks,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { Credentials, IDeferredPromise, IExecuteFunctions } from '../src'; import { Credentials, IExecuteFunctions } from '../src';
export class CredentialsHelper extends ICredentialsHelper { export class CredentialsHelper extends ICredentialsHelper {
getDecrypted( getDecrypted(

View File

@@ -1,6 +1,14 @@
import { IConnections, ILogger, INode, IRun, LoggerProxy, Workflow } from 'n8n-workflow'; import {
createDeferredPromise,
IConnections,
ILogger,
INode,
IRun,
LoggerProxy,
Workflow,
} from 'n8n-workflow';
import { createDeferredPromise, WorkflowExecute } from '../src'; import { WorkflowExecute } from '../src';
import * as Helpers from './Helpers'; import * as Helpers from './Helpers';

View File

@@ -0,0 +1,278 @@
import {
BINARY_ENCODING,
} from 'n8n-core';
import {
IDataObject,
IExecuteFunctions,
IN8nHttpFullResponse,
IN8nHttpResponse,
INodeExecutionData,
INodeType,
INodeTypeDescription,
NodeOperationError,
} from 'n8n-workflow';
export class RespondToWebhook implements INodeType {
description: INodeTypeDescription = {
displayName: 'Respond to Webhook',
icon: 'file:webhook.svg',
name: 'respondToWebhook',
group: ['transform'],
version: 1,
description: 'Returns data for Webhook',
defaults: {
name: 'Respond to Webhook',
color: '#885577',
},
inputs: ['main'],
outputs: ['main'],
credentials: [
],
properties: [
{
displayName: 'Respond With',
name: 'respondWith',
type: 'options',
options: [
{
name: 'First Incoming Item',
value: 'firstIncomingItem',
},
{
name: 'Text',
value: 'text',
},
{
name: 'JSON',
value: 'json',
},
{
name: 'Binary',
value: 'binary',
},
{
name: 'No Data',
value: 'noData',
},
],
default: 'firstIncomingItem',
description: 'The data that should be returned',
},
{
displayName: 'When using expressions, note that this node will only run for the first item in the input data.',
name: 'webhookNotice',
type: 'notice',
displayOptions: {
show: {
respondWith: [
'json',
'text',
],
},
},
default: '',
},
{
displayName: 'Response Body',
name: 'responseBody',
type: 'json',
displayOptions: {
show: {
respondWith: [
'json',
],
},
},
default: '',
placeholder: '{ "key": "value" }',
description: 'The HTTP Response JSON data',
},
{
displayName: 'Response Body',
name: 'responseBody',
type: 'string',
displayOptions: {
show: {
respondWith: [
'text',
],
},
},
default: '',
placeholder: 'e.g. Workflow started',
description: 'The HTTP Response text data',
},
{
displayName: 'Response Data Source',
name: 'responseDataSource',
type: 'options',
displayOptions: {
show: {
respondWith: [
'binary',
],
},
},
options: [
{
name: 'Choose Automatically From Input',
value: 'automatically',
description: 'Use if input data will contain a single piece of binary data',
},
{
name: 'Specify Myself',
value: 'set',
description: 'Enter the name of the input field the binary data will be in',
},
],
default: 'automatically',
},
{
displayName: 'Input Field Name',
name: 'inputFieldName',
type: 'string',
required: true,
default: 'data',
displayOptions: {
show: {
respondWith: [
'binary',
],
responseDataSource: [
'set',
],
},
},
description: 'The name of the node input field with the binary data',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Response Code',
name: 'responseCode',
type: 'number',
typeOptions: {
minValue: 100,
maxValue: 599,
},
default: 200,
description: 'The HTTP Response code to return. Defaults to 200.',
},
{
displayName: 'Response Headers',
name: 'responseHeaders',
placeholder: 'Add Response Header',
description: 'Add headers to the webhook response',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'entries',
displayName: 'Entries',
values: [
{
displayName: 'Name',
name: 'name',
type: 'string',
default: '',
description: 'Name of the header',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
description: 'Value of the header',
},
],
},
],
},
],
},
],
};
execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const respondWith = this.getNodeParameter('respondWith', 0) as string;
const options = this.getNodeParameter('options', 0, {}) as IDataObject;
const headers = {} as IDataObject;
if (options.responseHeaders) {
for (const header of (options.responseHeaders as IDataObject).entries as IDataObject[]) {
if (typeof header.name !== 'string') {
header.name = header.name?.toString();
}
headers[header.name?.toLowerCase() as string] = header.value?.toString();
}
}
let responseBody: IN8nHttpResponse;
if (respondWith === 'json') {
const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string;
if (responseBodyParameter) {
responseBody = JSON.parse(responseBodyParameter);
}
} else if (respondWith === 'firstIncomingItem') {
responseBody = items[0].json;
} else if (respondWith === 'text') {
responseBody = this.getNodeParameter('responseBody', 0) as string;
} else if (respondWith === 'binary') {
const item = this.getInputData()[0];
if (item.binary === undefined) {
throw new NodeOperationError(this.getNode(), 'No binary data exists on the first item!');
}
let responseBinaryPropertyName: string;
const responseDataSource = this.getNodeParameter('responseDataSource', 0) as string;
if (responseDataSource === 'set') {
responseBinaryPropertyName = this.getNodeParameter('inputFieldName', 0) as string;
} else {
const binaryKeys = Object.keys(item.binary);
if (binaryKeys.length === 0) {
throw new NodeOperationError(this.getNode(), 'No binary data exists on the first item!');
}
responseBinaryPropertyName = binaryKeys[0];
}
const binaryData = item.binary[responseBinaryPropertyName];
if (binaryData === undefined) {
throw new NodeOperationError(this.getNode(), `No binary data property "${responseBinaryPropertyName}" does not exists on item!`);
}
if (headers['content-type']) {
headers['content-type'] = binaryData.mimeType;
}
responseBody = Buffer.from(binaryData.data, BINARY_ENCODING);
} else if (respondWith !== 'noData') {
throw new NodeOperationError(this.getNode(), `The Response Data option "${respondWith}" is not supported!`);
}
const response: IN8nHttpFullResponse = {
body: responseBody,
headers,
statusCode: options.responseCode as number || 200,
};
this.sendResponse(response);
return this.prepareOutputData(items);
}
}

View File

@@ -304,6 +304,11 @@ export class Wait implements INodeType {
value: 'lastNode', value: 'lastNode',
description: 'Returns data of the last executed node', description: 'Returns data of the last executed node',
}, },
{
name: 'Response Node finishes',
value: 'responseNode',
description: 'Returns data the response node did set',
},
], ],
default: 'onReceived', default: 'onReceived',
description: 'When and how to respond to the webhook', description: 'When and how to respond to the webhook',

View File

@@ -9,7 +9,6 @@ import {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
IWebhookResponseData, IWebhookResponseData,
NodeApiError,
NodeOperationError, NodeOperationError,
} from 'n8n-workflow'; } from 'n8n-workflow';
@@ -143,10 +142,54 @@ export class Webhook implements INodeType {
required: true, required: true,
description: 'The path to listen to.', description: 'The path to listen to.',
}, },
{
displayName: 'Respond',
name: 'responseMode',
type: 'options',
options: [
{
name: 'Immediately',
value: 'onReceived',
description: 'As soon as this node executes',
},
{
name: 'When last node finishes',
value: 'lastNode',
description: 'Returns data of the last-executed node',
},
{
name: 'Using \'Respond to Webhook\' node',
value: 'responseNode',
description: 'Response defined in that node',
},
],
default: 'onReceived',
description: 'When and how to respond to the webhook.',
},
{
displayName: 'Insert a \'Respond to Webhook\' node to control when and how you respond. <a href="https://docs.n8n.io/nodes/n8n-nodes-base.respondToWebhook" target="_blank">More details</a>',
name: 'webhookNotice',
type: 'notice',
displayOptions: {
show: {
responseMode: [
'responseNode',
],
},
},
default: '',
},
{ {
displayName: 'Response Code', displayName: 'Response Code',
name: 'responseCode', name: 'responseCode',
type: 'number', type: 'number',
displayOptions: {
hide: {
responseMode: [
'responseNode',
],
},
},
typeOptions: { typeOptions: {
minValue: 100, minValue: 100,
maxValue: 599, maxValue: 599,
@@ -154,25 +197,6 @@ export class Webhook implements INodeType {
default: 200, default: 200,
description: 'The HTTP Response code to return', description: 'The HTTP Response code to return',
}, },
{
displayName: 'Respond When',
name: 'responseMode',
type: 'options',
options: [
{
name: 'Webhook received',
value: 'onReceived',
description: 'Returns directly with defined Response Code',
},
{
name: 'Last node finishes',
value: 'lastNode',
description: 'Returns data of the last executed node',
},
],
default: 'onReceived',
description: 'When and how to respond to the webhook.',
},
{ {
displayName: 'Response Data', displayName: 'Response Data',
name: 'responseData', name: 'responseData',

View File

@@ -552,6 +552,7 @@
"dist/nodes/Reddit/Reddit.node.js", "dist/nodes/Reddit/Reddit.node.js",
"dist/nodes/Redis/Redis.node.js", "dist/nodes/Redis/Redis.node.js",
"dist/nodes/RenameKeys.node.js", "dist/nodes/RenameKeys.node.js",
"dist/nodes/RespondToWebhook.node.js",
"dist/nodes/Rocketchat/Rocketchat.node.js", "dist/nodes/Rocketchat/Rocketchat.node.js",
"dist/nodes/RssFeedRead.node.js", "dist/nodes/RssFeedRead.node.js",
"dist/nodes/Rundeck/Rundeck.node.js", "dist/nodes/Rundeck/Rundeck.node.js",

View File

@@ -6,6 +6,7 @@
import * as express from 'express'; import * as express from 'express';
import * as FormData from 'form-data'; import * as FormData from 'form-data';
import { URLSearchParams } from 'url'; import { URLSearchParams } from 'url';
import { IDeferredPromise } from './DeferredPromise';
import { Workflow } from './Workflow'; import { Workflow } from './Workflow';
import { WorkflowHooks } from './WorkflowHooks'; import { WorkflowHooks } from './WorkflowHooks';
import { WorkflowOperationError } from './WorkflowErrors'; import { WorkflowOperationError } from './WorkflowErrors';
@@ -208,6 +209,9 @@ export interface IDataObject {
[key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[]; [key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[];
} }
// export type IExecuteResponsePromiseData = IDataObject;
export type IExecuteResponsePromiseData = IDataObject | IN8nHttpFullResponse;
export interface INodeTypeNameVersion { export interface INodeTypeNameVersion {
name: string; name: string;
version: number; version: number;
@@ -324,13 +328,13 @@ export interface IHttpRequestOptions {
json?: boolean; json?: boolean;
} }
export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[]; export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null;
export interface IN8nHttpFullResponse { export interface IN8nHttpFullResponse {
body: IN8nHttpResponse; body: IN8nHttpResponse;
headers: IDataObject; headers: IDataObject;
statusCode: number; statusCode: number;
statusMessage: string; statusMessage?: string;
} }
export interface IExecuteFunctions { export interface IExecuteFunctions {
@@ -371,7 +375,8 @@ export interface IExecuteFunctions {
outputIndex?: number, outputIndex?: number,
): Promise<INodeExecutionData[][]>; ): Promise<INodeExecutionData[][]>;
putExecutionToWait(waitTill: Date): Promise<void>; putExecutionToWait(waitTill: Date): Promise<void>;
sendMessageToUI(message: any): void; sendMessageToUI(message: any): void; // tslint:disable-line:no-any
sendResponse(response: IExecuteResponsePromiseData): void; // tslint:disable-line:no-any
helpers: { helpers: {
httpRequest( httpRequest(
requestOptions: IHttpRequestOptions, requestOptions: IHttpRequestOptions,
@@ -492,7 +497,10 @@ export interface IPollFunctions {
} }
export interface ITriggerFunctions { export interface ITriggerFunctions {
emit(data: INodeExecutionData[][]): void; emit(
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): void;
getCredentials(type: string): Promise<ICredentialDataDecryptedObject | undefined>; getCredentials(type: string): Promise<ICredentialDataDecryptedObject | undefined>;
getMode(): WorkflowExecuteMode; getMode(): WorkflowExecuteMode;
getActivationMode(): WorkflowActivateMode; getActivationMode(): WorkflowActivateMode;
@@ -975,6 +983,7 @@ export interface IWorkflowExecuteHooks {
nodeExecuteBefore?: Array<(nodeName: string) => Promise<void>>; nodeExecuteBefore?: Array<(nodeName: string) => Promise<void>>;
workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise<void>>; workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise<void>>;
workflowExecuteBefore?: Array<(workflow: Workflow, data: IRunExecutionData) => Promise<void>>; workflowExecuteBefore?: Array<(workflow: Workflow, data: IRunExecutionData) => Promise<void>>;
sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise<void>>;
} }
export interface IWorkflowExecuteAdditionalData { export interface IWorkflowExecuteAdditionalData {

View File

@@ -16,6 +16,8 @@
import { import {
Expression, Expression,
IConnections, IConnections,
IDeferredPromise,
IExecuteResponsePromiseData,
IGetExecuteTriggerFunctions, IGetExecuteTriggerFunctions,
INode, INode,
INodeExecuteFunctions, INodeExecuteFunctions,
@@ -946,10 +948,23 @@ export class Workflow {
// Add the manual trigger response which resolves when the first time data got emitted // Add the manual trigger response which resolves when the first time data got emitted
triggerResponse!.manualTriggerResponse = new Promise((resolve) => { triggerResponse!.manualTriggerResponse = new Promise((resolve) => {
// eslint-disable-next-line @typescript-eslint/no-shadow triggerFunctions.emit = (
triggerFunctions.emit = ((resolve) => (data: INodeExecutionData[][]) => { (resolveEmit) =>
resolve(data); (
})(resolve); data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) {
responsePromise.resolve(response);
}
},
];
resolveEmit(data);
}
)(resolve);
}); });
return triggerResponse; return triggerResponse;

View File

@@ -3,6 +3,7 @@ import * as LoggerProxy from './LoggerProxy';
import * as NodeHelpers from './NodeHelpers'; import * as NodeHelpers from './NodeHelpers';
import * as ObservableObject from './ObservableObject'; import * as ObservableObject from './ObservableObject';
export * from './DeferredPromise';
export * from './Interfaces'; export * from './Interfaces';
export * from './Expression'; export * from './Expression';
export * from './NodeErrors'; export * from './NodeErrors';