refactor(core): Make workflow services injectable (no-changelog) (#8033)

Refactor static workflow service classes into DI-compatible classes

Context: https://n8nio.slack.com/archives/C069HS026UF/p1702466571648889

Up next:
- Inject dependencies into workflow services
- Consolidate workflow controllers into one
- Make workflow controller injectable
- Inject dependencies into workflow controller

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Iván Ovejero
2023-12-15 12:59:56 +01:00
committed by GitHub
parent 2da15d0264
commit 1e7a309e63
13 changed files with 125 additions and 80 deletions

View File

@@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Service } from 'typedi'; import { Container, Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import type { import type {
@@ -59,7 +59,7 @@ import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner'; import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks'; import { ExternalHooks } from '@/ExternalHooks';
import { whereClause } from './UserManagement/UserManagementHelper'; import { whereClause } from './UserManagement/UserManagementHelper';
import { WorkflowsService } from './workflows/workflows.services'; import { WorkflowService } from './workflows/workflow.service';
import { webhookNotFoundErrorMessage } from './utils'; import { webhookNotFoundErrorMessage } from './utils';
import { In } from 'typeorm'; import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service'; import { WebhookService } from './services/webhook.service';
@@ -418,8 +418,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
} }
await this.webhookService.populateCache(); await this.webhookService.populateCache();
// Save static data!
await WorkflowsService.saveStaticData(workflow); await Container.get(WorkflowService).saveStaticData(workflow);
} }
/** /**
@@ -458,7 +458,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update', false); await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update', false);
} }
await WorkflowsService.saveStaticData(workflow); await Container.get(WorkflowService).saveStaticData(workflow);
await this.webhookService.deleteWorkflowWebhooks(workflowId); await this.webhookService.deleteWorkflowWebhooks(workflowId);
} }
@@ -531,7 +531,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
donePromise?: IDeferredPromise<IRun | undefined>, donePromise?: IDeferredPromise<IRun | undefined>,
): void => { ): void => {
this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
void WorkflowsService.saveStaticData(workflow); void Container.get(WorkflowService).saveStaticData(workflow);
const executePromise = this.runWorkflow( const executePromise = this.runWorkflow(
workflowData, workflowData,
node, node,
@@ -586,7 +586,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
donePromise?: IDeferredPromise<IRun | undefined>, donePromise?: IDeferredPromise<IRun | undefined>,
): void => { ): void => {
this.logger.debug(`Received trigger for workflow "${workflow.name}"`); this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
void WorkflowsService.saveStaticData(workflow); void Container.get(WorkflowService).saveStaticData(workflow);
const executePromise = this.runWorkflow( const executePromise = this.runWorkflow(
workflowData, workflowData,
@@ -821,7 +821,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.activationErrorsService.unset(workflowId); await this.activationErrorsService.unset(workflowId);
const triggerCount = this.countTriggers(workflow, additionalData); const triggerCount = this.countTriggers(workflow, additionalData);
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount); await Container.get(WorkflowService).updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (e) { } catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`); const error = e instanceof Error ? e : new Error(`${e}`);
await this.activationErrorsService.set(workflowId, error.message); await this.activationErrorsService.set(workflowId, error.message);
@@ -831,7 +831,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// If for example webhooks get created it sometimes has to save the // If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted. // id of them in the static data. So make sure that data gets persisted.
await WorkflowsService.saveStaticData(workflow); await Container.get(WorkflowService).saveStaticData(workflow);
} }
/** /**

View File

@@ -24,7 +24,7 @@ import {
parseTagNames, parseTagNames,
getWorkflowsAndCount, getWorkflowsAndCount,
} from './workflows.service'; } from './workflows.service';
import { WorkflowsService } from '@/workflows/workflows.services'; import { WorkflowService } from '@/workflows/workflow.service';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { RoleService } from '@/services/role.service'; import { RoleService } from '@/services/role.service';
import { WorkflowHistoryService } from '@/workflows/workflowHistory/workflowHistory.service.ee'; import { WorkflowHistoryService } from '@/workflows/workflowHistory/workflowHistory.service.ee';
@@ -63,7 +63,7 @@ export = {
async (req: WorkflowRequest.Get, res: express.Response): Promise<express.Response> => { async (req: WorkflowRequest.Get, res: express.Response): Promise<express.Response> => {
const { id: workflowId } = req.params; const { id: workflowId } = req.params;
const workflow = await WorkflowsService.delete(req.user, workflowId); const workflow = await Container.get(WorkflowService).delete(req.user, workflowId);
if (!workflow) { if (!workflow) {
// user trying to access a workflow they do not own // user trying to access a workflow they do not own
// or workflow does not exist // or workflow does not exist

View File

@@ -60,7 +60,7 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { EventsService } from '@/services/events.service'; import { EventsService } from '@/services/events.service';
import { OwnershipService } from './services/ownership.service'; import { OwnershipService } from './services/ownership.service';
import { parseBody } from './middlewares'; import { parseBody } from './middlewares';
import { WorkflowsService } from './workflows/workflows.services'; import { WorkflowService } from './workflows/workflow.service';
import { Logger } from './Logger'; import { Logger } from './Logger';
import { NotFoundError } from './errors/response-errors/not-found.error'; import { NotFoundError } from './errors/response-errors/not-found.error';
import { InternalServerError } from './errors/response-errors/internal-server.error'; import { InternalServerError } from './errors/response-errors/internal-server.error';
@@ -387,7 +387,7 @@ export async function executeWebhook(
} }
// Save static data if it changed // Save static data if it changed
await WorkflowsService.saveStaticData(workflow); await Container.get(WorkflowService).saveStaticData(workflow);
const additionalKeys: IWorkflowDataProxyAdditionalKeys = { const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId, $executionId: executionId,

View File

@@ -52,7 +52,7 @@ import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker'; import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services'; import { WorkflowService } from './workflows/workflow.service';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository';
import { EventsService } from '@/services/events.service'; import { EventsService } from '@/services/events.service';
@@ -418,7 +418,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database // Workflow is saved so update in database
try { try {
await WorkflowsService.saveStaticDataById( await Container.get(WorkflowService).saveStaticDataById(
this.workflowData.id as string, this.workflowData.id as string,
newStaticData, newStaticData,
); );
@@ -564,7 +564,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
if (isWorkflowIdValid(this.workflowData.id) && newStaticData) { if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database // Workflow is saved so update in database
try { try {
await WorkflowsService.saveStaticDataById( await Container.get(WorkflowService).saveStaticDataById(
this.workflowData.id as string, this.workflowData.id as string,
newStaticData, newStaticData,
); );
@@ -714,7 +714,7 @@ export async function getWorkflowData(
if (workflowInfo.id !== undefined) { if (workflowInfo.id !== undefined) {
const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags']; const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags'];
workflowData = await WorkflowsService.get({ id: workflowInfo.id }, { relations }); workflowData = await Container.get(WorkflowService).get({ id: workflowInfo.id }, { relations });
if (workflowData === undefined || workflowData === null) { if (workflowData === undefined || workflowData === null) {
throw new ApplicationError('Workflow does not exist.', { throw new ApplicationError('Workflow does not exist.', {

View File

@@ -3,8 +3,9 @@ import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { ExecutionsService } from './executions.service'; import { ExecutionsService } from './executions.service';
import type { ExecutionRequest } from '@/requests'; import type { ExecutionRequest } from '@/requests';
import type { IExecutionResponse, IExecutionFlattedResponse } from '@/Interfaces'; import type { IExecutionResponse, IExecutionFlattedResponse } from '@/Interfaces';
import { EEWorkflowsService as EEWorkflows } from '../workflows/workflows.services.ee'; import { EnterpriseWorkflowService } from '../workflows/workflow.service.ee';
import type { WorkflowWithSharingsAndCredentials } from '@/workflows/workflows.types'; import type { WorkflowWithSharingsAndCredentials } from '@/workflows/workflows.types';
import Container from 'typedi';
export class EEExecutionsService extends ExecutionsService { export class EEExecutionsService extends ExecutionsService {
/** /**
@@ -23,14 +24,16 @@ export class EEExecutionsService extends ExecutionsService {
if (!execution) return; if (!execution) return;
const relations = ['shared', 'shared.user', 'shared.role']; const relations = ['shared', 'shared.user', 'shared.role'];
const workflow = (await EEWorkflows.get( const enterpriseWorkflowService = Container.get(EnterpriseWorkflowService);
const workflow = (await enterpriseWorkflowService.get(
{ id: execution.workflowId }, { id: execution.workflowId },
{ relations }, { relations },
)) as WorkflowWithSharingsAndCredentials; )) as WorkflowWithSharingsAndCredentials;
if (!workflow) return; if (!workflow) return;
EEWorkflows.addOwnerAndSharings(workflow); enterpriseWorkflowService.addOwnerAndSharings(workflow);
await EEWorkflows.addCredentialsToWorkflow(workflow, req.user); await enterpriseWorkflowService.addCredentialsToWorkflow(workflow, req.user);
execution.workflowData = { execution.workflowData = {
...execution.workflowData, ...execution.workflowData,

View File

@@ -5,7 +5,7 @@ import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import { WorkflowEntity } from '@db/entities/WorkflowEntity'; import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { UserService } from '@/services/user.service'; import { UserService } from '@/services/user.service';
import { WorkflowsService } from './workflows.services'; import { WorkflowService } from './workflow.service';
import type { import type {
CredentialUsedByWorkflow, CredentialUsedByWorkflow,
WorkflowWithSharingsAndCredentials, WorkflowWithSharingsAndCredentials,
@@ -13,14 +13,15 @@ import type {
import { CredentialsService } from '@/credentials/credentials.service'; import { CredentialsService } from '@/credentials/credentials.service';
import { ApplicationError, NodeOperationError } from 'n8n-workflow'; import { ApplicationError, NodeOperationError } from 'n8n-workflow';
import { RoleService } from '@/services/role.service'; import { RoleService } from '@/services/role.service';
import Container from 'typedi'; import Container, { Service } from 'typedi';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity'; import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error';
export class EEWorkflowsService extends WorkflowsService { @Service()
static async isOwned( export class EnterpriseWorkflowService extends WorkflowService {
async isOwned(
user: User, user: User,
workflowId: string, workflowId: string,
): Promise<{ ownsWorkflow: boolean; workflow?: WorkflowEntity }> { ): Promise<{ ownsWorkflow: boolean; workflow?: WorkflowEntity }> {
@@ -36,7 +37,7 @@ export class EEWorkflowsService extends WorkflowsService {
return { ownsWorkflow: true, workflow }; return { ownsWorkflow: true, workflow };
} }
static async getSharings( async getSharings(
transaction: EntityManager, transaction: EntityManager,
workflowId: string, workflowId: string,
relations = ['shared'], relations = ['shared'],
@@ -48,7 +49,7 @@ export class EEWorkflowsService extends WorkflowsService {
return workflow?.shared ?? []; return workflow?.shared ?? [];
} }
static async pruneSharings( async pruneSharings(
transaction: EntityManager, transaction: EntityManager,
workflowId: string, workflowId: string,
userIds: string[], userIds: string[],
@@ -59,7 +60,7 @@ export class EEWorkflowsService extends WorkflowsService {
}); });
} }
static async share( async share(
transaction: EntityManager, transaction: EntityManager,
workflow: WorkflowEntity, workflow: WorkflowEntity,
shareWithIds: string[], shareWithIds: string[],
@@ -83,7 +84,7 @@ export class EEWorkflowsService extends WorkflowsService {
return transaction.save(newSharedWorkflows); return transaction.save(newSharedWorkflows);
} }
static addOwnerAndSharings(workflow: WorkflowWithSharingsAndCredentials): void { addOwnerAndSharings(workflow: WorkflowWithSharingsAndCredentials): void {
workflow.ownedBy = null; workflow.ownedBy = null;
workflow.sharedWith = []; workflow.sharedWith = [];
if (!workflow.usedCredentials) { if (!workflow.usedCredentials) {
@@ -104,7 +105,7 @@ export class EEWorkflowsService extends WorkflowsService {
delete workflow.shared; delete workflow.shared;
} }
static async addCredentialsToWorkflow( async addCredentialsToWorkflow(
workflow: WorkflowWithSharingsAndCredentials, workflow: WorkflowWithSharingsAndCredentials,
currentUser: User, currentUser: User,
): Promise<void> { ): Promise<void> {
@@ -150,7 +151,7 @@ export class EEWorkflowsService extends WorkflowsService {
}); });
} }
static validateCredentialPermissionsToUser( validateCredentialPermissionsToUser(
workflow: WorkflowEntity, workflow: WorkflowEntity,
allowedCredentials: CredentialsEntity[], allowedCredentials: CredentialsEntity[],
) { ) {
@@ -171,8 +172,8 @@ export class EEWorkflowsService extends WorkflowsService {
}); });
} }
static async preventTampering(workflow: WorkflowEntity, workflowId: string, user: User) { async preventTampering(workflow: WorkflowEntity, workflowId: string, user: User) {
const previousVersion = await EEWorkflowsService.get({ id: workflowId }); const previousVersion = await this.get({ id: workflowId });
if (!previousVersion) { if (!previousVersion) {
throw new NotFoundError('Workflow not found'); throw new NotFoundError('Workflow not found');

View File

@@ -1,4 +1,4 @@
import { Container } from 'typedi'; import { Container, Service } from 'typedi';
import type { IDataObject, INode, IPinData } from 'n8n-workflow'; import type { IDataObject, INode, IPinData } from 'n8n-workflow';
import { NodeApiError, ErrorReporterProxy as ErrorReporter, Workflow } from 'n8n-workflow'; import { NodeApiError, ErrorReporterProxy as ErrorReporter, Workflow } from 'n8n-workflow';
import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere, UpdateResult } from 'typeorm'; import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere, UpdateResult } from 'typeorm';
@@ -41,8 +41,9 @@ export type WorkflowsGetSharedOptions =
| { allowGlobalScope: true; globalScope: Scope } | { allowGlobalScope: true; globalScope: Scope }
| { allowGlobalScope: false }; | { allowGlobalScope: false };
export class WorkflowsService { @Service()
static async getSharing( export class WorkflowService {
async getSharing(
user: User, user: User,
workflowId: string, workflowId: string,
options: WorkflowsGetSharedOptions, options: WorkflowsGetSharedOptions,
@@ -68,7 +69,7 @@ export class WorkflowsService {
* - select the _first_ pinned trigger that leads to the executed node, * - select the _first_ pinned trigger that leads to the executed node,
* - else select the executed pinned trigger. * - else select the executed pinned trigger.
*/ */
static findPinnedTrigger(workflow: IWorkflowDb, startNodes?: string[], pinData?: IPinData) { findPinnedTrigger(workflow: IWorkflowDb, startNodes?: string[], pinData?: IPinData) {
if (!pinData || !startNodes) return null; if (!pinData || !startNodes) return null;
const isTrigger = (nodeTypeName: string) => const isTrigger = (nodeTypeName: string) =>
@@ -102,14 +103,14 @@ export class WorkflowsService {
return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution
} }
static async get(workflow: FindOptionsWhere<WorkflowEntity>, options?: { relations: string[] }) { async get(workflow: FindOptionsWhere<WorkflowEntity>, options?: { relations: string[] }) {
return Container.get(WorkflowRepository).findOne({ return Container.get(WorkflowRepository).findOne({
where: workflow, where: workflow,
relations: options?.relations, relations: options?.relations,
}); });
} }
static async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) { async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) {
if (sharedWorkflowIds.length === 0) return { workflows: [], count: 0 }; if (sharedWorkflowIds.length === 0) return { workflows: [], count: 0 };
const where: FindOptionsWhere<WorkflowEntity> = { const where: FindOptionsWhere<WorkflowEntity> = {
@@ -188,7 +189,7 @@ export class WorkflowsService {
: { workflows, count }; : { workflows, count };
} }
static async update( async update(
user: User, user: User,
workflow: WorkflowEntity, workflow: WorkflowEntity,
workflowId: string, workflowId: string,
@@ -381,7 +382,7 @@ export class WorkflowsService {
return updatedWorkflow; return updatedWorkflow;
} }
static async runManually( async runManually(
{ {
workflowData, workflowData,
runData, runData,
@@ -395,7 +396,7 @@ export class WorkflowsService {
const EXECUTION_MODE = 'manual'; const EXECUTION_MODE = 'manual';
const ACTIVATION_MODE = 'manual'; const ACTIVATION_MODE = 'manual';
const pinnedTrigger = WorkflowsService.findPinnedTrigger(workflowData, startNodes, pinData); const pinnedTrigger = this.findPinnedTrigger(workflowData, startNodes, pinData);
// If webhooks nodes exist and are active we have to wait for till we receive a call // If webhooks nodes exist and are active we have to wait for till we receive a call
if ( if (
@@ -463,7 +464,7 @@ export class WorkflowsService {
}; };
} }
static async delete(user: User, workflowId: string): Promise<WorkflowEntity | undefined> { async delete(user: User, workflowId: string): Promise<WorkflowEntity | undefined> {
await Container.get(ExternalHooks).run('workflow.delete', [workflowId]); await Container.get(ExternalHooks).run('workflow.delete', [workflowId]);
const sharedWorkflow = await Container.get(SharedWorkflowRepository).findOne({ const sharedWorkflow = await Container.get(SharedWorkflowRepository).findOne({
@@ -502,7 +503,7 @@ export class WorkflowsService {
return sharedWorkflow.workflow; return sharedWorkflow.workflow;
} }
static async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise<UpdateResult> { async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise<UpdateResult> {
const qb = Container.get(WorkflowRepository).createQueryBuilder('workflow'); const qb = Container.get(WorkflowRepository).createQueryBuilder('workflow');
return qb return qb
.update() .update()
@@ -522,19 +523,19 @@ export class WorkflowsService {
/** /**
* Saves the static data if it changed * Saves the static data if it changed
*/ */
static async saveStaticData(workflow: Workflow): Promise<void> { async saveStaticData(workflow: Workflow): Promise<void> {
if (workflow.staticData.__dataChanged === true) { if (workflow.staticData.__dataChanged === true) {
// Static data of workflow changed and so has to be saved // Static data of workflow changed and so has to be saved
if (isWorkflowIdValid(workflow.id)) { if (isWorkflowIdValid(workflow.id)) {
// Workflow is saved so update in database // Workflow is saved so update in database
try { try {
await WorkflowsService.saveStaticDataById(workflow.id, workflow.staticData); await this.saveStaticDataById(workflow.id, workflow.staticData);
workflow.staticData.__dataChanged = false; workflow.staticData.__dataChanged = false;
} catch (error) { } catch (error) {
ErrorReporter.error(error); ErrorReporter.error(error);
Container.get(Logger).error( Container.get(Logger).error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${workflow.id}" to save changed staticData: "${error.message}"`, `There was a problem saving the workflow with id "${workflow.id}" to save changed Data: "${error.message}"`,
{ workflowId: workflow.id }, { workflowId: workflow.id },
); );
} }
@@ -548,7 +549,7 @@ export class WorkflowsService {
* @param {(string)} workflowId The id of the workflow to save data on * @param {(string)} workflowId The id of the workflow to save data on
* @param {IDataObject} newStaticData The static data to save * @param {IDataObject} newStaticData The static data to save
*/ */
static async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise<void> { async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise<void> {
await Container.get(WorkflowRepository).update(workflowId, { await Container.get(WorkflowRepository).update(workflowId, {
staticData: newStaticData, staticData: newStaticData,
}); });

View File

@@ -8,7 +8,7 @@ import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { validateEntity } from '@/GenericHelpers'; import { validateEntity } from '@/GenericHelpers';
import type { ListQuery, WorkflowRequest } from '@/requests'; import type { ListQuery, WorkflowRequest } from '@/requests';
import { isSharingEnabled, rightDiff } from '@/UserManagement/UserManagementHelper'; import { isSharingEnabled, rightDiff } from '@/UserManagement/UserManagementHelper';
import { EEWorkflowsService as EEWorkflows } from './workflows.services.ee'; import { EnterpriseWorkflowService } from './workflow.service.ee';
import { ExternalHooks } from '@/ExternalHooks'; import { ExternalHooks } from '@/ExternalHooks';
import { SharedWorkflow } from '@db/entities/SharedWorkflow'; import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { CredentialsService } from '../credentials/credentials.service'; import { CredentialsService } from '../credentials/credentials.service';
@@ -59,7 +59,7 @@ EEWorkflowController.put(
throw new BadRequestError('Bad request'); throw new BadRequestError('Bad request');
} }
const isOwnedRes = await EEWorkflows.isOwned(req.user, workflowId); const isOwnedRes = await Container.get(EnterpriseWorkflowService).isOwned(req.user, workflowId);
const { ownsWorkflow } = isOwnedRes; const { ownsWorkflow } = isOwnedRes;
let { workflow } = isOwnedRes; let { workflow } = isOwnedRes;
@@ -67,10 +67,14 @@ EEWorkflowController.put(
workflow = undefined; workflow = undefined;
// Allow owners/admins to share // Allow owners/admins to share
if (await req.user.hasGlobalScope('workflow:share')) { if (await req.user.hasGlobalScope('workflow:share')) {
const sharedRes = await EEWorkflows.getSharing(req.user, workflowId, { const sharedRes = await Container.get(EnterpriseWorkflowService).getSharing(
allowGlobalScope: true, req.user,
globalScope: 'workflow:share', workflowId,
}); {
allowGlobalScope: true,
globalScope: 'workflow:share',
},
);
workflow = sharedRes?.workflow; workflow = sharedRes?.workflow;
} }
if (!workflow) { if (!workflow) {
@@ -79,10 +83,11 @@ EEWorkflowController.put(
} }
const ownerIds = ( const ownerIds = (
await EEWorkflows.getSharings(Db.getConnection().createEntityManager(), workflowId, [ await Container.get(EnterpriseWorkflowService).getSharings(
'shared', Db.getConnection().createEntityManager(),
'shared.role', workflowId,
]) ['shared', 'shared.role'],
)
) )
.filter((e) => e.role.name === 'owner') .filter((e) => e.role.name === 'owner')
.map((e) => e.userId); .map((e) => e.userId);
@@ -90,9 +95,12 @@ EEWorkflowController.put(
let newShareeIds: string[] = []; let newShareeIds: string[] = [];
await Db.transaction(async (trx) => { await Db.transaction(async (trx) => {
// remove all sharings that are not supposed to exist anymore // remove all sharings that are not supposed to exist anymore
await EEWorkflows.pruneSharings(trx, workflowId, [...ownerIds, ...shareWithIds]); await Container.get(EnterpriseWorkflowService).pruneSharings(trx, workflowId, [
...ownerIds,
...shareWithIds,
]);
const sharings = await EEWorkflows.getSharings(trx, workflowId); const sharings = await Container.get(EnterpriseWorkflowService).getSharings(trx, workflowId);
// extract the new sharings that need to be added // extract the new sharings that need to be added
newShareeIds = rightDiff( newShareeIds = rightDiff(
@@ -101,7 +109,7 @@ EEWorkflowController.put(
); );
if (newShareeIds.length) { if (newShareeIds.length) {
await EEWorkflows.share(trx, workflow!, newShareeIds); await Container.get(EnterpriseWorkflowService).share(trx, workflow!, newShareeIds);
} }
}); });
@@ -124,7 +132,9 @@ EEWorkflowController.get(
relations.push('tags'); relations.push('tags');
} }
const workflow = await EEWorkflows.get({ id: workflowId }, { relations }); const enterpriseWorkflowService = Container.get(EnterpriseWorkflowService);
const workflow = await enterpriseWorkflowService.get({ id: workflowId }, { relations });
if (!workflow) { if (!workflow) {
throw new NotFoundError(`Workflow with ID "${workflowId}" does not exist`); throw new NotFoundError(`Workflow with ID "${workflowId}" does not exist`);
@@ -137,8 +147,8 @@ EEWorkflowController.get(
); );
} }
EEWorkflows.addOwnerAndSharings(workflow); enterpriseWorkflowService.addOwnerAndSharings(workflow);
await EEWorkflows.addCredentialsToWorkflow(workflow, req.user); await enterpriseWorkflowService.addCredentialsToWorkflow(workflow, req.user);
return workflow; return workflow;
}), }),
); );
@@ -179,7 +189,10 @@ EEWorkflowController.post(
const allCredentials = await CredentialsService.getMany(req.user); const allCredentials = await CredentialsService.getMany(req.user);
try { try {
EEWorkflows.validateCredentialPermissionsToUser(newWorkflow, allCredentials); Container.get(EnterpriseWorkflowService).validateCredentialPermissionsToUser(
newWorkflow,
allCredentials,
);
} catch (error) { } catch (error) {
throw new BadRequestError( throw new BadRequestError(
'The workflow you are trying to save contains credentials that are not shared with you', 'The workflow you are trying to save contains credentials that are not shared with you',
@@ -240,7 +253,7 @@ EEWorkflowController.get(
try { try {
const sharedWorkflowIds = await WorkflowHelpers.getSharedWorkflowIds(req.user); const sharedWorkflowIds = await WorkflowHelpers.getSharedWorkflowIds(req.user);
const { workflows: data, count } = await EEWorkflows.getMany( const { workflows: data, count } = await Container.get(EnterpriseWorkflowService).getMany(
sharedWorkflowIds, sharedWorkflowIds,
req.listQueryOptions, req.listQueryOptions,
); );
@@ -264,9 +277,13 @@ EEWorkflowController.patch(
const { tags, ...rest } = req.body; const { tags, ...rest } = req.body;
Object.assign(updateData, rest); Object.assign(updateData, rest);
const safeWorkflow = await EEWorkflows.preventTampering(updateData, workflowId, req.user); const safeWorkflow = await Container.get(EnterpriseWorkflowService).preventTampering(
updateData,
workflowId,
req.user,
);
const updatedWorkflow = await EEWorkflows.update( const updatedWorkflow = await Container.get(EnterpriseWorkflowService).update(
req.user, req.user,
safeWorkflow, safeWorkflow,
workflowId, workflowId,
@@ -288,10 +305,18 @@ EEWorkflowController.post(
Object.assign(workflow, req.body.workflowData); Object.assign(workflow, req.body.workflowData);
if (req.body.workflowData.id !== undefined) { if (req.body.workflowData.id !== undefined) {
const safeWorkflow = await EEWorkflows.preventTampering(workflow, workflow.id, req.user); const safeWorkflow = await Container.get(EnterpriseWorkflowService).preventTampering(
workflow,
workflow.id,
req.user,
);
req.body.workflowData.nodes = safeWorkflow.nodes; req.body.workflowData.nodes = safeWorkflow.nodes;
} }
return EEWorkflows.runManually(req.body, req.user, GenericHelpers.getSessionId(req)); return Container.get(EnterpriseWorkflowService).runManually(
req.body,
req.user,
GenericHelpers.getSessionId(req),
);
}), }),
); );

View File

@@ -15,7 +15,7 @@ import { ExternalHooks } from '@/ExternalHooks';
import type { ListQuery, WorkflowRequest } from '@/requests'; import type { ListQuery, WorkflowRequest } from '@/requests';
import { isBelowOnboardingThreshold } from '@/WorkflowHelpers'; import { isBelowOnboardingThreshold } from '@/WorkflowHelpers';
import { EEWorkflowController } from './workflows.controller.ee'; import { EEWorkflowController } from './workflows.controller.ee';
import { WorkflowsService } from './workflows.services'; import { WorkflowService } from './workflow.service';
import { whereClause } from '@/UserManagement/UserManagementHelper'; import { whereClause } from '@/UserManagement/UserManagementHelper';
import { In } from 'typeorm'; import { In } from 'typeorm';
import { Container } from 'typedi'; import { Container } from 'typedi';
@@ -120,7 +120,7 @@ workflowsController.get(
try { try {
const sharedWorkflowIds = await WorkflowHelpers.getSharedWorkflowIds(req.user, ['owner']); const sharedWorkflowIds = await WorkflowHelpers.getSharedWorkflowIds(req.user, ['owner']);
const { workflows: data, count } = await WorkflowsService.getMany( const { workflows: data, count } = await Container.get(WorkflowService).getMany(
sharedWorkflowIds, sharedWorkflowIds,
req.listQueryOptions, req.listQueryOptions,
); );
@@ -245,7 +245,7 @@ workflowsController.patch(
const { tags, ...rest } = req.body; const { tags, ...rest } = req.body;
Object.assign(updateData, rest); Object.assign(updateData, rest);
const updatedWorkflow = await WorkflowsService.update( const updatedWorkflow = await Container.get(WorkflowService).update(
req.user, req.user,
updateData, updateData,
workflowId, workflowId,
@@ -267,7 +267,7 @@ workflowsController.delete(
ResponseHelper.send(async (req: WorkflowRequest.Delete) => { ResponseHelper.send(async (req: WorkflowRequest.Delete) => {
const { id: workflowId } = req.params; const { id: workflowId } = req.params;
const workflow = await WorkflowsService.delete(req.user, workflowId); const workflow = await Container.get(WorkflowService).delete(req.user, workflowId);
if (!workflow) { if (!workflow) {
Container.get(Logger).verbose('User attempted to delete a workflow without permissions', { Container.get(Logger).verbose('User attempted to delete a workflow without permissions', {
workflowId, workflowId,
@@ -288,6 +288,10 @@ workflowsController.delete(
workflowsController.post( workflowsController.post(
'/run', '/run',
ResponseHelper.send(async (req: WorkflowRequest.ManualRun): Promise<IExecutionPushResponse> => { ResponseHelper.send(async (req: WorkflowRequest.ManualRun): Promise<IExecutionPushResponse> => {
return WorkflowsService.runManually(req.body, req.user, GenericHelpers.getSessionId(req)); return Container.get(WorkflowService).runManually(
req.body,
req.user,
GenericHelpers.getSessionId(req),
);
}), }),
); );

View File

@@ -24,11 +24,13 @@ import { setSchedulerAsLoadedNode } from './shared/utils';
import * as testDb from './shared/testDb'; import * as testDb from './shared/testDb';
import { createOwner } from './shared/db/users'; import { createOwner } from './shared/db/users';
import { createWorkflow } from './shared/db/workflows'; import { createWorkflow } from './shared/db/workflows';
import { WorkflowService } from '@/workflows/workflow.service';
mockInstance(ActiveExecutions); mockInstance(ActiveExecutions);
mockInstance(ActiveWorkflows); mockInstance(ActiveWorkflows);
mockInstance(Push); mockInstance(Push);
mockInstance(SecretsHelper); mockInstance(SecretsHelper);
mockInstance(WorkflowService);
const webhookService = mockInstance(WebhookService); const webhookService = mockInstance(WebhookService);
const multiMainSetup = mockInstance(MultiMainSetup, { const multiMainSetup = mockInstance(MultiMainSetup, {

View File

@@ -7,7 +7,7 @@ import type { TagEntity } from '@db/entities/TagEntity';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository'; import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository';
import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { randomApiKey } from '../shared/random'; import { randomApiKey } from '../shared/random';
import * as utils from '../shared/utils/'; import * as utils from '../shared/utils/';
@@ -43,7 +43,10 @@ beforeAll(async () => {
}); });
await utils.initNodeTypes(); await utils.initNodeTypes();
workflowRunner = await utils.initActiveWorkflowRunner();
workflowRunner = Container.get(ActiveWorkflowRunner);
await workflowRunner.init();
}); });
beforeEach(async () => { beforeEach(async () => {

View File

@@ -1,6 +1,7 @@
import Container from 'typedi';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as testDb from './shared/testDb'; import * as testDb from './shared/testDb';
import { WorkflowsService } from '@/workflows/workflows.services'; import { WorkflowService } from '@/workflows/workflow.service';
import { mockInstance } from '../shared/mocking'; import { mockInstance } from '../shared/mocking';
import { Telemetry } from '@/telemetry'; import { Telemetry } from '@/telemetry';
import { createOwner } from './shared/db/users'; import { createOwner } from './shared/db/users';
@@ -31,7 +32,7 @@ describe('update()', () => {
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove'); const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add'); const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
await WorkflowsService.update(owner, workflow, workflow.id); await Container.get(WorkflowService).update(owner, workflow, workflow.id);
expect(removeSpy).toHaveBeenCalledTimes(1); expect(removeSpy).toHaveBeenCalledTimes(1);
const [removedWorkflowId] = removeSpy.mock.calls[0]; const [removedWorkflowId] = removeSpy.mock.calls[0];
@@ -51,7 +52,7 @@ describe('update()', () => {
const addSpy = jest.spyOn(activeWorkflowRunner, 'add'); const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
workflow.active = false; workflow.active = false;
await WorkflowsService.update(owner, workflow, workflow.id); await Container.get(WorkflowService).update(owner, workflow, workflow.id);
expect(removeSpy).toHaveBeenCalledTimes(1); expect(removeSpy).toHaveBeenCalledTimes(1);
const [removedWorkflowId] = removeSpy.mock.calls[0]; const [removedWorkflowId] = removeSpy.mock.calls[0];

View File

@@ -20,6 +20,8 @@ import { getCredentialOwnerRole, getGlobalMemberRole, getGlobalOwnerRole } from
import { createUser } from './shared/db/users'; import { createUser } from './shared/db/users';
import { createWorkflow, getWorkflowSharing, shareWorkflowWithUsers } from './shared/db/workflows'; import { createWorkflow, getWorkflowSharing, shareWorkflowWithUsers } from './shared/db/workflows';
import type { Role } from '@/databases/entities/Role'; import type { Role } from '@/databases/entities/Role';
import { EnterpriseWorkflowService } from '@/workflows/workflow.service.ee';
import { WorkflowService } from '@/workflows/workflow.service';
let globalMemberRole: Role; let globalMemberRole: Role;
let owner: User; let owner: User;
@@ -55,6 +57,9 @@ beforeAll(async () => {
saveCredential = affixRoleToSaveCredential(credentialOwnerRole); saveCredential = affixRoleToSaveCredential(credentialOwnerRole);
await utils.initNodeTypes(); await utils.initNodeTypes();
Container.set(WorkflowService, new WorkflowService());
Container.set(EnterpriseWorkflowService, new EnterpriseWorkflowService());
}); });
beforeEach(async () => { beforeEach(async () => {