refactor(core): Move webhook DB access to repository (no-changelog) (#6706)

* refactor(core): Move webhook DB access to repository (no-changelog)

* make sure `DataSource` is initialized before it's dependencies

at some point I hope to replace `DataSource` with a custom `DatabaseConnection` service class that can then disconnect and reconnect from DB without having to update all repositories.

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Iván Ovejero
2023-07-25 18:17:34 +02:00
committed by GitHub
parent ed9f86bb95
commit bcfc5e717b
4 changed files with 19 additions and 14 deletions

View File

@@ -74,9 +74,6 @@ export abstract class AbstractServer {
this.endpointWebhook = config.getEnv('endpoints.webhook'); this.endpointWebhook = config.getEnv('endpoints.webhook');
this.endpointWebhookTest = config.getEnv('endpoints.webhookTest'); this.endpointWebhookTest = config.getEnv('endpoints.webhookTest');
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting'); this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
this.externalHooks = Container.get(ExternalHooks);
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
} }
private async setupErrorHandlers() { private async setupErrorHandlers() {
@@ -438,6 +435,9 @@ export abstract class AbstractServer {
await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve())); await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));
this.externalHooks = Container.get(ExternalHooks);
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
await this.setupHealthCheck(); await this.setupHealthCheck();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);

View File

