fix(core): Ensure executions list is properly filtered for all users (#4765)

Also updates executions API to have EE version
This commit is contained in:
freya
2022-11-30 13:00:28 +00:00
committed by GitHub
parent ada73ed41d
commit ddf787c087
5 changed files with 274 additions and 172 deletions

View File

@@ -109,7 +109,7 @@ import type {
import { userManagementRouter } from '@/UserManagement'; import { userManagementRouter } from '@/UserManagement';
import { resolveJwt } from '@/UserManagement/auth/jwt'; import { resolveJwt } from '@/UserManagement/auth/jwt';
import { executionsController } from '@/api/executions.api'; import { executionsController } from '@/executions/executions.controller';
import { nodeTypesController } from '@/api/nodeTypes.api'; import { nodeTypesController } from '@/api/nodeTypes.api';
import { tagsController } from '@/api/tags.api'; import { tagsController } from '@/api/tags.api';
import { loadPublicApiVersions } from '@/PublicApi'; import { loadPublicApiVersions } from '@/PublicApi';

View File

@@ -0,0 +1,70 @@
import express from 'express';
import config from '@/config';
import {
IExecutionFlattedResponse,
IExecutionResponse,
IExecutionsListResponse,
} from '@/Interfaces';
import type { ExecutionRequest } from '@/requests';
import * as ResponseHelper from '@/ResponseHelper';
import { isSharingEnabled } from '@/UserManagement/UserManagementHelper';
import { EEExecutionsService } from './executions.service.ee';
// eslint-disable-next-line @typescript-eslint/naming-convention
export const EEExecutionsController = express.Router();
EEExecutionsController.use((req, res, next) => {
if (!isSharingEnabled() || !config.getEnv('enterprise.workflowSharingEnabled')) {
// skip ee router and use free one
next('router');
return;
}
// use ee router
next();
});
/**
* GET /executions
*/
EEExecutionsController.get(
'/',
ResponseHelper.send(async (req: ExecutionRequest.GetAll): Promise<IExecutionsListResponse> => {
return EEExecutionsService.getExecutionsList(req);
}),
);
/**
* GET /executions/:id
*/
EEExecutionsController.get(
'/:id',
ResponseHelper.send(
async (
req: ExecutionRequest.Get,
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> => {
return EEExecutionsService.getExecution(req);
},
),
);
/**
* POST /executions/:id/retry
*/
EEExecutionsController.post(
'/:id/retry',
ResponseHelper.send(async (req: ExecutionRequest.Retry): Promise<boolean> => {
return EEExecutionsService.retryExecution(req);
}),
);
/**
* POST /executions/delete
* INFORMATION: We use POST instead of DELETE to not run into any issues with the query data
* getting too long
*/
EEExecutionsController.post(
'/delete',
ResponseHelper.send(async (req: ExecutionRequest.Delete): Promise<void> => {
await EEExecutionsService.deleteExecutions(req);
}),
);

View File

@@ -0,0 +1,74 @@
import express from 'express';
import { LoggerProxy } from 'n8n-workflow';
import {
IExecutionFlattedResponse,
IExecutionResponse,
IExecutionsListResponse,
} from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import { getLogger } from '@/Logger';
import type { ExecutionRequest } from '@/requests';
import { EEExecutionsController } from './executions.controller.ee';
import { ExecutionsService } from './executions.service';
export const executionsController = express.Router();
/**
* Initialise Logger if needed
*/
executionsController.use((req, res, next) => {
try {
LoggerProxy.getInstance();
} catch (error) {
LoggerProxy.init(getLogger());
}
next();
});
executionsController.use('/', EEExecutionsController);
/**
* GET /executions
*/
executionsController.get(
'/',
ResponseHelper.send(async (req: ExecutionRequest.GetAll): Promise<IExecutionsListResponse> => {
return ExecutionsService.getExecutionsList(req);
}),
);
/**
* GET /executions/:id
*/
executionsController.get(
'/:id',
ResponseHelper.send(
async (
req: ExecutionRequest.Get,
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> => {
return ExecutionsService.getExecution(req);
},
),
);
/**
* POST /executions/:id/retry
*/
executionsController.post(
'/:id/retry',
ResponseHelper.send(async (req: ExecutionRequest.Retry): Promise<boolean> => {
return ExecutionsService.retryExecution(req);
}),
);
/**
* POST /executions/delete
* INFORMATION: We use POST instead of DELETE to not run into any issues with the query data
* getting too long
*/
executionsController.post(
'/delete',
ResponseHelper.send(async (req: ExecutionRequest.Delete): Promise<void> => {
await ExecutionsService.deleteExecutions(req);
}),
);

View File

@@ -0,0 +1,13 @@
import { User } from '@/databases/entities/User';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { ExecutionsService } from './executions.service';
export class EEExecutionsService extends ExecutionsService {
/**
* Function to get the workflow Ids for a User regardless of role
*/
static async getWorkflowIdsForUser(user: User): Promise<number[]> {
// Get all workflows
return getSharedWorkflowIds(user);
}
}

View File

@@ -1,47 +1,39 @@
/* eslint-disable no-restricted-syntax */
/* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unused-vars */
import express from 'express';
import { validate as jsonSchemaValidate } from 'jsonschema'; import { validate as jsonSchemaValidate } from 'jsonschema';
import { BinaryDataManager } from 'n8n-core'; import { BinaryDataManager } from 'n8n-core';
import { import { deepCopy, IDataObject, LoggerProxy, JsonObject, jsonParse, Workflow } from 'n8n-workflow';
deepCopy,
IDataObject,
IWorkflowBase,
JsonObject,
jsonParse,
LoggerProxy,
Workflow,
} from 'n8n-workflow';
import { FindOperator, In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm'; import { FindOperator, In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm';
import * as ActiveExecutions from '@/ActiveExecutions'; import * as ActiveExecutions from '@/ActiveExecutions';
import * as Db from '@/Db'; import config from '@/config';
import * as GenericHelpers from '@/GenericHelpers'; import { User } from '@/databases/entities/User';
import { DEFAULT_EXECUTIONS_GET_ALL_LIMIT } from '@/GenericHelpers';
import { import {
DatabaseType,
IExecutionFlattedResponse, IExecutionFlattedResponse,
IExecutionResponse, IExecutionResponse,
IExecutionsListResponse, IExecutionsListResponse,
IWorkflowBase,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
} from '@/Interfaces'; } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes';
import * as ResponseHelper from '@/ResponseHelper';
import { WorkflowRunner } from '@/WorkflowRunner';
import config from '@/config';
import { User } from '@db/entities/User';
import { DEFAULT_EXECUTIONS_GET_ALL_LIMIT } from '@/GenericHelpers';
import { getLogger } from '@/Logger';
import * as Queue from '@/Queue'; import * as Queue from '@/Queue';
import type { ExecutionRequest } from '@/requests'; import type { ExecutionRequest } from '@/requests';
import * as ResponseHelper from '@/ResponseHelper';
import { getSharedWorkflowIds } from '@/WorkflowHelpers'; import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner';
import { DatabaseType, Db, GenericHelpers } from '..';
export const executionsController = express.Router(); interface IGetExecutionsQueryFilter {
id?: FindOperator<string>;
finished?: boolean;
mode?: string;
retryOf?: string;
retrySuccessId?: string;
workflowId?: number | string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
waitTill?: FindOperator<any> | boolean;
}
const schemaGetExecutionsQueryFilter = { const schemaGetExecutionsQueryFilter = {
$id: '/IGetExecutionsQueryFilter', $id: '/IGetExecutionsQueryFilter',
@@ -58,91 +50,68 @@ const schemaGetExecutionsQueryFilter = {
const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties); const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties);
interface IGetExecutionsQueryFilter { export class ExecutionsService {
id?: FindOperator<string>; /**
finished?: boolean; * Function to get the workflow Ids for a User
mode?: string; * Overridden in EE version to ignore roles
retryOf?: string; */
retrySuccessId?: string; static async getWorkflowIdsForUser(user: User): Promise<number[]> {
workflowId?: number | string; // Get all workflows using owner role
// eslint-disable-next-line @typescript-eslint/no-explicit-any return getSharedWorkflowIds(user, ['owner']);
waitTill?: FindOperator<any> | boolean;
}
/**
* Initialise Logger if needed
*/
executionsController.use((req, res, next) => {
try {
LoggerProxy.getInstance();
} catch (error) {
LoggerProxy.init(getLogger());
} }
next();
});
/** /**
* Helper function to retrieve count of Executions * Helper function to retrieve count of Executions
*/ */
async function getExecutionsCount( static async getExecutionsCount(
countFilter: IDataObject, countFilter: IDataObject,
user: User, user: User,
): Promise<{ count: number; estimated: boolean }> { ): Promise<{ count: number; estimated: boolean }> {
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType; const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id'); const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id');
// For databases other than Postgres, do a regular count
// when filtering based on `workflowId` or `finished` fields.
if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') {
const sharedWorkflowIds = await this.getWorkflowIdsForUser(user);
const countParams = { where: { workflowId: In(sharedWorkflowIds), ...countFilter } };
const count = await Db.collections.Execution.count(countParams);
return { count, estimated: false };
}
try {
// Get an estimate of rows count.
const estimateRowsNumberSql =
"SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';";
const rows: Array<{ n_live_tup: string }> = await Db.collections.Execution.query(
estimateRowsNumberSql,
);
const estimate = parseInt(rows[0].n_live_tup, 10);
// If over 100k, return just an estimate.
if (estimate > 100_000) {
// if less than 100k, we get the real count as even a full
// table scan should not take so long.
return { count: estimate, estimated: true };
}
} catch (error) {
LoggerProxy.warn(`Failed to get executions count from Postgres: ${error}`);
}
// For databases other than Postgres, do a regular count
// when filtering based on `workflowId` or `finished` fields.
if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') {
const sharedWorkflowIds = await getSharedWorkflowIds(user); const sharedWorkflowIds = await getSharedWorkflowIds(user);
const count = await Db.collections.Execution.count({ const count = await Db.collections.Execution.count({
where: { where: {
workflowId: In(sharedWorkflowIds), workflowId: In(sharedWorkflowIds),
...countFilter,
}, },
}); });
return { count, estimated: false }; return { count, estimated: false };
} }
try { static async getExecutionsList(req: ExecutionRequest.GetAll): Promise<IExecutionsListResponse> {
// Get an estimate of rows count. const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
const estimateRowsNumberSql =
"SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';";
const rows: Array<{ n_live_tup: string }> = await Db.collections.Execution.query(
estimateRowsNumberSql,
);
const estimate = parseInt(rows[0].n_live_tup, 10);
// If over 100k, return just an estimate.
if (estimate > 100_000) {
// if less than 100k, we get the real count as even a full
// table scan should not take so long.
return { count: estimate, estimated: true };
}
} catch (error) {
LoggerProxy.warn(`Failed to get executions count from Postgres: ${error}`);
}
const sharedWorkflowIds = await getSharedWorkflowIds(user);
const count = await Db.collections.Execution.count({
where: {
workflowId: In(sharedWorkflowIds),
},
});
return { count, estimated: false };
}
/**
* GET /executions
*/
executionsController.get(
'/',
ResponseHelper.send(async (req: ExecutionRequest.GetAll): Promise<IExecutionsListResponse> => {
const sharedWorkflowIds = await getSharedWorkflowIds(req.user);
if (sharedWorkflowIds.length === 0) { if (sharedWorkflowIds.length === 0) {
// return early since without shared workflows there can be no hits // return early since without shared workflows there can be no hits
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners) // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
@@ -263,7 +232,10 @@ executionsController.get(
const executions = await query.getMany(); const executions = await query.getMany();
const { count, estimated } = await getExecutionsCount(countFilter as IDataObject, req.user); const { count, estimated } = await this.getExecutionsCount(
countFilter as IDataObject,
req.user,
);
const formattedExecutions = executions.map((execution) => { const formattedExecutions = executions.map((execution) => {
return { return {
@@ -285,66 +257,48 @@ executionsController.get(
results: formattedExecutions, results: formattedExecutions,
estimated, estimated,
}; };
}), }
);
/** static async getExecution(
* GET /executions/:id req: ExecutionRequest.Get,
*/ ): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> {
executionsController.get( const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
'/:id', if (!sharedWorkflowIds.length) return undefined;
ResponseHelper.send(
async (
req: ExecutionRequest.Get,
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> => {
const { id: executionId } = req.params;
const sharedWorkflowIds = await getSharedWorkflowIds(req.user);
if (!sharedWorkflowIds.length) return undefined;
const execution = await Db.collections.Execution.findOne({
where: {
id: executionId,
workflowId: In(sharedWorkflowIds),
},
});
if (!execution) {
LoggerProxy.info('Attempt to read execution was blocked due to insufficient permissions', {
userId: req.user.id,
executionId,
});
return undefined;
}
if (req.query.unflattedResponse === 'true') {
return ResponseHelper.unflattenExecutionData(execution);
}
const { id, ...rest } = execution;
// @ts-ignore
return {
id: id.toString(),
...rest,
};
},
),
);
/**
* POST /executions/:id/retry
*/
executionsController.post(
'/:id/retry',
ResponseHelper.send(async (req: ExecutionRequest.Retry): Promise<boolean> => {
const { id: executionId } = req.params; const { id: executionId } = req.params;
const execution = await Db.collections.Execution.findOne({
where: {
id: executionId,
workflowId: In(sharedWorkflowIds),
},
});
const sharedWorkflowIds = await getSharedWorkflowIds(req.user); if (!execution) {
LoggerProxy.info('Attempt to read execution was blocked due to insufficient permissions', {
userId: req.user.id,
executionId,
});
return undefined;
}
if (req.query.unflattedResponse === 'true') {
return ResponseHelper.unflattenExecutionData(execution);
}
const { id, ...rest } = execution;
// @ts-ignore
return {
id: id.toString(),
...rest,
};
}
static async retryExecution(req: ExecutionRequest.Retry): Promise<boolean> {
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
if (!sharedWorkflowIds.length) return false; if (!sharedWorkflowIds.length) return false;
const { id: executionId } = req.params;
const execution = await Db.collections.Execution.findOne({ const execution = await Db.collections.Execution.findOne({
where: { where: {
id: executionId, id: executionId,
@@ -404,7 +358,7 @@ executionsController.post(
if (req.body.loadWorkflow) { if (req.body.loadWorkflow) {
// Loads the currently saved workflow to execute instead of the // Loads the currently saved workflow to execute instead of the
// one saved at the time of the execution. // one saved at the time of the execution.
const workflowId = fullExecutionData.workflowData.id; const workflowId = fullExecutionData.workflowData.id as string;
const workflowData = (await Db.collections.Workflow.findOne(workflowId)) as IWorkflowBase; const workflowData = (await Db.collections.Workflow.findOne(workflowId)) as IWorkflowBase;
if (workflowData === undefined) { if (workflowData === undefined) {
@@ -458,17 +412,15 @@ executionsController.post(
} }
return !!executionData.finished; return !!executionData.finished;
}), }
);
/** static async deleteExecutions(req: ExecutionRequest.Delete): Promise<void> {
* POST /executions/delete const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
* INFORMATION: We use POST instead of DELETE to not run into any issues with the query data if (sharedWorkflowIds.length === 0) {
* getting too long // return early since without shared workflows there can be no hits
*/ // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
executionsController.post( return;
'/delete', }
ResponseHelper.send(async (req: ExecutionRequest.Delete): Promise<void> => {
const { deleteBefore, ids, filters: requestFiltersRaw } = req.body; const { deleteBefore, ids, filters: requestFiltersRaw } = req.body;
let requestFilters; let requestFilters;
if (requestFiltersRaw) { if (requestFiltersRaw) {
@@ -490,13 +442,6 @@ executionsController.post(
throw new Error('Either "deleteBefore" or "ids" must be present in the request body'); throw new Error('Either "deleteBefore" or "ids" must be present in the request body');
} }
const sharedWorkflowIds = await getSharedWorkflowIds(req.user);
if (sharedWorkflowIds.length === 0) {
// return early since without shared workflows there can be no hits
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
return;
}
const binaryDataManager = BinaryDataManager.getInstance(); const binaryDataManager = BinaryDataManager.getInstance();
// delete executions by date, if user may access the underlying workflows // delete executions by date, if user may access the underlying workflows
@@ -558,5 +503,5 @@ executionsController.post(
await Db.collections.Execution.delete(idsToDelete); await Db.collections.Execution.delete(idsToDelete);
} }
}), }
); }