fix(core): Flush instance stopped event immediately (#10238)

This commit is contained in:
Tomi Turtiainen
2024-07-30 14:49:41 +03:00
committed by GitHub
parent a2d08846d0
commit d6770b5fca
21 changed files with 223 additions and 274 deletions

View File

@@ -88,30 +88,28 @@ export class Telemetry {
); // every 6 hours
}
private async pulse(): Promise<unknown> {
private async pulse() {
if (!this.rudderStack) {
return;
}
const allPromises = Object.keys(this.executionCountsBuffer)
.filter((workflowId) => {
const data = this.executionCountsBuffer[workflowId];
const sum =
(data.manual_error?.count ?? 0) +
(data.manual_success?.count ?? 0) +
(data.prod_error?.count ?? 0) +
(data.prod_success?.count ?? 0);
return sum > 0;
})
.map(async (workflowId) => {
const promise = this.track('Workflow execution count', {
event_version: '2',
workflow_id: workflowId,
...this.executionCountsBuffer[workflowId],
});
const workflowIdsToReport = Object.keys(this.executionCountsBuffer).filter((workflowId) => {
const data = this.executionCountsBuffer[workflowId];
const sum =
(data.manual_error?.count ?? 0) +
(data.manual_success?.count ?? 0) +
(data.prod_error?.count ?? 0) +
(data.prod_success?.count ?? 0);
return sum > 0;
});
return await promise;
for (const workflowId of workflowIdsToReport) {
this.track('Workflow execution count', {
event_version: '2',
workflow_id: workflowId,
...this.executionCountsBuffer[workflowId],
});
}
this.executionCountsBuffer = {};
@@ -131,11 +129,11 @@ export class Telemetry {
team_projects: (await Container.get(ProjectRepository).getProjectCounts()).team,
project_role_count: await Container.get(ProjectRelationRepository).countUsersByRole(),
};
allPromises.push(this.track('pulse', pulsePacket));
return await Promise.all(allPromises);
this.track('pulse', pulsePacket);
}
async trackWorkflowExecution(properties: IExecutionTrackProperties): Promise<void> {
trackWorkflowExecution(properties: IExecutionTrackProperties) {
if (this.rudderStack) {
const execTime = new Date();
const workflowId = properties.workflow_id;
@@ -164,66 +162,60 @@ export class Telemetry {
properties.is_manual &&
properties.error_node_type?.startsWith('n8n-nodes-base')
) {
void this.track('Workflow execution errored', properties);
this.track('Workflow execution errored', properties);
}
}
}
async trackN8nStop(): Promise<void> {
clearInterval(this.pulseIntervalReference);
await this.track('User instance stopped');
void Promise.all([this.postHog.stop(), this.rudderStack?.flush()]);
this.track('User instance stopped');
await Promise.all([this.postHog.stop(), this.rudderStack?.flush()]);
}
async identify(traits?: {
[key: string]: string | number | boolean | object | undefined | null;
}): Promise<void> {
identify(traits?: { [key: string]: string | number | boolean | object | undefined | null }) {
if (!this.rudderStack) {
return;
}
const { instanceId } = this.instanceSettings;
return await new Promise<void>((resolve) => {
if (this.rudderStack) {
this.rudderStack.identify(
{
userId: instanceId,
traits: { ...traits, instanceId },
},
resolve,
);
} else {
resolve();
}
this.rudderStack.identify({
userId: instanceId,
traits: { ...traits, instanceId },
});
}
async track(
track(
eventName: string,
properties: ITelemetryTrackProperties = {},
{ withPostHog } = { withPostHog: false }, // whether to additionally track with PostHog
): Promise<void> {
) {
if (!this.rudderStack) {
return;
}
const { instanceId } = this.instanceSettings;
return await new Promise<void>((resolve) => {
if (this.rudderStack) {
const { user_id } = properties;
const updatedProperties = {
...properties,
instance_id: instanceId,
version_cli: N8N_VERSION,
};
const { user_id } = properties;
const updatedProperties = {
...properties,
instance_id: instanceId,
version_cli: N8N_VERSION,
};
const payload = {
userId: `${instanceId}${user_id ? `#${user_id}` : ''}`,
event: eventName,
properties: updatedProperties,
};
const payload = {
userId: `${instanceId}${user_id ? `#${user_id}` : ''}`,
event: eventName,
properties: updatedProperties,
};
if (withPostHog) {
this.postHog?.track(payload);
}
if (withPostHog) {
this.postHog?.track(payload);
}
return this.rudderStack.track(payload, resolve);
}
return resolve();
});
return this.rudderStack.track(payload);
}
// test helpers