Improve typing on Queue and Jobs (#3892)

also, move all things related to `bull` into a single place.
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2022-09-09 15:14:49 +02:00
committed by GitHub
parent 12507d39d6
commit f5c6c21bf4
4 changed files with 47 additions and 56 deletions

View File

@@ -13,23 +13,18 @@ import http from 'http';
import PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command';
import { BinaryDataManager, IBinaryDataConfig, UserSettings, WorkflowExecute } from 'n8n-core';
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow';
import { FindOneOptions, getConnectionManager } from 'typeorm';
import Bull from 'bull';
import {
CredentialsOverwrites,
CredentialTypes,
Db,
ExternalHooks,
GenericHelpers,
IBullJobData,
IBullJobResponse,
IBullWebhookResponse,
IExecutionFlattedDb,
InternalHooksManager,
LoadNodesAndCredentials,
NodeTypes,
@@ -64,7 +59,7 @@ export class Worker extends Command {
[key: string]: PCancelable<IRun>;
} = {};
static jobQueue: Bull.Queue;
static jobQueue: Queue.JobQueue;
static processExistCode = 0;
// static activeExecutions = ActiveExecutions.getInstance();
@@ -118,30 +113,28 @@ export class Worker extends Command {
process.exit(Worker.processExistCode);
}
async runJob(job: Bull.Job, nodeTypes: INodeTypes): Promise<IBullJobResponse> {
const jobData = job.data as IBullJobData;
const executionDb = await Db.collections.Execution.findOne(jobData.executionId);
async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> {
const { executionId, loadStaticData } = job.data;
const executionDb = await Db.collections.Execution.findOne(executionId);
if (!executionDb) {
LoggerProxy.error(
`Worker failed to find data of execution "${jobData.executionId}" in database. Cannot continue.`,
{
executionId: jobData.executionId,
},
`Worker failed to find data of execution "${executionId}" in database. Cannot continue.`,
{ executionId },
);
throw new Error(
`Unable to find data of execution "${jobData.executionId}" in database. Aborting execution.`,
`Unable to find data of execution "${executionId}" in database. Aborting execution.`,
);
}
const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb);
LoggerProxy.info(
`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`,
`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${executionId})`,
);
const workflowOwner = await getWorkflowOwner(currentExecutionDb.workflowData.id!.toString());
let { staticData } = currentExecutionDb.workflowData;
if (jobData.loadStaticData) {
if (loadStaticData) {
const findOptions = {
select: ['id', 'staticData'],
} as FindOneOptions;
@@ -154,7 +147,7 @@ export class Worker extends Command {
'Worker execution failed because workflow could not be found in database.',
{
workflowId: currentExecutionDb.workflowData.id,
executionId: jobData.executionId,
executionId,
},
);
throw new Error(
@@ -206,14 +199,15 @@ export class Worker extends Command {
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
await job.progress({
executionId: job.data.executionId as string,
const progress: Queue.WebhookResponse = {
executionId,
response: WebhookHelpers.encodeWebhookResponse(response),
} as IBullWebhookResponse);
};
await job.progress(progress);
},
];
additionalData.executionId = jobData.executionId;
additionalData.executionId = executionId;
let workflowExecute: WorkflowExecute;
let workflowRun: PCancelable<IRun>;