mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
feat: Add support for AI log streaming (#8526)
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
@@ -23,6 +23,7 @@ import type {
|
||||
WorkflowExecuteMode,
|
||||
ExecutionStatus,
|
||||
ExecutionError,
|
||||
EventNamesAiNodesType,
|
||||
} from 'n8n-workflow';
|
||||
import {
|
||||
ApplicationError,
|
||||
@@ -68,6 +69,7 @@ import { WorkflowStaticDataService } from './workflows/workflowStaticData.servic
|
||||
import { WorkflowRepository } from './databases/repositories/workflow.repository';
|
||||
import { UrlService } from './services/url.service';
|
||||
import { WorkflowExecutionService } from './workflows/workflowExecution.service';
|
||||
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
||||
|
||||
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
|
||||
|
||||
@@ -982,6 +984,22 @@ export async function getBase(
|
||||
setExecutionStatus,
|
||||
variables,
|
||||
secretsHelpers: Container.get(SecretsHelper),
|
||||
logAiEvent: async (
|
||||
eventName: EventNamesAiNodesType,
|
||||
payload: {
|
||||
msg?: string | undefined;
|
||||
executionId: string;
|
||||
nodeName: string;
|
||||
workflowId?: string | undefined;
|
||||
workflowName: string;
|
||||
nodeType?: string | undefined;
|
||||
},
|
||||
) => {
|
||||
return await Container.get(MessageEventBus).sendAiNodeEvent({
|
||||
eventName,
|
||||
payload,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage';
|
||||
import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow';
|
||||
import { EventMessageTypeNames } from 'n8n-workflow';
|
||||
import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions';
|
||||
import type { AbstractEventPayload } from './AbstractEventPayload';
|
||||
|
||||
// --------------------------------------
|
||||
// EventMessage class for Node events
|
||||
// --------------------------------------
|
||||
export interface EventPayloadAiNode extends AbstractEventPayload {
|
||||
msg?: string;
|
||||
executionId: string;
|
||||
nodeName: string;
|
||||
workflowId?: string;
|
||||
workflowName: string;
|
||||
nodeType?: string;
|
||||
}
|
||||
|
||||
export interface EventMessageAiNodeOptions extends AbstractEventMessageOptions {
|
||||
eventName: EventNamesAiNodesType;
|
||||
|
||||
payload?: EventPayloadAiNode | undefined;
|
||||
}
|
||||
|
||||
export class EventMessageAiNode extends AbstractEventMessage {
|
||||
readonly __type = EventMessageTypeNames.aiNode;
|
||||
|
||||
eventName: EventNamesAiNodesType;
|
||||
|
||||
payload: EventPayloadAiNode;
|
||||
|
||||
constructor(options: EventMessageAiNodeOptions) {
|
||||
super(options);
|
||||
if (options.payload) this.setPayload(options.payload);
|
||||
if (options.anonymize) {
|
||||
this.anonymize();
|
||||
}
|
||||
}
|
||||
|
||||
setPayload(payload: EventPayloadAiNode): this {
|
||||
this.payload = payload;
|
||||
return this;
|
||||
}
|
||||
|
||||
deserialize(data: JsonObject): this {
|
||||
if (isEventMessageOptionsWithType(data, this.__type)) {
|
||||
this.setOptionsOrDefault(data);
|
||||
if (data.payload) this.setPayload(data.payload as EventPayloadAiNode);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
import type { EventMessageAiNode } from './EventMessageAiNode';
|
||||
import type { EventMessageAudit } from './EventMessageAudit';
|
||||
import type { EventMessageGeneric } from './EventMessageGeneric';
|
||||
import type { EventMessageNode } from './EventMessageNode';
|
||||
import type { EventMessageWorkflow } from './EventMessageWorkflow';
|
||||
import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow';
|
||||
|
||||
export const eventNamesWorkflow = [
|
||||
'n8n.workflow.started',
|
||||
@@ -45,6 +47,7 @@ export type EventNamesTypes =
|
||||
| EventNamesWorkflowType
|
||||
| EventNamesNodeType
|
||||
| EventNamesGenericType
|
||||
| EventNamesAiNodesType
|
||||
| 'n8n.destination.test';
|
||||
|
||||
export const eventNamesAll = [
|
||||
@@ -52,13 +55,15 @@ export const eventNamesAll = [
|
||||
...eventNamesWorkflow,
|
||||
...eventNamesNode,
|
||||
...eventNamesGeneric,
|
||||
...eventNamesAiNodes,
|
||||
];
|
||||
|
||||
export type EventMessageTypes =
|
||||
| EventMessageGeneric
|
||||
| EventMessageWorkflow
|
||||
| EventMessageAudit
|
||||
| EventMessageNode;
|
||||
| EventMessageNode
|
||||
| EventMessageAiNode;
|
||||
|
||||
export interface FailedEventSummary {
|
||||
lastNodeExecuted: string;
|
||||
|
||||
@@ -37,6 +37,10 @@ import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
|
||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import { ExecutionDataRecoveryService } from '../executionDataRecovery.service';
|
||||
import {
|
||||
EventMessageAiNode,
|
||||
type EventMessageAiNodeOptions,
|
||||
} from '../EventMessageClasses/EventMessageAiNode';
|
||||
|
||||
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
|
||||
|
||||
@@ -457,4 +461,8 @@ export class MessageEventBus extends EventEmitter {
|
||||
async sendNodeEvent(options: EventMessageNodeOptions) {
|
||||
await this.send(new EventMessageNode(options));
|
||||
}
|
||||
|
||||
async sendAiNodeEvent(options: EventMessageAiNodeOptions) {
|
||||
await this.send(new EventMessageAiNode(options));
|
||||
}
|
||||
}
|
||||
|
||||
34
packages/cli/test/unit/ExecutionMetadataService.test.ts
Normal file
34
packages/cli/test/unit/ExecutionMetadataService.test.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { Container } from 'typedi';
|
||||
import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository';
|
||||
import { ExecutionMetadataService } from '@/services/executionMetadata.service';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
|
||||
describe('ExecutionMetadataService', () => {
|
||||
const repository = mockInstance(ExecutionMetadataRepository);
|
||||
|
||||
test('Execution metadata is saved in a batch', async () => {
|
||||
const toSave = {
|
||||
test1: 'value1',
|
||||
test2: 'value2',
|
||||
};
|
||||
const executionId = '1234';
|
||||
|
||||
await Container.get(ExecutionMetadataService).save(executionId, toSave);
|
||||
|
||||
expect(repository.save).toHaveBeenCalledTimes(1);
|
||||
expect(repository.save.mock.calls[0]).toEqual([
|
||||
[
|
||||
{
|
||||
execution: { id: executionId },
|
||||
key: 'test1',
|
||||
value: 'value1',
|
||||
},
|
||||
{
|
||||
execution: { id: executionId },
|
||||
key: 'test2',
|
||||
value: 'value2',
|
||||
},
|
||||
],
|
||||
]);
|
||||
});
|
||||
});
|
||||
@@ -1,34 +1,41 @@
|
||||
import { Container } from 'typedi';
|
||||
import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository';
|
||||
import { ExecutionMetadataService } from '@/services/executionMetadata.service';
|
||||
import { VariablesService } from '@/environments/variables/variables.service.ee';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
||||
import { getBase } from '@/WorkflowExecuteAdditionalData';
|
||||
import Container from 'typedi';
|
||||
import { CredentialsHelper } from '@/CredentialsHelper';
|
||||
import { SecretsHelper } from '@/SecretsHelpers';
|
||||
|
||||
describe('WorkflowExecuteAdditionalData', () => {
|
||||
const repository = mockInstance(ExecutionMetadataRepository);
|
||||
const messageEventBus = mockInstance(MessageEventBus);
|
||||
const variablesService = mockInstance(VariablesService);
|
||||
variablesService.getAllCached.mockResolvedValue([]);
|
||||
const credentialsHelper = mockInstance(CredentialsHelper);
|
||||
const secretsHelper = mockInstance(SecretsHelper);
|
||||
Container.set(MessageEventBus, messageEventBus);
|
||||
Container.set(VariablesService, variablesService);
|
||||
Container.set(CredentialsHelper, credentialsHelper);
|
||||
Container.set(SecretsHelper, secretsHelper);
|
||||
|
||||
test('Execution metadata is saved in a batch', async () => {
|
||||
const toSave = {
|
||||
test1: 'value1',
|
||||
test2: 'value2',
|
||||
test('logAiEvent should call MessageEventBus', async () => {
|
||||
const additionalData = await getBase('user-id');
|
||||
|
||||
const eventName = 'n8n.ai.memory.get.messages';
|
||||
const payload = {
|
||||
msg: 'test message',
|
||||
executionId: '123',
|
||||
nodeName: 'n8n-memory',
|
||||
workflowId: 'workflow-id',
|
||||
workflowName: 'workflow-name',
|
||||
nodeType: 'n8n-memory',
|
||||
};
|
||||
const executionId = '1234';
|
||||
|
||||
await Container.get(ExecutionMetadataService).save(executionId, toSave);
|
||||
await additionalData.logAiEvent(eventName, payload);
|
||||
|
||||
expect(repository.save).toHaveBeenCalledTimes(1);
|
||||
expect(repository.save.mock.calls[0]).toEqual([
|
||||
[
|
||||
{
|
||||
execution: { id: executionId },
|
||||
key: 'test1',
|
||||
value: 'value1',
|
||||
},
|
||||
{
|
||||
execution: { id: executionId },
|
||||
key: 'test2',
|
||||
value: 'value2',
|
||||
},
|
||||
],
|
||||
]);
|
||||
expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1);
|
||||
expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({
|
||||
eventName,
|
||||
payload,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user