feat(core): Expose data store service to Data Store Node (no-changelog) (#17970)

Co-authored-by: Daria Staferova <daria.staferova@n8n.io>
This commit is contained in:
Charlie Kolb
2025-08-19 17:43:19 +02:00
committed by GitHub
parent 970351bf23
commit 169acd12bd
20 changed files with 547 additions and 15 deletions

View File

@@ -51,9 +51,7 @@ export {
type DataStore, type DataStore,
type DataStoreColumn, type DataStoreColumn,
type DataStoreCreateColumnSchema, type DataStoreCreateColumnSchema,
type DataStoreColumnJsType,
type DataStoreListFilter, type DataStoreListFilter,
type DataStoreRows,
type DataStoreListOptions, type DataStoreListOptions,
type DataStoreUserTableName, type DataStoreUserTableName,
dateTimeSchema, dateTimeSchema,

View File

@@ -59,7 +59,3 @@ export const dateTimeSchema = z
// Dates are received as date strings and validated before insertion // Dates are received as date strings and validated before insertion
export const dataStoreColumnValueSchema = z.union([z.string(), z.number(), z.boolean(), z.null()]); export const dataStoreColumnValueSchema = z.union([z.string(), z.number(), z.boolean(), z.null()]);
export type DataStoreColumnJsType = string | number | boolean | Date;
export type DataStoreRows = Array<Record<string, DataStoreColumnJsType | null>>;

View File

@@ -24,6 +24,7 @@ import {
SubworkflowPolicyChecker, SubworkflowPolicyChecker,
} from '@/executions/pre-execution-checks'; } from '@/executions/pre-execution-checks';
import { ExternalHooks } from '@/external-hooks'; import { ExternalHooks } from '@/external-hooks';
import { DataStoreProxyService } from '@/modules/data-store/data-store-proxy.service';
import { UrlService } from '@/services/url.service'; import { UrlService } from '@/services/url.service';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { Telemetry } from '@/telemetry'; import { Telemetry } from '@/telemetry';
@@ -98,6 +99,7 @@ describe('WorkflowExecuteAdditionalData', () => {
mockInstance(CredentialsPermissionChecker); mockInstance(CredentialsPermissionChecker);
mockInstance(SubworkflowPolicyChecker); mockInstance(SubworkflowPolicyChecker);
mockInstance(WorkflowStatisticsService); mockInstance(WorkflowStatisticsService);
mockInstance(DataStoreProxyService);
const urlService = mockInstance(UrlService); const urlService = mockInstance(UrlService);
Container.set(UrlService, urlService); Container.set(UrlService, urlService);

View File

@@ -0,0 +1,230 @@
import type { Logger } from '@n8n/backend-common';
import { testDb, testModules } from '@n8n/backend-test-utils';
import type { Project } from '@n8n/db';
import { mock } from 'jest-mock-extended';
import type {
AddDataStoreColumnOptions,
INode,
ListDataStoreRowsOptions,
MoveDataStoreColumnOptions,
UpsertDataStoreRowsOptions,
Workflow,
} from 'n8n-workflow';
import type { OwnershipService } from '@/services/ownership.service';
import { DataStoreProxyService } from '../data-store-proxy.service';
import type { DataStoreService } from '../data-store.service';
const PROJECT_ID = 'project-id';
beforeAll(async () => {
await testModules.loadModules(['data-store']);
await testDb.init();
});
describe('DataStoreProxyService', () => {
let dataStoreServiceMock = mock<DataStoreService>();
let ownershipServiceMock = mock<OwnershipService>();
let loggerMock = mock<Logger>();
let dataStoreProxyService: DataStoreProxyService;
let workflow: Workflow;
let node: INode;
let project: Project;
beforeEach(() => {
dataStoreServiceMock = mock<DataStoreService>();
ownershipServiceMock = mock<OwnershipService>();
loggerMock = mock<Logger>();
dataStoreProxyService = new DataStoreProxyService(
dataStoreServiceMock,
ownershipServiceMock,
loggerMock,
);
workflow = mock<Workflow>({
id: 'workflow-id',
});
project = mock<Project>({
id: PROJECT_ID,
});
node = mock<INode>({
type: 'n8n-nodes-base.dataStore',
});
ownershipServiceMock.getWorkflowProjectCached.mockResolvedValueOnce(project);
});
describe('makeAggregateOperations', () => {
it('should call getManyAndCount with correct parameters', async () => {
const options = { filter: { name: 'test' } };
const aggregateOperations = await dataStoreProxyService.getDataStoreAggregateProxy(
workflow,
node,
);
await aggregateOperations.getManyAndCount(options);
expect(dataStoreServiceMock.getManyAndCount).toBeCalledWith({
filter: { name: 'test', projectId: PROJECT_ID },
});
});
it('should call createDataStore with correct parameters', async () => {
const options = { name: 'newDataStore', columns: [] };
const aggregateOperations = await dataStoreProxyService.getDataStoreAggregateProxy(
workflow,
node,
);
await aggregateOperations.createDataStore(options);
expect(dataStoreServiceMock.createDataStore).toBeCalledWith(PROJECT_ID, options);
});
it('should call deleteDataStoreByProject when proxy calls deleteDataStoreAll', async () => {
const aggregateOperations = await dataStoreProxyService.getDataStoreAggregateProxy(
workflow,
node,
);
await aggregateOperations.deleteDataStoreAll();
expect(dataStoreServiceMock.deleteDataStoreByProjectId).toBeCalledWith(PROJECT_ID);
});
});
it('should call updateDataStore with correct parameters', async () => {
const options = { name: 'updatedDataStore' };
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.updateDataStore(options);
expect(dataStoreServiceMock.updateDataStore).toBeCalledWith(
'dataStore-id',
PROJECT_ID,
options,
);
});
it('should call deleteDataStore with correct parameters', async () => {
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.deleteDataStore();
expect(dataStoreServiceMock.deleteDataStore).toBeCalledWith('dataStore-id', PROJECT_ID);
});
it('should call getColumns with correct parameters', async () => {
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.getColumns();
expect(dataStoreServiceMock.getColumns).toBeCalledWith('dataStore-id', PROJECT_ID);
});
it('should call addColumn with correct parameters', async () => {
const options: AddDataStoreColumnOptions = { name: 'newColumn', type: 'string' };
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.addColumn(options);
expect(dataStoreServiceMock.addColumn).toBeCalledWith('dataStore-id', PROJECT_ID, options);
});
it('should call moveColumn with correct parameters', async () => {
const columnId = 'column-id';
const options: MoveDataStoreColumnOptions = { targetIndex: 1 };
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.moveColumn(columnId, options);
expect(dataStoreServiceMock.moveColumn).toBeCalledWith(
'dataStore-id',
PROJECT_ID,
columnId,
options,
);
});
it('should call deleteColumn with correct parameters', async () => {
const columnId = 'column-id';
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.deleteColumn(columnId);
expect(dataStoreServiceMock.deleteColumn).toBeCalledWith('dataStore-id', PROJECT_ID, columnId);
});
it('should call getManyRowsAndCount with correct parameters', async () => {
const options: ListDataStoreRowsOptions = {
filter: {
filters: [{ columnName: 'x', condition: 'eq', value: 'testRow' }],
type: 'and',
},
};
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.getManyRowsAndCount(options);
expect(dataStoreServiceMock.getManyRowsAndCount).toBeCalledWith(
'dataStore-id',
PROJECT_ID,
options,
);
});
it('should call insertRows with correct parameters', async () => {
const rows = [{ id: 1, name: 'row1' }];
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.insertRows(rows);
expect(dataStoreServiceMock.insertRows).toBeCalledWith('dataStore-id', PROJECT_ID, rows);
});
it('should call upsertRows with correct parameters', async () => {
const options: UpsertDataStoreRowsOptions = {
matchFields: ['name'],
rows: [{ id: 1, name: 'row1' }],
};
const dataStoreOperations = await dataStoreProxyService.getDataStoreProxy(
workflow,
node,
'dataStore-id',
);
await dataStoreOperations.upsertRows(options);
expect(dataStoreServiceMock.upsertRows).toBeCalledWith('dataStore-id', PROJECT_ID, options);
});
});

View File

@@ -1,4 +1,4 @@
import type { DataStoreRows } from '@n8n/api-types'; import type { DataStoreRows } from 'n8n-workflow';
import { import {
addColumnQuery, addColumnQuery,

View File

@@ -0,0 +1,136 @@
import type { DataStoreListOptions } from '@n8n/api-types';
import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import {
AddDataStoreColumnOptions,
CreateDataStoreOptions,
DataStore,
DataStoreColumn,
DataStoreProxyProvider,
DataStoreRows,
IDataStoreProjectAggregateService,
IDataStoreProjectService,
INode,
ListDataStoreOptions,
ListDataStoreRowsOptions,
MoveDataStoreColumnOptions,
UpdateDataStoreOptions,
UpsertDataStoreRowsOptions,
Workflow,
} from 'n8n-workflow';
import { DataStoreService } from './data-store.service';
import { OwnershipService } from '@/services/ownership.service';
@Service()
export class DataStoreProxyService implements DataStoreProxyProvider {
constructor(
private readonly dataStoreService: DataStoreService,
private readonly ownershipService: OwnershipService,
private readonly logger: Logger,
) {
this.logger = this.logger.scoped('data-store');
}
private validateRequest(node: INode) {
if (node.type !== 'n8n-nodes-base.dataStore') {
throw new Error('This proxy is only available for data store nodes');
}
}
private async getProjectId(workflow: Workflow) {
const homeProject = await this.ownershipService.getWorkflowProjectCached(workflow.id);
return homeProject.id;
}
async getDataStoreAggregateProxy(
workflow: Workflow,
node: INode,
): Promise<IDataStoreProjectAggregateService> {
this.validateRequest(node);
const projectId = await this.getProjectId(workflow);
return this.makeAggregateOperations(projectId);
}
async getDataStoreProxy(
workflow: Workflow,
node: INode,
dataStoreId: string,
): Promise<IDataStoreProjectService> {
this.validateRequest(node);
const projectId = await this.getProjectId(workflow);
return this.makeDataStoreOperations(projectId, dataStoreId);
}
private makeAggregateOperations(projectId: string): IDataStoreProjectAggregateService {
const dataStoreService = this.dataStoreService;
return {
async getManyAndCount(options: ListDataStoreOptions = {}) {
const serviceOptions: DataStoreListOptions = {
...options,
filter: { projectId, ...(options.filter ?? {}) },
};
return await dataStoreService.getManyAndCount(serviceOptions);
},
async createDataStore(options: CreateDataStoreOptions): Promise<DataStore> {
return await dataStoreService.createDataStore(projectId, options);
},
async deleteDataStoreAll(): Promise<boolean> {
return await dataStoreService.deleteDataStoreByProjectId(projectId);
},
};
}
private makeDataStoreOperations(
projectId: string,
dataStoreId: string,
): Omit<IDataStoreProjectService, keyof IDataStoreProjectAggregateService> {
const dataStoreService = this.dataStoreService;
return {
// DataStore management
async updateDataStore(options: UpdateDataStoreOptions): Promise<boolean> {
return await dataStoreService.updateDataStore(dataStoreId, projectId, options);
},
async deleteDataStore(): Promise<boolean> {
return await dataStoreService.deleteDataStore(dataStoreId, projectId);
},
// Column operations
async getColumns(): Promise<DataStoreColumn[]> {
return await dataStoreService.getColumns(dataStoreId, projectId);
},
async addColumn(options: AddDataStoreColumnOptions): Promise<DataStoreColumn> {
return await dataStoreService.addColumn(dataStoreId, projectId, options);
},
async moveColumn(columnId: string, options: MoveDataStoreColumnOptions): Promise<boolean> {
return await dataStoreService.moveColumn(dataStoreId, projectId, columnId, options);
},
async deleteColumn(columnId: string): Promise<boolean> {
return await dataStoreService.deleteColumn(dataStoreId, projectId, columnId);
},
// Row operations
async getManyRowsAndCount(options: Partial<ListDataStoreRowsOptions>) {
return await dataStoreService.getManyRowsAndCount(dataStoreId, projectId, options);
},
async insertRows(rows: DataStoreRows) {
return await dataStoreService.insertRows(dataStoreId, projectId, rows);
},
async upsertRows(options: UpsertDataStoreRowsOptions) {
return await dataStoreService.upsertRows(dataStoreId, projectId, options);
},
};
}
}

View File

@@ -2,7 +2,6 @@ import type {
ListDataStoreContentQueryDto, ListDataStoreContentQueryDto,
ListDataStoreContentFilter, ListDataStoreContentFilter,
DataStoreUserTableName, DataStoreUserTableName,
DataStoreRows,
UpsertDataStoreRowsDto, UpsertDataStoreRowsDto,
} from '@n8n/api-types'; } from '@n8n/api-types';
import { CreateTable, DslColumn } from '@n8n/db'; import { CreateTable, DslColumn } from '@n8n/db';
@@ -27,6 +26,7 @@ import {
toDslColumns, toDslColumns,
toTableName, toTableName,
} from './utils/sql-utils'; } from './utils/sql-utils';
import { DataStoreRows } from 'n8n-workflow';
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
type QueryBuilder = SelectQueryBuilder<any>; type QueryBuilder = SelectQueryBuilder<any>;

View File

@@ -5,7 +5,6 @@ import type {
ListDataStoreContentQueryDto, ListDataStoreContentQueryDto,
MoveDataStoreColumnDto, MoveDataStoreColumnDto,
DataStoreListOptions, DataStoreListOptions,
DataStoreRows,
UpsertDataStoreRowsDto, UpsertDataStoreRowsDto,
UpdateDataStoreDto, UpdateDataStoreDto,
} from '@n8n/api-types'; } from '@n8n/api-types';
@@ -20,6 +19,7 @@ import { DataStoreNameConflictError } from './errors/data-store-name-conflict.er
import { DataStoreNotFoundError } from './errors/data-store-not-found.error'; import { DataStoreNotFoundError } from './errors/data-store-not-found.error';
import { DataStoreValidationError } from './errors/data-store-validation.error'; import { DataStoreValidationError } from './errors/data-store-validation.error';
import { toTableName, normalizeRows } from './utils/sql-utils'; import { toTableName, normalizeRows } from './utils/sql-utils';
import { DataStoreRows } from 'n8n-workflow';
@Service() @Service()
export class DataStoreService { export class DataStoreService {

View File

@@ -1,12 +1,11 @@
import { import {
DATA_STORE_COLUMN_REGEX, DATA_STORE_COLUMN_REGEX,
type DataStoreRows,
type DataStoreCreateColumnSchema, type DataStoreCreateColumnSchema,
type DataStoreColumn, type DataStoreColumn,
} from '@n8n/api-types'; } from '@n8n/api-types';
import { DslColumn } from '@n8n/db'; import { DslColumn } from '@n8n/db';
import type { DataSourceOptions } from '@n8n/typeorm'; import type { DataSourceOptions } from '@n8n/typeorm';
import { UnexpectedError } from 'n8n-workflow'; import { UnexpectedError, type DataStoreRows } from 'n8n-workflow';
import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error';

View File

@@ -23,6 +23,7 @@ import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.serv
import { JobProcessor } from '../job-processor'; import { JobProcessor } from '../job-processor';
import type { Job } from '../scaling.types'; import type { Job } from '../scaling.types';
import { DataStoreProxyService } from '@/modules/data-store/data-store-proxy.service';
mockInstance(VariablesService, { mockInstance(VariablesService, {
getAllCached: jest.fn().mockResolvedValue([]), getAllCached: jest.fn().mockResolvedValue([]),
@@ -32,6 +33,7 @@ mockInstance(ExternalSecretsProxy);
mockInstance(WorkflowStaticDataService); mockInstance(WorkflowStaticDataService);
mockInstance(WorkflowStatisticsService); mockInstance(WorkflowStatisticsService);
mockInstance(ExternalHooks); mockInstance(ExternalHooks);
mockInstance(DataStoreProxyService);
const processRunExecutionDataMock = jest.fn(); const processRunExecutionDataMock = jest.fn();
jest.mock('n8n-core', () => { jest.mock('n8n-core', () => {

View File

@@ -3,7 +3,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { PushMessage, PushType } from '@n8n/api-types'; import type { PushMessage, PushType } from '@n8n/api-types';
import { Logger } from '@n8n/backend-common'; import { Logger, ModuleRegistry } from '@n8n/backend-common';
import { GlobalConfig } from '@n8n/config'; import { GlobalConfig } from '@n8n/config';
import { ExecutionRepository, WorkflowRepository } from '@n8n/db'; import { ExecutionRepository, WorkflowRepository } from '@n8n/db';
import { Container } from '@n8n/di'; import { Container } from '@n8n/di';
@@ -377,7 +377,15 @@ export async function getBase(
const eventService = Container.get(EventService); const eventService = Container.get(EventService);
const moduleRegistry = Container.get(ModuleRegistry);
const dataStoreProxyProvider = moduleRegistry.isActive('data-store')
? Container.get(
(await import('@/modules/data-store/data-store-proxy.service')).DataStoreProxyService,
)
: undefined;
return { return {
dataStoreProxyProvider,
currentNodeExecutionIndex: 0, currentNodeExecutionIndex: 0,
credentialsHelper: Container.get(CredentialsHelper), credentialsHelper: Container.get(CredentialsHelper),
executeWorkflow, executeWorkflow,

View File

@@ -1,7 +1,8 @@
import type { CreateDataStoreColumnDto, DataStoreRows } from '@n8n/api-types'; import type { CreateDataStoreColumnDto } from '@n8n/api-types';
import { randomName } from '@n8n/backend-test-utils'; import { randomName } from '@n8n/backend-test-utils';
import type { Project } from '@n8n/db'; import type { Project } from '@n8n/db';
import { Container } from '@n8n/di'; import { Container } from '@n8n/di';
import type { DataStoreRows } from 'n8n-workflow';
import { DataStoreColumnRepository } from '@/modules/data-store/data-store-column.repository'; import { DataStoreColumnRepository } from '@/modules/data-store/data-store-column.repository';
import { DataStoreRowsRepository } from '@/modules/data-store/data-store-rows.repository'; import { DataStoreRowsRepository } from '@/modules/data-store/data-store-rows.repository';

View File

@@ -1,3 +1,5 @@
import type { DataStoreProxyProvider } from 'n8n-workflow';
import type { ExecutionLifecycleHooks } from './execution-lifecycle-hooks'; import type { ExecutionLifecycleHooks } from './execution-lifecycle-hooks';
import type { ExternalSecretsProxy } from './external-secrets-proxy'; import type { ExternalSecretsProxy } from './external-secrets-proxy';
@@ -5,6 +7,7 @@ declare module 'n8n-workflow' {
interface IWorkflowExecuteAdditionalData { interface IWorkflowExecuteAdditionalData {
hooks?: ExecutionLifecycleHooks; hooks?: ExecutionLifecycleHooks;
externalSecretsProxy: ExternalSecretsProxy; externalSecretsProxy: ExternalSecretsProxy;
dataStoreProxyProvider?: DataStoreProxyProvider;
} }
} }

View File

@@ -37,6 +37,7 @@ import {
} from './utils/binary-helper-functions'; } from './utils/binary-helper-functions';
import { constructExecutionMetaData } from './utils/construct-execution-metadata'; import { constructExecutionMetaData } from './utils/construct-execution-metadata';
import { copyInputItems } from './utils/copy-input-items'; import { copyInputItems } from './utils/copy-input-items';
import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions';
import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions'; import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions';
import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions'; import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions';
import { getInputConnectionData } from './utils/get-input-connection-data'; import { getInputConnectionData } from './utils/get-input-connection-data';
@@ -94,6 +95,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
connectionInputData, connectionInputData,
), ),
...getBinaryHelperFunctions(additionalData, workflow.id), ...getBinaryHelperFunctions(additionalData, workflow.id),
...getDataStoreHelperFunctions(additionalData, workflow, node),
...getSSHTunnelFunctions(), ...getSSHTunnelFunctions(),
...getFileSystemHelperFunctions(node), ...getFileSystemHelperFunctions(node),
...getDeduplicationHelperFunctions(workflow, node), ...getDeduplicationHelperFunctions(workflow, node),

View File

@@ -10,6 +10,7 @@ import type {
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions';
import { extractValue } from './utils/extract-value'; import { extractValue } from './utils/extract-value';
import { getRequestHelperFunctions } from './utils/request-helper-functions'; import { getRequestHelperFunctions } from './utils/request-helper-functions';
import { getSSHTunnelFunctions } from './utils/ssh-tunnel-helper-functions'; import { getSSHTunnelFunctions } from './utils/ssh-tunnel-helper-functions';
@@ -28,6 +29,7 @@ export class LoadOptionsContext extends NodeExecutionContext implements ILoadOpt
this.helpers = { this.helpers = {
...getSSHTunnelFunctions(), ...getSSHTunnelFunctions(),
...getRequestHelperFunctions(workflow, node, additionalData), ...getRequestHelperFunctions(workflow, node, additionalData),
...getDataStoreHelperFunctions(additionalData, workflow, node),
}; };
} }

View File

@@ -29,6 +29,7 @@ import {
} from './utils/binary-helper-functions'; } from './utils/binary-helper-functions';
import { constructExecutionMetaData } from './utils/construct-execution-metadata'; import { constructExecutionMetaData } from './utils/construct-execution-metadata';
import { copyInputItems } from './utils/copy-input-items'; import { copyInputItems } from './utils/copy-input-items';
import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions';
import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions'; import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions';
import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions'; import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions';
// eslint-disable-next-line import-x/no-cycle // eslint-disable-next-line import-x/no-cycle
@@ -88,6 +89,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
...getSSHTunnelFunctions(), ...getSSHTunnelFunctions(),
...getFileSystemHelperFunctions(node), ...getFileSystemHelperFunctions(node),
...getBinaryHelperFunctions(additionalData, workflow.id), ...getBinaryHelperFunctions(additionalData, workflow.id),
...getDataStoreHelperFunctions(additionalData, workflow, node),
...getDeduplicationHelperFunctions(workflow, node), ...getDeduplicationHelperFunctions(workflow, node),
assertBinaryData: (itemIndex, propertyName) => assertBinaryData: (itemIndex, propertyName) =>
assertBinaryData(inputData, node, itemIndex, propertyName, 0), assertBinaryData(inputData, node, itemIndex, propertyName, 0),

View File

@@ -0,0 +1,21 @@
import type {
DataStoreProxyFunctions,
INode,
Workflow,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow';
export function getDataStoreHelperFunctions(
additionalData: IWorkflowExecuteAdditionalData,
workflow: Workflow,
node: INode,
): Partial<DataStoreProxyFunctions> {
if (additionalData.dataStoreProxyProvider === undefined) return {};
const dataStoreProxyProvider = additionalData.dataStoreProxyProvider;
return {
getDataStoreAggregateProxy: async () =>
await dataStoreProxyProvider.getDataStoreAggregateProxy(workflow, node),
getDataStoreProxy: async (dataStoreId: string) =>
await dataStoreProxyProvider.getDataStoreProxy(workflow, node, dataStoreId),
};
}

View File

@@ -0,0 +1,106 @@
export type DataStoreColumnType = 'string' | 'number' | 'boolean' | 'date';
export type DataStoreColumn = {
id: string;
name: string;
type: DataStoreColumnType;
index: number;
dataStoreId: string;
};
export type DataStore = {
id: string;
name: string;
columns: DataStoreColumn[];
createdAt: Date;
updatedAt: Date;
projectId: string;
sizeBytes: number;
};
export type CreateDataStoreColumnOptions = Pick<DataStoreColumn, 'name' | 'type'> &
Partial<Pick<DataStoreColumn, 'index'>>;
export type CreateDataStoreOptions = Pick<DataStore, 'name'> & {
columns: CreateDataStoreColumnOptions[];
};
export type UpdateDataStoreOptions = { name: string };
export type ListDataStoreOptions = {
filter?: Record<string, string | string[]>;
sortBy?:
| 'name:asc'
| 'name:desc'
| 'createdAt:asc'
| 'createdAt:desc'
| 'updatedAt:asc'
| 'updatedAt:desc'
| 'sizeBytes:asc'
| 'sizeBytes:desc';
take?: number;
skip?: number;
};
export type ListDataStoreContentFilter = {
type: 'and' | 'or';
filters: Array<{
columnName: string;
condition: 'eq' | 'neq';
value: string | number | boolean | Date;
}>;
};
export type ListDataStoreRowsOptions = {
filter?: ListDataStoreContentFilter;
sortBy?: [string, 'ASC' | 'DESC'];
take?: number;
skip?: number;
};
export type UpsertDataStoreRowsOptions = {
rows: DataStoreRows;
matchFields: string[];
};
export type MoveDataStoreColumnOptions = {
targetIndex: number;
};
export type AddDataStoreColumnOptions = Pick<DataStoreColumn, 'name' | 'type'> &
Partial<Pick<DataStoreColumn, 'index'>>;
export type DataStoreColumnJsType = string | number | boolean | Date;
export type DataStoreRows = Array<Record<string, DataStoreColumnJsType | null>>;
// APIs for a data store service operating on a specific projectId
export interface IDataStoreProjectAggregateService {
createDataStore(options: CreateDataStoreOptions): Promise<DataStore>;
getManyAndCount(options: ListDataStoreOptions): Promise<{ count: number; data: DataStore[] }>;
deleteDataStoreAll(): Promise<boolean>;
}
// APIs for a data store service operating on a specific projectId and dataStoreId
export interface IDataStoreProjectService {
updateDataStore(options: UpdateDataStoreOptions): Promise<boolean>;
deleteDataStore(): Promise<boolean>;
getColumns(): Promise<DataStoreColumn[]>;
addColumn(options: AddDataStoreColumnOptions): Promise<DataStoreColumn>;
moveColumn(columnId: string, options: MoveDataStoreColumnOptions): Promise<boolean>;
deleteColumn(columnId: string): Promise<boolean>;
getManyRowsAndCount(
dto: Partial<ListDataStoreRowsOptions>,
): Promise<{ count: number; data: DataStoreRows }>;
insertRows(rows: DataStoreRows): Promise<boolean>;
upsertRows(options: UpsertDataStoreRowsOptions): Promise<boolean>;
}

View File

@@ -7,6 +7,7 @@ export * from './errors';
export * from './constants'; export * from './constants';
export * from './common'; export * from './common';
export * from './cron'; export * from './cron';
export * from './data-store.types';
export * from './deferred-promise'; export * from './deferred-promise';
export * from './global-state'; export * from './global-state';
export * from './interfaces'; export * from './interfaces';

View File

@@ -13,6 +13,10 @@ import type { SecureContextOptions } from 'tls';
import type { URLSearchParams } from 'url'; import type { URLSearchParams } from 'url';
import type { CODE_EXECUTION_MODES, CODE_LANGUAGES, LOG_LEVELS } from './constants'; import type { CODE_EXECUTION_MODES, CODE_LANGUAGES, LOG_LEVELS } from './constants';
import type {
IDataStoreProjectAggregateService,
IDataStoreProjectService,
} from './data-store.types';
import type { IDeferredPromise } from './deferred-promise'; import type { IDeferredPromise } from './deferred-promise';
import type { ExecutionCancelledError } from './errors'; import type { ExecutionCancelledError } from './errors';
import type { ExpressionError } from './errors/expression.error'; import type { ExpressionError } from './errors/expression.error';
@@ -916,6 +920,24 @@ type FunctionsBaseWithRequiredKeys<Keys extends keyof FunctionsBase> = Functions
export type ContextType = 'flow' | 'node'; export type ContextType = 'flow' | 'node';
export type DataStoreProxyProvider = {
getDataStoreAggregateProxy(
workflow: Workflow,
node: INode,
): Promise<IDataStoreProjectAggregateService>;
getDataStoreProxy(
workflow: Workflow,
node: INode,
dataStoreId: string,
): Promise<IDataStoreProjectService>;
};
export type DataStoreProxyFunctions = {
// These are optional to account for situations where the data-store module is disabled
getDataStoreAggregateProxy?(): Promise<IDataStoreProjectAggregateService>;
getDataStoreProxy?(dataStoreId: string): Promise<IDataStoreProjectService>;
};
type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
continueOnFail(): boolean; continueOnFail(): boolean;
setMetadata(metadata: ITaskMetadata): void; setMetadata(metadata: ITaskMetadata): void;
@@ -978,7 +1000,8 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
BinaryHelperFunctions & BinaryHelperFunctions &
DeduplicationHelperFunctions & DeduplicationHelperFunctions &
FileSystemHelperFunctions & FileSystemHelperFunctions &
SSHTunnelFunctions & { SSHTunnelFunctions &
DataStoreProxyFunctions & {
normalizeItems(items: INodeExecutionData | INodeExecutionData[]): INodeExecutionData[]; normalizeItems(items: INodeExecutionData | INodeExecutionData[]): INodeExecutionData[];
constructExecutionMetaData( constructExecutionMetaData(
inputData: INodeExecutionData[], inputData: INodeExecutionData[],
@@ -1062,7 +1085,7 @@ export interface ILoadOptionsFunctions extends FunctionsBase {
): NodeParameterValueType | object | undefined; ): NodeParameterValueType | object | undefined;
getCurrentNodeParameters(): INodeParameters | undefined; getCurrentNodeParameters(): INodeParameters | undefined;
helpers: RequestHelperFunctions & SSHTunnelFunctions; helpers: RequestHelperFunctions & SSHTunnelFunctions & DataStoreProxyFunctions;
} }
export type FieldValueOption = { name: string; type: FieldType | 'any' }; export type FieldValueOption = { name: string; type: FieldType | 'any' };