refactor(core): Decouple NodeTypes from migrations (#14941)

This commit is contained in:
Iván Ovejero
2025-04-28 12:49:22 +02:00
committed by GitHub
parent ddb688ba30
commit a767ce3d8e
3 changed files with 10 additions and 58 deletions

View File

@@ -1,60 +1,16 @@
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import type { MigrationContext, IrreversibleMigration } from '@/databases/types';
import { UserError } from 'n8n-workflow';
interface Workflow {
id: number;
nodes: WorkflowEntity['nodes'] | string;
connections: WorkflowEntity['connections'] | string;
}
import { WorkflowEntity } from '@/databases/entities/workflow-entity';
import type { IrreversibleMigration, MigrationContext } from '@/databases/types';
export class PurgeInvalidWorkflowConnections1675940580449 implements IrreversibleMigration {
async up({ escape, parseJson, runQuery, nodeTypes }: MigrationContext) {
const workflowsTable = escape.tableName('workflow_entity');
const workflows: Workflow[] = await runQuery(
`SELECT id, nodes, connections FROM ${workflowsTable}`,
async up({ queryRunner }: MigrationContext) {
const workflowCount = await queryRunner.manager.count(WorkflowEntity);
if (workflowCount > 0) {
throw new UserError(
'Migration "PurgeInvalidWorkflowConnections1675940580449" is no longer supported. Please upgrade to n8n@1.0.0 first.',
);
await Promise.all(
workflows.map(async (workflow) => {
const connections = parseJson(workflow.connections);
const nodes = parseJson(workflow.nodes);
const nodesThatCannotReceiveInput = nodes.reduce<string[]>((acc, node) => {
try {
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if ((nodeType.description.inputs?.length ?? []) === 0) {
acc.push(node.name);
}
} catch (error) {}
return acc;
}, []);
Object.keys(connections).forEach((sourceNodeName) => {
const connection = connections[sourceNodeName];
const outputs = Object.keys(connection);
outputs.forEach((outputConnectionName /* Like `main` */) => {
const outputConnection = connection[outputConnectionName];
// It filters out all connections that are connected to a node that cannot receive input
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => {
outputConnection[outputConnectionItemIdx] = (outputConnectionItem ?? []).filter(
(outgoingConnections) =>
!nodesThatCannotReceiveInput.includes(outgoingConnections.node),
);
});
});
});
// Update database with new connections
return await runQuery(
`UPDATE ${workflowsTable} SET connections = :connections WHERE id = :id`,
{
connections: JSON.stringify(connections),
id: workflow.id,
},
);
}),
);
}
}

View File

@@ -1,6 +1,5 @@
import type { QueryRunner, ObjectLiteral } from '@n8n/typeorm';
import type { Logger } from 'n8n-core';
import type { INodeTypes } from 'n8n-workflow';
import type { createSchemaBuilder } from './dsl';
@@ -16,7 +15,6 @@ export interface MigrationContext {
isPostgres: boolean;
dbName: string;
migrationName: string;
nodeTypes: INodeTypes;
schemaBuilder: ReturnType<typeof createSchemaBuilder>;
loadSurveyFromDisk(): string | null;
parseJson<T>(data: string | T): T;

View File

@@ -9,7 +9,6 @@ import { jsonParse, UnexpectedError } from 'n8n-workflow';
import { inTest } from '@/constants';
import { createSchemaBuilder } from '@/databases/dsl';
import type { BaseMigration, Migration, MigrationContext, MigrationFn } from '@/databases/types';
import { NodeTypes } from '@/node-types';
const PERSONALIZATION_SURVEY_FILENAME = 'personalizationSurvey.json';
@@ -108,7 +107,6 @@ const createContext = (queryRunner: QueryRunner, migration: Migration): Migratio
migrationName: migration.name,
queryRunner,
schemaBuilder: createSchemaBuilder(tablePrefix, queryRunner),
nodeTypes: Container.get(NodeTypes),
loadSurveyFromDisk,
parseJson,
escape: {