@@ -67,6 +67,7 @@ import { WorkflowsService } from './workflows/workflows.services';
import { STARTING_NODES } from './constants'; import { STARTING_NODES } from './constants';
import { webhookNotFoundErrorMessage } from './utils'; import { webhookNotFoundErrorMessage } from './utils';
import { In } from 'typeorm'; import { In } from 'typeorm';
import { WebhookRepository } from '@db/repositories';
const WEBHOOK_PROD_UNREGISTERED_HINT = const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@@ -87,6 +88,7 @@ export class ActiveWorkflowRunner {
private activeExecutions: ActiveExecutions, private activeExecutions: ActiveExecutions,
private externalHooks: ExternalHooks, private externalHooks: ExternalHooks,
private nodeTypes: NodeTypes, private nodeTypes: NodeTypes,
private webhookRepository: WebhookRepository,
) {} ) {}
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
@@ -110,7 +112,7 @@ export class ActiveWorkflowRunner {
// This is not officially supported but there is no reason // This is not officially supported but there is no reason
// it should not work. // it should not work.
// Clear up active workflow table // Clear up active workflow table
await Db.collections.Webhook.clear(); await this.webhookRepository.clear();
} }
if (workflowsData.length !== 0) { if (workflowsData.length !== 0) {
@@ -201,7 +203,7 @@ export class ActiveWorkflowRunner {
path = path.slice(0, -1); path = path.slice(0, -1);
} }
let webhook = await Db.collections.Webhook.findOneBy({ let webhook = await this.webhookRepository.findOneBy({
webhookPath: path, webhookPath: path,
method: httpMethod, method: httpMethod,
}); });
@@ -212,7 +214,7 @@ export class ActiveWorkflowRunner {
// check if a dynamic webhook path exists // check if a dynamic webhook path exists
const pathElements = path.split('/'); const pathElements = path.split('/');
webhookId = pathElements.shift(); webhookId = pathElements.shift();
const dynamicWebhooks = await Db.collections.Webhook.findBy({ const dynamicWebhooks = await this.webhookRepository.findBy({
webhookId, webhookId,
method: httpMethod, method: httpMethod,
pathLength: pathElements.length, pathLength: pathElements.length,
@@ -333,7 +335,7 @@ export class ActiveWorkflowRunner {
* Gets all request methods associated with a single webhook * Gets all request methods associated with a single webhook
*/ */
async getWebhookMethods(path: string): Promise<string[]> { async getWebhookMethods(path: string): Promise<string[]> {
const webhooks = await Db.collections.Webhook.find({ const webhooks = await this.webhookRepository.find({
select: ['method'], select: ['method'],
where: { webhookPath: path }, where: { webhookPath: path },
}); });
@@ -442,7 +444,7 @@ export class ActiveWorkflowRunner {
try { try {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
// TODO: this should happen in a transaction, that way we don't need to manually remove this in `catch` // TODO: this should happen in a transaction, that way we don't need to manually remove this in `catch`
await Db.collections.Webhook.insert(webhook); await this.webhookRepository.insert(webhook);
const webhookExists = await workflow.runWebhookMethod( const webhookExists = await workflow.runWebhookMethod(
'checkExists', 'checkExists',
webhookData, webhookData,
@@ -552,7 +554,7 @@ export class ActiveWorkflowRunner {
await WorkflowHelpers.saveStaticData(workflow); await WorkflowHelpers.saveStaticData(workflow);
await Db.collections.Webhook.delete({ await this.webhookRepository.delete({
workflowId: workflowData.id, workflowId: workflowData.id,
}); });
} }

View File

@@ -104,6 +104,9 @@ class WorkflowRunnerProcess {
this.startedAt = new Date(); this.startedAt = new Date();
// Init db since we need to read the license.
await Db.init();
const userSettings = await UserSettings.prepareUserSettings(); const userSettings = await UserSettings.prepareUserSettings();
const loadNodesAndCredentials = Container.get(LoadNodesAndCredentials); const loadNodesAndCredentials = Container.get(LoadNodesAndCredentials);
@@ -117,9 +120,6 @@ class WorkflowRunnerProcess {
const externalHooks = Container.get(ExternalHooks); const externalHooks = Container.get(ExternalHooks);
await externalHooks.init(); await externalHooks.init();
// Init db since we need to read the license.
await Db.init();
const instanceId = userSettings.instanceId ?? ''; const instanceId = userSettings.instanceId ?? '';
await Container.get(PostHogClient).init(instanceId); await Container.get(PostHogClient).init(instanceId);
await Container.get(InternalHooks).init(instanceId); await Container.get(InternalHooks).init(instanceId);

View File

@@ -24,6 +24,7 @@ import { mockInstance } from '../integration/shared/utils/';
import { Push } from '@/push'; import { Push } from '@/push';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import { NodeTypes } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes';
import type { WebhookRepository } from '@/databases/repositories';
/** /**
* TODO: * TODO:
@@ -139,6 +140,7 @@ const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn(
describe('ActiveWorkflowRunner', () => { describe('ActiveWorkflowRunner', () => {
let externalHooks: ExternalHooks; let externalHooks: ExternalHooks;
let activeWorkflowRunner: ActiveWorkflowRunner; let activeWorkflowRunner: ActiveWorkflowRunner;
let webhookRepository = mock<WebhookRepository>();
beforeAll(async () => { beforeAll(async () => {
LoggerProxy.init(getLogger()); LoggerProxy.init(getLogger());
@@ -160,6 +162,7 @@ describe('ActiveWorkflowRunner', () => {
new ActiveExecutions(), new ActiveExecutions(),
externalHooks, externalHooks,
Container.get(NodeTypes), Container.get(NodeTypes),
webhookRepository,
); );
}); });
@@ -174,7 +177,7 @@ describe('ActiveWorkflowRunner', () => {
await activeWorkflowRunner.init(); await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0); expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled(); expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(mocked(Db.collections.Webhook.clear)).toHaveBeenCalled(); expect(mocked(webhookRepository.clear)).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalledTimes(1); expect(externalHooks.run).toHaveBeenCalledTimes(1);
}); });
@@ -185,7 +188,7 @@ describe('ActiveWorkflowRunner', () => {
databaseActiveWorkflowsCount, databaseActiveWorkflowsCount,
); );
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled(); expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(mocked(Db.collections.Webhook.clear)).toHaveBeenCalled(); expect(mocked(webhookRepository.clear)).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalled(); expect(externalHooks.run).toHaveBeenCalled();
}); });