feat(Postgres Node): Options keepAlive and keepAliveInitialDelayMillis (#9067)

This commit is contained in:
Michael Kret
2024-04-09 18:41:51 +03:00
committed by GitHub
parent c2f4d7d796
commit 58518b684b
16 changed files with 136 additions and 31 deletions

View File

@@ -87,6 +87,10 @@ export async function pgTriggerFunction(
export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) {
const credentials = await this.getCredentials('postgres');
const options = this.getNodeParameter('options', {}) as {
connectionTimeout?: number;
delayClosingIdleConnection?: number;
};
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
noWarnings: true,
@@ -97,8 +101,17 @@ export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) {
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
keepAlive: true,
};
if (options.connectionTimeout) {
config.connectionTimeoutMillis = options.connectionTimeout * 1000;
}
if (options.delayClosingIdleConnection) {
config.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000;
}
if (credentials.allowUnauthorizedCerts === true) {
config.ssl = {
rejectUnauthorized: false,

View File

@@ -209,6 +209,33 @@ export class PostgresTrigger implements INodeType {
},
],
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Connection Timeout',
name: 'connectionTimeout',
type: 'number',
default: 30,
description: 'Number of seconds reserved for connecting to the database',
},
{
displayName: 'Delay Closing Idle Connection',
name: 'delayClosingIdleConnection',
type: 'number',
default: 0,
description:
'Number of seconds to wait before idle connection would be eligible for closing',
typeOptions: {
minValue: 0,
},
},
],
},
],
};

View File

@@ -30,6 +30,16 @@ export const optionsCollection: INodeProperties = {
default: 30,
description: 'Number of seconds reserved for connecting to the database',
},
{
displayName: 'Delay Closing Idle Connection',
name: 'delayClosingIdleConnection',
type: 'number',
default: 0,
description: 'Number of seconds to wait before idle connection would be eligible for closing',
typeOptions: {
minValue: 0,
},
},
{
displayName: 'Query Batching',
name: 'queryBatching',

View File

@@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
@@ -95,7 +96,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
_db?: PgpDatabase,
): Promise<INodeExecutionData[]> {
const queries: QueryWithValues[] = [];

View File

@@ -6,7 +6,12 @@ import type {
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import type { PgpDatabase, QueriesRunner, QueryWithValues } from '../../helpers/interfaces';
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryWithValues,
} from '../../helpers/interfaces';
import { replaceEmptyStringsByNulls } from '../../helpers/utils';
@@ -46,7 +51,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
_db?: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);

View File

@@ -7,6 +7,7 @@ import type {
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
@@ -157,7 +158,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);

View File

@@ -7,6 +7,7 @@ import type {
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
@@ -75,7 +76,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
_db?: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);

View File

@@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
@@ -194,7 +195,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
@@ -279,7 +280,7 @@ export async function execute(
const rowExists = await doesRowExist(db, schema, table, matchValues);
if (!rowExists) {
const descriptionValues: string[] = [];
matchValues.forEach((val, index) => {
matchValues.forEach((_, index) => {
if (index % 2 === 0) {
descriptionValues.push(`${matchValues[index]}=${matchValues[index + 1]}`);
}

View File

@@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
@@ -193,7 +194,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);

View File

@@ -6,6 +6,7 @@ import { configureQueryRunner } from '../helpers/utils';
import type { PostgresType } from './node.type';
import * as database from './database/Database.resource';
import type { PostgresNodeCredentials, PostgresNodeOptions } from '../helpers/interfaces';
export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let returnData: INodeExecutionData[] = [];
@@ -14,8 +15,8 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
const resource = this.getNodeParameter<PostgresType>('resource', 0);
const operation = this.getNodeParameter('operation', 0);
const credentials = await this.getCredentials('postgres');
const options = this.getNodeParameter('options', 0, {});
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = this.getNodeParameter('options', 0, {}) as PostgresNodeOptions;
options.nodeVersion = this.getNode().typeVersion;
options.operation = operation;

View File

@@ -35,3 +35,36 @@ export type QueriesRunner = (
items: INodeExecutionData[],
options: IDataObject,
) => Promise<INodeExecutionData[]>;
export type PostgresNodeOptions = {
nodeVersion?: number;
operation?: string;
cascade?: boolean;
connectionTimeout?: number;
delayClosingIdleConnection?: number;
queryBatching?: QueryMode;
queryReplacement?: string;
outputColumns?: string[];
largeNumbersOutput?: 'numbers' | 'text';
skipOnConflict?: boolean;
replaceEmptyStrings?: boolean;
};
export type PostgresNodeCredentials = {
sshAuthenticateWith: 'password' | 'privateKey';
host: string;
port: number;
database: string;
user: string;
password: string;
allowUnauthorizedCerts?: boolean;
ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full';
sshTunnel?: boolean;
sshHost?: string;
sshPort?: number;
sshPostgresPort?: number;
sshUser?: string;
sshPassword?: string;
privateKey?: string;
passphrase?: string;
};

View File

@@ -1,20 +1,19 @@
import type {
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
INodeCredentialTestResult,
} from 'n8n-workflow';
import { Client } from 'ssh2';
import { configurePostgres } from '../transport';
import type { PgpClient } from '../helpers/interfaces';
import type { PgpClient, PostgresNodeCredentials } from '../helpers/interfaces';
export async function postgresConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as IDataObject;
const credentials = credential.data as PostgresNodeCredentials;
let sshClientCreated: Client | undefined = new Client();
let pgpClientCreated: PgpClient | undefined;

View File

@@ -1,9 +1,10 @@
import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';
import { configurePostgres } from '../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';
export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };
const { db, sshClient } = await configurePostgres(credentials, options);
@@ -27,7 +28,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
}
}
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };
const { db, sshClient } = await configurePostgres(credentials, options);

View File

@@ -2,9 +2,10 @@ import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';
import { getTableSchema } from '../helpers/utils';
import { configurePostgres } from '../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';
export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };
const { db, sshClient } = await configurePostgres(credentials, options);

