refactor: createVectorStoreNode refactoring and embeddings batching support (no-changelog) (#13674)

This commit is contained in:
oleg
2025-03-13 16:23:06 +01:00
committed by GitHub
parent 5b6b78709e
commit b4672b8deb
34 changed files with 2359 additions and 566 deletions

View File

@@ -1,7 +1,7 @@
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
import type { INodeProperties } from 'n8n-workflow';
import { createVectorStoreNode } from '../shared/createVectorStoreNode';
import { createVectorStoreNode } from '../shared/createVectorStoreNode/createVectorStoreNode';
import { MemoryVectorStoreManager } from '../shared/MemoryVectorStoreManager';
const insertFields: INodeProperties[] = [

View File

@@ -11,7 +11,7 @@ import type pg from 'pg';
import { metadataFilterField } from '@utils/sharedFields';
import { createVectorStoreNode } from '../shared/createVectorStoreNode';
import { createVectorStoreNode } from '../shared/createVectorStoreNode/createVectorStoreNode';
type CollectionOptions = {
useCollection?: boolean;

View File

@@ -5,9 +5,9 @@ import { NodeOperationError, type INodeProperties } from 'n8n-workflow';
import { metadataFilterField } from '@utils/sharedFields';
import { createVectorStoreNode } from '../shared/createVectorStoreNode';
import { createVectorStoreNode } from '../shared/createVectorStoreNode/createVectorStoreNode';
import { pineconeIndexSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { pineconeIndexRLC } from '../shared/descriptions';
import { pineconeIndexSearch } from '../shared/methods/listSearch';
const sharedFields: INodeProperties[] = [pineconeIndexRLC];

View File

@@ -12,8 +12,8 @@ import {
import type { N8nJsonLoader } from '@utils/N8nJsonLoader';
import { pineconeIndexSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { pineconeIndexRLC } from '../shared/descriptions';
import { pineconeIndexSearch } from '../shared/methods/listSearch';
import { processDocuments } from '../shared/processDocuments';
// This node is deprecated. Use VectorStorePinecone instead.

View File

@@ -14,8 +14,8 @@ import { getMetadataFiltersValues } from '@utils/helpers';
import { logWrapper } from '@utils/logWrapper';
import { metadataFilterField } from '@utils/sharedFields';
import { pineconeIndexSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { pineconeIndexRLC } from '../shared/descriptions';
import { pineconeIndexSearch } from '../shared/methods/listSearch';
// This node is deprecated. Use VectorStorePinecone instead.
export class VectorStorePineconeLoad implements INodeType {

View File

@@ -5,9 +5,9 @@ import { QdrantVectorStore } from '@langchain/qdrant';
import type { Schemas as QdrantSchemas } from '@qdrant/js-client-rest';
import type { IDataObject, INodeProperties } from 'n8n-workflow';
import { createVectorStoreNode } from '../shared/createVectorStoreNode';
import { createVectorStoreNode } from '../shared/createVectorStoreNode/createVectorStoreNode';
import { qdrantCollectionsSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { qdrantCollectionRLC } from '../shared/descriptions';
import { qdrantCollectionsSearch } from '../shared/methods/listSearch';
class ExtendedQdrantVectorStore extends QdrantVectorStore {
private static defaultFilter: IDataObject = {};

View File

@@ -4,9 +4,9 @@ import { NodeOperationError, type INodeProperties } from 'n8n-workflow';
import { metadataFilterField } from '@utils/sharedFields';
import { createVectorStoreNode } from '../shared/createVectorStoreNode';
import { createVectorStoreNode } from '../shared/createVectorStoreNode/createVectorStoreNode';
import { supabaseTableNameSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { supabaseTableNameRLC } from '../shared/descriptions';
import { supabaseTableNameSearch } from '../shared/methods/listSearch';
const queryNameField: INodeProperties = {
displayName: 'Query Name',

View File

@@ -12,8 +12,8 @@ import {
import type { N8nJsonLoader } from '@utils/N8nJsonLoader';
import { supabaseTableNameSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { supabaseTableNameRLC } from '../shared/descriptions';
import { supabaseTableNameSearch } from '../shared/methods/listSearch';
import { processDocuments } from '../shared/processDocuments';
// This node is deprecated. Use VectorStoreSupabase instead.

View File

@@ -14,8 +14,8 @@ import { getMetadataFiltersValues } from '@utils/helpers';
import { logWrapper } from '@utils/logWrapper';
import { metadataFilterField } from '@utils/sharedFields';
import { supabaseTableNameSearch } from '../shared/createVectorStoreNode/methods/listSearch';
import { supabaseTableNameRLC } from '../shared/descriptions';
import { supabaseTableNameSearch } from '../shared/methods/listSearch';
// This node is deprecated. Use VectorStoreSupabase instead.
export class VectorStoreSupabaseLoad implements INodeType {

View File

@@ -5,7 +5,7 @@ import { NodeOperationError } from 'n8n-workflow';
import { metadataFilterField } from '@utils/sharedFields';
import { createVectorStoreNode } from '../shared/createVectorStoreNode';
import { createVectorStoreNode } from '../shared/createVectorStoreNode/createVectorStoreNode';
const embeddingDimensions: INodeProperties = {
displayName: 'Embedding Dimensions',

View File

@@ -1,547 +0,0 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import { DynamicTool } from 'langchain/tools';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type {
IExecuteFunctions,
INodeCredentialDescription,
INodeProperties,
INodeExecutionData,
INodeTypeDescription,
SupplyData,
ISupplyDataFunctions,
INodeType,
ILoadOptionsFunctions,
INodeListSearchResult,
Icon,
INodePropertyOptions,
ThemeIconColor,
} from 'n8n-workflow';
import { getMetadataFiltersValues, logAiEvent } from '@utils/helpers';
import { logWrapper } from '@utils/logWrapper';
import type { N8nBinaryLoader } from '@utils/N8nBinaryLoader';
import { N8nJsonLoader } from '@utils/N8nJsonLoader';
import { getConnectionHintNoticeField } from '@utils/sharedFields';
import { processDocument } from './processDocuments';
type NodeOperationMode = 'insert' | 'load' | 'retrieve' | 'update' | 'retrieve-as-tool';
const DEFAULT_OPERATION_MODES: NodeOperationMode[] = [
'load',
'insert',
'retrieve',
'retrieve-as-tool',
];
interface NodeMeta {
displayName: string;
name: string;
description: string;
docsUrl: string;
icon: Icon;
iconColor?: ThemeIconColor;
credentials?: INodeCredentialDescription[];
operationModes?: NodeOperationMode[];
}
export interface VectorStoreNodeConstructorArgs<T extends VectorStore = VectorStore> {
meta: NodeMeta;
methods?: {
listSearch?: {
[key: string]: (
this: ILoadOptionsFunctions,
filter?: string,
paginationToken?: string,
) => Promise<INodeListSearchResult>;
};
};
sharedFields: INodeProperties[];
insertFields?: INodeProperties[];
loadFields?: INodeProperties[];
retrieveFields?: INodeProperties[];
updateFields?: INodeProperties[];
populateVectorStore: (
context: IExecuteFunctions | ISupplyDataFunctions,
embeddings: Embeddings,
documents: Array<Document<Record<string, unknown>>>,
itemIndex: number,
) => Promise<void>;
getVectorStoreClient: (
context: IExecuteFunctions | ISupplyDataFunctions,
filter: Record<string, never> | undefined,
embeddings: Embeddings,
itemIndex: number,
) => Promise<T>;
releaseVectorStoreClient?: (vectorStore: T) => void;
}
function transformDescriptionForOperationMode(
fields: INodeProperties[],
mode: NodeOperationMode | NodeOperationMode[],
) {
return fields.map((field) => ({
...field,
displayOptions: { show: { mode: Array.isArray(mode) ? mode : [mode] } },
}));
}
function isUpdateSupported<T extends VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
): boolean {
return args.meta.operationModes?.includes('update') ?? false;
}
function getOperationModeOptions<T extends VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
): INodePropertyOptions[] {
const enabledOperationModes = args.meta.operationModes ?? DEFAULT_OPERATION_MODES;
const allOptions = [
{
name: 'Get Many',
value: 'load',
description: 'Get many ranked documents from vector store for query',
action: 'Get ranked documents from vector store',
},
{
name: 'Insert Documents',
value: 'insert',
description: 'Insert documents into vector store',
action: 'Add documents to vector store',
},
{
name: 'Retrieve Documents (As Vector Store for Chain/Tool)',
value: 'retrieve',
description: 'Retrieve documents from vector store to be used as vector store with AI nodes',
action: 'Retrieve documents for Chain/Tool as Vector Store',
outputConnectionType: NodeConnectionType.AiVectorStore,
},
{
name: 'Retrieve Documents (As Tool for AI Agent)',
value: 'retrieve-as-tool',
description: 'Retrieve documents from vector store to be used as tool with AI nodes',
action: 'Retrieve documents for AI Agent as Tool',
outputConnectionType: NodeConnectionType.AiTool,
},
{
name: 'Update Documents',
value: 'update',
description: 'Update documents in vector store by ID',
action: 'Update vector store documents',
},
];
return allOptions.filter(({ value }) =>
enabledOperationModes.includes(value as NodeOperationMode),
);
}
export const createVectorStoreNode = <T extends VectorStore = VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
) =>
class VectorStoreNodeType implements INodeType {
description: INodeTypeDescription = {
displayName: args.meta.displayName,
name: args.meta.name,
description: args.meta.description,
icon: args.meta.icon,
iconColor: args.meta.iconColor,
group: ['transform'],
version: 1,
defaults: {
name: args.meta.displayName,
},
codex: {
categories: ['AI'],
subcategories: {
AI: ['Vector Stores', 'Tools', 'Root Nodes'],
Tools: ['Other Tools'],
},
resources: {
primaryDocumentation: [
{
url: args.meta.docsUrl,
},
],
},
},
credentials: args.meta.credentials,
// eslint-disable-next-line n8n-nodes-base/node-class-description-inputs-wrong-regular-node
inputs: `={{
((parameters) => {
const mode = parameters?.mode;
const inputs = [{ displayName: "Embedding", type: "${NodeConnectionType.AiEmbedding}", required: true, maxConnections: 1}]
if (mode === 'retrieve-as-tool') {
return inputs;
}
if (['insert', 'load', 'update'].includes(mode)) {
inputs.push({ displayName: "", type: "${NodeConnectionType.Main}"})
}
if (['insert'].includes(mode)) {
inputs.push({ displayName: "Document", type: "${NodeConnectionType.AiDocument}", required: true, maxConnections: 1})
}
return inputs
})($parameter)
}}`,
outputs: `={{
((parameters) => {
const mode = parameters?.mode ?? 'retrieve';
if (mode === 'retrieve-as-tool') {
return [{ displayName: "Tool", type: "${NodeConnectionType.AiTool}"}]
}
if (mode === 'retrieve') {
return [{ displayName: "Vector Store", type: "${NodeConnectionType.AiVectorStore}"}]
}
return [{ displayName: "", type: "${NodeConnectionType.Main}"}]
})($parameter)
}}`,
properties: [
{
displayName: 'Operation Mode',
name: 'mode',
type: 'options',
noDataExpression: true,
default: 'retrieve',
options: getOperationModeOptions(args),
},
{
...getConnectionHintNoticeField([NodeConnectionType.AiRetriever]),
displayOptions: {
show: {
mode: ['retrieve'],
},
},
},
{
displayName: 'Name',
name: 'toolName',
type: 'string',
default: '',
required: true,
description: 'Name of the vector store',
placeholder: 'e.g. company_knowledge_base',
validateType: 'string-alphanumeric',
displayOptions: {
show: {
mode: ['retrieve-as-tool'],
},
},
},
{
displayName: 'Description',
name: 'toolDescription',
type: 'string',
default: '',
required: true,
typeOptions: { rows: 2 },
description:
'Explain to the LLM what this tool does, a good, specific description would allow LLMs to produce expected results much more often',
placeholder: `e.g. ${args.meta.description}`,
displayOptions: {
show: {
mode: ['retrieve-as-tool'],
},
},
},
...args.sharedFields,
...transformDescriptionForOperationMode(args.insertFields ?? [], 'insert'),
// Prompt and topK are always used for the load operation
{
displayName: 'Prompt',
name: 'prompt',
type: 'string',
default: '',
required: true,
description:
'Search prompt to retrieve matching documents from the vector store using similarity-based ranking',
displayOptions: {
show: {
mode: ['load'],
},
},
},
{
displayName: 'Limit',
name: 'topK',
type: 'number',
default: 4,
description: 'Number of top results to fetch from vector store',
displayOptions: {
show: {
mode: ['load', 'retrieve-as-tool'],
},
},
},
{
displayName: 'Include Metadata',
name: 'includeDocumentMetadata',
type: 'boolean',
default: true,
description: 'Whether or not to include document metadata',
displayOptions: {
show: {
mode: ['load', 'retrieve-as-tool'],
},
},
},
// ID is always used for update operation
{
displayName: 'ID',
name: 'id',
type: 'string',
default: '',
required: true,
description: 'ID of an embedding entry',
displayOptions: {
show: {
mode: ['update'],
},
},
},
...transformDescriptionForOperationMode(args.loadFields ?? [], [
'load',
'retrieve-as-tool',
]),
...transformDescriptionForOperationMode(args.retrieveFields ?? [], 'retrieve'),
...transformDescriptionForOperationMode(args.updateFields ?? [], 'update'),
],
};
methods = args.methods;
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const mode = this.getNodeParameter('mode', 0) as NodeOperationMode;
const embeddings = (await this.getInputConnectionData(
NodeConnectionType.AiEmbedding,
0,
)) as Embeddings;
if (mode === 'load') {
const items = this.getInputData(0);
const resultData = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const filter = getMetadataFiltersValues(this, itemIndex);
const vectorStore = await args.getVectorStoreClient(
this,
// We'll pass filter to similaritySearchVectorWithScore instead of getVectorStoreClient
undefined,
embeddings,
itemIndex,
);
try {
const prompt = this.getNodeParameter('prompt', itemIndex) as string;
const topK = this.getNodeParameter('topK', itemIndex, 4) as number;
const embeddedPrompt = await embeddings.embedQuery(prompt);
const docs = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
const includeDocumentMetadata = this.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;
const serializedDocs = docs.map(([doc, score]) => {
const document = {
pageContent: doc.pageContent,
...(includeDocumentMetadata ? { metadata: doc.metadata } : {}),
};
return {
json: { document, score },
pairedItem: {
item: itemIndex,
},
};
});
resultData.push(...serializedDocs);
logAiEvent(this, 'ai-vector-store-searched', { query: prompt });
} finally {
args.releaseVectorStoreClient?.(vectorStore);
}
}
return [resultData];
}
if (mode === 'insert') {
const items = this.getInputData();
const documentInput = (await this.getInputConnectionData(
NodeConnectionType.AiDocument,
0,
)) as N8nJsonLoader | N8nBinaryLoader | Array<Document<Record<string, unknown>>>;
const resultData = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
if (this.getExecutionCancelSignal()?.aborted) {
break;
}
const itemData = items[itemIndex];
const { processedDocuments, serializedDocuments } = await processDocument(
documentInput,
itemData,
itemIndex,
);
resultData.push(...serializedDocuments);
await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex);
logAiEvent(this, 'ai-vector-store-populated');
}
return [resultData];
}
if (mode === 'update') {
if (!isUpdateSupported(args)) {
throw new NodeOperationError(
this.getNode(),
'Update operation is not implemented for this Vector Store',
);
}
const items = this.getInputData();
const loader = new N8nJsonLoader(this);
const resultData = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const itemData = items[itemIndex];
const documentId = this.getNodeParameter('id', itemIndex, '', {
extractValue: true,
}) as string;
const vectorStore = await args.getVectorStoreClient(
this,
undefined,
embeddings,
itemIndex,
);
try {
const { processedDocuments, serializedDocuments } = await processDocument(
loader,
itemData,
itemIndex,
);
if (processedDocuments?.length !== 1) {
throw new NodeOperationError(this.getNode(), 'Single document per item expected');
}
resultData.push(...serializedDocuments);
// Use ids option to upsert instead of insert
await vectorStore.addDocuments(processedDocuments, {
ids: [documentId],
});
logAiEvent(this, 'ai-vector-store-updated');
} finally {
args.releaseVectorStoreClient?.(vectorStore);
}
}
return [resultData];
}
throw new NodeOperationError(
this.getNode(),
'Only the "load", "update" and "insert" operation modes are supported with execute',
);
}
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const mode = this.getNodeParameter('mode', 0) as NodeOperationMode;
const filter = getMetadataFiltersValues(this, itemIndex);
const embeddings = (await this.getInputConnectionData(
NodeConnectionType.AiEmbedding,
0,
)) as Embeddings;
if (mode === 'retrieve') {
const vectorStore = await args.getVectorStoreClient(this, filter, embeddings, itemIndex);
return {
response: logWrapper(vectorStore, this),
closeFunction: async () => {
args.releaseVectorStoreClient?.(vectorStore);
},
};
}
if (mode === 'retrieve-as-tool') {
const toolDescription = this.getNodeParameter('toolDescription', itemIndex) as string;
const toolName = this.getNodeParameter('toolName', itemIndex) as string;
const topK = this.getNodeParameter('topK', itemIndex, 4) as number;
const includeDocumentMetadata = this.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;
const vectorStoreTool = new DynamicTool({
name: toolName,
description: toolDescription,
func: async (input) => {
const vectorStore = await args.getVectorStoreClient(
this,
filter,
embeddings,
itemIndex,
);
try {
const embeddedPrompt = await embeddings.embedQuery(input);
const documents = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
return documents
.map((document) => {
if (includeDocumentMetadata) {
return { type: 'text', text: JSON.stringify(document[0]) };
}
return {
type: 'text',
text: JSON.stringify({ pageContent: document[0].pageContent }),
};
})
.filter((document) => !!document);
} finally {
args.releaseVectorStoreClient?.(vectorStore);
}
},
});
return {
response: logWrapper(vectorStoreTool, this),
};
}
throw new NodeOperationError(
this.getNode(),
'Only the "retrieve" and "retrieve-as-tool" operation mode is supported to supply data',
);
}
};

View File

@@ -0,0 +1,208 @@
## Overview
`createVectorStoreNode` is a factory function that generates n8n nodes for vector store operations. It abstracts the common functionality needed for vector stores while allowing specific implementations to focus only on their unique aspects.
## Purpose
The function provides a standardized way to:
1. Create vector store nodes with consistent UIs
2. Handle different operation modes (load, insert, retrieve, update, retrieve-as-tool)
3. Process documents and embeddings
4. Maintain connection to LLM services
## Architecture
```
/createVectorStoreNode/ # Create Vector Store Node
/constants.ts # Constants like operation modes and descriptions
/types.ts # TypeScript interfaces and types
/utils.ts # Utility functions for node configuration
/createVectorStoreNode.ts # Main factory function
/processDocuments.ts # Document processing helpers
/operations/ # Operation-specific logic
/loadOperation.ts # Handles 'load' mode
/insertOperation.ts # Handles 'insert' mode
/updateOperation.ts # Handles 'update' mode
/retrieveOperation.ts # Handles 'retrieve' mode
/retrieveAsToolOperation.ts # Handles 'retrieve-as-tool' mode
```
## Usage
To create a new vector store node:
```typescript
import { createVectorStoreNode } from './createVectorStoreNode';
export class MyVectorStoreNode {
static description = createVectorStoreNode({
meta: {
displayName: 'My Vector Store',
name: 'myVectorStore',
description: 'Operations for My Vector Store',
docsUrl: 'https://docs.example.com/my-vector-store',
icon: 'file:myIcon.svg',
// Optional: specify which operations this vector store supports
operationModes: ['load', 'insert', 'update','retrieve', 'retrieve-as-tool'],
},
sharedFields: [
// Fields shown in all operation modes
],
loadFields: [
// Fields specific to 'load' operation
],
insertFields: [
// Fields specific to 'insert' operation
],
retrieveFields: [
// Fields specific to 'retrieve' operation
],
// Functions to implement
getVectorStoreClient: async (context, filter, embeddings, itemIndex) => {
// Create and return vector store instance
},
populateVectorStore: async (context, embeddings, documents, itemIndex) => {
// Insert documents into vector store
},
// Optional: cleanup function - called in finally blocks after operations
releaseVectorStoreClient: (vectorStore) => {
// Release resources such as database connections or external clients
// For example, in PGVector: vectorStore.client?.release();
},
});
}
```
## Operation Modes
### 1. `load` Mode
- Retrieves documents from the vector store based on a query
- Embeds the query and performs similarity search
- Returns ranked documents with their similarity scores
### 2. `insert` Mode
- Processes documents from input
- Embeds and stores documents in the vector store
- Returns serialized documents with metadata
- Supports batched processing with configurable embedding batch size
### 3. `retrieve` Mode
- Returns the vector store instance for use with AI nodes
- Allows LLMs to query the vector store directly
- Used with chains and retrievers
### 4. `retrieve-as-tool` Mode
- Creates a tool that wraps the vector store
- Allows AI agents to use the vector store as a tool
- Returns documents in a format digestible by agents
### 5. `update` Mode (optional)
- Updates existing documents in the vector store by ID
- Requires the vector store to support document updates
- Only enabled if included in `operationModes`
- Uses `addDocuments` method with an `ids` array to update specific documents
- Processes a single document per item and applies it to the specified ID
- Validates that only one document is being updated per operation
## Key Components
### 1. NodeConstructorArgs Interface
Defines the configuration and callbacks that specific vector store implementations must provide:
> **Note:** In node version 1.1+, the `populateVectorStore` function must handle receiving multiple documents at once for batch processing.
```typescript
interface VectorStoreNodeConstructorArgs<T extends VectorStore> {
meta: NodeMeta; // Node metadata (name, description, etc.)
methods?: { ... }; // Optional methods for list searches
sharedFields: INodeProperties[]; // Fields shown in all modes
insertFields?: INodeProperties[]; // Fields specific to insert mode
loadFields?: INodeProperties[]; // Fields specific to load mode
retrieveFields?: INodeProperties[]; // Fields specific to retrieve mode
updateFields?: INodeProperties[]; // Fields specific to update mode
// Core implementation functions
populateVectorStore: Function; // Store documents in vector store (accepts batches in v1.1+)
getVectorStoreClient: Function; // Get vector store instance
releaseVectorStoreClient?: Function; // Clean up resources
}
```
### 2. Operation Handlers
Each operation mode has its own handler module with a well-defined interface:
```typescript
// Example: loadOperation.ts
export async function handleLoadOperation<T extends VectorStore>(
context: IExecuteFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings,
itemIndex: number
): Promise<INodeExecutionData[]>
// Example: insertOperation.ts (v1.1+)
export async function handleInsertOperation<T extends VectorStore>(
context: IExecuteFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings
): Promise<INodeExecutionData[]>
```
### 3. Document Processing
The `processDocument` function standardizes how documents are handled:
```typescript
const { processedDocuments, serializedDocuments } = await processDocument(
documentInput,
itemData,
itemIndex
);
```
## Implementation Details
### Error Handling and Resource Management
Each operation handler includes error handling with proper resource cleanup. The `releaseVectorStoreClient` function is called in a `finally` block to ensure resources are released even if an error occurs:
```typescript
try {
// Operation logic
} finally {
// Release resources even if an error occurs
args.releaseVectorStoreClient?.(vectorStore);
}
```
#### When releaseVectorStoreClient is called:
- After completing a similarity search in `loadOperation`
- As part of the `closeFunction` in `retrieveOperation` to release resources when they're no longer needed
- After each tool use in `retrieveAsToolOperation`
- After updating documents in `updateOperation`
- After inserting documents in `insertOperation`
This design ensures proper resource management, which is especially important for database-backed vector stores (like PGVector) that need to return connections to a pool. Without proper cleanup, prolonged usage could lead to resource leaks or connection pool exhaustion.
### Dynamic Tool Creation
For the `retrieve-as-tool` mode, a DynamicTool is created that exposes vector store functionality:
```typescript
const vectorStoreTool = new DynamicTool({
name: toolName,
description: toolDescription,
func: async (input) => {
// Search vector store with input
// ...
},
});
```
## Performance Considerations
1. **Resource Management**: Each operation properly handles resource cleanup with `releaseVectorStoreClient`.
2. **Batched Processing**: The `insert` operation processes documents in configurable batches. In node version 1.1+, a single embedding operation is performed for all documents in a batch, significantly improving performance by reducing API calls.
3. **Metadata Filtering**: Filters can be applied during search operations to reduce result sets.
4. **Execution Cancellation**: The code checks for cancellation signals to stop processing when needed.

View File

@@ -157,6 +157,27 @@ exports[`createVectorStoreNode retrieve mode supplies vector store as data 1`] =
"rows": 2,
},
},
{
"default": 200,
"description": "Number of documents to embed in a single batch",
"displayName": "Embedding Batch Size",
"displayOptions": {
"show": {
"@version": [
{
"_cnd": {
"gte": 1.1,
},
},
],
"mode": [
"insert",
],
},
},
"name": "embeddingBatchSize",
"type": "number",
},
{
"default": "",
"description": "Search prompt to retrieve matching documents from the vector store using similarity-based ranking",
@@ -229,6 +250,9 @@ exports[`createVectorStoreNode retrieve mode supplies vector store as data 1`] =
"name": "loadField",
},
],
"version": 1,
"version": [
1,
1.1,
],
}
`;

View File

@@ -0,0 +1,185 @@
import type { VectorStore } from '@langchain/core/vectorstores';
import type { INodeProperties } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { DEFAULT_OPERATION_MODES } from '../constants';
import type { VectorStoreNodeConstructorArgs, NodeOperationMode } from '../types';
import {
transformDescriptionForOperationMode,
isUpdateSupported,
getOperationModeOptions,
} from '../utils';
describe('Vector Store Utilities', () => {
describe('transformDescriptionForOperationMode', () => {
const testFields: INodeProperties[] = [
{
displayName: 'Test Field 1',
name: 'testField1',
type: 'string',
default: '',
},
{
displayName: 'Test Field 2',
name: 'testField2',
type: 'number',
default: 0,
},
];
it('should add displayOptions for a single mode', () => {
const result = transformDescriptionForOperationMode(testFields, 'load');
expect(result).toHaveLength(2);
expect(result[0].displayOptions).toEqual({ show: { mode: ['load'] } });
expect(result[1].displayOptions).toEqual({ show: { mode: ['load'] } });
});
it('should add displayOptions for multiple modes', () => {
const result = transformDescriptionForOperationMode(testFields, ['load', 'insert']);
expect(result).toHaveLength(2);
expect(result[0].displayOptions).toEqual({ show: { mode: ['load', 'insert'] } });
expect(result[1].displayOptions).toEqual({ show: { mode: ['load', 'insert'] } });
});
it('should preserve other properties of the fields', () => {
const result = transformDescriptionForOperationMode(testFields, 'load');
expect(result[0].displayName).toBe('Test Field 1');
expect(result[0].name).toBe('testField1');
expect(result[0].type).toBe('string');
expect(result[0].default).toBe('');
expect(result[1].displayName).toBe('Test Field 2');
expect(result[1].name).toBe('testField2');
expect(result[1].type).toBe('number');
expect(result[1].default).toBe(0);
});
});
describe('isUpdateSupported', () => {
it('should return true when update is in operationModes', () => {
const args = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Test description',
docsUrl: 'https://example.com',
icon: 'file:test.svg',
operationModes: ['load', 'insert', 'update'] as NodeOperationMode[],
},
sharedFields: [],
getVectorStoreClient: jest.fn(),
populateVectorStore: jest.fn(),
} as unknown as VectorStoreNodeConstructorArgs<VectorStore>;
expect(isUpdateSupported(args)).toBe(true);
});
it('should return false when update is not in operationModes', () => {
const args = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Test description',
docsUrl: 'https://example.com',
icon: 'file:test.svg',
operationModes: ['load', 'insert'] as NodeOperationMode[],
},
sharedFields: [],
getVectorStoreClient: jest.fn(),
populateVectorStore: jest.fn(),
} as unknown as VectorStoreNodeConstructorArgs<VectorStore>;
expect(isUpdateSupported(args)).toBe(false);
});
it('should return false when operationModes is undefined', () => {
const args = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Test description',
docsUrl: 'https://example.com',
icon: 'file:test.svg',
},
sharedFields: [],
getVectorStoreClient: jest.fn(),
populateVectorStore: jest.fn(),
} as unknown as VectorStoreNodeConstructorArgs<VectorStore>;
expect(isUpdateSupported(args)).toBe(false);
});
});
describe('getOperationModeOptions', () => {
it('should return options for specified operation modes', () => {
const args = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Test description',
docsUrl: 'https://example.com',
icon: 'file:test.svg',
operationModes: ['load', 'insert'] as NodeOperationMode[],
},
sharedFields: [],
getVectorStoreClient: jest.fn(),
populateVectorStore: jest.fn(),
} as unknown as VectorStoreNodeConstructorArgs<VectorStore>;
const result = getOperationModeOptions(args);
expect(result).toHaveLength(2);
expect(result[0].value).toBe('load');
expect(result[1].value).toBe('insert');
});
it('should return default operation modes when not specified', () => {
const args = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Test description',
docsUrl: 'https://example.com',
icon: 'file:test.svg',
},
sharedFields: [],
getVectorStoreClient: jest.fn(),
populateVectorStore: jest.fn(),
} as unknown as VectorStoreNodeConstructorArgs<VectorStore>;
const result = getOperationModeOptions(args);
expect(result).toHaveLength(DEFAULT_OPERATION_MODES.length);
DEFAULT_OPERATION_MODES.forEach((mode) => {
expect(result.some((option) => option.value === mode)).toBe(true);
});
});
it('should include output connection type properties from OPERATION_MODE_DESCRIPTIONS', () => {
const args = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Test description',
docsUrl: 'https://example.com',
icon: 'file:test.svg',
operationModes: ['retrieve', 'retrieve-as-tool'] as NodeOperationMode[],
},
sharedFields: [],
getVectorStoreClient: jest.fn(),
populateVectorStore: jest.fn(),
} as unknown as VectorStoreNodeConstructorArgs<VectorStore>;
const result = getOperationModeOptions(args);
const retrieveOption = result.find((option) => option.value === 'retrieve');
const retrieveAsToolOption = result.find((option) => option.value === 'retrieve-as-tool');
expect(retrieveOption?.outputConnectionType).toBe(NodeConnectionType.AiVectorStore);
expect(retrieveAsToolOption?.outputConnectionType).toBe(NodeConnectionType.AiTool);
});
});
});

View File

@@ -0,0 +1,46 @@
import { NodeConnectionType } from 'n8n-workflow';
import type { INodePropertyOptions } from 'n8n-workflow';
import type { NodeOperationMode } from './types';
export const DEFAULT_OPERATION_MODES: NodeOperationMode[] = [
'load',
'insert',
'retrieve',
'retrieve-as-tool',
];
export const OPERATION_MODE_DESCRIPTIONS: INodePropertyOptions[] = [
{
name: 'Get Many',
value: 'load',
description: 'Get many ranked documents from vector store for query',
action: 'Get ranked documents from vector store',
},
{
name: 'Insert Documents',
value: 'insert',
description: 'Insert documents into vector store',
action: 'Add documents to vector store',
},
{
name: 'Retrieve Documents (As Vector Store for Chain/Tool)',
value: 'retrieve',
description: 'Retrieve documents from vector store to be used as vector store with AI nodes',
action: 'Retrieve documents for Chain/Tool as Vector Store',
outputConnectionType: NodeConnectionType.AiVectorStore,
},
{
name: 'Retrieve Documents (As Tool for AI Agent)',
value: 'retrieve-as-tool',
description: 'Retrieve documents from vector store to be used as tool with AI nodes',
action: 'Retrieve documents for AI Agent as Tool',
outputConnectionType: NodeConnectionType.AiTool,
},
{
name: 'Update Documents',
value: 'update',
description: 'Update documents in vector store by ID',
action: 'Update vector store documents',
},
];

View File

@@ -1,3 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { DocumentInterface } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
@@ -5,8 +7,8 @@ import { mock } from 'jest-mock-extended';
import type { DynamicTool } from 'langchain/tools';
import type { ISupplyDataFunctions, NodeParameterValueType } from 'n8n-workflow';
import type { VectorStoreNodeConstructorArgs } from './createVectorStoreNode';
import { createVectorStoreNode } from './createVectorStoreNode';
import type { VectorStoreNodeConstructorArgs } from './types';
jest.mock('@utils/logWrapper', () => ({
logWrapper: jest.fn().mockImplementation((val: DynamicTool) => ({ logWrapped: val })),

View File

@@ -0,0 +1,295 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type {
IExecuteFunctions,
INodeExecutionData,
INodeTypeDescription,
SupplyData,
ISupplyDataFunctions,
INodeType,
} from 'n8n-workflow';
import { getConnectionHintNoticeField } from '@utils/sharedFields';
// Import custom types
import {
handleLoadOperation,
handleInsertOperation,
handleUpdateOperation,
handleRetrieveOperation,
handleRetrieveAsToolOperation,
} from './operations';
import type { NodeOperationMode, VectorStoreNodeConstructorArgs } from './types';
// Import utility functions
import { transformDescriptionForOperationMode, getOperationModeOptions } from './utils';
// Import operation handlers
/**
* Creates a vector store node with the given configuration
* This factory function produces a complete node class that implements all vector store operations
*/
export const createVectorStoreNode = <T extends VectorStore = VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
) =>
class VectorStoreNodeType implements INodeType {
description: INodeTypeDescription = {
displayName: args.meta.displayName,
name: args.meta.name,
description: args.meta.description,
icon: args.meta.icon,
iconColor: args.meta.iconColor,
group: ['transform'],
version: [1, 1.1],
defaults: {
name: args.meta.displayName,
},
codex: {
categories: ['AI'],
subcategories: {
AI: ['Vector Stores', 'Tools', 'Root Nodes'],
Tools: ['Other Tools'],
},
resources: {
primaryDocumentation: [
{
url: args.meta.docsUrl,
},
],
},
},
credentials: args.meta.credentials,
// eslint-disable-next-line n8n-nodes-base/node-class-description-inputs-wrong-regular-node
inputs: `={{
((parameters) => {
const mode = parameters?.mode;
const inputs = [{ displayName: "Embedding", type: "${NodeConnectionType.AiEmbedding}", required: true, maxConnections: 1}]
if (mode === 'retrieve-as-tool') {
return inputs;
}
if (['insert', 'load', 'update'].includes(mode)) {
inputs.push({ displayName: "", type: "${NodeConnectionType.Main}"})
}
if (['insert'].includes(mode)) {
inputs.push({ displayName: "Document", type: "${NodeConnectionType.AiDocument}", required: true, maxConnections: 1})
}
return inputs
})($parameter)
}}`,
outputs: `={{
((parameters) => {
const mode = parameters?.mode ?? 'retrieve';
if (mode === 'retrieve-as-tool') {
return [{ displayName: "Tool", type: "${NodeConnectionType.AiTool}"}]
}
if (mode === 'retrieve') {
return [{ displayName: "Vector Store", type: "${NodeConnectionType.AiVectorStore}"}]
}
return [{ displayName: "", type: "${NodeConnectionType.Main}"}]
})($parameter)
}}`,
properties: [
{
displayName: 'Operation Mode',
name: 'mode',
type: 'options',
noDataExpression: true,
default: 'retrieve',
options: getOperationModeOptions(args),
},
{
...getConnectionHintNoticeField([NodeConnectionType.AiRetriever]),
displayOptions: {
show: {
mode: ['retrieve'],
},
},
},
{
displayName: 'Name',
name: 'toolName',
type: 'string',
default: '',
required: true,
description: 'Name of the vector store',
placeholder: 'e.g. company_knowledge_base',
validateType: 'string-alphanumeric',
displayOptions: {
show: {
mode: ['retrieve-as-tool'],
},
},
},
{
displayName: 'Description',
name: 'toolDescription',
type: 'string',
default: '',
required: true,
typeOptions: { rows: 2 },
description:
'Explain to the LLM what this tool does, a good, specific description would allow LLMs to produce expected results much more often',
placeholder: `e.g. ${args.meta.description}`,
displayOptions: {
show: {
mode: ['retrieve-as-tool'],
},
},
},
...args.sharedFields,
{
displayName: 'Embedding Batch Size',
name: 'embeddingBatchSize',
type: 'number',
default: 200,
description: 'Number of documents to embed in a single batch',
displayOptions: {
show: {
mode: ['insert'],
'@version': [{ _cnd: { gte: 1.1 } }],
},
},
},
...transformDescriptionForOperationMode(args.insertFields ?? [], 'insert'),
// Prompt and topK are always used for the load operation
{
displayName: 'Prompt',
name: 'prompt',
type: 'string',
default: '',
required: true,
description:
'Search prompt to retrieve matching documents from the vector store using similarity-based ranking',
displayOptions: {
show: {
mode: ['load'],
},
},
},
{
displayName: 'Limit',
name: 'topK',
type: 'number',
default: 4,
description: 'Number of top results to fetch from vector store',
displayOptions: {
show: {
mode: ['load', 'retrieve-as-tool'],
},
},
},
{
displayName: 'Include Metadata',
name: 'includeDocumentMetadata',
type: 'boolean',
default: true,
description: 'Whether or not to include document metadata',
displayOptions: {
show: {
mode: ['load', 'retrieve-as-tool'],
},
},
},
// ID is always used for update operation
{
displayName: 'ID',
name: 'id',
type: 'string',
default: '',
required: true,
description: 'ID of an embedding entry',
displayOptions: {
show: {
mode: ['update'],
},
},
},
...transformDescriptionForOperationMode(args.loadFields ?? [], [
'load',
'retrieve-as-tool',
]),
...transformDescriptionForOperationMode(args.retrieveFields ?? [], 'retrieve'),
...transformDescriptionForOperationMode(args.updateFields ?? [], 'update'),
],
};
methods = args.methods;
/**
* Method to execute the node in regular workflow mode
* Supports 'load', 'insert', and 'update' operation modes
*/
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const mode = this.getNodeParameter('mode', 0) as NodeOperationMode;
// Get the embeddings model connected to this node
const embeddings = (await this.getInputConnectionData(
NodeConnectionType.AiEmbedding,
0,
)) as Embeddings;
// Handle each operation mode with dedicated modules
if (mode === 'load') {
const items = this.getInputData(0);
const resultData = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const docs = await handleLoadOperation(this, args, embeddings, itemIndex);
resultData.push(...docs);
}
return [resultData];
}
if (mode === 'insert') {
const resultData = await handleInsertOperation(this, args, embeddings);
return [resultData];
}
if (mode === 'update') {
const resultData = await handleUpdateOperation(this, args, embeddings);
return [resultData];
}
throw new NodeOperationError(
this.getNode(),
'Only the "load", "update" and "insert" operation modes are supported with execute',
);
}
/**
* Method to supply data to AI nodes
* Supports 'retrieve' and 'retrieve-as-tool' operation modes
*/
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const mode = this.getNodeParameter('mode', 0) as NodeOperationMode;
// Get the embeddings model connected to this node
const embeddings = (await this.getInputConnectionData(
NodeConnectionType.AiEmbedding,
0,
)) as Embeddings;
// Handle each supply data operation mode with dedicated modules
if (mode === 'retrieve') {
return await handleRetrieveOperation(this, args, embeddings, itemIndex);
}
if (mode === 'retrieve-as-tool') {
return await handleRetrieveAsToolOperation(this, args, embeddings, itemIndex);
}
throw new NodeOperationError(
this.getNode(),
'Only the "retrieve" and "retrieve-as-tool" operation mode is supported to supply data',
);
}
};

View File

@@ -0,0 +1,299 @@
/* eslint-disable @typescript-eslint/unbound-method */
/* eslint-disable @typescript-eslint/no-unsafe-return */
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { logAiEvent } from '@utils/helpers';
import type { N8nBinaryLoader } from '@utils/N8nBinaryLoader';
import type { N8nJsonLoader } from '@utils/N8nJsonLoader';
import type { VectorStoreNodeConstructorArgs } from '../../types';
import { handleInsertOperation } from '../insertOperation';
// Mock processDocument function
jest.mock('../../../processDocuments', () => ({
processDocument: jest.fn().mockImplementation((_documentInput, _itemData, itemIndex: number) => {
const mockProcessed = [
{
pageContent: `processed content ${itemIndex}`,
metadata: { source: 'test' },
} as Document,
];
const mockSerialized = [
{
json: {
pageContent: `processed content ${itemIndex}`,
metadata: { source: 'test' },
},
pairedItem: { item: itemIndex },
},
];
return {
processedDocuments: mockProcessed,
serializedDocuments: mockSerialized,
};
}),
}));
// Mock helper functions
jest.mock('@utils/helpers', () => ({
logAiEvent: jest.fn(),
}));
// Helper functions for testing
function createMockAbortSignal(aborted = false): AbortSignal {
return {
aborted,
addEventListener: jest.fn(),
removeEventListener: jest.fn(),
dispatchEvent: jest.fn(),
onabort: null,
reason: undefined,
throwIfAborted: jest.fn(),
} as unknown as AbortSignal;
}
// Create a mock implementation for getNodeParameter
function createNodeParameterMock(batchSize?: number) {
return (paramName: string, _: number, fallbackValue: any) => {
if (paramName === 'embeddingBatchSize' && batchSize !== undefined) {
return batchSize;
}
return fallbackValue;
};
}
describe('handleInsertOperation', () => {
let mockContext: MockProxy<IExecuteFunctions>;
let mockEmbeddings: MockProxy<Embeddings>;
let mockVectorStore: MockProxy<VectorStore>;
let mockArgs: VectorStoreNodeConstructorArgs<VectorStore>;
let mockInputItems: INodeExecutionData[];
let mockJsonLoader: MockProxy<N8nJsonLoader>;
beforeEach(() => {
// Mock input items
mockInputItems = [
{ json: { text: 'test document 1' } },
{ json: { text: 'test document 2' } },
{ json: { text: 'test document 3' } },
];
// Setup context mock
mockContext = mock<IExecuteFunctions>();
mockContext.getInputData.mockReturnValue(mockInputItems);
// Create a mock AbortSignal
const mockAbortSignal = createMockAbortSignal(false);
mockContext.getExecutionCancelSignal.mockReturnValue(mockAbortSignal);
mockContext.getInputConnectionData.mockResolvedValue(mockJsonLoader);
mockContext.getNode.mockReturnValue({
typeVersion: 1.1,
id: '',
name: '',
type: '',
position: [0, 0],
parameters: {},
});
// Setup embeddings mock
mockEmbeddings = mock<Embeddings>();
// Setup JSON loader mock
mockJsonLoader = mock<N8nJsonLoader>();
// Setup vector store mock
mockVectorStore = mock<VectorStore>();
// Setup args mock
mockArgs = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Vector store for testing',
docsUrl: 'https://example.com',
icon: 'file:testIcon.svg',
},
sharedFields: [],
getVectorStoreClient: jest.fn().mockResolvedValue(mockVectorStore),
populateVectorStore: jest.fn().mockResolvedValue(undefined),
releaseVectorStoreClient: jest.fn(),
};
});
afterEach(() => {
jest.clearAllMocks();
});
it('should process all input items and populate vector store', async () => {
const result = await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Should get document input from connection
expect(mockContext.getInputConnectionData).toHaveBeenCalledWith(
NodeConnectionType.AiDocument,
0,
);
// Should process each item
expect(result).toHaveLength(3);
// Should call populateVectorStore for each item
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(1);
// Should log AI event for each item
expect(logAiEvent).toHaveBeenCalledTimes(3);
expect(logAiEvent).toHaveBeenCalledWith(mockContext, 'ai-vector-store-populated');
});
it('should stop processing if execution is cancelled', async () => {
// Create mock AbortSignals for each call
const notAbortedSignal = createMockAbortSignal(false);
const abortedSignal = createMockAbortSignal(true);
// Mock execution being cancelled after first item
mockContext.getExecutionCancelSignal
.mockReturnValueOnce(notAbortedSignal)
.mockReturnValueOnce(abortedSignal);
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Should only process the first item
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(1);
expect(logAiEvent).toHaveBeenCalledTimes(1);
});
it('should handle different document input types', async () => {
// Test with Binary Loader
const mockBinaryLoader = mock<N8nBinaryLoader>();
mockContext.getInputConnectionData.mockResolvedValueOnce(mockBinaryLoader);
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Test with Document Array
const mockDocuments = [{ pageContent: 'test content', metadata: {} } as Document];
mockContext.getInputConnectionData.mockResolvedValueOnce(mockDocuments);
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Both calls should process all items
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(2);
});
it('should pass the correct documents to populateVectorStore', async () => {
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Check that populateVectorStore is called once with all documents
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(1);
expect(mockArgs.populateVectorStore).toHaveBeenCalledWith(
mockContext,
mockEmbeddings,
expect.arrayContaining([
expect.objectContaining({
pageContent: 'processed content 0',
metadata: { source: 'test' },
}),
expect.objectContaining({
pageContent: 'processed content 1',
metadata: { source: 'test' },
}),
expect.objectContaining({
pageContent: 'processed content 2',
metadata: { source: 'test' },
}),
]),
0,
);
});
it('should batch documents when node version is 1.1 and above', async () => {
// Create more documents to test batching
const manyItems = Array(10)
.fill(null)
.map((_, i) => ({
json: { text: `test document ${i}` },
}));
mockContext.getInputData.mockReturnValue(manyItems);
// Set smaller batch size
mockContext.getNodeParameter.mockImplementation(createNodeParameterMock(3));
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Should call populateVectorStore multiple times based on batch size
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(4); // 10 documents with batch size 3 = 4 batches
});
it('should run populateVectorStore for each item when node version is 1', async () => {
// Set node version to 1
mockContext.getNode.mockReturnValue({
typeVersion: 1,
id: '',
name: '',
type: '',
position: [0, 0],
parameters: {},
});
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// Should run populateVectorStore for each item
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(3);
// Should call populateVectorStore for each item with index parameter
expect(mockArgs.populateVectorStore).toHaveBeenNthCalledWith(
1,
mockContext,
mockEmbeddings,
expect.arrayContaining([
expect.objectContaining({
pageContent: 'processed content 0',
metadata: { source: 'test' },
}),
]),
0,
);
expect(mockArgs.populateVectorStore).toHaveBeenNthCalledWith(
2,
mockContext,
mockEmbeddings,
expect.arrayContaining([
expect.objectContaining({
pageContent: 'processed content 1',
metadata: { source: 'test' },
}),
]),
1,
);
expect(mockArgs.populateVectorStore).toHaveBeenNthCalledWith(
3,
mockContext,
mockEmbeddings,
expect.arrayContaining([
expect.objectContaining({
pageContent: 'processed content 2',
metadata: { source: 'test' },
}),
]),
2,
);
});
it('should use default batch size of 200 when not specified', async () => {
// Test fallback behavior (undefined means use fallback value)
mockContext.getNodeParameter.mockImplementation(createNodeParameterMock());
await handleInsertOperation(mockContext, mockArgs, mockEmbeddings);
// With only 3 documents and default batch size of 200, should only call once
expect(mockArgs.populateVectorStore).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,145 @@
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/unbound-method */
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import type { IDataObject, IExecuteFunctions } from 'n8n-workflow';
import { logAiEvent } from '@utils/helpers';
import type { VectorStoreNodeConstructorArgs } from '../../types';
import { handleLoadOperation } from '../loadOperation';
// Mock helper functions from external modules
jest.mock('@utils/helpers', () => ({
getMetadataFiltersValues: jest.fn().mockReturnValue({ testFilter: 'value' }),
logAiEvent: jest.fn(),
}));
describe('handleLoadOperation', () => {
let mockContext: MockProxy<IExecuteFunctions>;
let mockEmbeddings: MockProxy<Embeddings>;
let mockVectorStore: MockProxy<VectorStore>;
let mockArgs: VectorStoreNodeConstructorArgs<VectorStore>;
let nodeParameters: Record<string, any>;
beforeEach(() => {
nodeParameters = {
prompt: 'test search query',
topK: 3,
includeDocumentMetadata: true,
};
mockContext = mock<IExecuteFunctions>();
mockContext.getNodeParameter.mockImplementation((parameterName, _itemIndex, fallbackValue) => {
if (typeof parameterName !== 'string') return fallbackValue;
return nodeParameters[parameterName] ?? fallbackValue;
});
mockEmbeddings = mock<Embeddings>();
mockEmbeddings.embedQuery.mockResolvedValue([0.1, 0.2, 0.3]);
mockVectorStore = mock<VectorStore>();
mockVectorStore.similaritySearchVectorWithScore.mockResolvedValue([
[{ pageContent: 'test content 1', metadata: { test: 'metadata 1' } } as Document, 0.95],
[{ pageContent: 'test content 2', metadata: { test: 'metadata 2' } } as Document, 0.85],
[{ pageContent: 'test content 3', metadata: { test: 'metadata 3' } } as Document, 0.75],
]);
mockArgs = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Vector store for testing',
docsUrl: 'https://example.com',
icon: 'file:testIcon.svg',
},
sharedFields: [],
getVectorStoreClient: jest.fn().mockResolvedValue(mockVectorStore),
populateVectorStore: jest.fn().mockResolvedValue(undefined),
releaseVectorStoreClient: jest.fn(),
};
});
afterEach(() => {
jest.clearAllMocks();
});
it('should retrieve documents from vector store with similarity search', async () => {
const result = await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(mockArgs.getVectorStoreClient).toHaveBeenCalledWith(
mockContext,
undefined,
mockEmbeddings,
0,
);
expect(mockEmbeddings.embedQuery).toHaveBeenCalledWith('test search query');
expect(mockVectorStore.similaritySearchVectorWithScore).toHaveBeenCalledWith(
[0.1, 0.2, 0.3],
3,
{ testFilter: 'value' },
);
expect(result).toHaveLength(3);
});
it('should include document metadata when includeDocumentMetadata is true', async () => {
const result = await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(result[0].json.document).toHaveProperty('metadata');
expect((result[0].json?.document as IDataObject)?.metadata).toEqual({ test: 'metadata 1' });
expect((result[0].json?.document as IDataObject)?.pageContent).toEqual('test content 1');
expect(result[0].json?.score).toEqual(0.95);
});
it('should exclude document metadata when includeDocumentMetadata is false', async () => {
nodeParameters.includeDocumentMetadata = false;
const result = await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(result[0].json?.document).not.toHaveProperty('metadata');
expect((result[0].json?.document as IDataObject)?.pageContent).toEqual('test content 1');
expect(result[0].json?.score).toEqual(0.95);
});
it('should use the topK parameter to limit results', async () => {
nodeParameters.topK = 2;
await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(mockVectorStore.similaritySearchVectorWithScore).toHaveBeenCalledWith(
expect.anything(),
2,
expect.anything(),
);
});
it('should properly set pairedItem property in results', async () => {
const result = await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
result.forEach((item) => {
expect(item).toHaveProperty('pairedItem');
expect(item.pairedItem).toEqual({ item: 0 });
});
});
it('should log AI event with query after search is complete', async () => {
await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(logAiEvent).toHaveBeenCalledWith(mockContext, 'ai-vector-store-searched', {
query: 'test search query',
});
});
it('should release vector store client even if an error occurs', async () => {
mockVectorStore.similaritySearchVectorWithScore.mockRejectedValue(new Error('Test error'));
await expect(handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0)).rejects.toThrow(
'Test error',
);
expect(mockArgs.releaseVectorStoreClient).toHaveBeenCalledWith(mockVectorStore);
});
});

View File

@@ -0,0 +1,114 @@
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, ISupplyDataFunctions } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import type { VectorStoreNodeConstructorArgs } from '../../types';
import { handleLoadOperation } from '../loadOperation';
import { handleRetrieveAsToolOperation } from '../retrieveAsToolOperation';
import { handleRetrieveOperation } from '../retrieveOperation';
import { handleUpdateOperation } from '../updateOperation';
describe('Vector Store Operation Handlers', () => {
let mockContext: MockProxy<IExecuteFunctions & ISupplyDataFunctions>;
let mockEmbeddings: MockProxy<Embeddings>;
let mockVectorStore: MockProxy<VectorStore>;
let mockArgs: VectorStoreNodeConstructorArgs<VectorStore>;
let nodeParameters: Record<string, any>;
beforeEach(() => {
nodeParameters = {
mode: 'load',
prompt: 'test query',
topK: 3,
includeDocumentMetadata: true,
toolName: 'test_tool',
toolDescription: 'Test tool description',
};
mockContext = mock<IExecuteFunctions & ISupplyDataFunctions>();
mockContext.getNodeParameter.mockImplementation((parameterName, _itemIndex, fallbackValue) => {
if (typeof parameterName !== 'string') return fallbackValue;
return nodeParameters[parameterName] ?? fallbackValue;
});
mockContext.getInputData.mockReturnValue([{ json: { test: 'data' } }]);
mockEmbeddings = mock<Embeddings>();
mockEmbeddings.embedQuery.mockResolvedValue([0.1, 0.2, 0.3]);
mockVectorStore = mock<VectorStore>();
mockVectorStore.similaritySearchVectorWithScore.mockResolvedValue([
[{ pageContent: 'test content', metadata: { test: 'metadata' } } as Document, 0.95],
[{ pageContent: 'test content 2', metadata: { test: 'metadata 2' } } as Document, 0.85],
]);
mockArgs = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Vector store for testing',
docsUrl: 'https://example.com',
icon: 'file:testIcon.svg',
operationModes: ['load', 'insert', 'retrieve', 'retrieve-as-tool', 'update'],
},
sharedFields: [],
getVectorStoreClient: jest.fn().mockResolvedValue(mockVectorStore),
populateVectorStore: jest.fn().mockResolvedValue(undefined),
releaseVectorStoreClient: jest.fn(),
};
});
describe('handleLoadOperation', () => {
it('should properly process load operation', async () => {
const result = await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(mockArgs.getVectorStoreClient).toHaveBeenCalledTimes(1);
expect(mockEmbeddings.embedQuery).toHaveBeenCalledWith('test query');
expect(mockVectorStore.similaritySearchVectorWithScore).toHaveBeenCalled();
expect(result).toHaveLength(2);
expect(result[0].json).toHaveProperty('document');
expect(result[0].json).toHaveProperty('score');
expect(mockArgs.releaseVectorStoreClient).toHaveBeenCalledWith(mockVectorStore);
});
it('should exclude metadata when includeDocumentMetadata is false', async () => {
nodeParameters.includeDocumentMetadata = false;
const result = await handleLoadOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(result[0].json.document).not.toHaveProperty('metadata');
});
});
describe('handleUpdateOperation', () => {
it('should throw error when update is not supported', async () => {
mockArgs.meta.operationModes = ['load', 'insert'];
await expect(handleUpdateOperation(mockContext, mockArgs, mockEmbeddings)).rejects.toThrow(
NodeOperationError,
);
});
});
describe('handleRetrieveOperation', () => {
it('should return vector store with log wrapper and close function', async () => {
const result = await handleRetrieveOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(result).toHaveProperty('response');
expect(result).toHaveProperty('closeFunction');
});
});
describe('handleRetrieveAsToolOperation', () => {
it('should return a tool with the correct name and description', async () => {
const result = await handleRetrieveAsToolOperation(mockContext, mockArgs, mockEmbeddings, 0);
expect(result).toHaveProperty('response');
expect(result.response).toHaveProperty('name', 'test_tool');
expect(result.response).toHaveProperty('description', 'Test tool description');
});
});
});

View File

@@ -0,0 +1,181 @@
/* eslint-disable @typescript-eslint/unbound-method */
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import { DynamicTool } from 'langchain/tools';
import type { ISupplyDataFunctions } from 'n8n-workflow';
import { logWrapper } from '@utils/logWrapper';
import type { VectorStoreNodeConstructorArgs } from '../../types';
import { handleRetrieveAsToolOperation } from '../retrieveAsToolOperation';
// Mock the helper functions
jest.mock('@utils/helpers', () => ({
getMetadataFiltersValues: jest.fn().mockReturnValue({ testFilter: 'value' }),
}));
jest.mock('@utils/logWrapper', () => ({
logWrapper: jest.fn().mockImplementation((obj) => obj),
}));
describe('handleRetrieveAsToolOperation', () => {
let mockContext: MockProxy<ISupplyDataFunctions>;
let mockEmbeddings: MockProxy<Embeddings>;
let mockVectorStore: MockProxy<VectorStore>;
let mockArgs: VectorStoreNodeConstructorArgs<VectorStore>;
let nodeParameters: Record<string, any>;
beforeEach(() => {
nodeParameters = {
toolName: 'test_knowledge_base',
toolDescription: 'Search the test knowledge base',
topK: 3,
includeDocumentMetadata: true,
};
mockContext = mock<ISupplyDataFunctions>();
mockContext.getNodeParameter.mockImplementation((parameterName, _itemIndex, fallbackValue) => {
if (typeof parameterName !== 'string') return fallbackValue;
return nodeParameters[parameterName] ?? fallbackValue;
});
mockEmbeddings = mock<Embeddings>();
mockEmbeddings.embedQuery.mockResolvedValue([0.1, 0.2, 0.3]);
mockVectorStore = mock<VectorStore>();
mockVectorStore.similaritySearchVectorWithScore.mockResolvedValue([
[{ pageContent: 'test content 1', metadata: { test: 'metadata 1' } } as Document, 0.95],
[{ pageContent: 'test content 2', metadata: { test: 'metadata 2' } } as Document, 0.85],
]);
mockArgs = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Vector store for testing',
docsUrl: 'https://example.com',
icon: 'file:testIcon.svg',
},
sharedFields: [],
getVectorStoreClient: jest.fn().mockResolvedValue(mockVectorStore),
populateVectorStore: jest.fn().mockResolvedValue(undefined),
releaseVectorStoreClient: jest.fn(),
};
});
afterEach(() => {
jest.clearAllMocks();
});
it('should create a dynamic tool with the correct name and description', async () => {
const result = (await handleRetrieveAsToolOperation(
mockContext,
mockArgs,
mockEmbeddings,
0,
)) as {
response: DynamicTool;
};
expect(result).toHaveProperty('response');
expect(result.response).toBeInstanceOf(DynamicTool);
expect(result.response.name).toBe('test_knowledge_base');
expect(result.response.description).toBe('Search the test knowledge base');
// Check logWrapper was called
expect(logWrapper).toHaveBeenCalledWith(expect.any(DynamicTool), mockContext);
});
it('should create a tool that can search the vector store', async () => {
const result = await handleRetrieveAsToolOperation(mockContext, mockArgs, mockEmbeddings, 0);
const tool = result.response as DynamicTool;
// Invoke the tool's function
const toolResult = await tool.func('test query');
// Check vector store client was initialized
expect(mockArgs.getVectorStoreClient).toHaveBeenCalledWith(
mockContext,
undefined,
mockEmbeddings,
0,
);
// Check query was embedded
expect(mockEmbeddings.embedQuery).toHaveBeenCalledWith('test query');
// Check vector store was searched
expect(mockVectorStore.similaritySearchVectorWithScore).toHaveBeenCalledWith(
[0.1, 0.2, 0.3],
3,
{ testFilter: 'value' },
);
// Check tool returns formatted results
expect(toolResult).toHaveLength(2);
expect(toolResult[0]).toHaveProperty('type', 'text');
expect(toolResult[0]).toHaveProperty('text');
// Check vector store client was released
expect(mockArgs.releaseVectorStoreClient).toHaveBeenCalledWith(mockVectorStore);
});
it('should include metadata in results when includeDocumentMetadata is true', async () => {
const result = await handleRetrieveAsToolOperation(mockContext, mockArgs, mockEmbeddings, 0);
const tool = result.response as DynamicTool;
const toolResult = await tool.func('test query');
// Parse the JSON text to verify it includes metadata
const parsedFirst = JSON.parse(toolResult[0].text);
expect(parsedFirst).toHaveProperty('pageContent', 'test content 1');
expect(parsedFirst).toHaveProperty('metadata', { test: 'metadata 1' });
});
it('should exclude metadata in results when includeDocumentMetadata is false', async () => {
nodeParameters.includeDocumentMetadata = false;
const result = await handleRetrieveAsToolOperation(mockContext, mockArgs, mockEmbeddings, 0);
const tool = result.response as DynamicTool;
const toolResult = await tool.func('test query');
// Parse the JSON text to verify it excludes metadata
const parsedFirst = JSON.parse(toolResult[0].text);
expect(parsedFirst).toHaveProperty('pageContent', 'test content 1');
expect(parsedFirst).not.toHaveProperty('metadata');
});
it('should limit results based on topK parameter', async () => {
nodeParameters.topK = 1;
const result = await handleRetrieveAsToolOperation(mockContext, mockArgs, mockEmbeddings, 0);
const tool = result.response as DynamicTool;
await tool.func('test query');
expect(mockVectorStore.similaritySearchVectorWithScore).toHaveBeenCalledWith(
expect.anything(),
1,
expect.anything(),
);
});
it('should release vector store client even if search fails', async () => {
const result = await handleRetrieveAsToolOperation(mockContext, mockArgs, mockEmbeddings, 0);
const tool = result.response as DynamicTool;
// Make the search fail
mockVectorStore.similaritySearchVectorWithScore.mockRejectedValueOnce(
new Error('Search failed'),
);
await expect(tool.func('test query')).rejects.toThrow('Search failed');
// Should still release the client
expect(mockArgs.releaseVectorStoreClient).toHaveBeenCalledWith(mockVectorStore);
});
});

View File

@@ -0,0 +1,91 @@
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import type { ISupplyDataFunctions } from 'n8n-workflow';
import { logWrapper } from '@utils/logWrapper';
import type { VectorStoreNodeConstructorArgs } from '../../types';
import { handleRetrieveOperation } from '../retrieveOperation';
// Mock helper functions
jest.mock('@utils/helpers', () => ({
getMetadataFiltersValues: jest.fn().mockReturnValue({ testFilter: 'value' }),
}));
jest.mock('@utils/logWrapper', () => ({
logWrapper: jest.fn().mockImplementation((obj) => obj),
}));
describe('handleRetrieveOperation', () => {
let mockContext: MockProxy<ISupplyDataFunctions>;
let mockEmbeddings: MockProxy<Embeddings>;
let mockVectorStore: MockProxy<VectorStore>;
let mockArgs: VectorStoreNodeConstructorArgs<VectorStore>;
beforeEach(() => {
mockContext = mock<ISupplyDataFunctions>();
mockEmbeddings = mock<Embeddings>();
mockVectorStore = mock<VectorStore>();
mockArgs = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Vector store for testing',
docsUrl: 'https://example.com',
icon: 'file:testIcon.svg',
},
sharedFields: [],
getVectorStoreClient: jest.fn().mockResolvedValue(mockVectorStore),
populateVectorStore: jest.fn().mockResolvedValue(undefined),
releaseVectorStoreClient: jest.fn(),
};
});
afterEach(() => {
jest.clearAllMocks();
});
it('should retrieve vector store with metadata filters', async () => {
const result = await handleRetrieveOperation(mockContext, mockArgs, mockEmbeddings, 0);
// Should get vector store client with filters
expect(mockArgs.getVectorStoreClient).toHaveBeenCalledWith(
mockContext,
{ testFilter: 'value' },
mockEmbeddings,
0,
);
// Result should contain vector store and close function
expect(result).toHaveProperty('response', mockVectorStore);
expect(result).toHaveProperty('closeFunction');
// Should wrap vector store with logWrapper
expect(logWrapper).toHaveBeenCalledWith(mockVectorStore, mockContext);
});
it('should create a closeFunction that releases the vector store client', async () => {
const result = await handleRetrieveOperation(mockContext, mockArgs, mockEmbeddings, 0);
// Call the closeFunction
await result.closeFunction!();
// Should release the vector store client
expect(mockArgs.releaseVectorStoreClient).toHaveBeenCalledWith(mockVectorStore);
});
it('should handle vector store client when no releaseVectorStoreClient is provided', async () => {
// Remove releaseVectorStoreClient method
mockArgs.releaseVectorStoreClient = undefined;
const result = await handleRetrieveOperation(mockContext, mockArgs, mockEmbeddings, 0);
// Call the closeFunction - should not throw error even with no release method
await expect(result.closeFunction!()).resolves.not.toThrow();
});
});

View File

@@ -0,0 +1,175 @@
/* eslint-disable @typescript-eslint/unbound-method */
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { logAiEvent } from '@utils/helpers';
import type { VectorStoreNodeConstructorArgs } from '../../types';
import { isUpdateSupported } from '../../utils';
import { handleUpdateOperation } from '../updateOperation';
// Mock dependencies
jest.mock('../../utils', () => ({
isUpdateSupported: jest.fn(),
}));
jest.mock('@utils/helpers', () => ({
logAiEvent: jest.fn(),
}));
jest.mock('../../../processDocuments', () => ({
processDocument: jest.fn().mockImplementation((_documentInput, _itemData, itemIndex) => {
const mockProcessed = [
{
pageContent: `updated content ${itemIndex}`,
metadata: { source: 'test-update' },
} as Document,
];
const mockSerialized = [
{
json: {
pageContent: `updated content ${itemIndex}`,
metadata: { source: 'test-update' },
},
pairedItem: { item: itemIndex },
},
];
return {
processedDocuments: mockProcessed,
serializedDocuments: mockSerialized,
};
}),
}));
describe('handleUpdateOperation', () => {
let mockContext: MockProxy<IExecuteFunctions>;
let mockEmbeddings: MockProxy<Embeddings>;
let mockVectorStore: MockProxy<VectorStore>;
let mockArgs: VectorStoreNodeConstructorArgs<VectorStore>;
let mockInputItems: INodeExecutionData[];
beforeEach(() => {
// Mock isUpdateSupported to return true by default
(isUpdateSupported as jest.Mock).mockReturnValue(true);
// Mock input items
mockInputItems = [{ json: { text: 'test document 1' } }, { json: { text: 'test document 2' } }];
// Setup context mock
mockContext = mock<IExecuteFunctions>();
mockContext.getInputData.mockReturnValue(mockInputItems);
mockContext.getNodeParameter.mockImplementation((paramName, itemIndex) => {
if (paramName === 'id') {
return `doc-id-${itemIndex}`;
}
return undefined;
});
// Setup embeddings mock
mockEmbeddings = mock<Embeddings>();
// Setup vector store mock
mockVectorStore = mock<VectorStore>();
mockVectorStore.addDocuments.mockResolvedValue(undefined);
// Setup args mock
mockArgs = {
meta: {
displayName: 'Test Vector Store',
name: 'testVectorStore',
description: 'Vector store for testing',
docsUrl: 'https://example.com',
icon: 'file:testIcon.svg',
operationModes: ['load', 'insert', 'retrieve', 'retrieve-as-tool', 'update'],
},
sharedFields: [],
getVectorStoreClient: jest.fn().mockResolvedValue(mockVectorStore),
populateVectorStore: jest.fn().mockResolvedValue(undefined),
releaseVectorStoreClient: jest.fn(),
};
});
afterEach(() => {
jest.clearAllMocks();
});
it('should throw error if update is not supported', async () => {
// Mock isUpdateSupported to return false
(isUpdateSupported as jest.Mock).mockReturnValue(false);
await expect(handleUpdateOperation(mockContext, mockArgs, mockEmbeddings)).rejects.toThrow(
NodeOperationError,
);
expect(mockArgs.getVectorStoreClient).not.toHaveBeenCalled();
});
it('should update documents with their IDs', async () => {
const result = await handleUpdateOperation(mockContext, mockArgs, mockEmbeddings);
// Should process all items
expect(result).toHaveLength(2);
// Should get vector store client for each item
expect(mockArgs.getVectorStoreClient).toHaveBeenCalledTimes(2);
// Should call addDocuments with documents and IDs
expect(mockVectorStore.addDocuments).toHaveBeenCalledTimes(2);
// First call should use doc-id-0
expect(mockVectorStore.addDocuments).toHaveBeenNthCalledWith(
1,
[expect.objectContaining({ pageContent: 'updated content 0' })],
{ ids: ['doc-id-0'] },
);
// Second call should use doc-id-1
expect(mockVectorStore.addDocuments).toHaveBeenNthCalledWith(
2,
[expect.objectContaining({ pageContent: 'updated content 1' })],
{ ids: ['doc-id-1'] },
);
// Should log AI event for each update
expect(logAiEvent).toHaveBeenCalledTimes(2);
expect(logAiEvent).toHaveBeenCalledWith(mockContext, 'ai-vector-store-updated');
});
it('should release vector store client even if update fails', async () => {
// Mock addDocuments to fail
mockVectorStore.addDocuments.mockRejectedValue(new Error('Update failed'));
await expect(handleUpdateOperation(mockContext, mockArgs, mockEmbeddings)).rejects.toThrow(
'Update failed',
);
// Should still release the client
expect(mockArgs.releaseVectorStoreClient).toHaveBeenCalledWith(mockVectorStore);
});
it('should use proper document ID from node parameters', async () => {
// Setup custom document IDs
mockContext.getNodeParameter
.mockReturnValueOnce('custom-id-123')
.mockReturnValueOnce('custom-id-456');
await handleUpdateOperation(mockContext, mockArgs, mockEmbeddings);
// First call should use custom-id-123
expect(mockVectorStore.addDocuments).toHaveBeenNthCalledWith(1, expect.anything(), {
ids: ['custom-id-123'],
});
// Second call should use custom-id-456
expect(mockVectorStore.addDocuments).toHaveBeenNthCalledWith(2, expect.anything(), {
ids: ['custom-id-456'],
});
});
});

View File

@@ -0,0 +1,5 @@
export * from './loadOperation';
export * from './insertOperation';
export * from './updateOperation';
export * from './retrieveOperation';
export * from './retrieveAsToolOperation';

View File

@@ -0,0 +1,78 @@
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { logAiEvent } from '@utils/helpers';
import type { N8nBinaryLoader } from '@utils/N8nBinaryLoader';
import type { N8nJsonLoader } from '@utils/N8nJsonLoader';
import { processDocument } from '../../processDocuments';
import type { VectorStoreNodeConstructorArgs } from '../types';
/**
* Handles the 'insert' operation mode
* Inserts documents from the input into the vector store
*/
export async function handleInsertOperation<T extends VectorStore = VectorStore>(
context: IExecuteFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings,
): Promise<INodeExecutionData[]> {
const nodeVersion = context.getNode().typeVersion;
// Get the input items and document data
const items = context.getInputData();
const documentInput = (await context.getInputConnectionData(NodeConnectionType.AiDocument, 0)) as
| N8nJsonLoader
| N8nBinaryLoader
| Array<Document<Record<string, unknown>>>;
const resultData: INodeExecutionData[] = [];
const documentsForEmbedding: Array<Document<Record<string, unknown>>> = [];
// Process each input item
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
// Check if execution is being cancelled
if (context.getExecutionCancelSignal()?.aborted) {
break;
}
const itemData = items[itemIndex];
// Process the document from the input
const processedDocuments = await processDocument(documentInput, itemData, itemIndex);
// Add the serialized documents to the result
resultData.push(...processedDocuments.serializedDocuments);
// Add the processed documents to the documents to embedd
documentsForEmbedding.push(...processedDocuments.processedDocuments);
// For the version 1, we run the populateVectorStore(embedding and insert) function for each item
if (nodeVersion === 1) {
await args.populateVectorStore(
context,
embeddings,
processedDocuments.processedDocuments,
itemIndex,
);
}
// Log the AI event for analytics
logAiEvent(context, 'ai-vector-store-populated');
}
// For the version 1.1, we run the populateVectorStore in batches
if (nodeVersion >= 1.1) {
const embeddingBatchSize =
(context.getNodeParameter('embeddingBatchSize', 0, 200) as number) ?? 200;
// Populate the vector store with the processed documents in batches
for (let i = 0; i < documentsForEmbedding.length; i += embeddingBatchSize) {
const nextBatch = documentsForEmbedding.slice(i, i + embeddingBatchSize);
await args.populateVectorStore(context, embeddings, nextBatch, 0);
}
}
return resultData;
}

View File

@@ -0,0 +1,67 @@
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { getMetadataFiltersValues, logAiEvent } from '@utils/helpers';
import type { VectorStoreNodeConstructorArgs } from '../types';
/**
* Handles the 'load' operation mode
* Searches the vector store for documents similar to a query
*/
export async function handleLoadOperation<T extends VectorStore = VectorStore>(
context: IExecuteFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings,
itemIndex: number,
): Promise<INodeExecutionData[]> {
const filter = getMetadataFiltersValues(context, itemIndex);
const vectorStore = await args.getVectorStoreClient(
context,
// We'll pass filter to similaritySearchVectorWithScore instead of getVectorStoreClient
undefined,
embeddings,
itemIndex,
);
try {
// Get the search parameters from the node
const prompt = context.getNodeParameter('prompt', itemIndex) as string;
const topK = context.getNodeParameter('topK', itemIndex, 4) as number;
const includeDocumentMetadata = context.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;
// Embed the prompt to prepare for vector similarity search
const embeddedPrompt = await embeddings.embedQuery(prompt);
// Get the most similar documents to the embedded prompt
const docs = await vectorStore.similaritySearchVectorWithScore(embeddedPrompt, topK, filter);
// Format the documents for the output
const serializedDocs = docs.map(([doc, score]) => {
const document = {
pageContent: doc.pageContent,
...(includeDocumentMetadata ? { metadata: doc.metadata } : {}),
};
return {
json: { document, score },
pairedItem: {
item: itemIndex,
},
};
});
// Log the AI event for analytics
logAiEvent(context, 'ai-vector-store-searched', { query: prompt });
return serializedDocs;
} finally {
// Release the vector store client if a release method was provided
args.releaseVectorStoreClient?.(vectorStore);
}
}

View File

@@ -0,0 +1,83 @@
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import { DynamicTool } from 'langchain/tools';
import type { ISupplyDataFunctions, SupplyData } from 'n8n-workflow';
import { getMetadataFiltersValues } from '@utils/helpers';
import { logWrapper } from '@utils/logWrapper';
import type { VectorStoreNodeConstructorArgs } from '../types';
/**
* Handles the 'retrieve-as-tool' operation mode
* Returns a tool that can be used with AI Agent nodes
*/
export async function handleRetrieveAsToolOperation<T extends VectorStore = VectorStore>(
context: ISupplyDataFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings,
itemIndex: number,
): Promise<SupplyData> {
// Get the tool configuration parameters
const toolDescription = context.getNodeParameter('toolDescription', itemIndex) as string;
const toolName = context.getNodeParameter('toolName', itemIndex) as string;
const topK = context.getNodeParameter('topK', itemIndex, 4) as number;
const includeDocumentMetadata = context.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;
// Get metadata filters
const filter = getMetadataFiltersValues(context, itemIndex);
// Create a Dynamic Tool that wraps vector store search functionality
const vectorStoreTool = new DynamicTool({
name: toolName,
description: toolDescription,
func: async (input) => {
// For each tool use, get a fresh vector store client.
// We don't pass in a filter here only later in the similaritySearchVectorWithScore
// method to avoid an exception with some vector stores like Supabase or Pinecone(#AI-740)
const vectorStore = await args.getVectorStoreClient(
context,
undefined,
embeddings,
itemIndex,
);
try {
// Embed the input query
const embeddedPrompt = await embeddings.embedQuery(input);
// Search for similar documents
const documents = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
// Format the documents for the tool output
return documents
.map((document) => {
if (includeDocumentMetadata) {
return { type: 'text', text: JSON.stringify(document[0]) };
}
return {
type: 'text',
text: JSON.stringify({ pageContent: document[0].pageContent }),
};
})
.filter((document) => !!document);
} finally {
// Release the vector store client if a release method was provided
args.releaseVectorStoreClient?.(vectorStore);
}
},
});
// Return the vector store tool with logging wrapper
return {
response: logWrapper(vectorStoreTool, context),
};
}

View File

@@ -0,0 +1,34 @@
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { ISupplyDataFunctions, SupplyData } from 'n8n-workflow';
import { getMetadataFiltersValues } from '@utils/helpers';
import { logWrapper } from '@utils/logWrapper';
import type { VectorStoreNodeConstructorArgs } from '../types';
/**
* Handles the 'retrieve' operation mode
* Returns the vector store to be used with AI nodes
*/
export async function handleRetrieveOperation<T extends VectorStore = VectorStore>(
context: ISupplyDataFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings,
itemIndex: number,
): Promise<SupplyData> {
// Get metadata filters
const filter = getMetadataFiltersValues(context, itemIndex);
// Get the vector store client
const vectorStore = await args.getVectorStoreClient(context, filter, embeddings, itemIndex);
// Return the vector store with logging wrapper and cleanup function
return {
response: logWrapper(vectorStore, context),
closeFunction: async () => {
// Release the vector store client if a release method was provided
args.releaseVectorStoreClient?.(vectorStore);
},
};
}

View File

@@ -0,0 +1,79 @@
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { logAiEvent } from '@utils/helpers';
import { N8nJsonLoader } from '@utils/N8nJsonLoader';
import { processDocument } from '../../processDocuments';
import type { VectorStoreNodeConstructorArgs } from '../types';
import { isUpdateSupported } from '../utils';
/**
* Handles the 'update' operation mode
* Updates existing documents in the vector store by ID
*/
export async function handleUpdateOperation<T extends VectorStore = VectorStore>(
context: IExecuteFunctions,
args: VectorStoreNodeConstructorArgs<T>,
embeddings: Embeddings,
): Promise<INodeExecutionData[]> {
// First check if update operation is supported by this vector store
if (!isUpdateSupported(args)) {
throw new NodeOperationError(
context.getNode(),
'Update operation is not implemented for this Vector Store',
);
}
// Get input items
const items = context.getInputData();
// Create a loader for processing document data
const loader = new N8nJsonLoader(context);
const resultData: INodeExecutionData[] = [];
// Process each input item
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const itemData = items[itemIndex];
// Get the document ID to update
const documentId = context.getNodeParameter('id', itemIndex, '', {
extractValue: true,
}) as string;
// Get the vector store client
const vectorStore = await args.getVectorStoreClient(context, undefined, embeddings, itemIndex);
try {
// Process the document from the input
const { processedDocuments, serializedDocuments } = await processDocument(
loader,
itemData,
itemIndex,
);
// Validate that we have exactly one document to update
if (processedDocuments?.length !== 1) {
throw new NodeOperationError(context.getNode(), 'Single document per item expected');
}
// Add the serialized document to the result
resultData.push(...serializedDocuments);
// Use document ID to update the existing document
await vectorStore.addDocuments(processedDocuments, {
ids: [documentId],
});
// Log the AI event for analytics
logAiEvent(context, 'ai-vector-store-updated');
} finally {
// Release the vector store client if a release method was provided
args.releaseVectorStoreClient?.(vectorStore);
}
}
return resultData;
}

View File

@@ -0,0 +1,73 @@
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import type {
IExecuteFunctions,
INodeCredentialDescription,
INodeProperties,
ILoadOptionsFunctions,
INodeListSearchResult,
Icon,
ISupplyDataFunctions,
ThemeIconColor,
} from 'n8n-workflow';
export type NodeOperationMode = 'insert' | 'load' | 'retrieve' | 'update' | 'retrieve-as-tool';
export interface NodeMeta {
displayName: string;
name: string;
description: string;
docsUrl: string;
icon: Icon;
iconColor?: ThemeIconColor;
credentials?: INodeCredentialDescription[];
operationModes?: NodeOperationMode[];
}
export interface VectorStoreNodeConstructorArgs<T extends VectorStore = VectorStore> {
meta: NodeMeta;
methods?: {
listSearch?: {
[key: string]: (
this: ILoadOptionsFunctions,
filter?: string,
paginationToken?: string,
) => Promise<INodeListSearchResult>;
};
};
sharedFields: INodeProperties[];
insertFields?: INodeProperties[];
loadFields?: INodeProperties[];
retrieveFields?: INodeProperties[];
updateFields?: INodeProperties[];
/**
* Function to populate the vector store with documents
* Used during the 'insert' operation mode
*/
populateVectorStore: (
context: IExecuteFunctions | ISupplyDataFunctions,
embeddings: Embeddings,
documents: Array<Document<Record<string, unknown>>>,
itemIndex: number,
) => Promise<void>;
/**
* Function to get the vector store client
* This function is called for all operation modes
*/
getVectorStoreClient: (
context: IExecuteFunctions | ISupplyDataFunctions,
filter: Record<string, never> | undefined,
embeddings: Embeddings,
itemIndex: number,
) => Promise<T>;
/**
* Optional function to release resources associated with the vector store client
* Called after the vector store operations are complete
*/
releaseVectorStoreClient?: (vectorStore: T) => void;
}

View File

@@ -0,0 +1,43 @@
import type { VectorStore } from '@langchain/core/vectorstores';
import type { INodeProperties, INodePropertyOptions } from 'n8n-workflow';
import { DEFAULT_OPERATION_MODES, OPERATION_MODE_DESCRIPTIONS } from './constants';
import type { NodeOperationMode, VectorStoreNodeConstructorArgs } from './types';
/**
* Transforms field descriptions to show only for specific operation modes
* This function adds displayOptions to each field to make it appear only for specified modes
*/
export function transformDescriptionForOperationMode(
fields: INodeProperties[],
mode: NodeOperationMode | NodeOperationMode[],
): INodeProperties[] {
return fields.map((field) => ({
...field,
displayOptions: { show: { mode: Array.isArray(mode) ? mode : [mode] } },
}));
}
/**
* Checks if the update operation is supported for a specific vector store
* A vector store supports updates if it explicitly includes 'update' in its operationModes
*/
export function isUpdateSupported<T extends VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
): boolean {
return args.meta.operationModes?.includes('update') ?? false;
}
/**
* Returns the operation mode options enabled for a specific vector store
* Filters the full list of operation modes based on what's enabled for this vector store
*/
export function getOperationModeOptions<T extends VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
): INodePropertyOptions[] {
const enabledOperationModes = args.meta.operationModes ?? DEFAULT_OPERATION_MODES;
return OPERATION_MODE_DESCRIPTIONS.filter(({ value }) =>
enabledOperationModes.includes(value as NodeOperationMode),
);
}

View File

@@ -14,6 +14,18 @@ import {
defaultNodeDescriptions,
} from '@/__tests__/mocks';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import * as lodash from 'lodash-es';
vi.mock('lodash-es', async () => {
const actual = await vi.importActual('lodash-es');
return {
...actual,
debounce: vi.fn((fn) => {
// Return a function that immediately calls the provided function
return (...args: unknown[]) => fn(...args);
}),
};
});
const renderComponent = createComponentRenderer(WorkflowCanvas, {
props: {
@@ -143,4 +155,52 @@ describe('WorkflowCanvas', () => {
expect(container.querySelector(`[data-id="${nodes[0].id}"]`)).toBeInTheDocument();
expect(container.querySelector(`[data-id="${fallbackNodes[0].id}"]`)).not.toBeInTheDocument();
});
describe('debouncing behavior', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('should initialize debounced watchers on component mount', async () => {
renderComponent();
expect(lodash.debounce).toHaveBeenCalledTimes(3);
});
it('should configure debouncing with no delay when not executing', async () => {
renderComponent({
props: {
executing: false,
},
});
expect(lodash.debounce).toHaveBeenCalledTimes(3);
// Find calls related to our specific debouncing logic
const calls = vi.mocked(lodash.debounce).mock.calls;
const nonExecutingCalls = calls.filter((call) => call[1] === 0 && call[2]?.maxWait === 0);
expect(nonExecutingCalls.length).toBeGreaterThanOrEqual(2);
expect(nonExecutingCalls[0][1]).toBe(0);
expect(nonExecutingCalls[0][2]).toEqual({ maxWait: 0 });
});
it('should configure debouncing with delay when executing', async () => {
renderComponent({
props: {
executing: true,
},
});
expect(lodash.debounce).toHaveBeenCalledTimes(3);
// Find calls related to our specific debouncing logic
const calls = vi.mocked(lodash.debounce).mock.calls;
const executingCalls = calls.filter((call) => call[1] === 200 && call[2]?.maxWait === 50);
expect(executingCalls.length).toBeGreaterThanOrEqual(2);
expect(executingCalls[0][1]).toBe(200);
expect(executingCalls[0][2]).toEqual({ maxWait: 50 });
});
});
});

View File

@@ -1,13 +1,15 @@
<script setup lang="ts">
import Canvas from '@/components/canvas/Canvas.vue';
import { computed, ref, toRef, useCssModule } from 'vue';
import type { WatchStopHandle } from 'vue';
import { computed, ref, toRef, useCssModule, watch } from 'vue';
import type { Workflow } from 'n8n-workflow';
import type { IWorkflowDb } from '@/Interface';
import { useCanvasMapping } from '@/composables/useCanvasMapping';
import type { EventBus } from '@n8n/utils/event-bus';
import { createEventBus } from '@n8n/utils/event-bus';
import type { CanvasEventBusEvents } from '@/types';
import type { CanvasConnection, CanvasEventBusEvents, CanvasNode } from '@/types';
import { useVueFlow } from '@vue-flow/core';
import { debounce } from 'lodash-es';
defineOptions({
inheritAttrs: false,
@@ -59,6 +61,57 @@ onNodesInitialized(() => {
initialFitViewDone.value = true;
}
});
// Debounced versions of nodes and connections and watchers
const nodesDebounced = ref<CanvasNode[]>([]);
const connectionsDebounced = ref<CanvasConnection[]>([]);
const debounceNodesWatcher = ref<WatchStopHandle>();
const debounceConnectionsWatcher = ref<WatchStopHandle>();
// Update debounce watchers when execution state changes
watch(() => props.executing, setupDebouncedWatchers, { immediate: true });
/**
* Sets up debounced watchers for nodes and connections
* Uses different debounce times based on execution state:
* - During execution: Debounce updates to reduce performance impact for large number of nodes/items
* - Otherwise: Update immediately
*/
function setupDebouncedWatchers() {
// Clear existing watchers if they exist
debounceNodesWatcher.value?.();
debounceConnectionsWatcher.value?.();
// Configure debounce parameters based on execution state
const debounceTime = props.executing ? 200 : 0;
const maxWait = props.executing ? 50 : 0;
// Set up debounced watcher for nodes
debounceNodesWatcher.value = watch(
mappedNodes,
debounce(
(value) => {
nodesDebounced.value = value;
},
debounceTime,
{ maxWait },
),
{ immediate: true, deep: true },
);
// Set up debounced watcher for connections
debounceConnectionsWatcher.value = watch(
mappedConnections,
debounce(
(value) => {
connectionsDebounced.value = value;
},
debounceTime,
{ maxWait },
),
{ immediate: true, deep: true },
);
}
</script>
<template>
@@ -67,8 +120,8 @@ onNodesInitialized(() => {
<Canvas
v-if="workflow"
:id="id"
:nodes="mappedNodes"
:connections="mappedConnections"
:nodes="nodesDebounced"
:connections="connectionsDebounced"
:event-bus="eventBus"
:read-only="readOnly"
v-bind="$attrs"