diff --git a/packages/@n8n/api-types/src/index.ts b/packages/@n8n/api-types/src/index.ts index f9f141c80f..381f1ef469 100644 --- a/packages/@n8n/api-types/src/index.ts +++ b/packages/@n8n/api-types/src/index.ts @@ -51,9 +51,7 @@ export { type DataStore, type DataStoreColumn, type DataStoreCreateColumnSchema, - type DataStoreColumnJsType, type DataStoreListFilter, - type DataStoreRows, type DataStoreListOptions, type DataStoreUserTableName, dateTimeSchema, diff --git a/packages/@n8n/api-types/src/schemas/data-store.schema.ts b/packages/@n8n/api-types/src/schemas/data-store.schema.ts index eda3040f09..e77112161a 100644 --- a/packages/@n8n/api-types/src/schemas/data-store.schema.ts +++ b/packages/@n8n/api-types/src/schemas/data-store.schema.ts @@ -59,7 +59,3 @@ export const dateTimeSchema = z // Dates are received as date strings and validated before insertion export const dataStoreColumnValueSchema = z.union([z.string(), z.number(), z.boolean(), z.null()]); - -export type DataStoreColumnJsType = string | number | boolean | Date; - -export type DataStoreRows = Array>; diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index cdc1f8b20c..ac9e229ff3 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -24,6 +24,7 @@ import { SubworkflowPolicyChecker, } from '@/executions/pre-execution-checks'; import { ExternalHooks } from '@/external-hooks'; +import { DataStoreProxyService } from '@/modules/data-store/data-store-proxy.service'; import { UrlService } from '@/services/url.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { Telemetry } from '@/telemetry'; @@ -98,6 +99,7 @@ describe('WorkflowExecuteAdditionalData', () => { mockInstance(CredentialsPermissionChecker); mockInstance(SubworkflowPolicyChecker); mockInstance(WorkflowStatisticsService); + mockInstance(DataStoreProxyService); const urlService = mockInstance(UrlService); Container.set(UrlService, urlService); diff --git a/packages/cli/src/modules/data-store/__tests__/data-store-proxy.service.test.ts b/packages/cli/src/modules/data-store/__tests__/data-store-proxy.service.test.ts new file mode 100644 index 0000000000..8a083460ee --- /dev/null +++ b/packages/cli/src/modules/data-store/__tests__/data-store-proxy.service.test.ts @@ -0,0 +1,230 @@ +import type { Logger } from '@n8n/backend-common'; +import { testDb, testModules } from '@n8n/backend-test-utils'; +import type { Project } from '@n8n/db'; +import { mock } from 'jest-mock-extended'; +import type { + AddDataStoreColumnOptions, + INode, + ListDataStoreRowsOptions, + MoveDataStoreColumnOptions, + UpsertDataStoreRowsOptions, + Workflow, +} from 'n8n-workflow'; + +import type { OwnershipService } from '@/services/ownership.service'; + +import { DataStoreProxyService } from '../data-store-proxy.service'; +import type { DataStoreService } from '../data-store.service'; + +const PROJECT_ID = 'project-id'; + +beforeAll(async () => { + await testModules.loadModules(['data-store']); + await testDb.init(); +}); +describe('DataStoreProxyService', () => { + let dataStoreServiceMock = mock(); + let ownershipServiceMock = mock(); + let loggerMock = mock(); + let dataStoreProxyService: DataStoreProxyService; + + let workflow: Workflow; + let node: INode; + let project: Project; + + beforeEach(() => { + dataStoreServiceMock = mock(); + ownershipServiceMock = mock(); + loggerMock = mock(); + + dataStoreProxyService = new DataStoreProxyService( + dataStoreServiceMock, + ownershipServiceMock, + loggerMock, + ); + + workflow = mock({ + id: 'workflow-id', + }); + project = mock({ + id: PROJECT_ID, + }); + node = mock({ + type: 'n8n-nodes-base.dataStore', + }); + + ownershipServiceMock.getWorkflowProjectCached.mockResolvedValueOnce(project); + }); + + describe('makeAggregateOperations', () => { + it('should call getManyAndCount with correct parameters', async () => { + const options = { filter: { name: 'test' } }; + + const aggregateOperations = await dataStoreProxyService.getDataStoreAggregateProxy( + workflow, + node, + ); + await aggregateOperations.getManyAndCount(options); + + expect(dataStoreServiceMock.getManyAndCount).toBeCalledWith({ + filter: { name: 'test', projectId: PROJECT_ID }, + }); + }); + + it('should call createDataStore with correct parameters', async () => { + const options = { name: 'newDataStore', columns: [] }; + + const aggregateOperations = await dataStoreProxyService.getDataStoreAggregateProxy( + workflow, + node, + ); + await aggregateOperations.createDataStore(options); + + expect(dataStoreServiceMock.createDataStore).toBeCalledWith(PROJECT_ID, options); + }); + + it('should call deleteDataStoreByProject when proxy calls deleteDataStoreAll', async () => { + const aggregateOperations = await dataStoreProxyService.getDataStoreAggregateProxy( + workflow, + node, + ); + await aggregateOperations.deleteDataStoreAll(); + + expect(dataStoreServiceMock.deleteDataStoreByProjectId).toBeCalledWith(PROJECT_ID); + }); + }); + it('should call updateDataStore with correct parameters', async () => { + const options = { name: 'updatedDataStore' }; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.updateDataStore(options); + + expect(dataStoreServiceMock.updateDataStore).toBeCalledWith( + 'dataStore-id', + PROJECT_ID, + options, + ); + }); + + it('should call deleteDataStore with correct parameters', async () => { + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.deleteDataStore(); + + expect(dataStoreServiceMock.deleteDataStore).toBeCalledWith('dataStore-id', PROJECT_ID); + }); + + it('should call getColumns with correct parameters', async () => { + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.getColumns(); + + expect(dataStoreServiceMock.getColumns).toBeCalledWith('dataStore-id', PROJECT_ID); + }); + + it('should call addColumn with correct parameters', async () => { + const options: AddDataStoreColumnOptions = { name: 'newColumn', type: 'string' }; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.addColumn(options); + + expect(dataStoreServiceMock.addColumn).toBeCalledWith('dataStore-id', PROJECT_ID, options); + }); + + it('should call moveColumn with correct parameters', async () => { + const columnId = 'column-id'; + const options: MoveDataStoreColumnOptions = { targetIndex: 1 }; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.moveColumn(columnId, options); + + expect(dataStoreServiceMock.moveColumn).toBeCalledWith( + 'dataStore-id', + PROJECT_ID, + columnId, + options, + ); + }); + + it('should call deleteColumn with correct parameters', async () => { + const columnId = 'column-id'; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.deleteColumn(columnId); + + expect(dataStoreServiceMock.deleteColumn).toBeCalledWith('dataStore-id', PROJECT_ID, columnId); + }); + + it('should call getManyRowsAndCount with correct parameters', async () => { + const options: ListDataStoreRowsOptions = { + filter: { + filters: [{ columnName: 'x', condition: 'eq', value: 'testRow' }], + type: 'and', + }, + }; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.getManyRowsAndCount(options); + + expect(dataStoreServiceMock.getManyRowsAndCount).toBeCalledWith( + 'dataStore-id', + PROJECT_ID, + options, + ); + }); + + it('should call insertRows with correct parameters', async () => { + const rows = [{ id: 1, name: 'row1' }]; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.insertRows(rows); + + expect(dataStoreServiceMock.insertRows).toBeCalledWith('dataStore-id', PROJECT_ID, rows); + }); + + it('should call upsertRows with correct parameters', async () => { + const options: UpsertDataStoreRowsOptions = { + matchFields: ['name'], + rows: [{ id: 1, name: 'row1' }], + }; + + const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy( + workflow, + node, + 'dataStore-id', + ); + await dataStoreOperations.upsertRows(options); + + expect(dataStoreServiceMock.upsertRows).toBeCalledWith('dataStore-id', PROJECT_ID, options); + }); +}); diff --git a/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts b/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts index 6f90885a5c..0d144e9189 100644 --- a/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts +++ b/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts @@ -1,4 +1,4 @@ -import type { DataStoreRows } from '@n8n/api-types'; +import type { DataStoreRows } from 'n8n-workflow'; import { addColumnQuery, diff --git a/packages/cli/src/modules/data-store/data-store-proxy.service.ts b/packages/cli/src/modules/data-store/data-store-proxy.service.ts new file mode 100644 index 0000000000..c8a284f3f8 --- /dev/null +++ b/packages/cli/src/modules/data-store/data-store-proxy.service.ts @@ -0,0 +1,136 @@ +import type { DataStoreListOptions } from '@n8n/api-types'; +import { Logger } from '@n8n/backend-common'; +import { Service } from '@n8n/di'; +import { + AddDataStoreColumnOptions, + CreateDataStoreOptions, + DataStore, + DataStoreColumn, + DataStoreProxyProvider, + DataStoreRows, + IDataStoreProjectAggregateService, + IDataStoreProjectService, + INode, + ListDataStoreOptions, + ListDataStoreRowsOptions, + MoveDataStoreColumnOptions, + UpdateDataStoreOptions, + UpsertDataStoreRowsOptions, + Workflow, +} from 'n8n-workflow'; + +import { DataStoreService } from './data-store.service'; + +import { OwnershipService } from '@/services/ownership.service'; + +@Service() +export class DataStoreProxyService implements DataStoreProxyProvider { + constructor( + private readonly dataStoreService: DataStoreService, + private readonly ownershipService: OwnershipService, + private readonly logger: Logger, + ) { + this.logger = this.logger.scoped('data-store'); + } + + private validateRequest(node: INode) { + if (node.type !== 'n8n-nodes-base.dataStore') { + throw new Error('This proxy is only available for data store nodes'); + } + } + + private async getProjectId(workflow: Workflow) { + const homeProject = await this.ownershipService.getWorkflowProjectCached(workflow.id); + return homeProject.id; + } + + async getDataStoreAggregateProxy( + workflow: Workflow, + node: INode, + ): Promise { + this.validateRequest(node); + const projectId = await this.getProjectId(workflow); + + return this.makeAggregateOperations(projectId); + } + + async getDataStoreProxy( + workflow: Workflow, + node: INode, + dataStoreId: string, + ): Promise { + this.validateRequest(node); + const projectId = await this.getProjectId(workflow); + + return this.makeDataStoreOperations(projectId, dataStoreId); + } + + private makeAggregateOperations(projectId: string): IDataStoreProjectAggregateService { + const dataStoreService = this.dataStoreService; + return { + async getManyAndCount(options: ListDataStoreOptions = {}) { + const serviceOptions: DataStoreListOptions = { + ...options, + filter: { projectId, ...(options.filter ?? {}) }, + }; + return await dataStoreService.getManyAndCount(serviceOptions); + }, + + async createDataStore(options: CreateDataStoreOptions): Promise { + return await dataStoreService.createDataStore(projectId, options); + }, + + async deleteDataStoreAll(): Promise { + return await dataStoreService.deleteDataStoreByProjectId(projectId); + }, + }; + } + + private makeDataStoreOperations( + projectId: string, + dataStoreId: string, + ): Omit { + const dataStoreService = this.dataStoreService; + + return { + // DataStore management + async updateDataStore(options: UpdateDataStoreOptions): Promise { + return await dataStoreService.updateDataStore(dataStoreId, projectId, options); + }, + + async deleteDataStore(): Promise { + return await dataStoreService.deleteDataStore(dataStoreId, projectId); + }, + + // Column operations + async getColumns(): Promise { + return await dataStoreService.getColumns(dataStoreId, projectId); + }, + + async addColumn(options: AddDataStoreColumnOptions): Promise { + return await dataStoreService.addColumn(dataStoreId, projectId, options); + }, + + async moveColumn(columnId: string, options: MoveDataStoreColumnOptions): Promise { + return await dataStoreService.moveColumn(dataStoreId, projectId, columnId, options); + }, + + async deleteColumn(columnId: string): Promise { + return await dataStoreService.deleteColumn(dataStoreId, projectId, columnId); + }, + + // Row operations + async getManyRowsAndCount(options: Partial) { + return await dataStoreService.getManyRowsAndCount(dataStoreId, projectId, options); + }, + + async insertRows(rows: DataStoreRows) { + return await dataStoreService.insertRows(dataStoreId, projectId, rows); + }, + + async upsertRows(options: UpsertDataStoreRowsOptions) { + return await dataStoreService.upsertRows(dataStoreId, projectId, options); + }, + }; + } +} diff --git a/packages/cli/src/modules/data-store/data-store-rows.repository.ts b/packages/cli/src/modules/data-store/data-store-rows.repository.ts index 7e39c03a4c..23b39ce8fc 100644 --- a/packages/cli/src/modules/data-store/data-store-rows.repository.ts +++ b/packages/cli/src/modules/data-store/data-store-rows.repository.ts @@ -2,7 +2,6 @@ import type { ListDataStoreContentQueryDto, ListDataStoreContentFilter, DataStoreUserTableName, - DataStoreRows, UpsertDataStoreRowsDto, } from '@n8n/api-types'; import { CreateTable, DslColumn } from '@n8n/db'; @@ -27,6 +26,7 @@ import { toDslColumns, toTableName, } from './utils/sql-utils'; +import { DataStoreRows } from 'n8n-workflow'; // eslint-disable-next-line @typescript-eslint/no-explicit-any type QueryBuilder = SelectQueryBuilder; diff --git a/packages/cli/src/modules/data-store/data-store.service.ts b/packages/cli/src/modules/data-store/data-store.service.ts index a8acd23908..f3ed12db18 100644 --- a/packages/cli/src/modules/data-store/data-store.service.ts +++ b/packages/cli/src/modules/data-store/data-store.service.ts @@ -5,7 +5,6 @@ import type { ListDataStoreContentQueryDto, MoveDataStoreColumnDto, DataStoreListOptions, - DataStoreRows, UpsertDataStoreRowsDto, UpdateDataStoreDto, } from '@n8n/api-types'; @@ -20,6 +19,7 @@ import { DataStoreNameConflictError } from './errors/data-store-name-conflict.er import { DataStoreNotFoundError } from './errors/data-store-not-found.error'; import { DataStoreValidationError } from './errors/data-store-validation.error'; import { toTableName, normalizeRows } from './utils/sql-utils'; +import { DataStoreRows } from 'n8n-workflow'; @Service() export class DataStoreService { diff --git a/packages/cli/src/modules/data-store/utils/sql-utils.ts b/packages/cli/src/modules/data-store/utils/sql-utils.ts index 6becf3c5e6..a58d0eb1d2 100644 --- a/packages/cli/src/modules/data-store/utils/sql-utils.ts +++ b/packages/cli/src/modules/data-store/utils/sql-utils.ts @@ -1,12 +1,11 @@ import { DATA_STORE_COLUMN_REGEX, - type DataStoreRows, type DataStoreCreateColumnSchema, type DataStoreColumn, } from '@n8n/api-types'; import { DslColumn } from '@n8n/db'; import type { DataSourceOptions } from '@n8n/typeorm'; -import { UnexpectedError } from 'n8n-workflow'; +import { UnexpectedError, type DataStoreRows } from 'n8n-workflow'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts index d50653c761..db26083137 100644 --- a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -23,6 +23,7 @@ import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.serv import { JobProcessor } from '../job-processor'; import type { Job } from '../scaling.types'; +import { DataStoreProxyService } from '@/modules/data-store/data-store-proxy.service'; mockInstance(VariablesService, { getAllCached: jest.fn().mockResolvedValue([]), @@ -32,6 +33,7 @@ mockInstance(ExternalSecretsProxy); mockInstance(WorkflowStaticDataService); mockInstance(WorkflowStatisticsService); mockInstance(ExternalHooks); +mockInstance(DataStoreProxyService); const processRunExecutionDataMock = jest.fn(); jest.mock('n8n-core', () => { diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index b1db74005c..3efda92490 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -3,7 +3,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import type { PushMessage, PushType } from '@n8n/api-types'; -import { Logger } from '@n8n/backend-common'; +import { Logger, ModuleRegistry } from '@n8n/backend-common'; import { GlobalConfig } from '@n8n/config'; import { ExecutionRepository, WorkflowRepository } from '@n8n/db'; import { Container } from '@n8n/di'; @@ -377,7 +377,15 @@ export async function getBase( const eventService = Container.get(EventService); + const moduleRegistry = Container.get(ModuleRegistry); + const dataStoreProxyProvider = moduleRegistry.isActive('data-store') + ? Container.get( + (await import('@/modules/data-store/data-store-proxy.service')).DataStoreProxyService, + ) + : undefined; + return { + dataStoreProxyProvider, currentNodeExecutionIndex: 0, credentialsHelper: Container.get(CredentialsHelper), executeWorkflow, diff --git a/packages/cli/test/integration/shared/db/data-stores.ts b/packages/cli/test/integration/shared/db/data-stores.ts index a552257a12..cb03ef3281 100644 --- a/packages/cli/test/integration/shared/db/data-stores.ts +++ b/packages/cli/test/integration/shared/db/data-stores.ts @@ -1,7 +1,8 @@ -import type { CreateDataStoreColumnDto, DataStoreRows } from '@n8n/api-types'; +import type { CreateDataStoreColumnDto } from '@n8n/api-types'; import { randomName } from '@n8n/backend-test-utils'; import type { Project } from '@n8n/db'; import { Container } from '@n8n/di'; +import type { DataStoreRows } from 'n8n-workflow'; import { DataStoreColumnRepository } from '@/modules/data-store/data-store-column.repository'; import { DataStoreRowsRepository } from '@/modules/data-store/data-store-rows.repository'; diff --git a/packages/core/src/execution-engine/index.ts b/packages/core/src/execution-engine/index.ts index 34a31194f2..1b944b60fc 100644 --- a/packages/core/src/execution-engine/index.ts +++ b/packages/core/src/execution-engine/index.ts @@ -1,3 +1,5 @@ +import type { DataStoreProxyProvider } from 'n8n-workflow'; + import type { ExecutionLifecycleHooks } from './execution-lifecycle-hooks'; import type { ExternalSecretsProxy } from './external-secrets-proxy'; @@ -5,6 +7,7 @@ declare module 'n8n-workflow' { interface IWorkflowExecuteAdditionalData { hooks?: ExecutionLifecycleHooks; externalSecretsProxy: ExternalSecretsProxy; + dataStoreProxyProvider?: DataStoreProxyProvider; } } diff --git a/packages/core/src/execution-engine/node-execution-context/execute-context.ts b/packages/core/src/execution-engine/node-execution-context/execute-context.ts index e754d8ac66..7b0df62381 100644 --- a/packages/core/src/execution-engine/node-execution-context/execute-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/execute-context.ts @@ -37,6 +37,7 @@ import { } from './utils/binary-helper-functions'; import { constructExecutionMetaData } from './utils/construct-execution-metadata'; import { copyInputItems } from './utils/copy-input-items'; +import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions'; import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions'; import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions'; import { getInputConnectionData } from './utils/get-input-connection-data'; @@ -94,6 +95,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti connectionInputData, ), ...getBinaryHelperFunctions(additionalData, workflow.id), + ...getDataStoreHelperFunctions(additionalData, workflow, node), ...getSSHTunnelFunctions(), ...getFileSystemHelperFunctions(node), ...getDeduplicationHelperFunctions(workflow, node), diff --git a/packages/core/src/execution-engine/node-execution-context/load-options-context.ts b/packages/core/src/execution-engine/node-execution-context/load-options-context.ts index ac0ab46f67..e7350a96e5 100644 --- a/packages/core/src/execution-engine/node-execution-context/load-options-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/load-options-context.ts @@ -10,6 +10,7 @@ import type { } from 'n8n-workflow'; import { NodeExecutionContext } from './node-execution-context'; +import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions'; import { extractValue } from './utils/extract-value'; import { getRequestHelperFunctions } from './utils/request-helper-functions'; import { getSSHTunnelFunctions } from './utils/ssh-tunnel-helper-functions'; @@ -28,6 +29,7 @@ export class LoadOptionsContext extends NodeExecutionContext implements ILoadOpt this.helpers = { ...getSSHTunnelFunctions(), ...getRequestHelperFunctions(workflow, node, additionalData), + ...getDataStoreHelperFunctions(additionalData, workflow, node), }; } diff --git a/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts b/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts index 500b60d890..e60ac53f3a 100644 --- a/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts @@ -29,6 +29,7 @@ import { } from './utils/binary-helper-functions'; import { constructExecutionMetaData } from './utils/construct-execution-metadata'; import { copyInputItems } from './utils/copy-input-items'; +import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions'; import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions'; import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions'; // eslint-disable-next-line import-x/no-cycle @@ -88,6 +89,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData ...getSSHTunnelFunctions(), ...getFileSystemHelperFunctions(node), ...getBinaryHelperFunctions(additionalData, workflow.id), + ...getDataStoreHelperFunctions(additionalData, workflow, node), ...getDeduplicationHelperFunctions(workflow, node), assertBinaryData: (itemIndex, propertyName) => assertBinaryData(inputData, node, itemIndex, propertyName, 0), diff --git a/packages/core/src/execution-engine/node-execution-context/utils/data-store-helper-functions.ts b/packages/core/src/execution-engine/node-execution-context/utils/data-store-helper-functions.ts new file mode 100644 index 0000000000..adb09de992 --- /dev/null +++ b/packages/core/src/execution-engine/node-execution-context/utils/data-store-helper-functions.ts @@ -0,0 +1,21 @@ +import type { + DataStoreProxyFunctions, + INode, + Workflow, + IWorkflowExecuteAdditionalData, +} from 'n8n-workflow'; + +export function getDataStoreHelperFunctions( + additionalData: IWorkflowExecuteAdditionalData, + workflow: Workflow, + node: INode, +): Partial { + if (additionalData.dataStoreProxyProvider === undefined) return {}; + const dataStoreProxyProvider = additionalData.dataStoreProxyProvider; + return { + getDataStoreAggregateProxy: async () => + await dataStoreProxyProvider.getDataStoreAggregateProxy(workflow, node), + getDataStoreProxy: async (dataStoreId: string) => + await dataStoreProxyProvider.getDataStoreProxy(workflow, node, dataStoreId), + }; +} diff --git a/packages/workflow/src/data-store.types.ts b/packages/workflow/src/data-store.types.ts new file mode 100644 index 0000000000..3a19883b0e --- /dev/null +++ b/packages/workflow/src/data-store.types.ts @@ -0,0 +1,106 @@ +export type DataStoreColumnType = 'string' | 'number' | 'boolean' | 'date'; + +export type DataStoreColumn = { + id: string; + name: string; + type: DataStoreColumnType; + index: number; + dataStoreId: string; +}; + +export type DataStore = { + id: string; + name: string; + columns: DataStoreColumn[]; + createdAt: Date; + updatedAt: Date; + projectId: string; + sizeBytes: number; +}; + +export type CreateDataStoreColumnOptions = Pick & + Partial>; + +export type CreateDataStoreOptions = Pick & { + columns: CreateDataStoreColumnOptions[]; +}; + +export type UpdateDataStoreOptions = { name: string }; + +export type ListDataStoreOptions = { + filter?: Record; + sortBy?: + | 'name:asc' + | 'name:desc' + | 'createdAt:asc' + | 'createdAt:desc' + | 'updatedAt:asc' + | 'updatedAt:desc' + | 'sizeBytes:asc' + | 'sizeBytes:desc'; + take?: number; + skip?: number; +}; + +export type ListDataStoreContentFilter = { + type: 'and' | 'or'; + filters: Array<{ + columnName: string; + condition: 'eq' | 'neq'; + value: string | number | boolean | Date; + }>; +}; + +export type ListDataStoreRowsOptions = { + filter?: ListDataStoreContentFilter; + sortBy?: [string, 'ASC' | 'DESC']; + take?: number; + skip?: number; +}; + +export type UpsertDataStoreRowsOptions = { + rows: DataStoreRows; + matchFields: string[]; +}; + +export type MoveDataStoreColumnOptions = { + targetIndex: number; +}; + +export type AddDataStoreColumnOptions = Pick & + Partial>; + +export type DataStoreColumnJsType = string | number | boolean | Date; + +export type DataStoreRows = Array>; + +// APIs for a data store service operating on a specific projectId +export interface IDataStoreProjectAggregateService { + createDataStore(options: CreateDataStoreOptions): Promise; + + getManyAndCount(options: ListDataStoreOptions): Promise<{ count: number; data: DataStore[] }>; + + deleteDataStoreAll(): Promise; +} +// APIs for a data store service operating on a specific projectId and dataStoreId +export interface IDataStoreProjectService { + updateDataStore(options: UpdateDataStoreOptions): Promise; + + deleteDataStore(): Promise; + + getColumns(): Promise; + + addColumn(options: AddDataStoreColumnOptions): Promise; + + moveColumn(columnId: string, options: MoveDataStoreColumnOptions): Promise; + + deleteColumn(columnId: string): Promise; + + getManyRowsAndCount( + dto: Partial, + ): Promise<{ count: number; data: DataStoreRows }>; + + insertRows(rows: DataStoreRows): Promise; + + upsertRows(options: UpsertDataStoreRowsOptions): Promise; +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 0bd86b1b8e..7dbcc09394 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -7,6 +7,7 @@ export * from './errors'; export * from './constants'; export * from './common'; export * from './cron'; +export * from './data-store.types'; export * from './deferred-promise'; export * from './global-state'; export * from './interfaces'; diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 8ae7ebec91..eb920602e9 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -13,6 +13,10 @@ import type { SecureContextOptions } from 'tls'; import type { URLSearchParams } from 'url'; import type { CODE_EXECUTION_MODES, CODE_LANGUAGES, LOG_LEVELS } from './constants'; +import type { + IDataStoreProjectAggregateService, + IDataStoreProjectService, +} from './data-store.types'; import type { IDeferredPromise } from './deferred-promise'; import type { ExecutionCancelledError } from './errors'; import type { ExpressionError } from './errors/expression.error'; @@ -916,6 +920,24 @@ type FunctionsBaseWithRequiredKeys = Functions export type ContextType = 'flow' | 'node'; +export type DataStoreProxyProvider = { + getDataStoreAggregateProxy( + workflow: Workflow, + node: INode, + ): Promise; + getDataStoreProxy( + workflow: Workflow, + node: INode, + dataStoreId: string, + ): Promise; +}; + +export type DataStoreProxyFunctions = { + // These are optional to account for situations where the data-store module is disabled + getDataStoreAggregateProxy?(): Promise; + getDataStoreProxy?(dataStoreId: string): Promise; +}; + type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { continueOnFail(): boolean; setMetadata(metadata: ITaskMetadata): void; @@ -978,7 +1000,8 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & BinaryHelperFunctions & DeduplicationHelperFunctions & FileSystemHelperFunctions & - SSHTunnelFunctions & { + SSHTunnelFunctions & + DataStoreProxyFunctions & { normalizeItems(items: INodeExecutionData | INodeExecutionData[]): INodeExecutionData[]; constructExecutionMetaData( inputData: INodeExecutionData[], @@ -1062,7 +1085,7 @@ export interface ILoadOptionsFunctions extends FunctionsBase { ): NodeParameterValueType | object | undefined; getCurrentNodeParameters(): INodeParameters | undefined; - helpers: RequestHelperFunctions & SSHTunnelFunctions; + helpers: RequestHelperFunctions & SSHTunnelFunctions & DataStoreProxyFunctions; } export type FieldValueOption = { name: string; type: FieldType | 'any' };