View File

@@ -1,6 +1,7 @@
import type { ILoadOptionsFunctions, ResourceMapperFields, FieldType } from 'n8n-workflow';
import { getEnumValues, getEnums, getTableSchema, uniqueColumns } from '../helpers/utils';
import { configurePostgres } from '../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';
const fieldTypeMapping: Partial<Record<FieldType, string[]>> = {
string: ['text', 'varchar', 'character varying', 'character', 'char'],
@@ -45,7 +46,7 @@ function mapPostgresType(postgresType: string): FieldType {
export async function getMappingColumns(
this: ILoadOptionsFunctions,
): Promise<ResourceMapperFields> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const { db, sshClient } = await configurePostgres(credentials);

View File

@@ -6,10 +6,14 @@ import type { ConnectConfig } from 'ssh2';
import type { IDataObject } from 'n8n-workflow';
import pgPromise from 'pg-promise';
import type { PgpDatabase } from '../helpers/interfaces';
import type {
PgpDatabase,
PostgresNodeCredentials,
PostgresNodeOptions,
} from '../helpers/interfaces';
import { formatPrivateKey } from '@utils/utilities';
async function createSshConnectConfig(credentials: IDataObject) {
async function createSshConnectConfig(credentials: PostgresNodeCredentials) {
if (credentials.sshAuthenticateWith === 'password') {
return {
host: credentials.sshHost as string,
@@ -26,7 +30,7 @@ async function createSshConnectConfig(credentials: IDataObject) {
};
if (credentials.passphrase) {
options.passphrase = credentials.passphrase as string;
options.passphrase = credentials.passphrase;
}
return options;
@@ -34,8 +38,8 @@ async function createSshConnectConfig(credentials: IDataObject) {
}
export async function configurePostgres(
credentials: IDataObject,
options: IDataObject = {},
credentials: PostgresNodeCredentials,
options: PostgresNodeOptions = {},
createdSshClient?: Client,
) {
const pgp = pgPromise({
@@ -63,15 +67,20 @@ export async function configurePostgres(
}
const dbConfig: IDataObject = {
host: credentials.host as string,
port: credentials.port as number,
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
host: credentials.host,
port: credentials.port,
database: credentials.database,
user: credentials.user,
password: credentials.password,
keepAlive: true,
};
if (options.connectionTimeout) {
dbConfig.connectionTimeoutMillis = (options.connectionTimeout as number) * 1000;
dbConfig.connectionTimeoutMillis = options.connectionTimeout * 1000;
}
if (options.delayClosingIdleConnection) {
dbConfig.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000;
}
if (credentials.allowUnauthorizedCerts === true) {
@@ -80,7 +89,7 @@ export async function configurePostgres(
};
} else {
dbConfig.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
dbConfig.sslmode = (credentials.ssl as string) || 'disable';
dbConfig.sslmode = credentials.ssl || 'disable';
}
if (!credentials.sshTunnel) {
@@ -105,8 +114,8 @@ export async function configurePostgres(
sshClient.forwardOut(
socket.remoteAddress as string,
socket.remotePort as number,
credentials.host as string,
credentials.port as number,
credentials.host,
credentials.port,
(err, stream) => {
if (err) reject(err);