fix(core): Decouple removing and closing destination from actually deleting it (#17614)

This commit is contained in:
Guillaume Jacquart
2025-07-24 11:21:42 +02:00
committed by GitHub
parent a98ed2ca49
commit b09f73701d
3 changed files with 10 additions and 8 deletions

View File

@@ -269,6 +269,7 @@ export class E2EController {
private async resetLogStreaming() {
for (const id in this.eventBus.destinations) {
await this.eventBus.removeDestination(id, false);
await this.eventBus.deleteDestination(id);
}
}

View File

@@ -124,7 +124,8 @@ export class EventBusController {
@GlobalScope('eventBusDestination:delete')
async deleteDestination(req: AuthenticatedRequest) {
if (isWithIdString(req.query)) {
return await this.eventBus.removeDestination(req.query.id);
await this.eventBus.removeDestination(req.query.id);
return await this.eventBus.deleteDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}

View File

@@ -240,20 +240,20 @@ export class MessageEventBus extends EventEmitter {
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
}
async removeDestination(
id: string,
notifyWorkers: boolean = true,
): Promise<DeleteResult | undefined> {
let result;
async removeDestination(id: string, notifyWorkers: boolean = true) {
if (Object.keys(this.destinations).includes(id)) {
await this.destinations[id].close();
result = await this.destinations[id].deleteFromDb();
delete this.destinations[id];
}
if (notifyWorkers) {
void this.publisher.publishCommand({ command: 'restart-event-bus' });
}
return result;
}
async deleteDestination(id: string): Promise<DeleteResult | undefined> {
return await this.eventDestinationsRepository.delete({
id,
});
}
private async trySendingUnsent(msgs?: EventMessageTypes[]) {