feat(HTTP Request Node): Add pagination support (#5993)

Is still WIP and does not implement the correct UI yet.

Github issue / Community forum post (link here to close automatically):
https://community.n8n.io/t/pagination-included-into-http-node/15080

https://community.n8n.io/t/how-to-paginate-through-data-in-http-requests/28103
This commit is contained in:
Jan Oberhauser
2023-11-01 14:24:43 +01:00
committed by GitHub
parent 9bdb85c4ce
commit cc2bd2e19c
7 changed files with 2408 additions and 238 deletions

View File

@@ -37,6 +37,7 @@ import { extension, lookup } from 'mime-types';
import type {
BinaryHelperFunctions,
ConnectionTypes,
ContextType,
ExecutionError,
FieldType,
FileSystemHelperFunctions,
@@ -88,6 +89,7 @@ import type {
NodeExecutionWithMetadata,
NodeHelperFunctions,
NodeParameterValueType,
PaginationOptions,
RequestHelperFunctions,
Workflow,
WorkflowActivateMode,
@@ -110,13 +112,14 @@ import {
isResourceMapperValue,
validateFieldType,
ExecutionBaseError,
jsonParse,
} from 'n8n-workflow';
import type { Token } from 'oauth-1.0a';
import clientOAuth1 from 'oauth-1.0a';
import path from 'path';
import { stringify } from 'qs';
import type { OptionsWithUri, OptionsWithUrl } from 'request';
import type { RequestPromiseOptions } from 'request-promise-native';
import type { OptionsWithUrl } from 'request';
import type { OptionsWithUri, RequestPromiseOptions } from 'request-promise-native';
import { Readable } from 'stream';
import url, { URL, URLSearchParams } from 'url';
@@ -126,6 +129,7 @@ import {
BLOCK_FILE_ACCESS_TO_N8N_FILES,
CONFIG_FILES,
CUSTOM_EXTENSION_ENV,
HTTP_REQUEST_NODE_TYPE,
PLACEHOLDER_EMPTY_EXECUTION_ID,
RESTRICT_FILE_ACCESS_TO,
UM_EMAIL_TEMPLATES_INVITE,
@@ -143,6 +147,7 @@ import {
import { getSecretsProxy } from './Secrets';
import Container from 'typedi';
import type { BinaryData } from './BinaryData/types';
import merge from 'lodash/merge';
import { InstanceSettings } from './InstanceSettings';
axios.defaults.timeout = 300000;
@@ -1866,7 +1871,7 @@ export async function getCredentials(
// Hardcode for now for security reasons that only a single node can access
// all credentials
const fullAccess = ['n8n-nodes-base.httpRequest'].includes(node.type);
const fullAccess = [HTTP_REQUEST_NODE_TYPE].includes(node.type);
let nodeCredentialDescription: INodeCredentialDescription | undefined;
if (!fullAccess) {
@@ -2239,6 +2244,7 @@ export function getNodeParameter(
}
let returnData;
try {
returnData = workflow.expression.getParameterValue(
value,
@@ -2506,70 +2512,303 @@ const getRequestHelperFunctions = (
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
): RequestHelperFunctions => ({
httpRequest,
): RequestHelperFunctions => {
const getResolvedValue = (
parameterValue: NodeParameterValueType,
itemIndex: number,
runIndex: number,
executeData: IExecuteData,
additionalKeys?: IWorkflowDataProxyAdditionalKeys,
returnObjectAsString = false,
): NodeParameterValueType => {
const runExecutionData: IRunExecutionData | null = null;
const connectionInputData: INodeExecutionData[] = [];
const mode: WorkflowExecuteMode = 'internal';
async httpRequestWithAuthentication(
this,
credentialsType,
requestOptions,
additionalCredentialOptions,
): Promise<any> {
return httpRequestWithAuthentication.call(
if (
typeof parameterValue === 'object' ||
(typeof parameterValue === 'string' && parameterValue.charAt(0) === '=')
) {
return workflow.expression.getParameterValue(
parameterValue,
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
mode,
additionalKeys ?? {},
executeData,
returnObjectAsString,
);
}
return parameterValue;
};
return {
httpRequest,
async requestWithAuthenticationPaginated(
this: IExecuteFunctions,
requestOptions: OptionsWithUri,
itemIndex: number,
paginationOptions: PaginationOptions,
credentialsType?: string,
additionalCredentialOptions?: IAdditionalCredentialOptions,
): Promise<any[]> {
const responseData = [];
if (!requestOptions.qs) {
requestOptions.qs = {};
}
requestOptions.resolveWithFullResponse = true;
requestOptions.simple = false;
let tempResponseData: IN8nHttpFullResponse;
let makeAdditionalRequest: boolean;
let paginateRequestData: IHttpRequestOptions;
const runIndex = 0;
const additionalKeys = {
$request: requestOptions,
$response: {} as IN8nHttpFullResponse,
$version: node.typeVersion,
$pageCount: 0,
};
const executeData: IExecuteData = {
data: {},
node,
source: null,
};
const hashData = {
identicalCount: 0,
previousLength: 0,
previousHash: '',
};
do {
paginateRequestData = getResolvedValue(
paginationOptions.request as unknown as NodeParameterValueType,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as object as IHttpRequestOptions;
const tempRequestOptions = merge(requestOptions, paginateRequestData);
if (credentialsType) {
tempResponseData = await this.helpers.requestWithAuthentication.call(
this,
credentialsType,
tempRequestOptions,
additionalCredentialOptions,
);
} else {
tempResponseData = await this.helpers.request(tempRequestOptions);
}
const newResponse: IN8nHttpFullResponse = Object.assign(
{
body: {},
headers: {},
statusCode: 0,
},
pick(tempResponseData, ['body', 'headers', 'statusCode']),
);
let contentBody: Exclude<IN8nHttpResponse, Buffer>;
if (
newResponse.body?.constructor.name === 'IncomingMessage' &&
paginationOptions.binaryResult !== true
) {
const data = await this.helpers
.binaryToBuffer(newResponse.body as Buffer | Readable)
.then((body) => body.toString());
// Keep the original string version that we can use it to hash if needed
contentBody = data;
const responseContentType = newResponse.headers['content-type']?.toString() ?? '';
if (responseContentType.includes('application/json')) {
newResponse.body = jsonParse(data, { fallbackValue: {} });
} else {
newResponse.body = data;
}
tempResponseData.__bodyResolved = true;
tempResponseData.body = newResponse.body;
} else {
contentBody = newResponse.body;
}
if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) {
// If the data is not binary (and so not a stream), or an etag is present,
// we check via etag or hash if identical data is received
let contentLength = 0;
if ('content-length' in tempResponseData.headers) {
contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0;
}
if (hashData.previousLength === contentLength) {
let hash: string;
if (tempResponseData.headers.etag) {
// If an etag is provided, we use it as "hash"
hash = tempResponseData.headers.etag as string;
} else {
// If there is no etag, we calculate a hash from the data in the body
if (typeof contentBody !== 'string') {
contentBody = JSON.stringify(contentBody);
}
hash = crypto.createHash('md5').update(contentBody).digest('base64');
}
if (hashData.previousHash === hash) {
hashData.identicalCount += 1;
if (hashData.identicalCount > 2) {
// Length was identical 5x and hash 3x
throw new NodeOperationError(
node,
'The returned response was identical 5x, so requests got stopped',
{
itemIndex,
description:
'Check if "Pagination Completed When" has been configured correctly.',
},
);
}
} else {
hashData.identicalCount = 0;
}
hashData.previousHash = hash;
} else {
hashData.identicalCount = 0;
}
hashData.previousLength = contentLength;
}
responseData.push(tempResponseData);
additionalKeys.$response = newResponse;
additionalKeys.$pageCount = additionalKeys.$pageCount + 1;
if (
paginationOptions.maxRequests &&
additionalKeys.$pageCount >= paginationOptions.maxRequests
) {
break;
}
makeAdditionalRequest = getResolvedValue(
paginationOptions.continue,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as boolean;
if (makeAdditionalRequest) {
if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) {
// We have it configured to let all requests pass no matter the response code
// via "requestOptions.simple = false" to not by default fail if it is for example
// configured to stop on 404 response codes. For that reason we have to throw here
// now an error manually if the response code is not a success one.
let data = tempResponseData.body;
if (
data?.constructor.name === 'IncomingMessage' &&
paginationOptions.binaryResult !== true
) {
data = await this.helpers
.binaryToBuffer(tempResponseData.body as Buffer | Readable)
.then((body) => body.toString());
} else if (typeof data === 'object') {
data = JSON.stringify(data);
}
throw Object.assign(
new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`),
{
statusCode: tempResponseData.statusCode,
error: data,
isAxiosError: true,
response: {
headers: tempResponseData.headers,
status: tempResponseData.statusCode,
statusText: tempResponseData.statusMessage,
},
},
);
}
}
} while (makeAdditionalRequest);
return responseData;
},
async httpRequestWithAuthentication(
this,
credentialsType,
requestOptions,
workflow,
node,
additionalData,
additionalCredentialOptions,
);
},
): Promise<any> {
return httpRequestWithAuthentication.call(
this,
credentialsType,
requestOptions,
workflow,
node,
additionalData,
additionalCredentialOptions,
);
},
request: async (uriOrObject, options) =>
proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options),
request: async (uriOrObject, options) =>
proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options),
async requestWithAuthentication(
this,
credentialsType,
requestOptions,
additionalCredentialOptions,
): Promise<any> {
return requestWithAuthentication.call(
async requestWithAuthentication(
this,
credentialsType,
requestOptions,
workflow,
node,
additionalData,
additionalCredentialOptions,
);
},
): Promise<any> {
return requestWithAuthentication.call(
this,
credentialsType,
requestOptions,
workflow,
node,
additionalData,
additionalCredentialOptions,
);
},
async requestOAuth1(
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: OptionsWithUrl | RequestPromiseOptions,
): Promise<any> {
return requestOAuth1.call(this, credentialsType, requestOptions);
},
async requestOAuth1(
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: OptionsWithUrl | RequestPromiseOptions,
): Promise<any> {
return requestOAuth1.call(this, credentialsType, requestOptions);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: OptionsWithUri | RequestPromiseOptions,
oAuth2Options?: IOAuth2Options,
): Promise<any> {
return requestOAuth2.call(
this,
credentialsType,
requestOptions,
node,
additionalData,
oAuth2Options,
);
},
});
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: OptionsWithUri | RequestPromiseOptions,
oAuth2Options?: IOAuth2Options,
): Promise<any> {
return requestOAuth2.call(
this,
credentialsType,
requestOptions,
node,
additionalData,
oAuth2Options,
);
},
};
};
const getAllowedPaths = () => {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
@@ -2899,7 +3138,7 @@ export function getExecuteFunctions(
),
);
},
getContext(type: string): IContextObject {
getContext(type: ContextType): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
async getInputConnectionData(
@@ -3293,7 +3532,7 @@ export function getExecuteSingleFunctions(
executeData,
);
},
getContext(type: string): IContextObject {
getContext(type: ContextType): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
getCredentials: async (type) =>