From 9cbbb6335df0d36f66f22c18041d12f14dc59b32 Mon Sep 17 00:00:00 2001 From: oleg Date: Wed, 17 Jul 2024 08:25:37 +0200 Subject: [PATCH] feat(Postgres Chat Memory Node): Implement Postgres Chat Memory node (#10071) --- .../MemoryPostgresChat.node.ts | 103 ++++++++++++++++++ .../memory/MemoryPostgresChat/postgres.svg | 1 + packages/@n8n/nodes-langchain/package.json | 2 + .../@n8n/nodes-langchain/utils/helpers.ts | 22 +++- .../@n8n/nodes-langchain/utils/logWrapper.ts | 10 +- pnpm-lock.yaml | 68 ++++++++++++ 6 files changed, 200 insertions(+), 6 deletions(-) create mode 100644 packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/MemoryPostgresChat.node.ts create mode 100644 packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/postgres.svg diff --git a/packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/MemoryPostgresChat.node.ts b/packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/MemoryPostgresChat.node.ts new file mode 100644 index 0000000000..ea3ed3c33e --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/MemoryPostgresChat.node.ts @@ -0,0 +1,103 @@ +/* eslint-disable n8n-nodes-base/node-dirname-against-convention */ +import type { IExecuteFunctions, INodeType, INodeTypeDescription, SupplyData } from 'n8n-workflow'; +import { NodeConnectionType } from 'n8n-workflow'; +import { BufferMemory } from 'langchain/memory'; +import { PostgresChatMessageHistory } from '@langchain/community/stores/message/postgres'; +import type pg from 'pg'; +import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/v2/transport'; +import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces'; +import { postgresConnectionTest } from 'n8n-nodes-base/dist/nodes/Postgres/v2/methods/credentialTest'; +import { logWrapper } from '../../../utils/logWrapper'; +import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { sessionIdOption, sessionKeyProperty } from '../descriptions'; +import { getSessionId } from '../../../utils/helpers'; + +export class MemoryPostgresChat implements INodeType { + description: INodeTypeDescription = { + displayName: 'Postgres Chat Memory', + name: 'memoryPostgresChat', + icon: 'file:postgres.svg', + group: ['transform'], + version: [1], + description: 'Stores the chat history in Postgres table.', + defaults: { + name: 'Postgres Chat Memory', + }, + credentials: [ + { + name: 'postgres', + required: true, + testedBy: 'postgresConnectionTest', + }, + ], + codex: { + categories: ['AI'], + subcategories: { + AI: ['Memory'], + }, + resources: { + primaryDocumentation: [ + { + url: 'https://docs.n8n.io/integrations/builtin/cluster-nodes/sub-nodes/n8n-nodes-langchain.memorypostgreschat/', + }, + ], + }, + }, + // eslint-disable-next-line n8n-nodes-base/node-class-description-inputs-wrong-regular-node + inputs: [], + // eslint-disable-next-line n8n-nodes-base/node-class-description-outputs-wrong + outputs: [NodeConnectionType.AiMemory], + outputNames: ['Memory'], + properties: [ + getConnectionHintNoticeField([NodeConnectionType.AiAgent]), + sessionIdOption, + sessionKeyProperty, + { + displayName: 'Table Name', + name: 'tableName', + type: 'string', + default: 'n8n_chat_histories', + description: + 'The table name to store the chat history in. If table does not exist, it will be created.', + }, + ], + }; + + methods = { + credentialTest: { + postgresConnectionTest, + }, + }; + + async supplyData(this: IExecuteFunctions, itemIndex: number): Promise { + const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials; + const tableName = this.getNodeParameter('tableName', itemIndex, 'n8n_chat_histories') as string; + const sessionId = getSessionId(this, itemIndex); + + const pgConf = await configurePostgres.call(this, credentials); + const pool = pgConf.db.$pool as unknown as pg.Pool; + + const pgChatHistory = new PostgresChatMessageHistory({ + pool, + sessionId, + tableName, + }); + + const memory = new BufferMemory({ + memoryKey: 'chat_history', + chatHistory: pgChatHistory, + returnMessages: true, + inputKey: 'input', + outputKey: 'output', + }); + + async function closeFunction() { + void pool.end(); + } + + return { + closeFunction, + response: logWrapper(memory, this), + }; + } +} diff --git a/packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/postgres.svg b/packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/postgres.svg new file mode 100644 index 0000000000..da7be289e2 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/memory/MemoryPostgresChat/postgres.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/packages/@n8n/nodes-langchain/package.json b/packages/@n8n/nodes-langchain/package.json index 74f3e92016..ba7610d8a2 100644 --- a/packages/@n8n/nodes-langchain/package.json +++ b/packages/@n8n/nodes-langchain/package.json @@ -77,6 +77,7 @@ "dist/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.js", "dist/nodes/memory/MemoryBufferWindow/MemoryBufferWindow.node.js", "dist/nodes/memory/MemoryMotorhead/MemoryMotorhead.node.js", + "dist/nodes/memory/MemoryPostgresChat/MemoryPostgresChat.node.js", "dist/nodes/memory/MemoryRedisChat/MemoryRedisChat.node.js", "dist/nodes/memory/MemoryManager/MemoryManager.node.js", "dist/nodes/memory/MemoryChatRetriever/MemoryChatRetriever.node.js", @@ -153,6 +154,7 @@ "@pinecone-database/pinecone": "2.2.1", "@qdrant/js-client-rest": "1.9.0", "@supabase/supabase-js": "2.43.4", + "@types/pg": "^8.11.3", "@xata.io/client": "0.28.4", "basic-auth": "2.0.1", "cheerio": "1.0.0-rc.12", diff --git a/packages/@n8n/nodes-langchain/utils/helpers.ts b/packages/@n8n/nodes-langchain/utils/helpers.ts index 255a4f47c8..57c54e648c 100644 --- a/packages/@n8n/nodes-langchain/utils/helpers.ts +++ b/packages/@n8n/nodes-langchain/utils/helpers.ts @@ -10,6 +10,18 @@ import type { BaseOutputParser } from '@langchain/core/output_parsers'; import type { BaseMessage } from '@langchain/core/messages'; import { DynamicTool, type Tool } from '@langchain/core/tools'; import type { BaseLLM } from '@langchain/core/language_models/llms'; +import type { BaseChatMemory } from 'langchain/memory'; +import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; + +function hasMethods(obj: unknown, ...methodNames: Array): obj is T { + return methodNames.every( + (methodName) => + typeof obj === 'object' && + obj !== null && + methodName in obj && + typeof (obj as Record)[methodName] === 'function', + ); +} export function getMetadataFiltersValues( ctx: IExecuteFunctions, @@ -38,8 +50,16 @@ export function getMetadataFiltersValues( return undefined; } +export function isBaseChatMemory(obj: unknown) { + return hasMethods(obj, 'loadMemoryVariables', 'saveContext'); +} + +export function isBaseChatMessageHistory(obj: unknown) { + return hasMethods(obj, 'getMessages', 'addMessage'); +} + export function isChatInstance(model: unknown): model is BaseChatModel { - const namespace = (model as BaseLLM | BaseChatModel)?.lc_namespace ?? []; + const namespace = (model as BaseLLM)?.lc_namespace ?? []; return namespace.includes('chat_models'); } diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index e76d954385..1985616657 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -4,21 +4,21 @@ import type { ConnectionTypes, IExecuteFunctions, INodeExecutionData } from 'n8n import type { Tool } from '@langchain/core/tools'; import type { BaseMessage } from '@langchain/core/messages'; import type { InputValues, MemoryVariables, OutputValues } from '@langchain/core/memory'; -import { BaseChatMessageHistory } from '@langchain/core/chat_history'; +import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; import type { BaseCallbackConfig, Callbacks } from '@langchain/core/callbacks/manager'; import { Embeddings } from '@langchain/core/embeddings'; import { VectorStore } from '@langchain/core/vectorstores'; import type { Document } from '@langchain/core/documents'; import { TextSplitter } from '@langchain/textsplitters'; -import { BaseChatMemory } from '@langchain/community/memory/chat_memory'; +import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import { BaseRetriever } from '@langchain/core/retrievers'; import { BaseOutputParser, OutputParserException } from '@langchain/core/output_parsers'; import { isObject } from 'lodash'; import type { BaseDocumentLoader } from 'langchain/dist/document_loaders/base'; import { N8nJsonLoader } from './N8nJsonLoader'; import { N8nBinaryLoader } from './N8nBinaryLoader'; -import { logAiEvent, isToolsInstance } from './helpers'; +import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory } from './helpers'; const errorsMap: { [key: string]: { message: string; description: string } } = { 'You exceeded your current quota, please check your plan and billing details.': { @@ -125,7 +125,7 @@ export function logWrapper( get: (target, prop) => { let connectionType: ConnectionTypes | undefined; // ========== BaseChatMemory ========== - if (originalInstance instanceof BaseChatMemory) { + if (isBaseChatMemory(originalInstance)) { if (prop === 'loadMemoryVariables' && 'loadMemoryVariables' in target) { return async (values: InputValues): Promise => { connectionType = NodeConnectionType.AiMemory; @@ -177,7 +177,7 @@ export function logWrapper( } // ========== BaseChatMessageHistory ========== - if (originalInstance instanceof BaseChatMessageHistory) { + if (isBaseChatMessageHistory(originalInstance)) { if (prop === 'getMessages' && 'getMessages' in target) { return async (): Promise => { connectionType = NodeConnectionType.AiMemory; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 096f6fcfdb..15800ee6d3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -337,6 +337,9 @@ importers: '@supabase/supabase-js': specifier: 2.43.4 version: 2.43.4 + '@types/pg': + specifier: ^8.11.3 + version: 8.11.6 '@xata.io/client': specifier: 0.28.4 version: 0.28.4(typescript@5.5.2) @@ -5354,6 +5357,9 @@ packages: '@types/normalize-package-data@2.4.1': resolution: {integrity: sha512-Gj7cI7z+98M282Tqmp2K5EIsoouUEzbBJhQQzDE3jSIRk6r9gsz0oUokqIUR4u1R3dMHo0pDHM7sNOHyhulypw==} + '@types/pg@8.11.6': + resolution: {integrity: sha512-/2WmmBXHLsfRqzfHW7BNZ8SbYzE8OSk7i3WjFYvfgRHj7S1xj+16Je5fUKv3lVdVzk/zn9TXOqf+avFCFIE0yQ==} + '@types/phoenix@1.6.4': resolution: {integrity: sha512-B34A7uot1Cv0XtaHRYDATltAdKx0BvVKNgYNqE4WjtPUa4VQJM7kxeXcVKaH+KS+kCmZ+6w+QaUdcljiheiBJA==} @@ -10433,6 +10439,9 @@ packages: resolution: {integrity: sha512-aU6xnDFYT3x17e/f0IiiwlGPTy2jzMySGfUB4fq6z7CV8l85CWHDk5ErhyhpfDHhrOMwGFhSQkhMGHaIotA6Ng==} engines: {node: '>= 0.4'} + obuf@1.1.2: + resolution: {integrity: sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg==} + on-finished@2.4.1: resolution: {integrity: sha512-oVlzkg3ENAhCk2zdv7IJwd/QUD4z2RxRwpkcGY8psCVcCYZNq4wYnVWALHM+brtuJjePWiYF/ClmuDr8Ch5+kg==} engines: {node: '>= 0.8'} @@ -10720,6 +10729,10 @@ packages: resolution: {integrity: sha512-1KdmFGGTP6jplJoI8MfvRlfvMiyBivMRP7/ffh4a11RUFJ7kC2J0ZHlipoKiH/1hz+DVgceon9U2qbaHpPeyPg==} engines: {node: '>=8.0'} + pg-numeric@1.0.2: + resolution: {integrity: sha512-BM/Thnrw5jm2kKLE5uJkXqqExRUY/toLHda65XgFTBTFYZyopbKjBe29Ii3RbkvlsMoFwD+tHeGaCjjv0gHlyw==} + engines: {node: '>=4'} + pg-pool@3.6.1: resolution: {integrity: sha512-jizsIzhkIitxCGfPRzJn1ZdcosIt3pz9Sh3V01fm1vZnbnCMgmGl5wvGGdNN2EL9Rmb0EcFoCkixH4Pu+sP9Og==} peerDependencies: @@ -10736,6 +10749,10 @@ packages: resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} engines: {node: '>=4'} + pg-types@4.0.2: + resolution: {integrity: sha512-cRL3JpS3lKMGsKaWndugWQoLOCoP+Cic8oseVcbr0qhPzYD5DWXK+RZ9LY9wxRf7RQia4SCwQlXk0q6FCPrVng==} + engines: {node: '>=10'} + pg@8.11.3: resolution: {integrity: sha512-+9iuvG8QfaaUrrph+kpF24cXkH1YOOUeArRNYIxq1viYHZagBxrTno7cecY1Fa44tJeZvaoG+Djpkc3JwehN5g==} engines: {node: '>= 8.0.0'} @@ -10870,18 +10887,37 @@ packages: resolution: {integrity: sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==} engines: {node: '>=4'} + postgres-array@3.0.2: + resolution: {integrity: sha512-6faShkdFugNQCLwucjPcY5ARoW1SlbnrZjmGl0IrrqewpvxvhSLHimCVzqeuULCbG0fQv7Dtk1yDbG3xv7Veog==} + engines: {node: '>=12'} + postgres-bytea@1.0.0: resolution: {integrity: sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==} engines: {node: '>=0.10.0'} + postgres-bytea@3.0.0: + resolution: {integrity: sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw==} + engines: {node: '>= 6'} + postgres-date@1.0.7: resolution: {integrity: sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==} engines: {node: '>=0.10.0'} + postgres-date@2.1.0: + resolution: {integrity: sha512-K7Juri8gtgXVcDfZttFKVmhglp7epKb1K4pgrkLxehjqkrgPhfG6OO8LHLkfaqkbpjNRnra018XwAr1yQFWGcA==} + engines: {node: '>=12'} + postgres-interval@1.2.0: resolution: {integrity: sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==} engines: {node: '>=0.10.0'} + postgres-interval@3.0.0: + resolution: {integrity: sha512-BSNDnbyZCXSxgA+1f5UU2GmwhoI0aU5yMxRGO8CdFEcY2BQF9xm/7MqKnYoM1nJDk8nONNWDk9WeSmePFhQdlw==} + engines: {node: '>=12'} + + postgres-range@1.1.4: + resolution: {integrity: sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w==} + posthog-node@3.2.1: resolution: {integrity: sha512-ofNX3TPfZPlWErVc2EDk66cIrfp9EXeKBsXFxf8ISXK57b10ANwRnKAlf5rQjxjRKqcUWmV0d3ZfOeVeYracMw==} engines: {node: '>=15.0.0'} @@ -18629,6 +18665,12 @@ snapshots: '@types/normalize-package-data@2.4.1': {} + '@types/pg@8.11.6': + dependencies: + '@types/node': 18.16.16 + pg-protocol: 1.6.0 + pg-types: 4.0.2 + '@types/phoenix@1.6.4': {} '@types/pretty-hrtime@1.0.1': {} @@ -24623,6 +24665,8 @@ snapshots: define-properties: 1.2.1 es-abstract: 1.22.5 + obuf@1.1.2: {} + on-finished@2.4.1: dependencies: ee-first: 1.1.1 @@ -24925,6 +24969,8 @@ snapshots: pg-minify@1.6.2: {} + pg-numeric@1.0.2: {} + pg-pool@3.6.1(pg@8.11.3): dependencies: pg: 8.11.3 @@ -24952,6 +24998,16 @@ snapshots: postgres-date: 1.0.7 postgres-interval: 1.2.0 + pg-types@4.0.2: + dependencies: + pg-int8: 1.0.1 + pg-numeric: 1.0.2 + postgres-array: 3.0.2 + postgres-bytea: 3.0.0 + postgres-date: 2.1.0 + postgres-interval: 3.0.0 + postgres-range: 1.1.4 + pg@8.11.3: dependencies: buffer-writer: 2.0.0 @@ -25075,14 +25131,26 @@ snapshots: postgres-array@2.0.0: {} + postgres-array@3.0.2: {} + postgres-bytea@1.0.0: {} + postgres-bytea@3.0.0: + dependencies: + obuf: 1.1.2 + postgres-date@1.0.7: {} + postgres-date@2.1.0: {} + postgres-interval@1.2.0: dependencies: xtend: 4.0.2 + postgres-interval@3.0.0: {} + + postgres-range@1.1.4: {} + posthog-node@3.2.1: dependencies: axios: 1.6.7(debug@3.2.7)