From 3cad60e9184de7ad73893062964366a09d4487df Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Wed, 2 Aug 2023 12:51:25 +0200 Subject: [PATCH] feat(core): Make Redis available for backend communication (#6719) * support redis cluster * cleanup, fix config schema * set default prefix to bull * initial commit * improve logging * improve types and refactor * list support and refactor * fix redis service and tests * add comment * add redis and cache prefix * use injection * lint fix * clean schema comments * improve naming, tests, cluster client * merge master * cache returns unknown instead of T * update cache service, tests and doc * remove console.log * do not cache null or undefined values * fix merge * lint fix --- packages/cli/package.json | 5 +- packages/cli/src/AbstractServer.ts | 138 ++++---- packages/cli/src/GenericHelpers.ts | 24 -- packages/cli/src/Queue.ts | 49 +-- packages/cli/src/config/schema.ts | 19 +- packages/cli/src/services/cache.service.ts | 331 ++++++++++++------ .../cli/src/services/ownership.service.ts | 2 +- packages/cli/src/services/redis.service.ts | 52 +++ .../services/redis/RedisServiceBaseClasses.ts | 74 ++++ .../services/redis/RedisServiceCommands.ts | 22 ++ .../src/services/redis/RedisServiceHelper.ts | 146 ++++++++ .../redis/RedisServiceListReceiver.ts | 57 +++ .../services/redis/RedisServiceListSender.ts | 30 ++ .../redis/RedisServicePubSubPublisher.ts | 39 +++ .../redis/RedisServicePubSubSubscriber.ts | 46 +++ .../redis/RedisServiceStreamConsumer.ts | 92 +++++ .../redis/RedisServiceStreamProducer.ts | 42 +++ .../test/unit/services/cache.service.test.ts | 322 +++++++++++++---- .../test/unit/services/redis.service.test.ts | 138 ++++++++ pnpm-lock.yaml | 71 +++- 20 files changed, 1377 insertions(+), 322 deletions(-) create mode 100644 packages/cli/src/services/redis.service.ts create mode 100644 packages/cli/src/services/redis/RedisServiceBaseClasses.ts create mode 100644 packages/cli/src/services/redis/RedisServiceCommands.ts create mode 100644 packages/cli/src/services/redis/RedisServiceHelper.ts create mode 100644 packages/cli/src/services/redis/RedisServiceListReceiver.ts create mode 100644 packages/cli/src/services/redis/RedisServiceListSender.ts create mode 100644 packages/cli/src/services/redis/RedisServicePubSubPublisher.ts create mode 100644 packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts create mode 100644 packages/cli/src/services/redis/RedisServiceStreamConsumer.ts create mode 100644 packages/cli/src/services/redis/RedisServiceStreamProducer.ts create mode 100644 packages/cli/test/unit/services/redis.service.test.ts diff --git a/packages/cli/package.json b/packages/cli/package.json index bc61227c14..3859746178 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -70,9 +70,9 @@ "@types/bcryptjs": "^2.4.2", "@types/compression": "1.0.1", "@types/connect-history-api-fallback": "^1.3.1", - "@types/convict": "^6.1.1", "@types/content-disposition": "^0.5.5", "@types/content-type": "^1.1.5", + "@types/convict": "^6.1.1", "@types/cookie-parser": "^1.4.2", "@types/express": "^4.17.6", "@types/formidable": "^3.4.0", @@ -96,6 +96,7 @@ "@types/yamljs": "^0.2.31", "chokidar": "^3.5.2", "concurrently": "^8.2.0", + "ioredis-mock": "^8.8.1", "ts-essentials": "^7.0.3" }, "dependencies": { @@ -119,9 +120,9 @@ "class-validator": "^0.14.0", "compression": "^1.7.4", "connect-history-api-fallback": "^1.6.0", - "convict": "^6.2.4", "content-disposition": "^0.5.4", "content-type": "^1.0.4", + "convict": "^6.2.4", "cookie-parser": "^1.4.6", "crypto-js": "~4.1.1", "csrf": "^3.1.0", diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 64bf53dcbc..9c19ac721b 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -3,9 +3,6 @@ import { readFile } from 'fs/promises'; import type { Server } from 'http'; import express from 'express'; import compression from 'compression'; -import type { RedisOptions } from 'ioredis'; - -import { LoggerProxy } from 'n8n-workflow'; import config from '@/config'; import { N8N_VERSION, inDevelopment, inTest } from '@/constants'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; @@ -16,8 +13,17 @@ import { send, sendErrorResponse, ServiceUnavailableError } from '@/ResponseHelp import { rawBody, jsonParser, corsMiddleware } from '@/middlewares'; import { TestWebhooks } from '@/TestWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks'; -import { getRedisClusterNodes } from './GenericHelpers'; import { webhookRequestHandler } from '@/WebhookHelpers'; +import { RedisService } from '@/services/redis.service'; +import { jsonParse } from 'n8n-workflow'; +import { eventBus } from './eventbus'; +import type { AbstractEventMessageOptions } from './eventbus/EventMessageClasses/AbstractEventMessageOptions'; +import { getEventMessageObjectByType } from './eventbus/EventMessageClasses/Helpers'; +import type { RedisServiceWorkerResponseObject } from './services/redis/RedisServiceCommands'; +import { + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './services/redis/RedisServiceHelper'; export abstract class AbstractServer { protected server: Server; @@ -110,80 +116,76 @@ export abstract class AbstractServer { }); if (config.getEnv('executions.mode') === 'queue') { - await this.setupRedisChecks(); + await this.setupRedis(); } } // This connection is going to be our heartbeat // IORedis automatically pings redis and tries to reconnect // We will be using a retryStrategy to control how and when to exit. - private async setupRedisChecks() { - // eslint-disable-next-line @typescript-eslint/naming-convention - const { default: Redis } = await import('ioredis'); + // We are also subscribing to the event log channel to receive events from workers + private async setupRedis() { + const redisService = Container.get(RedisService); + const redisSubscriber = await redisService.getPubSubSubscriber(); - let lastTimer = 0; - let cumulativeTimeout = 0; - const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); - const clusterNodes = getRedisClusterNodes(); - const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - const usesRedisCluster = clusterNodes.length > 0; - LoggerProxy.debug( - usesRedisCluster - ? `Initialising Redis cluster connection with nodes: ${clusterNodes - .map((e) => `${e.host}:${e.port}`) - .join(',')}` - : `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${ - port ?? '6379' - }`, - ); - const sharedRedisOptions: RedisOptions = { - username, - password, - db, - enableReadyCheck: false, - maxRetriesPerRequest: null, - }; - const redis = usesRedisCluster - ? new Redis.Cluster( - clusterNodes.map((node) => ({ host: node.host, port: node.port })), - { - redisOptions: sharedRedisOptions, - }, - ) - : new Redis({ - host, - port, - ...sharedRedisOptions, - retryStrategy: (): number | null => { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - LoggerProxy.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } + // TODO: these are all proof of concept implementations for the moment + // until worker communication is implemented + // #region proof of concept + await redisSubscriber.subscribeToEventLog(); + await redisSubscriber.subscribeToWorkerResponseChannel(); + redisSubscriber.addMessageHandler( + 'AbstractServerReceiver', + async (channel: string, message: string) => { + // TODO: this is a proof of concept implementation to forward events to the main instance's event bus + // Events are arriving through a pub/sub channel and are forwarded to the eventBus + // In the future, a stream should probably replace this implementation entirely + if (channel === EVENT_BUS_REDIS_CHANNEL) { + const eventData = jsonParse(message); + if (eventData) { + const eventMessage = getEventMessageObjectByType(eventData); + if (eventMessage) { + await eventBus.send(eventMessage); } - return 500; - }, - }); + } + } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + // The back channel from the workers as a pub/sub channel + const workerResponse = jsonParse(message); + if (workerResponse) { + // TODO: Handle worker response + console.log('Received worker response', workerResponse); + } + } + }, + ); + // TODO: Leave comments for now as implementation example + // const redisStreamListener = await redisService.getStreamConsumer(); + // void redisStreamListener.listenToStream('teststream'); + // redisStreamListener.addMessageHandler( + // 'MessageLogger', + // async (stream: string, id: string, message: string[]) => { + // // TODO: this is a proof of concept implementation of a stream consumer + // switch (stream) { + // case EVENT_BUS_REDIS_STREAM: + // case COMMAND_REDIS_STREAM: + // case WORKER_RESPONSE_REDIS_STREAM: + // default: + // LoggerProxy.debug( + // `Received message from stream ${stream} with id ${id} and message ${message.join( + // ',', + // )}`, + // ); + // break; + // } + // }, + // ); - redis.on('close', () => { - LoggerProxy.warn('Redis unavailable - trying to reconnect...'); - }); + // const redisListReceiver = await redisService.getListReceiver(); + // await redisListReceiver.init(); - redis.on('error', (error) => { - if (!String(error).includes('ECONNREFUSED')) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - LoggerProxy.warn('Error with Redis: ', error); - } - }); + // setInterval(async () => { + // await redisListReceiver.popLatestWorkerResponse(); + // }, 1000); + // #endregion } async init(): Promise { diff --git a/packages/cli/src/GenericHelpers.ts b/packages/cli/src/GenericHelpers.ts index 1860f5052a..0034ebea90 100644 --- a/packages/cli/src/GenericHelpers.ts +++ b/packages/cli/src/GenericHelpers.ts @@ -192,28 +192,4 @@ export async function createErrorExecution( await Container.get(ExecutionRepository).createNewExecution(fullExecutionData); } -export function getRedisClusterNodes(): Array<{ host: string; port: number }> { - const clusterNodePairs = config - .getEnv('queue.bull.redis.clusterNodes') - .split(',') - .filter((e) => e); - return clusterNodePairs.map((pair) => { - const [host, port] = pair.split(':'); - return { host, port: parseInt(port) }; - }); -} - -export function getRedisPrefix(): string { - let prefix = config.getEnv('queue.bull.prefix'); - if (prefix && getRedisClusterNodes().length > 0) { - if (!prefix.startsWith('{')) { - prefix = '{' + prefix; - } - if (!prefix.endsWith('}')) { - prefix += '}'; - } - } - return prefix; -} - export const DEFAULT_EXECUTIONS_GET_ALL_LIMIT = 20; diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index ecf862a1d0..27c59b411d 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -1,11 +1,16 @@ import type Bull from 'bull'; -import { type RedisOptions } from 'ioredis'; import { Service } from 'typedi'; -import { LoggerProxy, type IExecuteResponsePromiseData } from 'n8n-workflow'; -import config from '@/config'; +import { type IExecuteResponsePromiseData } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; import * as WebhookHelpers from '@/WebhookHelpers'; -import { getRedisClusterNodes, getRedisPrefix } from './GenericHelpers'; +import { + getRedisClusterClient, + getRedisClusterNodes, + getRedisPrefix, + getRedisStandardClient, +} from './services/redis/RedisServiceHelper'; +import type { RedisClientType } from './services/redis/RedisServiceBaseClasses'; +import config from '@/config'; export type JobId = Bull.JobId; export type Job = Bull.Job; @@ -32,10 +37,10 @@ export class Queue { constructor(private activeExecutions: ActiveExecutions) {} async init() { - const prefix = getRedisPrefix(); + const bullPrefix = config.getEnv('queue.bull.prefix'); + const prefix = getRedisPrefix(bullPrefix); const clusterNodes = getRedisClusterNodes(); const usesRedisCluster = clusterNodes.length > 0; - const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); // eslint-disable-next-line @typescript-eslint/naming-convention const { default: Bull } = await import('bull'); // eslint-disable-next-line @typescript-eslint/naming-convention @@ -45,40 +50,12 @@ export class Queue { // for some time. With it enabled, worker might take minutes to realize // redis is back up and resume working. // More here: https://github.com/OptimalBits/bull/issues/890 - - LoggerProxy.debug( - usesRedisCluster - ? `Initialising Redis cluster connection with nodes: ${clusterNodes - .map((e) => `${e.host}:${e.port}`) - .join(',')}` - : `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${ - port ?? '6379' - }`, - ); - const sharedRedisOptions: RedisOptions = { - username, - password, - db, - enableReadyCheck: false, - maxRetriesPerRequest: null, - }; this.jobQueue = new Bull('jobs', { prefix, createClient: (type, clientConfig) => usesRedisCluster - ? new Redis.Cluster( - clusterNodes.map((node) => ({ host: node.host, port: node.port })), - { - ...clientConfig, - redisOptions: sharedRedisOptions, - }, - ) - : new Redis({ - ...clientConfig, - host, - port, - ...sharedRedisOptions, - }), + ? getRedisClusterClient(Redis, clientConfig, (type + '(bull)') as RedisClientType) + : getRedisStandardClient(Redis, clientConfig, (type + '(bull)') as RedisClientType), }); this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 7f93974a86..f9fc1dcc3e 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -351,7 +351,7 @@ export const schema = { }, bull: { prefix: { - doc: 'Prefix for all queue keys (wrap in {} for cluster mode)', + doc: 'Prefix for all bull queue keys', format: String, default: 'bull', env: 'QUEUE_BULL_PREFIX', @@ -1112,6 +1112,15 @@ export const schema = { }, }, + redis: { + prefix: { + doc: 'Prefix for all n8n related keys', + format: String, + default: 'n8n', + env: 'N8N_REDIS_KEY_PREFIX', + }, + }, + cache: { enabled: { doc: 'Whether caching is enabled', @@ -1140,10 +1149,16 @@ export const schema = { }, }, redis: { + prefix: { + doc: 'Prefix for all cache keys', + format: String, + default: 'cache', + env: 'N8N_CACHE_REDIS_KEY_PREFIX', + }, ttl: { doc: 'Time to live for cached items in redis (in ms), 0 for no TTL', format: Number, - default: 0, + default: 3600 * 1000, // 1 hour env: 'N8N_CACHE_REDIS_TTL', }, }, diff --git a/packages/cli/src/services/cache.service.ts b/packages/cli/src/services/cache.service.ts index b13a1c2dde..849688b340 100644 --- a/packages/cli/src/services/cache.service.ts +++ b/packages/cli/src/services/cache.service.ts @@ -3,9 +3,8 @@ import config from '@/config'; import { caching } from 'cache-manager'; import type { MemoryCache } from 'cache-manager'; import type { RedisCache } from 'cache-manager-ioredis-yet'; -import type { RedisOptions } from 'ioredis'; -import { getRedisClusterNodes } from '../GenericHelpers'; -import { LoggerProxy, jsonStringify } from 'n8n-workflow'; +import { jsonStringify } from 'n8n-workflow'; +import { getDefaultRedisClient, getRedisPrefix } from './redis/RedisServiceHelper'; @Service() export class CacheService { @@ -15,80 +14,33 @@ export class CacheService { */ private cache: RedisCache | MemoryCache | undefined; - async init() { + isRedisCache(): boolean { + return (this.cache as RedisCache)?.store?.isCacheable !== undefined; + } + + /** + * Initialize the cache service. + * + * If the cache is enabled, it will initialize the cache from the provided config options. By default, it will use + * the `memory` backend and create a simple in-memory cache. If running in `queue` mode, or if `redis` backend is selected, + * it use Redis as the cache backend (either a local Redis instance or a Redis cluster, depending on the config) + * + * If the cache is disabled, this does nothing. + */ + async init(): Promise { if (!config.getEnv('cache.enabled')) { - throw new Error('Cache is disabled'); + return; } - const backend = config.getEnv('cache.backend'); - if ( backend === 'redis' || (backend === 'auto' && config.getEnv('executions.mode') === 'queue') ) { const { redisInsStore } = await import('cache-manager-ioredis-yet'); - - // #region TEMPORARY Redis Client Code - /* - * TODO: remove once redis service is ready - * this code is just temporary - */ - // eslint-disable-next-line @typescript-eslint/naming-convention - const { default: Redis } = await import('ioredis'); - let lastTimer = 0; - let cumulativeTimeout = 0; - const { host, port, username, password, db }: RedisOptions = - config.getEnv('queue.bull.redis'); - const clusterNodes = getRedisClusterNodes(); - const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - const usesRedisCluster = clusterNodes.length > 0; - LoggerProxy.debug( - usesRedisCluster - ? `(Cache Service) Initialising Redis cluster connection with nodes: ${clusterNodes - .map((e) => `${e.host}:${e.port}`) - .join(',')}` - : `(Cache Service) Initialising Redis client connection with host: ${ - host ?? 'localhost' - } and port: ${port ?? '6379'}`, - ); - const sharedRedisOptions: RedisOptions = { - username, - password, - db, - enableReadyCheck: false, - maxRetriesPerRequest: null, - }; - const redisClient = usesRedisCluster - ? new Redis.Cluster( - clusterNodes.map((node) => ({ host: node.host, port: node.port })), - { - redisOptions: sharedRedisOptions, - }, - ) - : new Redis({ - host, - port, - ...sharedRedisOptions, - retryStrategy: (): number | null => { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - LoggerProxy.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } - } - return 500; - }, - }); - // #endregion TEMPORARY Redis Client Code + const redisPrefix = getRedisPrefix(config.getEnv('redis.prefix')); + const cachePrefix = config.getEnv('cache.redis.prefix'); + const keyPrefix = `${redisPrefix}:${cachePrefix}:`; + const redisClient = await getDefaultRedisClient({ keyPrefix }, 'client(cache)'); const redisStore = redisInsStore(redisClient, { ttl: config.getEnv('cache.redis.ttl'), }); @@ -106,6 +58,163 @@ export class CacheService { } } + /** + * Get a value from the cache by key. + * + * If the value is not in the cache or expired, the refreshFunction is called if defined, + * which will set the key with the function's result and returns it. If no refreshFunction is set, the fallback value is returned. + * + * If the cache is disabled, refreshFunction's result or fallbackValue is returned. + * + * If cache is not hit, and neither refreshFunction nor fallbackValue are provided, `undefined` is returned. + * @param key The key to fetch from the cache + * @param options.refreshFunction Optional function to call to set the cache if the key is not found + * @param options.refreshTtl Optional ttl for the refreshFunction's set call + * @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided + */ + async get( + key: string, + options: { + fallbackValue?: unknown; + refreshFunction?: (key: string) => Promise; + refreshTtl?: number; + } = {}, + ): Promise { + const value = await this.cache?.store.get(key); + if (value !== undefined) { + return value; + } + if (options.refreshFunction) { + const refreshValue = await options.refreshFunction(key); + await this.set(key, refreshValue, options.refreshTtl); + return refreshValue; + } + return options.fallbackValue ?? undefined; + } + + /** + * Get many values from a list of keys. + * + * If a value is not in the cache or expired, the returned list will have `undefined` at that index. + * If the cache is disabled, refreshFunction's result or fallbackValue is returned. + * If cache is not hit, and neither refreshFunction nor fallbackValue are provided, a list of `undefined` is returned. + * @param keys A list of keys to fetch from the cache + * @param options.refreshFunctionEach Optional, if defined, undefined values will be replaced with the result of the refreshFunctionEach call and the cache will be updated + * @param options.refreshFunctionMany Optional, if defined, all values will be replaced with the result of the refreshFunctionMany call and the cache will be updated + * @param options.refreshTtl Optional ttl for the refreshFunction's set call + * @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided + */ + async getMany( + keys: string[], + options: { + fallbackValues?: unknown[]; + refreshFunctionEach?: (key: string) => Promise; + refreshFunctionMany?: (keys: string[]) => Promise; + refreshTtl?: number; + } = {}, + ): Promise { + let values = await this.cache?.store.mget(...keys); + if (values === undefined) { + values = keys.map(() => undefined); + } + if (!values.includes(undefined)) { + return values; + } + if (options.refreshFunctionEach) { + for (let i = 0; i < keys.length; i++) { + if (values[i] === undefined) { + const key = keys[i]; + let fallback = undefined; + if (options.fallbackValues && options.fallbackValues.length > i) { + fallback = options.fallbackValues[i]; + } + const refreshValue = await this.get(key, { + refreshFunction: options.refreshFunctionEach, + refreshTtl: options.refreshTtl, + fallbackValue: fallback, + }); + values[i] = refreshValue; + } + } + return values; + } + if (options.refreshFunctionMany) { + const refreshValues: unknown[] = await options.refreshFunctionMany(keys); + if (keys.length !== refreshValues.length) { + throw new Error('refreshFunctionMany must return the same number of values as keys'); + } + const newKV: Array<[string, unknown]> = []; + for (let i = 0; i < keys.length; i++) { + newKV.push([keys[i], refreshValues[i]]); + } + await this.setMany(newKV, options.refreshTtl); + return refreshValues; + } + return options.fallbackValues ?? values; + } + + /** + * Set a value in the cache by key. + * @param key The key to set + * @param value The value to set + * @param ttl Optional time to live in ms + */ + async set(key: string, value: unknown, ttl?: number): Promise { + if (!this.cache) { + await this.init(); + } + if (value === undefined || value === null) { + return; + } + if (this.isRedisCache()) { + if (!(this.cache as RedisCache)?.store?.isCacheable(value)) { + throw new Error('Value is not cacheable'); + } + } + await this.cache?.store.set(key, value, ttl); + } + + /** + * Set a multiple values in the cache at once. + * @param values An array of [key, value] tuples to set + * @param ttl Optional time to live in ms + */ + async setMany(values: Array<[string, unknown]>, ttl?: number): Promise { + if (!this.cache) { + await this.init(); + } + // eslint-disable-next-line @typescript-eslint/naming-convention + const nonNullValues = values.filter(([_key, value]) => value !== undefined && value !== null); + if (this.isRedisCache()) { + // eslint-disable-next-line @typescript-eslint/naming-convention + nonNullValues.forEach(([_key, value]) => { + if (!(this.cache as RedisCache)?.store?.isCacheable(value)) { + throw new Error('Value is not cacheable'); + } + }); + } + await this.cache?.store.mset(nonNullValues, ttl); + } + + /** + * Delete a value from the cache by key. + * @param key The key to delete + */ + async delete(key: string): Promise { + await this.cache?.store.del(key); + } + + /** + * Delete multiple values from the cache. + * @param keys List of keys to delete + */ + async deleteMany(keys: string[]): Promise { + return this.cache?.store.mdel(...keys); + } + + /** + * Delete all values and uninitialized the cache. + */ async destroy() { if (this.cache) { await this.reset(); @@ -113,6 +222,22 @@ export class CacheService { } } + /** + * Enable and initialize the cache. + */ + async enable() { + config.set('cache.enabled', true); + await this.init(); + } + + /** + * Disable and destroy the cache. + */ + async disable() { + config.set('cache.enabled', false); + await this.destroy(); + } + async getCache(): Promise { if (!this.cache) { await this.init(); @@ -120,59 +245,35 @@ export class CacheService { return this.cache; } - async get(key: string): Promise { - if (!this.cache) { - await this.init(); - } - return this.cache?.store.get(key) as T; - } - - async set(key: string, value: T, ttl?: number): Promise { - if (!this.cache) { - await this.init(); - } - return this.cache?.store.set(key, value, ttl); - } - - async delete(key: string): Promise { - if (!this.cache) { - await this.init(); - } - return this.cache?.store.del(key); - } - + /** + * Delete all values from the cache, but leave the cache initialized. + */ async reset(): Promise { - if (!this.cache) { - await this.init(); - } - return this.cache?.store.reset(); + await this.cache?.store.reset(); } + /** + * Return all keys in the cache. + */ async keys(): Promise { - if (!this.cache) { - await this.init(); - } return this.cache?.store.keys() ?? []; } - async setMany(values: Array<[string, T]>, ttl?: number): Promise { - if (!this.cache) { - await this.init(); + /** + * Return all key/value pairs in the cache. This is a potentially very expensive operation and is only meant to be used for debugging + */ + async keyValues(): Promise> { + const keys = await this.keys(); + const values = await this.getMany(keys); + const map = new Map(); + if (keys.length === values.length) { + for (let i = 0; i < keys.length; i++) { + map.set(keys[i], values[i]); + } + return map; } - return this.cache?.store.mset(values, ttl); - } - - async getMany(keys: string[]): Promise> { - if (!this.cache) { - await this.init(); - } - return this.cache?.store.mget(...keys) as Promise>; - } - - async deleteMany(keys: string[]): Promise { - if (!this.cache) { - await this.init(); - } - return this.cache?.store.mdel(...keys); + throw new Error( + 'Keys and values do not match, this should not happen and appears to result from some cache corruption.', + ); } } diff --git a/packages/cli/src/services/ownership.service.ts b/packages/cli/src/services/ownership.service.ts index ec01bcca01..9602dc6241 100644 --- a/packages/cli/src/services/ownership.service.ts +++ b/packages/cli/src/services/ownership.service.ts @@ -16,7 +16,7 @@ export class OwnershipService { * Retrieve the user who owns the workflow. Note that workflow ownership is **immutable**. */ async getWorkflowOwnerCached(workflowId: string) { - const cachedValue = await this.cacheService.get(`cache:workflow-owner:${workflowId}`); + const cachedValue = (await this.cacheService.get(`cache:workflow-owner:${workflowId}`)) as User; if (cachedValue) return this.userRepository.create(cachedValue); diff --git a/packages/cli/src/services/redis.service.ts b/packages/cli/src/services/redis.service.ts new file mode 100644 index 0000000000..46557fb0df --- /dev/null +++ b/packages/cli/src/services/redis.service.ts @@ -0,0 +1,52 @@ +import { Service } from 'typedi'; +import { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; +import { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; +import { RedisServiceListReceiver } from './redis/RedisServiceListReceiver'; +import { RedisServiceListSender } from './redis/RedisServiceListSender'; +import { RedisServiceStreamConsumer } from './redis/RedisServiceStreamConsumer'; +import { RedisServiceStreamProducer } from './redis/RedisServiceStreamProducer'; + +/* + * This is a convenience service that provides access to all the Redis clients. + */ +@Service() +export class RedisService { + constructor( + private redisServicePubSubSubscriber: RedisServicePubSubSubscriber, + private redisServicePubSubPublisher: RedisServicePubSubPublisher, + private redisServiceListReceiver: RedisServiceListReceiver, + private redisServiceListSender: RedisServiceListSender, + private redisServiceStreamConsumer: RedisServiceStreamConsumer, + private redisServiceStreamProducer: RedisServiceStreamProducer, + ) {} + + async getPubSubSubscriber() { + await this.redisServicePubSubSubscriber.init(); + return this.redisServicePubSubSubscriber; + } + + async getPubSubPublisher() { + await this.redisServicePubSubPublisher.init(); + return this.redisServicePubSubPublisher; + } + + async getListSender() { + await this.redisServiceListSender.init(); + return this.redisServiceListSender; + } + + async getListReceiver() { + await this.redisServiceListReceiver.init(); + return this.redisServiceListReceiver; + } + + async getStreamProducer() { + await this.redisServiceStreamProducer.init(); + return this.redisServiceStreamProducer; + } + + async getStreamConsumer() { + await this.redisServiceStreamConsumer.init(); + return this.redisServiceStreamConsumer; + } +} diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts new file mode 100644 index 0000000000..2ed9d94eee --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -0,0 +1,74 @@ +import type Redis from 'ioredis'; +import type { Cluster } from 'ioredis'; +import { getDefaultRedisClient } from './RedisServiceHelper'; +import { LoggerProxy } from 'n8n-workflow'; + +export type RedisClientType = + | 'subscriber' + | 'client' + | 'bclient' + | 'subscriber(bull)' + | 'client(bull)' + | 'bclient(bull)' + | 'client(cache)' + | 'publisher' + | 'consumer' + | 'producer' + | 'list-sender' + | 'list-receiver'; + +export type RedisServiceMessageHandler = + | ((channel: string, message: string) => void) + | ((stream: string, id: string, message: string[]) => void); + +class RedisServiceBase { + redisClient: Redis | Cluster | undefined; + + isInitialized = false; + + async init(type: RedisClientType = 'client'): Promise { + if (this.redisClient && this.isInitialized) { + return; + } + this.redisClient = await getDefaultRedisClient(undefined, type); + + this.redisClient.on('close', () => { + LoggerProxy.warn('Redis unavailable - trying to reconnect...'); + }); + + this.redisClient.on('error', (error) => { + if (!String(error).includes('ECONNREFUSED')) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + LoggerProxy.warn('Error with Redis: ', error); + } + }); + } + + async destroy(): Promise { + if (!this.redisClient) { + return; + } + await this.redisClient.quit(); + this.redisClient = undefined; + } +} + +export abstract class RedisServiceBaseSender extends RedisServiceBase { + senderId: string; + + setSenderId(senderId?: string): void { + this.senderId = senderId ?? ''; + } +} + +export abstract class RedisServiceBaseReceiver extends RedisServiceBase { + messageHandlers: Map = new Map(); + + addMessageHandler(handlerName: string, handler: RedisServiceMessageHandler): void { + this.messageHandlers.set(handlerName, handler); + } + + removeMessageHandler(handlerName: string): void { + this.messageHandlers.delete(handlerName); + } +} diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts new file mode 100644 index 0000000000..cd70a32d6e --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -0,0 +1,22 @@ +export type RedisServiceCommand = 'getStatus' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands + +/** + * An object to be sent via Redis pub/sub from the main process to the workers. + * @field command: The command to be executed. + * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. + * @field args: Optional arguments to be passed to the command. + */ +type RedisServiceBaseCommand = { + command: RedisServiceCommand; + payload?: { + [key: string]: string | number | boolean | string[] | number[] | boolean[]; + }; +}; + +export type RedisServiceWorkerResponseObject = { + workerId: string; +} & RedisServiceBaseCommand; + +export type RedisServiceCommandObject = { + targets?: string[]; +} & RedisServiceBaseCommand; diff --git a/packages/cli/src/services/redis/RedisServiceHelper.ts b/packages/cli/src/services/redis/RedisServiceHelper.ts new file mode 100644 index 0000000000..d996e149ce --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceHelper.ts @@ -0,0 +1,146 @@ +import type Redis from 'ioredis'; +import type { Cluster, RedisOptions } from 'ioredis'; +import config from '@/config'; +import { LoggerProxy } from 'n8n-workflow'; +import type { RedisClientType } from './RedisServiceBaseClasses'; + +export const EVENT_BUS_REDIS_STREAM = 'n8n:eventstream'; +export const COMMAND_REDIS_STREAM = 'n8n:commandstream'; +export const WORKER_RESPONSE_REDIS_STREAM = 'n8n:workerstream'; +export const EVENT_BUS_REDIS_CHANNEL = 'n8n.events'; +export const COMMAND_REDIS_CHANNEL = 'n8n.commands'; +export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response'; +export const WORKER_RESPONSE_REDIS_LIST = 'n8n:list:worker-response'; + +export function getRedisClusterNodes(): Array<{ host: string; port: number }> { + const clusterNodePairs = config + .getEnv('queue.bull.redis.clusterNodes') + .split(',') + .filter((e) => e); + return clusterNodePairs.map((pair) => { + const [host, port] = pair.split(':'); + return { host, port: parseInt(port) }; + }); +} + +export function getRedisPrefix(customPrefix?: string): string { + let prefix = customPrefix ?? config.getEnv('redis.prefix'); + if (prefix && getRedisClusterNodes().length > 0) { + if (!prefix.startsWith('{')) { + prefix = '{' + prefix; + } + if (!prefix.endsWith('}')) { + prefix += '}'; + } + } + return prefix; +} + +export function getRedisStandardClient( + redis: typeof Redis, + redisOptions?: RedisOptions, + redisType?: RedisClientType, +): Redis | Cluster { + let lastTimer = 0; + let cumulativeTimeout = 0; + const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); + const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); + const sharedRedisOptions: RedisOptions = { + host, + port, + username, + password, + db, + enableReadyCheck: false, + maxRetriesPerRequest: null, + ...redisOptions, + }; + LoggerProxy.debug( + `Initialising Redis client${redisType ? ` of type ${redisType}` : ''} connection with host: ${ + host ?? 'localhost' + } and port: ${port ?? '6379'}`, + ); + return new redis({ + ...sharedRedisOptions, + retryStrategy: (): number | null => { + const now = Date.now(); + if (now - lastTimer > 30000) { + // Means we had no timeout at all or last timeout was temporary and we recovered + lastTimer = now; + cumulativeTimeout = 0; + } else { + cumulativeTimeout += now - lastTimer; + lastTimer = now; + if (cumulativeTimeout > redisConnectionTimeoutLimit) { + LoggerProxy.error( + `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, + ); + process.exit(1); + } + } + return 500; + }, + }); +} + +export function getRedisClusterClient( + redis: typeof Redis, + redisOptions?: RedisOptions, + redisType?: RedisClientType, +): Cluster { + let lastTimer = 0; + let cumulativeTimeout = 0; + const clusterNodes = getRedisClusterNodes(); + const { username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); + const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); + const sharedRedisOptions: RedisOptions = { + username, + password, + db, + enableReadyCheck: false, + maxRetriesPerRequest: null, + ...redisOptions, + }; + LoggerProxy.debug( + `Initialising Redis cluster${ + redisType ? ` of type ${redisType}` : '' + } connection with nodes: ${clusterNodes.map((e) => `${e.host}:${e.port}`).join(',')}`, + ); + return new redis.Cluster( + clusterNodes.map((node) => ({ host: node.host, port: node.port })), + { + redisOptions: sharedRedisOptions, + clusterRetryStrategy: (): number | null => { + const now = Date.now(); + if (now - lastTimer > 30000) { + // Means we had no timeout at all or last timeout was temporary and we recovered + lastTimer = now; + cumulativeTimeout = 0; + } else { + cumulativeTimeout += now - lastTimer; + lastTimer = now; + if (cumulativeTimeout > redisConnectionTimeoutLimit) { + LoggerProxy.error( + `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, + ); + process.exit(1); + } + } + return 500; + }, + }, + ); +} + +export async function getDefaultRedisClient( + additionalRedisOptions?: RedisOptions, + redisType?: RedisClientType, +): Promise { + // eslint-disable-next-line @typescript-eslint/naming-convention + const { default: Redis } = await import('ioredis'); + const clusterNodes = getRedisClusterNodes(); + const usesRedisCluster = clusterNodes.length > 0; + return usesRedisCluster + ? getRedisClusterClient(Redis, additionalRedisOptions, redisType) + : getRedisStandardClient(Redis, additionalRedisOptions, redisType); +} diff --git a/packages/cli/src/services/redis/RedisServiceListReceiver.ts b/packages/cli/src/services/redis/RedisServiceListReceiver.ts new file mode 100644 index 0000000000..8526409003 --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceListReceiver.ts @@ -0,0 +1,57 @@ +import { Service } from 'typedi'; +import { WORKER_RESPONSE_REDIS_LIST } from './RedisServiceHelper'; +import type { RedisServiceWorkerResponseObject } from './RedisServiceCommands'; +import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; +import { LoggerProxy, jsonParse } from 'n8n-workflow'; + +@Service() +export class RedisServiceListReceiver extends RedisServiceBaseReceiver { + async init(): Promise { + await super.init('list-receiver'); + } + + async popFromHead(list: string): Promise { + if (!this.redisClient) { + await this.init(); + } + return this.redisClient?.lpop(list); + } + + async popFromTail(list: string): Promise { + if (!this.redisClient) { + await this.init(); + } + return this.redisClient?.rpop(list); + } + + private poppedResultToWorkerResponse( + poppedResult: string | null | undefined, + list: string = WORKER_RESPONSE_REDIS_LIST, + ): RedisServiceWorkerResponseObject | null { + if (poppedResult) { + try { + const workerResponse = jsonParse(poppedResult); + if (workerResponse) { + // TODO: Handle worker response + console.log('Received worker response', workerResponse); + } + return workerResponse; + } catch (error) { + LoggerProxy.warn( + `Error parsing worker response on list ${list}: ${(error as Error).message}`, + ); + } + } + return null; + } + + async popOldestWorkerResponse(): Promise { + const poppedResult = await this.popFromTail(WORKER_RESPONSE_REDIS_LIST); + return this.poppedResultToWorkerResponse(poppedResult); + } + + async popLatestWorkerResponse(): Promise { + const poppedResult = await this.popFromHead(WORKER_RESPONSE_REDIS_LIST); + return this.poppedResultToWorkerResponse(poppedResult); + } +} diff --git a/packages/cli/src/services/redis/RedisServiceListSender.ts b/packages/cli/src/services/redis/RedisServiceListSender.ts new file mode 100644 index 0000000000..bb91c1325d --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceListSender.ts @@ -0,0 +1,30 @@ +import { Service } from 'typedi'; +import { WORKER_RESPONSE_REDIS_LIST } from './RedisServiceHelper'; +import type { RedisServiceWorkerResponseObject } from './RedisServiceCommands'; +import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; + +@Service() +export class RedisServiceListSender extends RedisServiceBaseSender { + async init(senderId?: string): Promise { + await super.init('list-sender'); + this.setSenderId(senderId); + } + + async prepend(list: string, message: string): Promise { + if (!this.redisClient) { + await this.init(); + } + await this.redisClient?.lpush(list, message); + } + + async append(list: string, message: string): Promise { + if (!this.redisClient) { + await this.init(); + } + await this.redisClient?.rpush(list, message); + } + + async appendWorkerResponse(message: RedisServiceWorkerResponseObject): Promise { + await this.prepend(WORKER_RESPONSE_REDIS_LIST, JSON.stringify(message)); + } +} diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts new file mode 100644 index 0000000000..a0a850133c --- /dev/null +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -0,0 +1,39 @@ +import { Service } from 'typedi'; +import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './RedisServiceHelper'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from './RedisServiceCommands'; +import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; + +@Service() +export class RedisServicePubSubPublisher extends RedisServiceBaseSender { + async init(senderId?: string): Promise { + await super.init('publisher'); + this.setSenderId(senderId); + } + + async publish(channel: string, message: string): Promise { + if (!this.redisClient) { + await this.init(); + } + await this.redisClient?.publish(channel, message); + } + + async publishToEventLog(message: AbstractEventMessage): Promise { + await this.publish(EVENT_BUS_REDIS_CHANNEL, message.toString()); + } + + async publishToCommandChannel(message: RedisServiceCommandObject): Promise { + await this.publish(COMMAND_REDIS_CHANNEL, JSON.stringify(message)); + } + + async publishToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise { + await this.publish(WORKER_RESPONSE_REDIS_CHANNEL, JSON.stringify(message)); + } +} diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts new file mode 100644 index 0000000000..cb7b05d41f --- /dev/null +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -0,0 +1,46 @@ +import { Service } from 'typedi'; +import { LoggerProxy as Logger } from 'n8n-workflow'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './RedisServiceHelper'; +import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; + +@Service() +export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { + async init(): Promise { + await super.init('subscriber'); + + this.redisClient?.on('message', (channel: string, message: string) => { + this.messageHandlers.forEach((handler: (channel: string, message: string) => void) => + handler(channel, message), + ); + }); + } + + async subscribe(channel: string): Promise { + if (!this.redisClient) { + await this.init(); + } + await this.redisClient?.subscribe(channel, (error, count: number) => { + if (error) { + Logger.error(`Error subscribing to channel ${channel}`); + } else { + Logger.debug(`Subscribed ${count.toString()} to eventlog channel`); + } + }); + } + + async subscribeToEventLog(): Promise { + await this.subscribe(EVENT_BUS_REDIS_CHANNEL); + } + + async subscribeToCommandChannel(): Promise { + await this.subscribe(COMMAND_REDIS_CHANNEL); + } + + async subscribeToWorkerResponseChannel(): Promise { + await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); + } +} diff --git a/packages/cli/src/services/redis/RedisServiceStreamConsumer.ts b/packages/cli/src/services/redis/RedisServiceStreamConsumer.ts new file mode 100644 index 0000000000..39646cb713 --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceStreamConsumer.ts @@ -0,0 +1,92 @@ +import { Service } from 'typedi'; +import { LoggerProxy } from 'n8n-workflow'; +import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; + +type LastId = string; + +type StreamName = string; + +type StreamDetails = { + lastId: LastId; + pollingInterval: number; + waiter: NodeJS.Timer | undefined; +}; + +@Service() +export class RedisServiceStreamConsumer extends RedisServiceBaseReceiver { + // while actively listening, the stream name and last id are stored here + // removing the entry will stop the listener + streams: Map = new Map(); + + async init(): Promise { + await super.init('consumer'); + } + + async listenToStream(stream: StreamName, lastId = '$'): Promise { + if (!this.redisClient) { + await this.init(); + } + LoggerProxy.debug(`Redis client now listening to stream ${stream} starting with id ${lastId}`); + this.setLastId(stream, lastId); + const interval = this.streams.get(stream)?.pollingInterval ?? 1000; + const waiter = setInterval(async () => { + const currentLastId = this.streams.get(stream)?.lastId ?? '$'; + const results = await this.redisClient?.xread( + 'BLOCK', + interval, + 'STREAMS', + stream, + currentLastId, + ); + if (results && results.length > 0) { + const [_key, messages] = results[0]; + if (messages.length > 0) { + messages.forEach(([id, message]) => { + this.messageHandlers.forEach((handler) => handler(stream, id, message)); + }); + // Pass the last id of the results to the next round. + const newLastId = messages[messages.length - 1][0]; + this.setLastId(stream, newLastId); + } + } + }, interval); + this.setWaiter(stream, waiter); + } + + stopListeningToStream(stream: StreamName): void { + LoggerProxy.debug(`Redis client stopped listening to stream ${stream}`); + const existing = this.streams.get(stream); + if (existing?.waiter) { + clearInterval(existing.waiter); + } + this.streams.delete(stream); + } + + private updateStreamDetails(stream: StreamName, details: Partial): void { + const existing = this.streams.get(stream); + this.streams.set(stream, { + lastId: details.lastId ?? existing?.lastId ?? '$', + waiter: details.waiter ?? existing?.waiter, + pollingInterval: details.pollingInterval ?? existing?.pollingInterval ?? 1000, + }); + } + + async setPollingInterval(stream: StreamName, pollingInterval: number): Promise { + this.updateStreamDetails(stream, { pollingInterval }); + if (this.streams.get(stream)?.waiter) { + this.stopListeningToStream(stream); + await this.listenToStream(stream); + } + } + + setLastId(stream: StreamName, lastId: string): void { + this.updateStreamDetails(stream, { lastId }); + } + + setWaiter(stream: StreamName, waiter: NodeJS.Timeout): void { + // only update the waiter if the stream is still being listened to + if (this.streams.get(stream)) { + this.updateStreamDetails(stream, { waiter }); + } + } +} diff --git a/packages/cli/src/services/redis/RedisServiceStreamProducer.ts b/packages/cli/src/services/redis/RedisServiceStreamProducer.ts new file mode 100644 index 0000000000..51578d5d80 --- /dev/null +++ b/packages/cli/src/services/redis/RedisServiceStreamProducer.ts @@ -0,0 +1,42 @@ +import type { RedisValue } from 'ioredis'; +import { Service } from 'typedi'; +import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; +import { + COMMAND_REDIS_STREAM, + EVENT_BUS_REDIS_STREAM, + WORKER_RESPONSE_REDIS_STREAM, +} from './RedisServiceHelper'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from './RedisServiceCommands'; +import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; + +@Service() +export class RedisServiceStreamProducer extends RedisServiceBaseSender { + async init(senderId?: string): Promise { + await super.init('producer'); + this.setSenderId(senderId); + } + + async add(streamName: string, values: RedisValue[]): Promise { + await this.redisClient?.xadd(streamName, '*', 'senderId', this.senderId, ...values); + } + + async addToEventStream(message: AbstractEventMessage): Promise { + await this.add(EVENT_BUS_REDIS_STREAM, [ + 'message', + message.eventName, + 'event', + message.toString(), + ]); + } + + async addToCommandChannel(message: RedisServiceCommandObject): Promise { + await this.add(COMMAND_REDIS_STREAM, ['command', JSON.stringify(message)]); + } + + async addToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise { + await this.add(WORKER_RESPONSE_REDIS_STREAM, ['response', JSON.stringify(message)]); + } +} diff --git a/packages/cli/test/unit/services/cache.service.test.ts b/packages/cli/test/unit/services/cache.service.test.ts index ed68b239d8..601a624a60 100644 --- a/packages/cli/test/unit/services/cache.service.test.ts +++ b/packages/cli/test/unit/services/cache.service.test.ts @@ -1,18 +1,59 @@ import Container from 'typedi'; import { CacheService } from '@/services/cache.service'; import type { MemoryCache } from 'cache-manager'; -// import type { RedisCache } from 'cache-manager-ioredis-yet'; +import type { RedisCache } from 'cache-manager-ioredis-yet'; import config from '@/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; const cacheService = Container.get(CacheService); function setDefaultConfig() { config.set('executions.mode', 'regular'); - config.set('cache.backend', 'auto'); + config.set('cache.enabled', true); + config.set('cache.backend', 'memory'); config.set('cache.memory.maxSize', 1 * 1024 * 1024); } +interface TestObject { + test: string; + test2: number; + test3?: TestObject & { test4: TestObject }; +} + +const testObject: TestObject = { + test: 'test', + test2: 123, + test3: { + test: 'test3', + test2: 123, + test4: { + test: 'test4', + test2: 123, + }, + }, +}; + describe('cacheService', () => { + beforeAll(async () => { + LoggerProxy.init(getLogger()); + jest.mock('ioredis', () => { + const Redis = require('ioredis-mock'); + if (typeof Redis === 'object') { + // the first mock is an ioredis shim because ioredis-mock depends on it + // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 + return { + Command: { _transformer: { argument: {}, reply: {} } }, + }; + } + // second mock for our code + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function (...args: any) { + return new Redis(args); + }; + }); + }); + beforeEach(async () => { setDefaultConfig(); await Container.get(CacheService).destroy(); @@ -29,43 +70,43 @@ describe('cacheService', () => { test('should cache and retrieve a value', async () => { await cacheService.init(); await expect(cacheService.getCache()).resolves.toBeDefined(); - await cacheService.set('testString', 'test'); - await cacheService.set('testNumber', 123); + await cacheService.set('testString', 'test'); + await cacheService.set('testNumber1', 123); - await expect(cacheService.get('testString')).resolves.toBe('test'); - expect(typeof (await cacheService.get('testString'))).toBe('string'); - await expect(cacheService.get('testNumber')).resolves.toBe(123); - expect(typeof (await cacheService.get('testNumber'))).toBe('number'); + await expect(cacheService.get('testString')).resolves.toBe('test'); + expect(typeof (await cacheService.get('testString'))).toBe('string'); + await expect(cacheService.get('testNumber1')).resolves.toBe(123); + expect(typeof (await cacheService.get('testNumber1'))).toBe('number'); }); test('should honour ttl values', async () => { // set default TTL to 10ms config.set('cache.memory.ttl', 10); - await cacheService.set('testString', 'test'); - await cacheService.set('testNumber', 123, 1000); + await cacheService.set('testString', 'test'); + await cacheService.set('testNumber1', 123, 1000); const store = (await cacheService.getCache())?.store; expect(store).toBeDefined(); await expect(store!.ttl('testString')).resolves.toBeLessThanOrEqual(100); - await expect(store!.ttl('testNumber')).resolves.toBeLessThanOrEqual(1000); + await expect(store!.ttl('testNumber1')).resolves.toBeLessThanOrEqual(1000); - await expect(cacheService.get('testString')).resolves.toBe('test'); - await expect(cacheService.get('testNumber')).resolves.toBe(123); + await expect(cacheService.get('testString')).resolves.toBe('test'); + await expect(cacheService.get('testNumber1')).resolves.toBe(123); await new Promise((resolve) => setTimeout(resolve, 20)); - await expect(cacheService.get('testString')).resolves.toBeUndefined(); - await expect(cacheService.get('testNumber')).resolves.toBe(123); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await expect(cacheService.get('testNumber1')).resolves.toBe(123); }); test('should set and remove values', async () => { - await cacheService.set('testString', 'test'); - await expect(cacheService.get('testString')).resolves.toBe('test'); + await cacheService.set('testString', 'test'); + await expect(cacheService.get('testString')).resolves.toBe('test'); await cacheService.delete('testString'); - await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); }); test('should calculate maxSize', async () => { @@ -73,65 +114,228 @@ describe('cacheService', () => { await cacheService.destroy(); // 16 bytes because stringify wraps the string in quotes, so 2 bytes for the quotes - await cacheService.set('testString', 'withoutUnicode'); - await expect(cacheService.get('testString')).resolves.toBe('withoutUnicode'); + await cacheService.set('testString', 'withoutUnicode'); + await expect(cacheService.get('testString')).resolves.toBe('withoutUnicode'); await cacheService.destroy(); // should not fit! - await cacheService.set('testString', 'withUnicodeԱԲԳ'); - await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await cacheService.set('testString', 'withUnicodeԱԲԳ'); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); }); test('should set and get complex objects', async () => { - interface TestObject { - test: string; - test2: number; - test3?: TestObject & { test4: TestObject }; - } - - const testObject: TestObject = { - test: 'test', - test2: 123, - test3: { - test: 'test3', - test2: 123, - test4: { - test: 'test4', - test2: 123, - }, - }, - }; - - await cacheService.set('testObject', testObject); - await expect(cacheService.get('testObject')).resolves.toMatchObject(testObject); + await cacheService.set('testObject', testObject); + await expect(cacheService.get('testObject')).resolves.toMatchObject(testObject); }); test('should set and get multiple values', async () => { - config.set('executions.mode', 'regular'); - config.set('cache.backend', 'auto'); + await cacheService.destroy(); + expect(cacheService.isRedisCache()).toBe(false); - await cacheService.setMany([ + await cacheService.setMany([ ['testString', 'test'], ['testString2', 'test2'], ]); - await cacheService.setMany([ - ['testNumber', 123], + await cacheService.setMany([ + ['testNumber1', 123], + ['testNumber2', 456], + ]); + await expect(cacheService.getMany(['testString', 'testString2'])).resolves.toStrictEqual([ + 'test', + 'test2', + ]); + await expect(cacheService.getMany(['testNumber1', 'testNumber2'])).resolves.toStrictEqual([ + 123, 456, + ]); + }); + + test('should create a redis in queue mode', async () => { + config.set('cache.backend', 'auto'); + config.set('executions.mode', 'queue'); + await cacheService.destroy(); + await cacheService.init(); + + const cache = await cacheService.getCache(); + await expect(cacheService.getCache()).resolves.toBeDefined(); + const candidate = (await cacheService.getCache()) as RedisCache; + expect(candidate.store.client).toBeDefined(); + }); + + test('should create a redis cache if asked', async () => { + config.set('cache.backend', 'redis'); + config.set('executions.mode', 'queue'); + await cacheService.destroy(); + await cacheService.init(); + + const cache = await cacheService.getCache(); + await expect(cacheService.getCache()).resolves.toBeDefined(); + const candidate = (await cacheService.getCache()) as RedisCache; + expect(candidate.store.client).toBeDefined(); + }); + + test('should get/set/delete redis cache', async () => { + config.set('cache.backend', 'redis'); + config.set('executions.mode', 'queue'); + await cacheService.destroy(); + await cacheService.init(); + + await cacheService.set('testObject', testObject); + await expect(cacheService.get('testObject')).resolves.toMatchObject(testObject); + await cacheService.delete('testObject'); + await expect(cacheService.get('testObject')).resolves.toBeUndefined(); + }); + + // NOTE: mset and mget are not supported by ioredis-mock + // test('should set and get multiple values with redis', async () => { + // }); + + test('should return fallback value if key is not set', async () => { + await cacheService.reset(); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await expect( + cacheService.get('testString', { + fallbackValue: 'fallback', + }), + ).resolves.toBe('fallback'); + }); + + test('should call refreshFunction if key is not set', async () => { + await cacheService.reset(); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await expect( + cacheService.get('testString', { + refreshFunction: async () => 'refreshed', + fallbackValue: 'this should not be returned', + }), + ).resolves.toBe('refreshed'); + }); + + test('should transparently handle disabled cache', async () => { + await cacheService.disable(); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await cacheService.set('testString', 'whatever'); + await expect(cacheService.get('testString')).resolves.toBeUndefined(); + await expect( + cacheService.get('testString', { + fallbackValue: 'fallback', + }), + ).resolves.toBe('fallback'); + await expect( + cacheService.get('testString', { + refreshFunction: async () => 'refreshed', + fallbackValue: 'this should not be returned', + }), + ).resolves.toBe('refreshed'); + }); + + test('should set and get partial results', async () => { + await cacheService.setMany([ + ['testNumber1', 123], + ['testNumber2', 456], + ]); + await expect(cacheService.getMany(['testNumber1', 'testNumber2'])).resolves.toStrictEqual([ + 123, 456, + ]); + await expect(cacheService.getMany(['testNumber3', 'testNumber2'])).resolves.toStrictEqual([ + undefined, + 456, + ]); + }); + + test('should getMany and fix partial results and set single key', async () => { + await cacheService.setMany([ + ['testNumber1', 123], ['testNumber2', 456], ]); await expect( - cacheService.getMany(['testString', 'testString2']), - ).resolves.toStrictEqual(['test', 'test2']); + cacheService.getMany(['testNumber1', 'testNumber2', 'testNumber3']), + ).resolves.toStrictEqual([123, 456, undefined]); + await expect(cacheService.get('testNumber3')).resolves.toBeUndefined(); await expect( - cacheService.getMany(['testNumber', 'testNumber2']), + cacheService.getMany(['testNumber1', 'testNumber2', 'testNumber3'], { + async refreshFunctionEach(key) { + return key === 'testNumber3' ? 789 : undefined; + }, + }), + ).resolves.toStrictEqual([123, 456, 789]); + await expect(cacheService.get('testNumber3')).resolves.toBe(789); + }); + + test('should getMany and set all keys', async () => { + await cacheService.setMany([ + ['testNumber1', 123], + ['testNumber2', 456], + ]); + await expect( + cacheService.getMany(['testNumber1', 'testNumber2', 'testNumber3']), + ).resolves.toStrictEqual([123, 456, undefined]); + await expect(cacheService.get('testNumber3')).resolves.toBeUndefined(); + await expect( + cacheService.getMany(['testNumber1', 'testNumber2', 'testNumber3'], { + async refreshFunctionMany(keys) { + return [111, 222, 333]; + }, + }), + ).resolves.toStrictEqual([111, 222, 333]); + await expect(cacheService.get('testNumber1')).resolves.toBe(111); + await expect(cacheService.get('testNumber2')).resolves.toBe(222); + await expect(cacheService.get('testNumber3')).resolves.toBe(333); + }); + + test('should set and get multiple values with fallbackValue', async () => { + await cacheService.disable(); + await cacheService.setMany([ + ['testNumber1', 123], + ['testNumber2', 456], + ]); + await expect(cacheService.getMany(['testNumber1', 'testNumber2'])).resolves.toStrictEqual([ + undefined, + undefined, + ]); + await expect( + cacheService.getMany(['testNumber1', 'testNumber2'], { + fallbackValues: [123, 456], + }), + ).resolves.toStrictEqual([123, 456]); + await expect( + cacheService.getMany(['testNumber1', 'testNumber2'], { + refreshFunctionMany: async () => [123, 456], + fallbackValues: [0, 1], + }), ).resolves.toStrictEqual([123, 456]); }); - // This test is skipped because it requires the Redis service - // test('should create a redis cache if asked', async () => { - // config.set('cache.backend', 'redis'); - // await cacheService.init(); - // expect(cacheService.getCacheInstance()).toBeDefined(); - // const candidate = cacheService.getCacheInstance() as RedisCache; - // expect(candidate.store.client).toBeDefined(); - // }); + + test('should deal with unicode keys', async () => { + const key = '? > ":< ! withUnicodeԱԲԳ'; + await cacheService.set(key, 'test'); + await expect(cacheService.get(key)).resolves.toBe('test'); + await cacheService.delete(key); + await expect(cacheService.get(key)).resolves.toBeUndefined(); + }); + + test('should deal with unicode keys in redis', async () => { + config.set('cache.backend', 'redis'); + config.set('executions.mode', 'queue'); + await cacheService.destroy(); + await cacheService.init(); + const key = '? > ":< ! withUnicodeԱԲԳ'; + + expect(((await cacheService.getCache()) as RedisCache).store.client).toBeDefined(); + + await cacheService.set(key, 'test'); + await expect(cacheService.get(key)).resolves.toBe('test'); + await cacheService.delete(key); + await expect(cacheService.get(key)).resolves.toBeUndefined(); + }); + + test('should not cache null or undefined values', async () => { + await cacheService.set('nullValue', null); + await cacheService.set('undefValue', undefined); + await cacheService.set('normalValue', 'test'); + + await expect(cacheService.get('normalValue')).resolves.toBe('test'); + await expect(cacheService.get('undefValue')).resolves.toBeUndefined(); + await expect(cacheService.get('nullValue')).resolves.toBeUndefined(); + }); }); diff --git a/packages/cli/test/unit/services/redis.service.test.ts b/packages/cli/test/unit/services/redis.service.test.ts new file mode 100644 index 0000000000..ee541a2d25 --- /dev/null +++ b/packages/cli/test/unit/services/redis.service.test.ts @@ -0,0 +1,138 @@ +import Container from 'typedi'; +import config from '@/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; +import { RedisService } from '@/services/redis.service'; + +const redisService = Container.get(RedisService); + +function setDefaultConfig() { + config.set('executions.mode', 'queue'); +} + +interface TestObject { + test: string; + test2: number; + test3?: TestObject & { test4: TestObject }; +} + +const testObject: TestObject = { + test: 'test', + test2: 123, + test3: { + test: 'test3', + test2: 123, + test4: { + test: 'test4', + test2: 123, + }, + }, +}; + +const PUBSUB_CHANNEL = 'testchannel'; +const LIST_CHANNEL = 'testlist'; +const STREAM_CHANNEL = 'teststream'; + +describe('cacheService', () => { + beforeAll(async () => { + LoggerProxy.init(getLogger()); + jest.mock('ioredis', () => { + const Redis = require('ioredis-mock'); + if (typeof Redis === 'object') { + // the first mock is an ioredis shim because ioredis-mock depends on it + // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 + return { + Command: { _transformer: { argument: {}, reply: {} } }, + }; + } + // second mock for our code + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function (...args: any) { + return new Redis(args); + }; + }); + setDefaultConfig(); + }); + + test('should create pubsub publisher and subscriber with handler', async () => { + const pub = await redisService.getPubSubPublisher(); + const sub = await redisService.getPubSubSubscriber(); + expect(pub).toBeDefined(); + expect(sub).toBeDefined(); + + const mockHandler = jest.fn(); + mockHandler.mockImplementation((channel: string, message: string) => {}); + sub.addMessageHandler(PUBSUB_CHANNEL, mockHandler); + await sub.subscribe(PUBSUB_CHANNEL); + await pub.publish(PUBSUB_CHANNEL, 'test'); + await new Promise((resolve) => + setTimeout(async () => { + resolve(0); + }, 50), + ); + expect(mockHandler).toHaveBeenCalled(); + await sub.destroy(); + await pub.destroy(); + }); + + test('should create list sender and receiver', async () => { + const sender = await redisService.getListSender(); + const receiver = await redisService.getListReceiver(); + expect(sender).toBeDefined(); + expect(receiver).toBeDefined(); + await sender.prepend(LIST_CHANNEL, 'middle'); + await sender.prepend(LIST_CHANNEL, 'first'); + await sender.append(LIST_CHANNEL, 'end'); + let popResult = await receiver.popFromHead(LIST_CHANNEL); + expect(popResult).toBe('first'); + popResult = await receiver.popFromTail(LIST_CHANNEL); + expect(popResult).toBe('end'); + await sender.prepend(LIST_CHANNEL, 'somevalue'); + popResult = await receiver.popFromTail(LIST_CHANNEL); + expect(popResult).toBe('middle'); + await sender.destroy(); + await receiver.destroy(); + }); + + // NOTE: This test is failing because the mock Redis client does not support streams apparently + // eslint-disable-next-line n8n-local-rules/no-skipped-tests + test.skip('should create stream producer and consumer', async () => { + const consumer = await redisService.getStreamConsumer(); + const producer = await redisService.getStreamProducer(); + + expect(consumer).toBeDefined(); + expect(producer).toBeDefined(); + + const mockHandler = jest.fn(); + mockHandler.mockImplementation((stream: string, id: string, message: string[]) => { + console.log('Received message', stream, id, message); + }); + consumer.addMessageHandler('some handler', mockHandler); + + await consumer.setPollingInterval(STREAM_CHANNEL, 50); + await consumer.listenToStream(STREAM_CHANNEL); + + let timeout; + await new Promise((resolve) => { + timeout = setTimeout(async () => { + await producer.add(STREAM_CHANNEL, ['message', 'testMessage', 'event', 'testEveny']); + resolve(0); + }, 50); + }); + + await new Promise((resolve) => + setTimeout(async () => { + resolve(0); + }, 100), + ); + + clearInterval(timeout); + + consumer.stopListeningToStream(STREAM_CHANNEL); + + expect(mockHandler).toHaveBeenCalled(); + + await consumer.destroy(); + await producer.destroy(); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a92f51ed4a..eee5f7a989 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -557,6 +557,9 @@ importers: concurrently: specifier: ^8.2.0 version: 8.2.0 + ioredis-mock: + specifier: ^8.8.1 + version: 8.8.1(@types/ioredis-mock@8.2.2)(ioredis@5.2.4) ts-essentials: specifier: ^7.0.3 version: 7.0.3(typescript@5.1.6) @@ -3599,9 +3602,12 @@ packages: '@intlify/shared': 9.2.2 dev: false + /@ioredis/as-callback@3.0.0: + resolution: {integrity: sha512-Kqv1rZ3WbgOrS+hgzJ5xG5WQuhvzzSTRYvNeyPMLOAM78MHSnuKI20JeJGbpuAt//LCuP0vsexZcorqW7kWhJg==} + dev: true + /@ioredis/commands@1.2.0: resolution: {integrity: sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==} - dev: false /@isaacs/cliui@8.0.2: resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} @@ -6199,6 +6205,14 @@ packages: rxjs: 6.6.7 dev: true + /@types/ioredis-mock@8.2.2: + resolution: {integrity: sha512-bnbPHOjxy4TUDjRh61MMoK2QvDNZqrMDXJYrEDZP/HPFvBubR24CQ0DBi5lgWhLxG4lvVsXPRDXtZ03+JgonoQ==} + dependencies: + ioredis: 5.3.2 + transitivePeerDependencies: + - supports-color + dev: true + /@types/is-function@1.0.1: resolution: {integrity: sha512-A79HEEiwXTFtfY+Bcbo58M2GRYzCr9itHWzbzHVFNEYCcoU/MMGwYYf721gBrnhpj1s6RGVVha/IgNFnR0Iw/Q==} dev: true @@ -8060,7 +8074,7 @@ packages: /axios@0.21.4: resolution: {integrity: sha512-ut5vewkiu8jjGBdqpM44XxjuCjq9LAKeHVmoVfHVzy8eHgxxq8SbAVQNovDA8mVi05kP0Ea/n/UzcSHcTJQfNg==} dependencies: - follow-redirects: 1.15.2(debug@4.3.4) + follow-redirects: 1.15.2(debug@3.2.7) transitivePeerDependencies: - debug dev: false @@ -8089,6 +8103,7 @@ packages: form-data: 4.0.0 transitivePeerDependencies: - debug + dev: true /babel-core@7.0.0-bridge.0(@babel/core@7.22.9): resolution: {integrity: sha512-poPX9mZH/5CSanm50Q+1toVci6pv5KSRv/5TWCwtzQS5XEwn40BcCrgIeMFWP9CKKIniKXNxoIOnOq4VVlGXhg==} @@ -9027,7 +9042,6 @@ packages: /cluster-key-slot@1.1.1: resolution: {integrity: sha512-rwHwUfXL40Chm1r08yrhU3qpUvdVlgkKNeyeGPOxnW8/SyVDvgRaed/Uz54AqWNaTCAThlj6QAs3TZcKI0xDEw==} engines: {node: '>=0.10.0'} - dev: false /co@4.6.0: resolution: {integrity: sha512-QVb0dM5HvG+uaxitm8wONl7jltx8dqhfU33DcqtOZcLSVIKSDDLDi7+0LbAKiyI8hD9u42m2YxXSkMGWThaecQ==} @@ -9943,7 +9957,6 @@ packages: /denque@2.1.0: resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} engines: {node: '>=0.10'} - dev: false /depd@1.1.2: resolution: {integrity: sha512-7emPTl6Dpo6JRXOXjLRxck+FlLRX5847cLKEn00PLAgc3g2hTZZgr+e4c2v6QpSmLeFP3n5yUo7ft6avBK/5jQ==} @@ -11415,6 +11428,22 @@ packages: resolution: {integrity: sha512-OP2IUU6HeYKJi3i0z4A19kHMQoLVs4Hc+DPqqxI2h/DPZHTm/vjsfC6P0b4jCMy14XizLBqvndQ+UilD7707Jw==} dev: false + /fengari-interop@0.1.3(fengari@0.1.4): + resolution: {integrity: sha512-EtZ+oTu3kEwVJnoymFPBVLIbQcCoy9uWCVnMA6h3M/RqHkUBsLYp29+RRHf9rKr6GwjubWREU1O7RretFIXjHw==} + peerDependencies: + fengari: ^0.1.0 + dependencies: + fengari: 0.1.4 + dev: true + + /fengari@0.1.4: + resolution: {integrity: sha512-6ujqUuiIYmcgkGz8MGAdERU57EIluGGPSUgGPTsco657EHa+srq0S3/YUl/r9kx1+D+d4rGfYObd+m8K22gB1g==} + dependencies: + readline-sync: 1.4.10 + sprintf-js: 1.1.2 + tmp: 0.0.33 + dev: true + /fetch-retry@5.0.3: resolution: {integrity: sha512-uJQyMrX5IJZkhoEUBQ3EjxkeiZkppBd5jS/fMTJmfZxLSiaQjv2zD0kTvuvkSH89uFvgSlB6ueGpjD3HWN7Bxw==} dev: true @@ -11661,6 +11690,7 @@ packages: optional: true dependencies: debug: 4.3.4(supports-color@8.1.1) + dev: true /for-each@0.3.3: resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==} @@ -12837,6 +12867,22 @@ packages: engines: {node: '>=0.10.0'} dev: true + /ioredis-mock@8.8.1(@types/ioredis-mock@8.2.2)(ioredis@5.2.4): + resolution: {integrity: sha512-zXSaDf86EcDFVf8jMOirWU6Js4WcwLd/cxwJiCh9EbD1GoHfeE/fVqLhLz/l1MkyL85Fb6MwfF2Fr/9819Ul9Q==} + engines: {node: '>=12.22'} + peerDependencies: + '@types/ioredis-mock': ^8 + ioredis: ^5 + dependencies: + '@ioredis/as-callback': 3.0.0 + '@ioredis/commands': 1.2.0 + '@types/ioredis-mock': 8.2.2 + fengari: 0.1.4 + fengari-interop: 0.1.3(fengari@0.1.4) + ioredis: 5.2.4 + semver: 7.5.4 + dev: true + /ioredis@4.28.5: resolution: {integrity: sha512-3GYo0GJtLqgNXj4YhrisLaNNvWSNwSS2wS4OELGfGxH8I69+XfNdnmV1AyN+ZqMh0i7eX+SWjrwFKDBDgfBC1A==} engines: {node: '>=6'} @@ -12871,7 +12917,6 @@ packages: standard-as-callback: 2.1.0 transitivePeerDependencies: - supports-color - dev: false /ioredis@5.3.2: resolution: {integrity: sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==} @@ -12888,7 +12933,6 @@ packages: standard-as-callback: 2.1.0 transitivePeerDependencies: - supports-color - dev: false /ip@1.1.8: resolution: {integrity: sha512-PuExPYUiu6qMBQb4l06ecm6T6ujzhmh+MeJcW9wa89PoAz5pvd4zPgN5WJV104mb6S2T1AwNIAaB70JNrLQWhg==} @@ -14712,7 +14756,6 @@ packages: /lodash.defaults@4.2.0: resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} - dev: false /lodash.find@4.6.0: resolution: {integrity: sha512-yaRZoAV3Xq28F1iafWN1+a0rflOej93l1DQUejs3SZ41h2O9UJBoS9aueGjPDgAl4B6tPC0NuuchLKaDQQ3Isg==} @@ -14738,7 +14781,6 @@ packages: /lodash.isarguments@3.1.0: resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} - dev: false /lodash.isempty@4.4.0: resolution: {integrity: sha512-oKMuF3xEeqDltrGMfDxAPGIVMSSRv8tbRSODbrs4KGsRRLEhrW8N8Rd4DRgB2+621hY8A8XwwrTVhXWpxFvMzg==} @@ -16222,7 +16264,6 @@ packages: /os-tmpdir@1.0.2: resolution: {integrity: sha512-D2FR03Vir7FIu45XBY20mTb+/ZSWB00sjU9jdQXt83gDrI4Ztz5Fs7/yy74g2N5SVQY4xY1qDr4rNddwYRVX0g==} engines: {node: '>=0.10.0'} - dev: false /ospath@1.2.2: resolution: {integrity: sha512-o6E5qJV5zkAbIDNhGSIlyOhScKXgQrSRMilfph0clDfM0nEnBOlKlH4sWDmG95BW/CvwNz0vmm7dJVtU2KlMiA==} @@ -16990,7 +17031,7 @@ packages: resolution: {integrity: sha512-aXYe/D+28kF63W8Cz53t09ypEORz+ULeDCahdAqhVrRm2scbOXFbtnn0GGhvMpYe45grepLKuwui9KxrZ2ZuMw==} engines: {node: '>=14.17.0'} dependencies: - axios: 0.27.2(debug@4.3.4) + axios: 0.27.2(debug@3.2.7) transitivePeerDependencies: - debug dev: false @@ -17624,6 +17665,11 @@ packages: dependencies: picomatch: 2.3.1 + /readline-sync@1.4.10: + resolution: {integrity: sha512-gNva8/6UAe8QYepIQH/jQ2qn91Qj0B9sYjMBBs3QOB8F2CXcKgLxQaJRP76sWVRQt+QU+8fAkCbCvjjMFu7Ycw==} + engines: {node: '>= 0.8.0'} + dev: true + /recast@0.21.5: resolution: {integrity: sha512-hjMmLaUXAm1hIuTqOdeYObMslq/q+Xff6QE3Y2P+uoHAg2nmVlLBps2hzh1UJDdMtDTMXOFewK6ky51JQIeECg==} engines: {node: '>= 4'} @@ -17681,14 +17727,12 @@ packages: /redis-errors@1.2.0: resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} engines: {node: '>=4'} - dev: false /redis-parser@3.0.0: resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} engines: {node: '>=4'} dependencies: redis-errors: 1.2.0 - dev: false /redis@3.1.2: resolution: {integrity: sha512-grn5KoZLr/qrRQVwoSkmzdbw6pwF+/rwODtrOr6vuBRiR/f3rjSTGupbF90Zpqm2oenix8Do6RV7pYEkGwlKkw==} @@ -18755,7 +18799,6 @@ packages: /sprintf-js@1.1.2: resolution: {integrity: sha512-VE0SOVEHCk7Qc8ulkWw3ntAzXuqf7S2lvwQaDLRnUeIEaKNQJzV6BwmLKhOqT61aGhfUMrXeaBk+oDGCzvhcug==} - dev: false /sqlite3@5.1.6: resolution: {integrity: sha512-olYkWoKFVNSSSQNvxVUfjiVbz3YtBwTJj+mfV5zpHmqW3sELx2Cf4QCdirMelhM5Zh+KDVaKgQHqCxrqiWHybw==} @@ -18843,7 +18886,6 @@ packages: /standard-as-callback@2.1.0: resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} - dev: false /start-server-and-test@2.0.0: resolution: {integrity: sha512-UqKLw0mJbfrsG1jcRLTUlvuRi9sjNuUiDOLI42r7R5fA9dsFoywAy9DoLXNYys9B886E4RCKb+qM1Gzu96h7DQ==} @@ -19561,7 +19603,6 @@ packages: engines: {node: '>=0.6.0'} dependencies: os-tmpdir: 1.0.2 - dev: false /tmp@0.1.0: resolution: {integrity: sha512-J7Z2K08jbGcdA1kkQpJSqLF6T0tdQqpR2pnSUXsIchbPdTI9v3e85cLW0d6WDhwuAleOV71j2xWs8qMPfK7nKw==}