refactor(core): Use DI in source-control. add more tests (#12554)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-01-10 16:10:19 +01:00
committed by GitHub
parent b2cbed9865
commit 25a79ccf40
15 changed files with 590 additions and 217 deletions

View File

@@ -1,5 +1,5 @@
import type { SourceControlledFile } from '@n8n/api-types';
import { Container, Service } from '@n8n/di';
import { Service } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In } from '@n8n/typeorm';
import glob from 'fast-glob';
@@ -53,7 +53,15 @@ export class SourceControlImportService {
private readonly errorReporter: ErrorReporter,
private readonly variablesService: VariablesService,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly credentialsRepository: CredentialsRepository,
private readonly projectRepository: ProjectRepository,
private readonly tagRepository: TagRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly sharedCredentialsRepository: SharedCredentialsRepository,
private readonly userRepository: UserRepository,
private readonly variablesRepository: VariablesRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly workflowTagMappingRepository: WorkflowTagMappingRepository,
instanceSettings: InstanceSettings,
) {
this.gitFolder = path.join(instanceSettings.n8nFolder, SOURCE_CONTROL_GIT_FOLDER);
@@ -91,7 +99,7 @@ export class SourceControlImportService {
}
async getLocalVersionIdsFromDb(): Promise<SourceControlWorkflowVersionId[]> {
const localWorkflows = await Container.get(WorkflowRepository).find({
const localWorkflows = await this.workflowRepository.find({
select: ['id', 'name', 'versionId', 'updatedAt'],
});
return localWorkflows.map((local) => {
@@ -146,7 +154,7 @@ export class SourceControlImportService {
}
async getLocalCredentialsFromDb(): Promise<Array<ExportableCredential & { filename: string }>> {
const localCredentials = await Container.get(CredentialsRepository).find({
const localCredentials = await this.credentialsRepository.find({
select: ['id', 'name', 'type'],
});
return localCredentials.map((local) => ({
@@ -201,24 +209,22 @@ export class SourceControlImportService {
const localTags = await this.tagRepository.find({
select: ['id', 'name'],
});
const localMappings = await Container.get(WorkflowTagMappingRepository).find({
const localMappings = await this.workflowTagMappingRepository.find({
select: ['workflowId', 'tagId'],
});
return { tags: localTags, mappings: localMappings };
}
async importWorkflowFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
const personalProject =
await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
const personalProject = await this.projectRepository.getPersonalProjectForUserOrFail(userId);
const workflowManager = this.activeWorkflowManager;
const candidateIds = candidates.map((c) => c.id);
const existingWorkflows = await Container.get(WorkflowRepository).findByIds(candidateIds, {
const existingWorkflows = await this.workflowRepository.findByIds(candidateIds, {
fields: ['id', 'name', 'versionId', 'active'],
});
const allSharedWorkflows = await Container.get(SharedWorkflowRepository).findWithFields(
candidateIds,
{ select: ['workflowId', 'role', 'projectId'] },
);
const allSharedWorkflows = await this.sharedWorkflowRepository.findWithFields(candidateIds, {
select: ['workflowId', 'role', 'projectId'],
});
const importWorkflowsResult = [];
// Due to SQLite concurrency issues, we cannot save all workflows at once
@@ -235,9 +241,7 @@ export class SourceControlImportService {
const existingWorkflow = existingWorkflows.find((e) => e.id === importedWorkflow.id);
importedWorkflow.active = existingWorkflow?.active ?? false;
this.logger.debug(`Updating workflow id ${importedWorkflow.id ?? 'new'}`);
const upsertResult = await Container.get(WorkflowRepository).upsert({ ...importedWorkflow }, [
'id',
]);
const upsertResult = await this.workflowRepository.upsert({ ...importedWorkflow }, ['id']);
if (upsertResult?.identifiers?.length !== 1) {
throw new ApplicationError('Failed to upsert workflow', {
extra: { workflowId: importedWorkflow.id ?? 'new' },
@@ -253,7 +257,7 @@ export class SourceControlImportService {
? await this.findOrCreateOwnerProject(importedWorkflow.owner)
: null;
await Container.get(SharedWorkflowRepository).upsert(
await this.sharedWorkflowRepository.upsert(
{
workflowId: importedWorkflow.id,
projectId: remoteOwnerProject?.id ?? personalProject.id,
@@ -276,7 +280,7 @@ export class SourceControlImportService {
const error = ensureError(e);
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, { error });
} finally {
await Container.get(WorkflowRepository).update(
await this.workflowRepository.update(
{ id: existingWorkflow.id },
{ versionId: importedWorkflow.versionId },
);
@@ -295,16 +299,15 @@ export class SourceControlImportService {
}
async importCredentialsFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
const personalProject =
await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
const personalProject = await this.projectRepository.getPersonalProjectForUserOrFail(userId);
const candidateIds = candidates.map((c) => c.id);
const existingCredentials = await Container.get(CredentialsRepository).find({
const existingCredentials = await this.credentialsRepository.find({
where: {
id: In(candidateIds),
},
select: ['id', 'name', 'type', 'data'],
});
const existingSharedCredentials = await Container.get(SharedCredentialsRepository).find({
const existingSharedCredentials = await this.sharedCredentialsRepository.find({
select: ['credentialsId', 'role'],
where: {
credentialsId: In(candidateIds),
@@ -336,7 +339,7 @@ export class SourceControlImportService {
}
this.logger.debug(`Updating credential id ${newCredentialObject.id as string}`);
await Container.get(CredentialsRepository).upsert(newCredentialObject, ['id']);
await this.credentialsRepository.upsert(newCredentialObject, ['id']);
const isOwnedLocally = existingSharedCredentials.some(
(c) => c.credentialsId === credential.id && c.role === 'credential:owner',
@@ -352,7 +355,7 @@ export class SourceControlImportService {
newSharedCredential.projectId = remoteOwnerProject?.id ?? personalProject.id;
newSharedCredential.role = 'credential:owner';
await Container.get(SharedCredentialsRepository).upsert({ ...newSharedCredential }, [
await this.sharedCredentialsRepository.upsert({ ...newSharedCredential }, [
'credentialsId',
'projectId',
]);
@@ -388,7 +391,7 @@ export class SourceControlImportService {
const existingWorkflowIds = new Set(
(
await Container.get(WorkflowRepository).find({
await this.workflowRepository.find({
select: ['id'],
})
).map((e) => e.id),
@@ -417,7 +420,7 @@ export class SourceControlImportService {
await Promise.all(
mappedTags.mappings.map(async (mapping) => {
if (!existingWorkflowIds.has(String(mapping.workflowId))) return;
await Container.get(WorkflowTagMappingRepository).upsert(
await this.workflowTagMappingRepository.upsert(
{ tagId: String(mapping.tagId), workflowId: String(mapping.workflowId) },
{
skipUpdateIfNoValuesChanged: true,
@@ -464,12 +467,12 @@ export class SourceControlImportService {
overriddenKeys.splice(overriddenKeys.indexOf(variable.key), 1);
}
try {
await Container.get(VariablesRepository).upsert({ ...variable }, ['id']);
await this.variablesRepository.upsert({ ...variable }, ['id']);
} catch (errorUpsert) {
if (isUniqueConstraintError(errorUpsert as Error)) {
this.logger.debug(`Variable ${variable.key} already exists, updating instead`);
try {
await Container.get(VariablesRepository).update({ key: variable.key }, { ...variable });
await this.variablesRepository.update({ key: variable.key }, { ...variable });
} catch (errorUpdate) {
this.logger.debug(`Failed to update variable ${variable.key}, skipping`);
this.logger.debug((errorUpdate as Error).message);
@@ -484,11 +487,11 @@ export class SourceControlImportService {
if (overriddenKeys.length > 0 && valueOverrides) {
for (const key of overriddenKeys) {
result.imported.push(key);
const newVariable = Container.get(VariablesRepository).create({
const newVariable = this.variablesRepository.create({
key,
value: valueOverrides[key],
});
await Container.get(VariablesRepository).save(newVariable, { transaction: false });
await this.variablesRepository.save(newVariable, { transaction: false });
}
}
@@ -498,32 +501,30 @@ export class SourceControlImportService {
}
private async findOrCreateOwnerProject(owner: ResourceOwner): Promise<Project | null> {
const projectRepository = Container.get(ProjectRepository);
const userRepository = Container.get(UserRepository);
if (typeof owner === 'string' || owner.type === 'personal') {
const email = typeof owner === 'string' ? owner : owner.personalEmail;
const user = await userRepository.findOne({
const user = await this.userRepository.findOne({
where: { email },
});
if (!user) {
return null;
}
return await projectRepository.getPersonalProjectForUserOrFail(user.id);
return await this.projectRepository.getPersonalProjectForUserOrFail(user.id);
} else if (owner.type === 'team') {
let teamProject = await projectRepository.findOne({
let teamProject = await this.projectRepository.findOne({
where: { id: owner.teamId },
});
if (!teamProject) {
try {
teamProject = await projectRepository.save(
projectRepository.create({
teamProject = await this.projectRepository.save(
this.projectRepository.create({
id: owner.teamId,
name: owner.teamName,
type: 'team',
}),
);
} catch (e) {
teamProject = await projectRepository.findOne({
teamProject = await this.projectRepository.findOne({
where: { id: owner.teamId },
});
if (!teamProject) {