mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
feat(Simple Vector Store Node): Implement store cleaning based on age/used memory (#13986)
This commit is contained in:
@@ -0,0 +1,89 @@
|
||||
import type { Document } from '@langchain/core/documents';
|
||||
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
|
||||
import type { IMemoryCalculator } from './types';
|
||||
|
||||
// Memory estimation constants
|
||||
const FLOAT_SIZE_BYTES = 8; // Size of a float64 in bytes
|
||||
const CHAR_SIZE_BYTES = 2; // Size of a JavaScript character in bytes(2 bytes per character in UTF-16)
|
||||
const VECTOR_OVERHEAD_BYTES = 200; // Estimated overhead per vector
|
||||
const EMBEDDING_DIMENSIONS = 1536; // Fixed embedding dimensions
|
||||
const EMBEDDING_SIZE_BYTES = EMBEDDING_DIMENSIONS * FLOAT_SIZE_BYTES;
|
||||
const AVG_METADATA_SIZE_BYTES = 100; // Average size for simple metadata
|
||||
|
||||
/**
|
||||
* Calculates memory usage for vector stores and documents
|
||||
*/
|
||||
export class MemoryCalculator implements IMemoryCalculator {
|
||||
/**
|
||||
* Fast batch size estimation for multiple documents
|
||||
*/
|
||||
estimateBatchSize(documents: Document[]): number {
|
||||
if (documents.length === 0) return 0;
|
||||
|
||||
let totalContentSize = 0;
|
||||
let totalMetadataSize = 0;
|
||||
|
||||
// Single pass through documents for content and metadata estimation
|
||||
for (const doc of documents) {
|
||||
if (doc.pageContent) {
|
||||
totalContentSize += doc.pageContent.length * CHAR_SIZE_BYTES;
|
||||
}
|
||||
|
||||
// Metadata size estimation
|
||||
if (doc.metadata) {
|
||||
// For simple objects, estimate based on key count
|
||||
const metadataKeys = Object.keys(doc.metadata).length;
|
||||
if (metadataKeys > 0) {
|
||||
// For each key, estimate the key name plus a typical value
|
||||
// plus some overhead for object structure
|
||||
totalMetadataSize += metadataKeys * AVG_METADATA_SIZE_BYTES;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fixed size components (embedding vectors and overhead)
|
||||
// Each embedding is a fixed-size array of floating point numbers
|
||||
const embeddingSize = documents.length * EMBEDDING_SIZE_BYTES;
|
||||
|
||||
// Object overhead, each vector is stored with additional JS object structure
|
||||
const overhead = documents.length * VECTOR_OVERHEAD_BYTES;
|
||||
|
||||
// Calculate total batch size with a safety factor to avoid underestimation
|
||||
const calculatedSize = totalContentSize + totalMetadataSize + embeddingSize + overhead;
|
||||
|
||||
return Math.ceil(calculatedSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the size of a vector store by examining its contents
|
||||
*/
|
||||
calculateVectorStoreSize(vectorStore: MemoryVectorStore): number {
|
||||
if (!vectorStore.memoryVectors || vectorStore.memoryVectors.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let storeSize = 0;
|
||||
|
||||
// Calculate size of each vector
|
||||
for (const vector of vectorStore.memoryVectors) {
|
||||
// Size of embedding (float64 array)
|
||||
storeSize += vector.embedding.length * FLOAT_SIZE_BYTES;
|
||||
|
||||
// Size of content string (2 bytes per character in JS)
|
||||
storeSize += vector.content ? vector.content.length * CHAR_SIZE_BYTES : 0;
|
||||
|
||||
// Estimate metadata size
|
||||
if (vector.metadata) {
|
||||
// Use a more accurate calculation for metadata
|
||||
const metadataStr = JSON.stringify(vector.metadata);
|
||||
storeSize += metadataStr.length * CHAR_SIZE_BYTES;
|
||||
}
|
||||
|
||||
// Add overhead for object structure
|
||||
storeSize += VECTOR_OVERHEAD_BYTES;
|
||||
}
|
||||
|
||||
return Math.ceil(storeSize);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,311 @@
|
||||
import type { Document } from '@langchain/core/documents';
|
||||
import type { Embeddings } from '@langchain/core/embeddings';
|
||||
import { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
import type { Logger } from 'n8n-workflow';
|
||||
|
||||
import { getConfig, mbToBytes, hoursToMs } from './config';
|
||||
import { MemoryCalculator } from './MemoryCalculator';
|
||||
import { StoreCleanupService } from './StoreCleanupService';
|
||||
import type { VectorStoreMetadata, VectorStoreStats } from './types';
|
||||
|
||||
/**
|
||||
* Manages in-memory vector stores with memory limits and auto-cleanup
|
||||
*/
|
||||
export class MemoryVectorStoreManager {
|
||||
private static instance: MemoryVectorStoreManager | null = null;
|
||||
|
||||
// Storage
|
||||
protected vectorStoreBuffer: Map<string, MemoryVectorStore>;
|
||||
|
||||
protected storeMetadata: Map<string, VectorStoreMetadata>;
|
||||
|
||||
protected memoryUsageBytes: number = 0;
|
||||
|
||||
// Dependencies
|
||||
protected memoryCalculator: MemoryCalculator;
|
||||
|
||||
protected cleanupService: StoreCleanupService;
|
||||
|
||||
protected static logger: Logger;
|
||||
|
||||
// Config values
|
||||
protected maxMemorySizeBytes: number;
|
||||
|
||||
protected inactiveTtlMs: number;
|
||||
|
||||
// Inactive TTL cleanup timer
|
||||
protected ttlCleanupIntervalId: NodeJS.Timeout | null = null;
|
||||
|
||||
protected constructor(
|
||||
protected embeddings: Embeddings,
|
||||
protected logger: Logger,
|
||||
) {
|
||||
// Initialize storage
|
||||
this.vectorStoreBuffer = new Map();
|
||||
this.storeMetadata = new Map();
|
||||
this.logger = logger;
|
||||
|
||||
const config = getConfig();
|
||||
this.maxMemorySizeBytes = mbToBytes(config.maxMemoryMB);
|
||||
this.inactiveTtlMs = hoursToMs(config.ttlHours);
|
||||
|
||||
// Initialize services
|
||||
this.memoryCalculator = new MemoryCalculator();
|
||||
this.cleanupService = new StoreCleanupService(
|
||||
this.maxMemorySizeBytes,
|
||||
this.inactiveTtlMs,
|
||||
this.vectorStoreBuffer,
|
||||
this.storeMetadata,
|
||||
this.handleCleanup.bind(this),
|
||||
);
|
||||
|
||||
this.setupTtlCleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get singleton instance
|
||||
*/
|
||||
static getInstance(embeddings: Embeddings, logger: Logger): MemoryVectorStoreManager {
|
||||
if (!MemoryVectorStoreManager.instance) {
|
||||
MemoryVectorStoreManager.instance = new MemoryVectorStoreManager(embeddings, logger);
|
||||
} else {
|
||||
// We need to update the embeddings in the existing instance.
|
||||
// This is important as embeddings instance is wrapped in a logWrapper,
|
||||
// which relies on supplyDataFunctions context which changes on each workflow run
|
||||
MemoryVectorStoreManager.instance.embeddings = embeddings;
|
||||
MemoryVectorStoreManager.instance.vectorStoreBuffer.forEach((vectorStoreInstance) => {
|
||||
vectorStoreInstance.embeddings = embeddings;
|
||||
});
|
||||
}
|
||||
|
||||
return MemoryVectorStoreManager.instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up timer for TTL-based cleanup
|
||||
*/
|
||||
private setupTtlCleanup(): void {
|
||||
// Skip setup if TTL is disabled
|
||||
if (this.inactiveTtlMs <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Cleanup check interval (run every hour)
|
||||
const CLEANUP_INTERVAL_MS = 60 * 60 * 1000;
|
||||
|
||||
// Clear any existing interval
|
||||
if (this.ttlCleanupIntervalId) {
|
||||
clearInterval(this.ttlCleanupIntervalId);
|
||||
}
|
||||
|
||||
// Setup new interval for TTL cleanup
|
||||
this.ttlCleanupIntervalId = setInterval(() => {
|
||||
this.cleanupService.cleanupInactiveStores();
|
||||
}, CLEANUP_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle cleanup events from the cleanup service
|
||||
*/
|
||||
private handleCleanup(removedKeys: string[], freedBytes: number, reason: 'ttl' | 'memory'): void {
|
||||
// Update total memory usage
|
||||
this.memoryUsageBytes -= freedBytes;
|
||||
|
||||
// Log cleanup event
|
||||
if (reason === 'ttl') {
|
||||
const ttlHours = Math.round(this.inactiveTtlMs / (60 * 60 * 1000));
|
||||
this.logger.info(
|
||||
`TTL cleanup: removed ${removedKeys.length} inactive vector stores (${ttlHours}h TTL) to free ${Math.round(freedBytes / (1024 * 1024))}MB of memory`,
|
||||
);
|
||||
} else {
|
||||
this.logger.info(
|
||||
`Memory cleanup: removed ${removedKeys.length} oldest vector stores to free ${Math.round(freedBytes / (1024 * 1024))}MB of memory`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a vector store by key
|
||||
*/
|
||||
async getVectorStore(memoryKey: string): Promise<MemoryVectorStore> {
|
||||
let vectorStoreInstance = this.vectorStoreBuffer.get(memoryKey);
|
||||
|
||||
if (!vectorStoreInstance) {
|
||||
vectorStoreInstance = await MemoryVectorStore.fromExistingIndex(this.embeddings);
|
||||
this.vectorStoreBuffer.set(memoryKey, vectorStoreInstance);
|
||||
|
||||
this.storeMetadata.set(memoryKey, {
|
||||
size: 0,
|
||||
createdAt: new Date(),
|
||||
lastAccessed: new Date(),
|
||||
});
|
||||
} else {
|
||||
const metadata = this.storeMetadata.get(memoryKey);
|
||||
if (metadata) {
|
||||
metadata.lastAccessed = new Date();
|
||||
}
|
||||
}
|
||||
|
||||
return vectorStoreInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset a store's metadata when it's cleared
|
||||
*/
|
||||
protected clearStoreMetadata(memoryKey: string): void {
|
||||
const metadata = this.storeMetadata.get(memoryKey);
|
||||
if (metadata) {
|
||||
this.memoryUsageBytes -= metadata.size;
|
||||
metadata.size = 0;
|
||||
metadata.lastAccessed = new Date();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get memory usage in bytes
|
||||
*/
|
||||
getMemoryUsage(): number {
|
||||
return this.memoryUsageBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get memory usage as a formatted string (MB)
|
||||
*/
|
||||
getMemoryUsageFormatted(): string {
|
||||
return `${Math.round(this.memoryUsageBytes / (1024 * 1024))}MB`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recalculate memory usage from actual vector store contents
|
||||
* This ensures tracking accuracy for large stores
|
||||
*/
|
||||
recalculateMemoryUsage(): void {
|
||||
this.memoryUsageBytes = 0;
|
||||
|
||||
// Recalculate for each store
|
||||
for (const [key, vectorStore] of this.vectorStoreBuffer.entries()) {
|
||||
const storeSize = this.memoryCalculator.calculateVectorStoreSize(vectorStore);
|
||||
|
||||
// Update metadata
|
||||
const metadata = this.storeMetadata.get(key);
|
||||
if (metadata) {
|
||||
metadata.size = storeSize;
|
||||
this.memoryUsageBytes += storeSize;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(`Recalculated vector store memory: ${this.getMemoryUsageFormatted()}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add documents to a vector store
|
||||
*/
|
||||
async addDocuments(
|
||||
memoryKey: string,
|
||||
documents: Document[],
|
||||
clearStore?: boolean,
|
||||
): Promise<void> {
|
||||
if (clearStore) {
|
||||
this.clearStoreMetadata(memoryKey);
|
||||
this.vectorStoreBuffer.delete(memoryKey);
|
||||
}
|
||||
|
||||
// Fast batch estimation instead of per-document calculation
|
||||
const estimatedAddedSize = this.memoryCalculator.estimateBatchSize(documents);
|
||||
|
||||
// Clean up old stores if necessary
|
||||
this.cleanupService.cleanupOldestStores(estimatedAddedSize);
|
||||
|
||||
const vectorStoreInstance = await this.getVectorStore(memoryKey);
|
||||
|
||||
// Get vector count before adding documents
|
||||
const vectorCountBefore = vectorStoreInstance.memoryVectors?.length || 0;
|
||||
|
||||
await vectorStoreInstance.addDocuments(documents);
|
||||
|
||||
// Update store metadata and memory tracking
|
||||
const metadata = this.storeMetadata.get(memoryKey);
|
||||
if (metadata) {
|
||||
metadata.size += estimatedAddedSize;
|
||||
metadata.lastAccessed = new Date();
|
||||
this.memoryUsageBytes += estimatedAddedSize;
|
||||
}
|
||||
|
||||
// Get updated vector count
|
||||
const vectorCount = vectorStoreInstance.memoryVectors?.length || 0;
|
||||
|
||||
// Periodically recalculate actual memory usage to avoid drift
|
||||
if (
|
||||
(vectorCount > 0 && vectorCount % 100 === 0) ||
|
||||
documents.length > 20 ||
|
||||
(vectorCountBefore === 0 && vectorCount > 0)
|
||||
) {
|
||||
this.recalculateMemoryUsage();
|
||||
}
|
||||
|
||||
// Logging memory usage
|
||||
const maxMemoryMB =
|
||||
this.maxMemorySizeBytes > 0
|
||||
? (this.maxMemorySizeBytes / (1024 * 1024)).toFixed(0)
|
||||
: 'unlimited';
|
||||
|
||||
this.logger.debug(
|
||||
`Vector store memory: ${this.getMemoryUsageFormatted()}/${maxMemoryMB}MB (${vectorCount} vectors in ${this.vectorStoreBuffer.size} stores)`,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics about the vector store memory usage
|
||||
*/
|
||||
getStats(): VectorStoreStats {
|
||||
const now = Date.now();
|
||||
let inactiveStoreCount = 0;
|
||||
|
||||
// Always recalculate when getting stats to ensure accuracy
|
||||
this.recalculateMemoryUsage();
|
||||
|
||||
const stats: VectorStoreStats = {
|
||||
totalSizeBytes: this.memoryUsageBytes,
|
||||
totalSizeMB: Math.round((this.memoryUsageBytes / (1024 * 1024)) * 100) / 100,
|
||||
percentOfLimit:
|
||||
this.maxMemorySizeBytes > 0
|
||||
? Math.round((this.memoryUsageBytes / this.maxMemorySizeBytes) * 100)
|
||||
: 0,
|
||||
maxMemoryMB: this.maxMemorySizeBytes > 0 ? this.maxMemorySizeBytes / (1024 * 1024) : -1, // -1 indicates unlimited
|
||||
storeCount: this.vectorStoreBuffer.size,
|
||||
inactiveStoreCount: 0,
|
||||
ttlHours: this.inactiveTtlMs > 0 ? this.inactiveTtlMs / (60 * 60 * 1000) : -1, // -1 indicates disabled
|
||||
stores: {},
|
||||
};
|
||||
|
||||
// Add stats for each store
|
||||
for (const [key, metadata] of this.storeMetadata.entries()) {
|
||||
const store = this.vectorStoreBuffer.get(key);
|
||||
|
||||
if (store) {
|
||||
const lastAccessedTime = metadata.lastAccessed.getTime();
|
||||
const inactiveTimeMs = now - lastAccessedTime;
|
||||
const isInactive = this.cleanupService.isStoreInactive(metadata);
|
||||
|
||||
if (isInactive) {
|
||||
inactiveStoreCount++;
|
||||
}
|
||||
|
||||
stats.stores[key] = {
|
||||
sizeBytes: metadata.size,
|
||||
sizeMB: Math.round((metadata.size / (1024 * 1024)) * 100) / 100,
|
||||
percentOfTotal: Math.round((metadata.size / this.memoryUsageBytes) * 100) || 0,
|
||||
vectors: store.memoryVectors?.length || 0,
|
||||
createdAt: metadata.createdAt.toISOString(),
|
||||
lastAccessed: metadata.lastAccessed.toISOString(),
|
||||
inactive: isInactive,
|
||||
inactiveForHours: Math.round(inactiveTimeMs / (60 * 60 * 1000)),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
stats.inactiveStoreCount = inactiveStoreCount;
|
||||
|
||||
return stats;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
|
||||
import type { VectorStoreMetadata, IStoreCleanupService } from './types';
|
||||
|
||||
/**
|
||||
* Service for cleaning up vector stores based on inactivity or memory pressure
|
||||
*/
|
||||
export class StoreCleanupService implements IStoreCleanupService {
|
||||
// Cache for oldest stores sorted by creation time
|
||||
private oldestStoreKeys: string[] = [];
|
||||
|
||||
private lastSortTime = 0;
|
||||
|
||||
private readonly CACHE_TTL_MS = 5000; // 5 seconds
|
||||
|
||||
constructor(
|
||||
private readonly maxMemorySizeBytes: number,
|
||||
private readonly inactiveTtlMs: number,
|
||||
private readonly vectorStores: Map<string, MemoryVectorStore>,
|
||||
private readonly storeMetadata: Map<string, VectorStoreMetadata>,
|
||||
private readonly onCleanup: (
|
||||
removedKeys: string[],
|
||||
freedBytes: number,
|
||||
reason: 'ttl' | 'memory',
|
||||
) => void,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Check if a store has been inactive for longer than the TTL
|
||||
*/
|
||||
isStoreInactive(metadata: VectorStoreMetadata): boolean {
|
||||
// If TTL is disabled, nothing is considered inactive
|
||||
if (this.inactiveTtlMs <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const lastAccessedTime = metadata.lastAccessed.getTime();
|
||||
return now - lastAccessedTime > this.inactiveTtlMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove vector stores that haven't been accessed for longer than TTL
|
||||
*/
|
||||
cleanupInactiveStores(): void {
|
||||
// Skip if TTL is disabled
|
||||
if (this.inactiveTtlMs <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
let freedBytes = 0;
|
||||
const removedStores: string[] = [];
|
||||
|
||||
// Find and remove inactive stores
|
||||
for (const [key, metadata] of this.storeMetadata.entries()) {
|
||||
if (this.isStoreInactive(metadata)) {
|
||||
// Remove this inactive store
|
||||
this.vectorStores.delete(key);
|
||||
freedBytes += metadata.size;
|
||||
removedStores.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from metadata after iteration to avoid concurrent modification
|
||||
for (const key of removedStores) {
|
||||
this.storeMetadata.delete(key);
|
||||
}
|
||||
|
||||
// Invalidate cache if we removed any stores
|
||||
if (removedStores.length > 0) {
|
||||
this.oldestStoreKeys = [];
|
||||
this.onCleanup(removedStores, freedBytes, 'ttl');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the oldest vector stores to free up memory
|
||||
*/
|
||||
cleanupOldestStores(requiredBytes: number): void {
|
||||
// Skip if memory limit is disabled
|
||||
if (this.maxMemorySizeBytes <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Calculate current total memory usage
|
||||
let currentMemoryUsage = 0;
|
||||
for (const metadata of this.storeMetadata.values()) {
|
||||
currentMemoryUsage += metadata.size;
|
||||
}
|
||||
|
||||
// First, try to clean up inactive stores
|
||||
this.cleanupInactiveStores();
|
||||
|
||||
// Recalculate memory usage after inactive cleanup
|
||||
currentMemoryUsage = 0;
|
||||
for (const metadata of this.storeMetadata.values()) {
|
||||
currentMemoryUsage += metadata.size;
|
||||
}
|
||||
|
||||
// If no more cleanup needed, return early
|
||||
if (currentMemoryUsage + requiredBytes <= this.maxMemorySizeBytes) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
|
||||
// Reuse cached ordering if available and not stale
|
||||
if (this.oldestStoreKeys.length === 0 || now - this.lastSortTime > this.CACHE_TTL_MS) {
|
||||
// Collect and sort store keys by age
|
||||
const stores: Array<[string, number]> = [];
|
||||
|
||||
for (const [key, metadata] of this.storeMetadata.entries()) {
|
||||
stores.push([key, metadata.createdAt.getTime()]);
|
||||
}
|
||||
|
||||
// Sort by creation time (oldest first)
|
||||
stores.sort((a, b) => a[1] - b[1]);
|
||||
|
||||
// Extract just the keys
|
||||
this.oldestStoreKeys = stores.map(([key]) => key);
|
||||
this.lastSortTime = now;
|
||||
}
|
||||
|
||||
let freedBytes = 0;
|
||||
const removedStores: string[] = [];
|
||||
|
||||
// Remove stores in order until we have enough space
|
||||
for (const key of this.oldestStoreKeys) {
|
||||
// Skip if store no longer exists
|
||||
if (!this.storeMetadata.has(key)) continue;
|
||||
|
||||
// Stop if we've freed enough space
|
||||
if (currentMemoryUsage - freedBytes + requiredBytes <= this.maxMemorySizeBytes) {
|
||||
break;
|
||||
}
|
||||
|
||||
const metadata = this.storeMetadata.get(key);
|
||||
if (metadata) {
|
||||
this.vectorStores.delete(key);
|
||||
freedBytes += metadata.size;
|
||||
removedStores.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from metadata after iteration to avoid concurrent modification
|
||||
for (const key of removedStores) {
|
||||
this.storeMetadata.delete(key);
|
||||
}
|
||||
|
||||
// Update our cache if we removed stores
|
||||
if (removedStores.length > 0) {
|
||||
// Filter out removed stores from cached keys
|
||||
this.oldestStoreKeys = this.oldestStoreKeys.filter((key) => !removedStores.includes(key));
|
||||
this.onCleanup(removedStores, freedBytes, 'memory');
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
import type { MemoryVectorStoreConfig } from './types';
|
||||
|
||||
// Defaults
|
||||
const DEFAULT_MAX_MEMORY_MB = -1;
|
||||
const DEFAULT_INACTIVE_TTL_HOURS = -1;
|
||||
|
||||
/**
|
||||
* Helper function to get the configuration from environment variables
|
||||
*/
|
||||
export function getConfig(): MemoryVectorStoreConfig {
|
||||
// Get memory limit from env var or use default
|
||||
let maxMemoryMB = DEFAULT_MAX_MEMORY_MB;
|
||||
if (process.env.N8N_VECTOR_STORE_MAX_MEMORY) {
|
||||
const parsed = parseInt(process.env.N8N_VECTOR_STORE_MAX_MEMORY, 10);
|
||||
if (!isNaN(parsed)) {
|
||||
maxMemoryMB = parsed;
|
||||
}
|
||||
}
|
||||
|
||||
// Get TTL from env var or use default
|
||||
let ttlHours = DEFAULT_INACTIVE_TTL_HOURS;
|
||||
if (process.env.N8N_VECTOR_STORE_TTL_HOURS) {
|
||||
const parsed = parseInt(process.env.N8N_VECTOR_STORE_TTL_HOURS, 10);
|
||||
if (!isNaN(parsed)) {
|
||||
ttlHours = parsed;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
maxMemoryMB,
|
||||
ttlHours,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert memory size from MB to bytes
|
||||
*/
|
||||
export function mbToBytes(mb: number): number {
|
||||
// -1 - "unlimited"
|
||||
if (mb <= 0) return -1;
|
||||
return mb * 1024 * 1024;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert TTL from hours to milliseconds
|
||||
*/
|
||||
export function hoursToMs(hours: number): number {
|
||||
// -1 - "disabled"
|
||||
if (hours <= 0) return -1;
|
||||
return hours * 60 * 60 * 1000;
|
||||
}
|
||||
@@ -0,0 +1,202 @@
|
||||
import { Document } from '@langchain/core/documents';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
|
||||
import { MemoryCalculator } from '../MemoryCalculator';
|
||||
|
||||
function createTestEmbedding(dimensions = 1536, initialValue = 0.1, multiplier = 1): number[] {
|
||||
return new Array(dimensions).fill(initialValue).map((value) => value * multiplier);
|
||||
}
|
||||
|
||||
describe('MemoryCalculator', () => {
|
||||
let calculator: MemoryCalculator;
|
||||
|
||||
beforeEach(() => {
|
||||
calculator = new MemoryCalculator();
|
||||
});
|
||||
|
||||
describe('estimateBatchSize', () => {
|
||||
it('should return 0 for empty document arrays', () => {
|
||||
const size = calculator.estimateBatchSize([]);
|
||||
expect(size).toBe(0);
|
||||
});
|
||||
|
||||
it('should calculate size for simple documents', () => {
|
||||
const documents = [
|
||||
new Document({ pageContent: 'Hello, world!', metadata: { simple: 'value' } }),
|
||||
];
|
||||
|
||||
const size = calculator.estimateBatchSize(documents);
|
||||
|
||||
expect(size).toBeGreaterThan(0);
|
||||
|
||||
// The size should account for the content, metadata, embedding size, and overhead
|
||||
const simpleCase = calculator.estimateBatchSize([
|
||||
new Document({ pageContent: '', metadata: {} }),
|
||||
]);
|
||||
const withContent = calculator.estimateBatchSize([
|
||||
new Document({ pageContent: 'test content', metadata: {} }),
|
||||
]);
|
||||
const withMetadata = calculator.estimateBatchSize([
|
||||
new Document({ pageContent: '', metadata: { key: 'value' } }),
|
||||
]);
|
||||
|
||||
// Content should increase size
|
||||
expect(withContent).toBeGreaterThan(simpleCase);
|
||||
|
||||
// Metadata should increase size
|
||||
expect(withMetadata).toBeGreaterThan(simpleCase);
|
||||
});
|
||||
|
||||
it('should account for content length in size calculation', () => {
|
||||
const shortDoc = new Document({
|
||||
pageContent: 'Short content',
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
const longDoc = new Document({
|
||||
pageContent: 'A'.repeat(1000),
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
const shortSize = calculator.estimateBatchSize([shortDoc]);
|
||||
const longSize = calculator.estimateBatchSize([longDoc]);
|
||||
|
||||
// Long content should result in a larger size estimate
|
||||
expect(longSize).toBeGreaterThan(shortSize);
|
||||
expect(longSize - shortSize).toBeGreaterThan(1000);
|
||||
});
|
||||
|
||||
it('should account for metadata complexity in size calculation', () => {
|
||||
const simpleMetadata = new Document({
|
||||
pageContent: '',
|
||||
metadata: { simple: 'value' },
|
||||
});
|
||||
|
||||
const complexMetadata = new Document({
|
||||
pageContent: '',
|
||||
metadata: {
|
||||
nested: {
|
||||
objects: {
|
||||
with: {
|
||||
many: {
|
||||
levels: [1, 2, 3, 4, 5],
|
||||
andArray: ['a', 'b', 'c', 'd', 'e'],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
moreKeys: 'moreValues',
|
||||
evenMore: 'data',
|
||||
},
|
||||
});
|
||||
|
||||
const simpleSize = calculator.estimateBatchSize([simpleMetadata]);
|
||||
const complexSize = calculator.estimateBatchSize([complexMetadata]);
|
||||
|
||||
// Complex metadata should result in a larger size estimate
|
||||
expect(complexSize).toBeGreaterThan(simpleSize);
|
||||
});
|
||||
|
||||
it('should scale with the number of documents', () => {
|
||||
const doc = new Document({ pageContent: 'Sample content', metadata: { key: 'value' } });
|
||||
|
||||
const singleSize = calculator.estimateBatchSize([doc]);
|
||||
const doubleSize = calculator.estimateBatchSize([doc, doc]);
|
||||
const tripleSize = calculator.estimateBatchSize([doc, doc, doc]);
|
||||
|
||||
// Size should scale roughly linearly with document count
|
||||
expect(doubleSize).toBeGreaterThan(singleSize * 1.5); // Allow for some overhead
|
||||
expect(tripleSize).toBeGreaterThan(doubleSize * 1.3); // Allow for some overhead
|
||||
});
|
||||
});
|
||||
|
||||
describe('calculateVectorStoreSize', () => {
|
||||
it('should return 0 for empty vector stores', () => {
|
||||
const mockVectorStore = mock<MemoryVectorStore>();
|
||||
|
||||
const size = calculator.calculateVectorStoreSize(mockVectorStore);
|
||||
expect(size).toBe(0);
|
||||
});
|
||||
|
||||
it('should calculate size for vector stores with content', () => {
|
||||
const mockVectorStore = mock<MemoryVectorStore>();
|
||||
mockVectorStore.memoryVectors = [
|
||||
{
|
||||
embedding: createTestEmbedding(), // Using the helper function
|
||||
content: 'Document content',
|
||||
metadata: { simple: 'value' },
|
||||
},
|
||||
];
|
||||
|
||||
const size = calculator.calculateVectorStoreSize(mockVectorStore);
|
||||
|
||||
// Size should account for the embedding, content, metadata, and overhead
|
||||
expect(size).toBeGreaterThan(1536 * 8); // At least the size of the embedding in bytes
|
||||
});
|
||||
|
||||
it('should account for vector count in size calculation', () => {
|
||||
const singleVector = mock<MemoryVectorStore>();
|
||||
singleVector.memoryVectors = [
|
||||
{
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'Content',
|
||||
metadata: {},
|
||||
},
|
||||
];
|
||||
|
||||
const multiVector = mock<MemoryVectorStore>();
|
||||
multiVector.memoryVectors = [
|
||||
{
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'Content',
|
||||
metadata: {},
|
||||
},
|
||||
{
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'Content',
|
||||
metadata: {},
|
||||
},
|
||||
{
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'Content',
|
||||
metadata: {},
|
||||
},
|
||||
];
|
||||
|
||||
const singleSize = calculator.calculateVectorStoreSize(singleVector);
|
||||
const multiSize = calculator.calculateVectorStoreSize(multiVector);
|
||||
|
||||
// Multi-vector store should be about 3x the size
|
||||
expect(multiSize).toBeGreaterThan(singleSize * 2.5);
|
||||
expect(multiSize).toBeLessThan(singleSize * 3.5);
|
||||
});
|
||||
|
||||
it('should handle vectors with no content or metadata', () => {
|
||||
const vectorStore = mock<MemoryVectorStore>();
|
||||
vectorStore.memoryVectors = [
|
||||
{
|
||||
embedding: createTestEmbedding(),
|
||||
content: '',
|
||||
metadata: {},
|
||||
},
|
||||
];
|
||||
|
||||
const size = calculator.calculateVectorStoreSize(vectorStore);
|
||||
|
||||
// Size should still be positive (at least the embedding size)
|
||||
expect(size).toBeGreaterThan(1536 * 8);
|
||||
});
|
||||
|
||||
it('should handle null or undefined vector arrays', () => {
|
||||
const nullVectorStore = mock<MemoryVectorStore>();
|
||||
nullVectorStore.memoryVectors = [];
|
||||
|
||||
const undefinedVectorStore = mock<MemoryVectorStore>();
|
||||
undefinedVectorStore.memoryVectors = [];
|
||||
|
||||
expect(calculator.calculateVectorStoreSize(nullVectorStore)).toBe(0);
|
||||
expect(calculator.calculateVectorStoreSize(undefinedVectorStore)).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,249 @@
|
||||
/* eslint-disable @typescript-eslint/dot-notation */
|
||||
import { Document } from '@langchain/core/documents';
|
||||
import type { OpenAIEmbeddings } from '@langchain/openai';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
import type { Logger } from 'n8n-workflow';
|
||||
|
||||
import * as configModule from '../config';
|
||||
import { MemoryVectorStoreManager } from '../MemoryVectorStoreManager';
|
||||
|
||||
function createTestEmbedding(dimensions = 1536, initialValue = 0.1, multiplier = 1): number[] {
|
||||
return new Array(dimensions).fill(initialValue).map((value) => value * multiplier);
|
||||
}
|
||||
|
||||
jest.mock('langchain/vectorstores/memory', () => {
|
||||
return {
|
||||
MemoryVectorStore: {
|
||||
fromExistingIndex: jest.fn().mockImplementation(() => {
|
||||
return {
|
||||
embeddings: null,
|
||||
addDocuments: jest.fn(),
|
||||
memoryVectors: [],
|
||||
};
|
||||
}),
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
describe('MemoryVectorStoreManager', () => {
|
||||
let logger: Logger;
|
||||
// Reset the singleton instance before each test
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
logger = mock<Logger>();
|
||||
MemoryVectorStoreManager['instance'] = null;
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
// Mock the config
|
||||
jest.spyOn(configModule, 'getConfig').mockReturnValue({
|
||||
maxMemoryMB: 100,
|
||||
ttlHours: 168,
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.runOnlyPendingTimers();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('should create an instance of MemoryVectorStoreManager', () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
expect(instance).toBeInstanceOf(MemoryVectorStoreManager);
|
||||
});
|
||||
|
||||
it('should return existing instance', () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance1 = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
const instance2 = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
expect(instance1).toBe(instance2);
|
||||
});
|
||||
|
||||
it('should update embeddings in existing instance', () => {
|
||||
const embeddings1 = mock<OpenAIEmbeddings>();
|
||||
const embeddings2 = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings1, logger);
|
||||
MemoryVectorStoreManager.getInstance(embeddings2, logger);
|
||||
|
||||
expect(instance['embeddings']).toBe(embeddings2);
|
||||
});
|
||||
|
||||
it('should update embeddings in existing vector store instances', async () => {
|
||||
const embeddings1 = mock<OpenAIEmbeddings>();
|
||||
const embeddings2 = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance1 = MemoryVectorStoreManager.getInstance(embeddings1, logger);
|
||||
await instance1.getVectorStore('test');
|
||||
|
||||
const instance2 = MemoryVectorStoreManager.getInstance(embeddings2, logger);
|
||||
const vectorStoreInstance2 = await instance2.getVectorStore('test');
|
||||
|
||||
expect(vectorStoreInstance2.embeddings).toBe(embeddings2);
|
||||
});
|
||||
|
||||
it('should set up the TTL cleanup interval', () => {
|
||||
jest.spyOn(global, 'setInterval');
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
|
||||
MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
expect(setInterval).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not set up the TTL cleanup interval when TTL is disabled', () => {
|
||||
jest.spyOn(configModule, 'getConfig').mockReturnValue({
|
||||
maxMemoryMB: 100,
|
||||
ttlHours: -1, // TTL disabled
|
||||
});
|
||||
|
||||
jest.spyOn(global, 'setInterval');
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
|
||||
MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
expect(setInterval).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should track memory usage when adding documents', async () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
const calculatorSpy = jest
|
||||
.spyOn(instance['memoryCalculator'], 'estimateBatchSize')
|
||||
.mockReturnValue(1024 * 1024); // Mock 1MB size
|
||||
|
||||
const documents = [new Document({ pageContent: 'test document', metadata: { test: 'value' } })];
|
||||
|
||||
await instance.addDocuments('test-key', documents);
|
||||
|
||||
expect(calculatorSpy).toHaveBeenCalledWith(documents);
|
||||
expect(instance.getMemoryUsage()).toBe(1024 * 1024); // Should be 1MB
|
||||
});
|
||||
|
||||
it('should clear store metadata when clearing store', async () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
// Directly set memory usage to 0 to start with a clean state
|
||||
instance['memoryUsageBytes'] = 0;
|
||||
|
||||
// Add documents to create a store
|
||||
const docs = [new Document({ pageContent: 'test', metadata: {} })];
|
||||
jest.spyOn(instance['memoryCalculator'], 'estimateBatchSize').mockReturnValue(1000);
|
||||
|
||||
await instance.addDocuments('test-key', docs);
|
||||
expect(instance.getMemoryUsage()).toBe(1000);
|
||||
|
||||
// Directly access the metadata to verify clearing works
|
||||
const metadataSizeBefore = instance['storeMetadata'].get('test-key')?.size;
|
||||
expect(metadataSizeBefore).toBe(1000);
|
||||
|
||||
// Now clear the store by calling the private method directly
|
||||
instance['clearStoreMetadata']('test-key');
|
||||
|
||||
// Verify metadata was reset
|
||||
const metadataSizeAfter = instance['storeMetadata'].get('test-key')?.size;
|
||||
expect(metadataSizeAfter).toBe(0);
|
||||
|
||||
// The memory usage should be reduced
|
||||
expect(instance.getMemoryUsage()).toBe(0);
|
||||
});
|
||||
|
||||
it('should request cleanup when adding documents that would exceed memory limit', async () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
// Spy on the cleanup service
|
||||
const cleanupSpy = jest.spyOn(instance['cleanupService'], 'cleanupOldestStores');
|
||||
|
||||
// Set up a large document batch
|
||||
const documents = [new Document({ pageContent: 'test', metadata: {} })];
|
||||
jest.spyOn(instance['memoryCalculator'], 'estimateBatchSize').mockReturnValue(50 * 1024 * 1024); // 50MB
|
||||
|
||||
await instance.addDocuments('test-key', documents);
|
||||
|
||||
expect(cleanupSpy).toHaveBeenCalledWith(50 * 1024 * 1024);
|
||||
});
|
||||
|
||||
it('should recalculate memory usage periodically', async () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
// Mock methods and spies
|
||||
const recalcSpy = jest.spyOn(instance, 'recalculateMemoryUsage');
|
||||
const mockVectorStore = mock<MemoryVectorStore>();
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
mockVectorStore.memoryVectors = new Array(100).fill({
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'test',
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
// Mock the getVectorStore to return our mock
|
||||
jest.spyOn(instance, 'getVectorStore').mockResolvedValue(mockVectorStore);
|
||||
jest.spyOn(instance['memoryCalculator'], 'estimateBatchSize').mockReturnValue(1000);
|
||||
|
||||
// Add a large batch of documents
|
||||
const documents = new Array(21).fill(new Document({ pageContent: 'test', metadata: {} }));
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||
await instance.addDocuments('test-key', documents);
|
||||
|
||||
expect(recalcSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should provide accurate stats about vector stores', async () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings, logger);
|
||||
|
||||
// Create mock vector stores
|
||||
const mockVectorStore1 = mock<MemoryVectorStore>();
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
mockVectorStore1.memoryVectors = new Array(50).fill({
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'test1',
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
const mockVectorStore2 = mock<MemoryVectorStore>();
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
mockVectorStore2.memoryVectors = new Array(30).fill({
|
||||
embedding: createTestEmbedding(),
|
||||
content: 'test2',
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
// Mock internal state
|
||||
instance['vectorStoreBuffer'].set('store1', mockVectorStore1);
|
||||
instance['vectorStoreBuffer'].set('store2', mockVectorStore2);
|
||||
|
||||
// Set metadata for the stores
|
||||
instance['storeMetadata'].set('store1', {
|
||||
size: 1024 * 1024, // 1MB
|
||||
createdAt: new Date(Date.now() - 3600000), // 1 hour ago
|
||||
lastAccessed: new Date(Date.now() - 1800000), // 30 minutes ago
|
||||
});
|
||||
|
||||
instance['storeMetadata'].set('store2', {
|
||||
size: 512 * 1024, // 0.5MB
|
||||
createdAt: new Date(Date.now() - 7200000), // 2 hours ago
|
||||
lastAccessed: new Date(Date.now() - 3600000), // 1 hour ago
|
||||
});
|
||||
|
||||
// Set memory usage
|
||||
instance['memoryUsageBytes'] = 1024 * 1024 + 512 * 1024;
|
||||
|
||||
const stats = instance.getStats();
|
||||
|
||||
expect(stats.storeCount).toBe(2);
|
||||
expect(stats.totalSizeBytes).toBeGreaterThan(0);
|
||||
expect(Object.keys(stats.stores)).toContain('store1');
|
||||
expect(Object.keys(stats.stores)).toContain('store2');
|
||||
expect(stats.stores.store1.vectors).toBe(50);
|
||||
expect(stats.stores.store2.vectors).toBe(30);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,289 @@
|
||||
/* eslint-disable @typescript-eslint/dot-notation */
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
|
||||
import { StoreCleanupService } from '../StoreCleanupService';
|
||||
import type { VectorStoreMetadata } from '../types';
|
||||
|
||||
describe('StoreCleanupService', () => {
|
||||
// Setup test data
|
||||
let vectorStores: Map<string, MemoryVectorStore>;
|
||||
let storeMetadata: Map<string, VectorStoreMetadata>;
|
||||
let onCleanupMock: jest.Mock;
|
||||
|
||||
// Utility to add a test store with given age
|
||||
const addTestStore = (
|
||||
key: string,
|
||||
sizeBytes: number,
|
||||
createdHoursAgo: number,
|
||||
accessedHoursAgo: number,
|
||||
) => {
|
||||
const mockStore = mock<MemoryVectorStore>();
|
||||
vectorStores.set(key, mockStore);
|
||||
|
||||
const now = Date.now();
|
||||
storeMetadata.set(key, {
|
||||
size: sizeBytes,
|
||||
createdAt: new Date(now - createdHoursAgo * 3600000),
|
||||
lastAccessed: new Date(now - accessedHoursAgo * 3600000),
|
||||
});
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
vectorStores = new Map();
|
||||
storeMetadata = new Map();
|
||||
onCleanupMock = jest.fn();
|
||||
});
|
||||
|
||||
describe('TTL-based cleanup', () => {
|
||||
it('should identify inactive stores correctly', () => {
|
||||
const service = new StoreCleanupService(
|
||||
100 * 1024 * 1024, // 100MB max
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Create test metadata
|
||||
const recentMetadata: VectorStoreMetadata = {
|
||||
size: 1024,
|
||||
createdAt: new Date(Date.now() - 48 * 3600 * 1000), // 48 hours ago
|
||||
lastAccessed: new Date(Date.now() - 12 * 3600 * 1000), // 12 hours ago
|
||||
};
|
||||
|
||||
const inactiveMetadata: VectorStoreMetadata = {
|
||||
size: 1024,
|
||||
createdAt: new Date(Date.now() - 48 * 3600 * 1000), // 48 hours ago
|
||||
lastAccessed: new Date(Date.now() - 36 * 3600 * 1000), // 36 hours ago
|
||||
};
|
||||
|
||||
// Test the inactive check
|
||||
expect(service.isStoreInactive(recentMetadata)).toBe(false);
|
||||
expect(service.isStoreInactive(inactiveMetadata)).toBe(true);
|
||||
});
|
||||
|
||||
it('should never identify stores as inactive when TTL is disabled', () => {
|
||||
const service = new StoreCleanupService(
|
||||
100 * 1024 * 1024, // 100MB max
|
||||
-1, // TTL disabled
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Create very old metadata
|
||||
const veryOldMetadata: VectorStoreMetadata = {
|
||||
size: 1024,
|
||||
createdAt: new Date(Date.now() - 365 * 24 * 3600 * 1000), // 1 year ago
|
||||
lastAccessed: new Date(Date.now() - 365 * 24 * 3600 * 1000), // 1 year ago
|
||||
};
|
||||
|
||||
// Should never be inactive when TTL is disabled
|
||||
expect(service.isStoreInactive(veryOldMetadata)).toBe(false);
|
||||
});
|
||||
|
||||
it('should clean up inactive stores', () => {
|
||||
const service = new StoreCleanupService(
|
||||
100 * 1024 * 1024, // 100MB max
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Add active and inactive stores
|
||||
addTestStore('active1', 1024 * 1024, 48, 12); // 48 hours old, accessed 12 hours ago
|
||||
addTestStore('active2', 2048 * 1024, 72, 20); // 72 hours old, accessed 20 hours ago
|
||||
addTestStore('inactive1', 3072 * 1024, 100, 30); // 100 hours old, accessed 30 hours ago
|
||||
addTestStore('inactive2', 4096 * 1024, 120, 48); // 120 hours old, accessed 48 hours ago
|
||||
|
||||
// Run cleanup
|
||||
service.cleanupInactiveStores();
|
||||
|
||||
// Check which stores were cleaned up
|
||||
expect(vectorStores.has('active1')).toBe(true);
|
||||
expect(vectorStores.has('active2')).toBe(true);
|
||||
expect(vectorStores.has('inactive1')).toBe(false);
|
||||
expect(vectorStores.has('inactive2')).toBe(false);
|
||||
|
||||
// Metadata should also be cleaned up
|
||||
expect(storeMetadata.has('active1')).toBe(true);
|
||||
expect(storeMetadata.has('active2')).toBe(true);
|
||||
expect(storeMetadata.has('inactive1')).toBe(false);
|
||||
expect(storeMetadata.has('inactive2')).toBe(false);
|
||||
|
||||
// Check callback was called correctly
|
||||
expect(onCleanupMock).toHaveBeenCalledWith(
|
||||
expect.arrayContaining(['inactive1', 'inactive2']),
|
||||
7168 * 1024, // sum of inactive store sizes
|
||||
'ttl',
|
||||
);
|
||||
});
|
||||
|
||||
it('should not run TTL cleanup when disabled', () => {
|
||||
const service = new StoreCleanupService(
|
||||
100 * 1024 * 1024, // 100MB max
|
||||
-1, // TTL disabled
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Add all "inactive" stores
|
||||
addTestStore('store1', 1024 * 1024, 48, 30);
|
||||
addTestStore('store2', 2048 * 1024, 72, 48);
|
||||
|
||||
// Run cleanup
|
||||
service.cleanupInactiveStores();
|
||||
|
||||
// Nothing should be cleaned up
|
||||
expect(vectorStores.size).toBe(2);
|
||||
expect(storeMetadata.size).toBe(2);
|
||||
expect(onCleanupMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Memory-based cleanup', () => {
|
||||
it('should clean up oldest stores to make room for new data', () => {
|
||||
const maxMemoryBytes = 10 * 1024 * 1024; // 10MB
|
||||
const service = new StoreCleanupService(
|
||||
maxMemoryBytes,
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Add stores with different creation times
|
||||
addTestStore('newest', 2 * 1024 * 1024, 1, 1); // 2MB, 1 hour old
|
||||
addTestStore('newer', 3 * 1024 * 1024, 2, 1); // 3MB, 2 hours old
|
||||
addTestStore('older', 3 * 1024 * 1024, 3, 1); // 3MB, 3 hours old
|
||||
addTestStore('oldest', 2 * 1024 * 1024, 4, 1); // 2MB, 4 hours old
|
||||
|
||||
// Current total: 10MB
|
||||
|
||||
// Try to add 5MB more
|
||||
service.cleanupOldestStores(5 * 1024 * 1024);
|
||||
|
||||
// Should have removed oldest and older (5MB total)
|
||||
expect(vectorStores.has('newest')).toBe(true);
|
||||
expect(vectorStores.has('newer')).toBe(true);
|
||||
expect(vectorStores.has('older')).toBe(false);
|
||||
expect(vectorStores.has('oldest')).toBe(false);
|
||||
|
||||
// Check callback
|
||||
expect(onCleanupMock).toHaveBeenCalledWith(
|
||||
expect.arrayContaining(['older', 'oldest']),
|
||||
5 * 1024 * 1024,
|
||||
'memory',
|
||||
);
|
||||
});
|
||||
|
||||
it('should run TTL cleanup before memory cleanup', () => {
|
||||
const maxMemoryBytes = 10 * 1024 * 1024; // 10MB
|
||||
const service = new StoreCleanupService(
|
||||
maxMemoryBytes,
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Add a mix of active and inactive stores
|
||||
addTestStore('active-newest', 2 * 1024 * 1024, 1, 1); // 2MB, active
|
||||
addTestStore('active-older', 3 * 1024 * 1024, 3, 12); // 3MB, active
|
||||
addTestStore('inactive', 3 * 1024 * 1024, 3, 30); // 3MB, inactive (30h)
|
||||
addTestStore('active-oldest', 2 * 1024 * 1024, 4, 20); // 2MB, active
|
||||
|
||||
// Total: 10MB, with 3MB inactive
|
||||
|
||||
// Try to add 5MB more
|
||||
service.cleanupOldestStores(5 * 1024 * 1024);
|
||||
|
||||
// Should have removed inactive first, then active-oldest (5MB total)
|
||||
expect(vectorStores.has('active-newest')).toBe(true);
|
||||
expect(vectorStores.has('active-older')).toBe(true);
|
||||
expect(vectorStores.has('inactive')).toBe(false);
|
||||
expect(vectorStores.has('active-oldest')).toBe(false);
|
||||
|
||||
// Check callbacks
|
||||
expect(onCleanupMock).toHaveBeenCalledTimes(2);
|
||||
// First call for TTL cleanup
|
||||
expect(onCleanupMock).toHaveBeenNthCalledWith(1, ['inactive'], 3 * 1024 * 1024, 'ttl');
|
||||
// Second call for memory cleanup
|
||||
expect(onCleanupMock).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
['active-oldest'],
|
||||
2 * 1024 * 1024,
|
||||
'memory',
|
||||
);
|
||||
});
|
||||
|
||||
it('should not perform memory cleanup when limit is disabled', () => {
|
||||
const service = new StoreCleanupService(
|
||||
-1, // Memory limit disabled
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Add some stores
|
||||
addTestStore('store1', 5 * 1024 * 1024, 1, 1);
|
||||
addTestStore('store2', 10 * 1024 * 1024, 2, 1);
|
||||
|
||||
// Try to add a lot more data
|
||||
service.cleanupOldestStores(100 * 1024 * 1024);
|
||||
|
||||
// Nothing should be cleaned up
|
||||
expect(vectorStores.size).toBe(2);
|
||||
expect(storeMetadata.size).toBe(2);
|
||||
expect(onCleanupMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle empty stores during cleanup', () => {
|
||||
const service = new StoreCleanupService(
|
||||
10 * 1024 * 1024, // 10MB
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
service.cleanupOldestStores(5 * 1024 * 1024);
|
||||
service.cleanupInactiveStores();
|
||||
|
||||
expect(onCleanupMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should update the cache when stores are removed', () => {
|
||||
const service = new StoreCleanupService(
|
||||
10 * 1024 * 1024, // 10MB
|
||||
24 * 3600 * 1000, // 24 hours TTL
|
||||
vectorStores,
|
||||
storeMetadata,
|
||||
onCleanupMock,
|
||||
);
|
||||
|
||||
// Add test stores
|
||||
addTestStore('newest', 2 * 1024 * 1024, 1, 1);
|
||||
addTestStore('middle', 3 * 1024 * 1024, 3, 1);
|
||||
addTestStore('oldest', 4 * 1024 * 1024, 5, 1);
|
||||
|
||||
// Trigger a cleanup that will remove only the oldest store
|
||||
service.cleanupOldestStores(4 * 1024 * 1024); // 4MB
|
||||
|
||||
// Verify removal
|
||||
expect(vectorStores.has('oldest')).toBe(false);
|
||||
expect(vectorStores.has('middle')).toBe(true);
|
||||
expect(vectorStores.has('newest')).toBe(true);
|
||||
|
||||
// Check that the cache was updated correctly
|
||||
const cacheKeys = service['oldestStoreKeys'];
|
||||
expect(cacheKeys.includes('oldest')).toBe(false);
|
||||
expect(cacheKeys.includes('middle')).toBe(true);
|
||||
expect(cacheKeys.includes('newest')).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,74 @@
|
||||
import { getConfig, mbToBytes, hoursToMs } from '../config';
|
||||
|
||||
describe('Vector Store Config', () => {
|
||||
// Store original environment
|
||||
const originalEnv = { ...process.env };
|
||||
|
||||
// Restore original environment after each test
|
||||
afterEach(() => {
|
||||
process.env = { ...originalEnv };
|
||||
});
|
||||
|
||||
describe('getConfig', () => {
|
||||
it('should return default values when no environment variables set', () => {
|
||||
// Clear relevant environment variables
|
||||
delete process.env.N8N_VECTOR_STORE_MAX_MEMORY;
|
||||
delete process.env.N8N_VECTOR_STORE_TTL_HOURS;
|
||||
|
||||
const config = getConfig();
|
||||
|
||||
expect(config.maxMemoryMB).toBe(-1);
|
||||
expect(config.ttlHours).toBe(-1);
|
||||
});
|
||||
|
||||
it('should use values from environment variables when set', () => {
|
||||
process.env.N8N_VECTOR_STORE_MAX_MEMORY = '200';
|
||||
process.env.N8N_VECTOR_STORE_TTL_HOURS = '24';
|
||||
|
||||
const config = getConfig();
|
||||
|
||||
expect(config.maxMemoryMB).toBe(200);
|
||||
expect(config.ttlHours).toBe(24);
|
||||
});
|
||||
|
||||
it('should handle invalid environment variable values', () => {
|
||||
// Set invalid values (non-numeric)
|
||||
process.env.N8N_VECTOR_STORE_MAX_MEMORY = 'invalid';
|
||||
process.env.N8N_VECTOR_STORE_TTL_HOURS = 'notanumber';
|
||||
|
||||
const config = getConfig();
|
||||
|
||||
// Should use default values for invalid inputs
|
||||
expect(config.maxMemoryMB).toBe(-1);
|
||||
expect(config.ttlHours).toBe(-1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('mbToBytes', () => {
|
||||
it('should convert MB to bytes', () => {
|
||||
expect(mbToBytes(1)).toBe(1024 * 1024);
|
||||
expect(mbToBytes(5)).toBe(5 * 1024 * 1024);
|
||||
expect(mbToBytes(100)).toBe(100 * 1024 * 1024);
|
||||
});
|
||||
|
||||
it('should handle zero and negative values', () => {
|
||||
expect(mbToBytes(0)).toBe(-1);
|
||||
expect(mbToBytes(-1)).toBe(-1);
|
||||
expect(mbToBytes(-10)).toBe(-1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('hoursToMs', () => {
|
||||
it('should convert hours to milliseconds', () => {
|
||||
expect(hoursToMs(1)).toBe(60 * 60 * 1000);
|
||||
expect(hoursToMs(24)).toBe(24 * 60 * 60 * 1000);
|
||||
expect(hoursToMs(168)).toBe(168 * 60 * 60 * 1000);
|
||||
});
|
||||
|
||||
it('should handle zero and negative values', () => {
|
||||
expect(hoursToMs(0)).toBe(-1);
|
||||
expect(hoursToMs(-1)).toBe(-1);
|
||||
expect(hoursToMs(-24)).toBe(-1);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,70 @@
|
||||
import type { Document } from '@langchain/core/documents';
|
||||
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
|
||||
/**
|
||||
* Configuration options for the memory vector store
|
||||
*/
|
||||
export interface MemoryVectorStoreConfig {
|
||||
/**
|
||||
* Maximum memory size in MB, -1 to disable
|
||||
*/
|
||||
maxMemoryMB: number;
|
||||
|
||||
/**
|
||||
* TTL for inactive stores in hours, -1 to disable
|
||||
*/
|
||||
ttlHours: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Vector store metadata for tracking usage
|
||||
*/
|
||||
export interface VectorStoreMetadata {
|
||||
size: number;
|
||||
createdAt: Date;
|
||||
lastAccessed: Date;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-store statistics for reporting
|
||||
*/
|
||||
export interface StoreStats {
|
||||
sizeBytes: number;
|
||||
sizeMB: number;
|
||||
percentOfTotal: number;
|
||||
vectors: number;
|
||||
createdAt: string;
|
||||
lastAccessed: string;
|
||||
inactive?: boolean;
|
||||
inactiveForHours?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Overall vector store statistics
|
||||
*/
|
||||
export interface VectorStoreStats {
|
||||
totalSizeBytes: number;
|
||||
totalSizeMB: number;
|
||||
percentOfLimit: number;
|
||||
maxMemoryMB: number;
|
||||
storeCount: number;
|
||||
inactiveStoreCount: number;
|
||||
ttlHours: number;
|
||||
stores: Record<string, StoreStats>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Service for calculating memory usage
|
||||
*/
|
||||
export interface IMemoryCalculator {
|
||||
estimateBatchSize(documents: Document[]): number;
|
||||
calculateVectorStoreSize(vectorStore: MemoryVectorStore): number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Service for cleaning up vector stores
|
||||
*/
|
||||
export interface IStoreCleanupService {
|
||||
cleanupInactiveStores(): void;
|
||||
cleanupOldestStores(requiredBytes: number): void;
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
import type { OpenAIEmbeddings } from '@langchain/openai';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
|
||||
import { MemoryVectorStoreManager } from './MemoryVectorStoreManager';
|
||||
|
||||
describe('MemoryVectorStoreManager', () => {
|
||||
it('should create an instance of MemoryVectorStoreManager', () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings);
|
||||
expect(instance).toBeInstanceOf(MemoryVectorStoreManager);
|
||||
});
|
||||
|
||||
it('should return existing instance', () => {
|
||||
const embeddings = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance1 = MemoryVectorStoreManager.getInstance(embeddings);
|
||||
const instance2 = MemoryVectorStoreManager.getInstance(embeddings);
|
||||
expect(instance1).toBe(instance2);
|
||||
});
|
||||
|
||||
it('should update embeddings in existing instance', () => {
|
||||
const embeddings1 = mock<OpenAIEmbeddings>();
|
||||
const embeddings2 = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance = MemoryVectorStoreManager.getInstance(embeddings1);
|
||||
MemoryVectorStoreManager.getInstance(embeddings2);
|
||||
|
||||
expect((instance as any).embeddings).toBe(embeddings2);
|
||||
});
|
||||
|
||||
it('should update embeddings in existing vector store instances', async () => {
|
||||
const embeddings1 = mock<OpenAIEmbeddings>();
|
||||
const embeddings2 = mock<OpenAIEmbeddings>();
|
||||
|
||||
const instance1 = MemoryVectorStoreManager.getInstance(embeddings1);
|
||||
await instance1.getVectorStore('test');
|
||||
|
||||
const instance2 = MemoryVectorStoreManager.getInstance(embeddings2);
|
||||
const vectorStoreInstance2 = await instance2.getVectorStore('test');
|
||||
|
||||
expect((vectorStoreInstance2 as any).embeddings).toBe(embeddings2);
|
||||
});
|
||||
});
|
||||
@@ -1,52 +0,0 @@
|
||||
import type { Document } from '@langchain/core/documents';
|
||||
import type { Embeddings } from '@langchain/core/embeddings';
|
||||
import { MemoryVectorStore } from 'langchain/vectorstores/memory';
|
||||
|
||||
export class MemoryVectorStoreManager {
|
||||
private static instance: MemoryVectorStoreManager | null = null;
|
||||
|
||||
private vectorStoreBuffer: Map<string, MemoryVectorStore>;
|
||||
|
||||
private constructor(private embeddings: Embeddings) {
|
||||
this.vectorStoreBuffer = new Map();
|
||||
}
|
||||
|
||||
static getInstance(embeddings: Embeddings): MemoryVectorStoreManager {
|
||||
if (!MemoryVectorStoreManager.instance) {
|
||||
MemoryVectorStoreManager.instance = new MemoryVectorStoreManager(embeddings);
|
||||
} else {
|
||||
// We need to update the embeddings in the existing instance.
|
||||
// This is important as embeddings instance is wrapped in a logWrapper,
|
||||
// which relies on supplyDataFunctions context which changes on each workflow run
|
||||
MemoryVectorStoreManager.instance.embeddings = embeddings;
|
||||
MemoryVectorStoreManager.instance.vectorStoreBuffer.forEach((vectorStoreInstance) => {
|
||||
vectorStoreInstance.embeddings = embeddings;
|
||||
});
|
||||
}
|
||||
|
||||
return MemoryVectorStoreManager.instance;
|
||||
}
|
||||
|
||||
async getVectorStore(memoryKey: string): Promise<MemoryVectorStore> {
|
||||
let vectorStoreInstance = this.vectorStoreBuffer.get(memoryKey);
|
||||
|
||||
if (!vectorStoreInstance) {
|
||||
vectorStoreInstance = await MemoryVectorStore.fromExistingIndex(this.embeddings);
|
||||
this.vectorStoreBuffer.set(memoryKey, vectorStoreInstance);
|
||||
}
|
||||
|
||||
return vectorStoreInstance;
|
||||
}
|
||||
|
||||
async addDocuments(
|
||||
memoryKey: string,
|
||||
documents: Document[],
|
||||
clearStore?: boolean,
|
||||
): Promise<void> {
|
||||
if (clearStore) {
|
||||
this.vectorStoreBuffer.delete(memoryKey);
|
||||
}
|
||||
const vectorStoreInstance = await this.getVectorStore(memoryKey);
|
||||
await vectorStoreInstance.addDocuments(documents);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user