diff --git a/.github/workflows/test-workflows.yml b/.github/workflows/test-workflows.yml new file mode 100644 index 0000000000..909264e0ce --- /dev/null +++ b/.github/workflows/test-workflows.yml @@ -0,0 +1,69 @@ +name: Run test workflows + +on: + schedule: + - cron: "0 2 * * *" + workflow_dispatch: + + +jobs: + run-test-workflows: + + runs-on: ubuntu-latest + + strategy: + matrix: + node-version: [14.x] + steps: + - + name: Checkout + uses: actions/checkout@v2 + with: + path: n8n + - + name: Checkout workflows repo + uses: actions/checkout@v2 + with: + repository: n8n-io/test-workflows + path: test-workflows + - + name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node-version }} + - + name: npm install and build + run: | + cd n8n + npm install + npm run bootstrap + npm run build --if-present + env: + CI: true + shell: bash + - + name: Import credentials + run: n8n/packages/cli/bin/n8n import:credentials --input=test-workflows/credentials.json + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} + - + name: Import workflows + run: n8n/packages/cli/bin/n8n import:workflow --separate --input=test-workflows/workflows + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} + - + name: Copy static assets + run: | + cp n8n/assets/n8n-logo.png /tmp/n8n-logo.png + cp n8n/assets/n8n-screenshot.png /tmp/n8n-screenshot.png + cp n8n/node_modules/pdf-parse/test/data/05-versions-space.pdf /tmp/05-versions-space.pdf + cp n8n/node_modules/pdf-parse/test/data/04-valid.pdf /tmp/04-valid.pdf + shell: bash + - + name: Run tests + run: n8n/packages/cli/bin/n8n executeBatch --shallow --skipList=test-workflows/skipList.txt --shortOutput --concurrency=16 --compare=test-workflows/snapshots + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} diff --git a/packages/cli/commands/Interfaces.d.ts b/packages/cli/commands/Interfaces.d.ts new file mode 100644 index 0000000000..aedd194539 --- /dev/null +++ b/packages/cli/commands/Interfaces.d.ts @@ -0,0 +1,54 @@ +interface IResult { + totalWorkflows: number; + summary: { + failedExecutions: number, + successfulExecutions: number, + warningExecutions: number, + errors: IExecutionError[], + warnings: IExecutionError[], + }; + coveredNodes: { + [nodeType: string]: number + }; + executions: IExecutionResult[]; +} +interface IExecutionResult { + workflowId: string | number; + workflowName: string; + executionTime: number; // Given in seconds with decimals for milisseconds + finished: boolean; + executionStatus: ExecutionStatus; + error?: string; + changes?: string; + coveredNodes: { + [nodeType: string]: number + }; +} + +interface IExecutionError { + workflowId: string | number; + error: string; +} + +interface IWorkflowExecutionProgress { + workflowId: string | number; + status: ExecutionStatus; +} + +interface INodeSpecialCases { + [nodeName: string]: INodeSpecialCase; +} + +interface INodeSpecialCase { + ignoredProperties?: string[]; + capResults?: number; +} + +type ExecutionStatus = 'success' | 'error' | 'warning' | 'running'; + +declare module 'json-diff' { + interface IDiffOptions { + keysOnly?: boolean; + } + export function diff(obj1: unknown, obj2: unknown, diffOptions: IDiffOptions): string; +} diff --git a/packages/cli/commands/execute.ts b/packages/cli/commands/execute.ts index 9eafd28c9e..48ae6a19f0 100644 --- a/packages/cli/commands/execute.ts +++ b/packages/cli/commands/execute.ts @@ -22,7 +22,7 @@ import { WorkflowRunner, } from '../src'; -import { +import { getLogger, } from '../src/Logger'; @@ -46,6 +46,9 @@ export class Execute extends Command { id: flags.string({ description: 'id of the workflow to execute', }), + rawOutput: flags.boolean({ + description: 'Outputs only JSON data, with no other text', + }), }; @@ -183,10 +186,11 @@ export class Execute extends Command { stack: error.stack, }; } - - console.info('Execution was successful:'); - console.info('===================================='); - console.info(JSON.stringify(data, null, 2)); + if (flags.rawOutput === undefined) { + this.log('Execution was successful:'); + this.log('===================================='); + } + this.log(JSON.stringify(data, null, 2)); } catch (e) { console.error('Error executing workflow. See log messages for details.'); logger.error('\nExecution error:'); diff --git a/packages/cli/commands/executeBatch.ts b/packages/cli/commands/executeBatch.ts new file mode 100644 index 0000000000..7bc6bb858d --- /dev/null +++ b/packages/cli/commands/executeBatch.ts @@ -0,0 +1,796 @@ +import * as fs from 'fs'; +import { + Command, + flags, +} from '@oclif/command'; + +import { + UserSettings, +} from 'n8n-core'; + +import { + INode, + INodeExecutionData, + ITaskData, +} from 'n8n-workflow'; + +import { + ActiveExecutions, + CredentialsOverwrites, + CredentialTypes, + Db, + ExternalHooks, + IExecutionsCurrentSummary, + IWorkflowDb, + IWorkflowExecutionDataProcess, + LoadNodesAndCredentials, + NodeTypes, + WorkflowCredentials, + WorkflowRunner, +} from '../src'; + +import { + sep, +} from 'path'; + +import { + diff, +} from 'json-diff'; + +import { + getLogger, +} from '../src/Logger'; + +import { + LoggerProxy, +} from 'n8n-workflow'; + +export class ExecuteBatch extends Command { + static description = '\nExecutes multiple workflows once'; + + static cancelled = false; + + static workflowExecutionsProgress: IWorkflowExecutionProgress[][]; + + static shallow = false; + + static compare: string; + + static snapshot: string; + + static concurrency = 1; + + static debug = false; + + static executionTimeout = 3 * 60 * 1000; + + static examples = [ + `$ n8n executeAll`, + `$ n8n executeAll --concurrency=10 --skipList=/data/skipList.txt`, + `$ n8n executeAll --debug --output=/data/output.json`, + `$ n8n executeAll --ids=10,13,15 --shortOutput`, + `$ n8n executeAll --snapshot=/data/snapshots --shallow`, + `$ n8n executeAll --compare=/data/previousExecutionData --retries=2`, + ]; + + static flags = { + help: flags.help({ char: 'h' }), + debug: flags.boolean({ + description: 'Toggles on displaying all errors and debug messages.', + }), + ids: flags.string({ + description: 'Specifies workflow IDs to get executed, separated by a comma.', + }), + concurrency: flags.integer({ + default: 1, + description: 'How many workflows can run in parallel. Defaults to 1 which means no concurrency.', + }), + output: flags.string({ + description: 'Enable execution saving, You must inform an existing folder to save execution via this param', + }), + snapshot: flags.string({ + description: 'Enables snapshot saving. You must inform an existing folder to save snapshots via this param.', + }), + compare: flags.string({ + description: 'Compares current execution with an existing snapshot. You must inform an existing folder where the snapshots are saved.', + }), + shallow: flags.boolean({ + description: 'Compares only if attributes output from node are the same, with no regards to neste JSON objects.', + }), + skipList: flags.string({ + description: 'File containing a comma separated list of workflow IDs to skip.', + }), + retries: flags.integer({ + description: 'Retries failed workflows up to N tries. Default is 1. Set 0 to disable.', + default: 1, + }), + shortOutput: flags.boolean({ + description: 'Omits the full execution information from output, displaying only summary.', + }), + }; + + /** + * Gracefully handles exit. + * @param {boolean} skipExit Whether to skip exit or number according to received signal + */ + static async stopProcess(skipExit: boolean | number = false) { + + if (ExecuteBatch.cancelled === true) { + process.exit(0); + } + + ExecuteBatch.cancelled = true; + const activeExecutionsInstance = ActiveExecutions.getInstance(); + const stopPromises = activeExecutionsInstance.getActiveExecutions().map(async execution => { + activeExecutionsInstance.stopExecution(execution.id); + }); + + await Promise.allSettled(stopPromises); + + setTimeout(() => { + process.exit(0); + }, 30000); + + let executingWorkflows = activeExecutionsInstance.getActiveExecutions() as IExecutionsCurrentSummary[]; + + let count = 0; + while (executingWorkflows.length !== 0) { + if (count++ % 4 === 0) { + console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`); + executingWorkflows.map(execution => { + console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`); + }); + } + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + executingWorkflows = activeExecutionsInstance.getActiveExecutions(); + } + // We may receive true but when called from `process.on` + // we get the signal (SIGNIT, etc.) + if (skipExit !== true) { + process.exit(0); + } + } + + formatJsonOutput(data: object) { + return JSON.stringify(data, null, 2); + } + + shouldBeConsideredAsWarning(errorMessage: string) { + + const warningStrings = [ + 'refresh token is invalid', + 'unable to connect to', + 'econnreset', + '429', + 'econnrefused', + 'missing a required parameter', + ]; + + errorMessage = errorMessage.toLowerCase(); + + for (let i = 0; i < warningStrings.length; i++) { + if (errorMessage.includes(warningStrings[i])) { + return true; + } + } + + return false; + } + + + async run() { + + process.on('SIGTERM', ExecuteBatch.stopProcess); + process.on('SIGINT', ExecuteBatch.stopProcess); + + const logger = getLogger(); + LoggerProxy.init(logger); + + const { flags } = this.parse(ExecuteBatch); + + ExecuteBatch.debug = flags.debug === true; + ExecuteBatch.concurrency = flags.concurrency || 1; + + const ids: number[] = []; + const skipIds: number[] = []; + + if (flags.snapshot !== undefined) { + if (fs.existsSync(flags.snapshot)) { + if (!fs.lstatSync(flags.snapshot).isDirectory()) { + console.log(`The parameter --snapshot must be an existing directory`); + return; + } + } else { + console.log(`The parameter --snapshot must be an existing directory`); + return; + } + + ExecuteBatch.snapshot = flags.snapshot; + } + if (flags.compare !== undefined) { + if (fs.existsSync(flags.compare)) { + if (!fs.lstatSync(flags.compare).isDirectory()) { + console.log(`The parameter --compare must be an existing directory`); + return; + } + } else { + console.log(`The parameter --compare must be an existing directory`); + return; + } + + ExecuteBatch.compare = flags.compare; + } + + if (flags.output !== undefined) { + if (fs.existsSync(flags.output)) { + if (fs.lstatSync(flags.output).isDirectory()) { + console.log(`The parameter --output must be a writable file`); + return; + } + } + } + + if (flags.ids !== undefined) { + const paramIds = flags.ids.split(','); + const re = /\d+/; + const matchedIds = paramIds.filter(id => id.match(re)).map(id => parseInt(id.trim(), 10)); + + if (matchedIds.length === 0) { + console.log(`The parameter --ids must be a list of numeric IDs separated by a comma.`); + return; + } + + ids.push(...matchedIds); + } + + if (flags.skipList !== undefined) { + if (fs.existsSync(flags.skipList)) { + const contents = fs.readFileSync(flags.skipList, { encoding: 'utf-8' }); + skipIds.push(...contents.split(',').map(id => parseInt(id.trim(), 10))); + } else { + console.log('Skip list file not found. Exiting.'); + return; + } + } + + if (flags.shallow === true) { + ExecuteBatch.shallow = true; + } + + + // Start directly with the init of the database to improve startup time + const startDbInitPromise = Db.init(); + + // Load all node and credential types + const loadNodesAndCredentials = LoadNodesAndCredentials(); + const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init(); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); + + // Wait till the database is ready + await startDbInitPromise; + + let allWorkflows; + + const query = Db.collections!.Workflow!.createQueryBuilder('workflows'); + + if (ids.length > 0) { + query.andWhere(`workflows.id in (:...ids)`, { ids }); + } + + if (skipIds.length > 0) { + query.andWhere(`workflows.id not in (:...skipIds)`, { skipIds }); + } + + allWorkflows = await query.getMany() as IWorkflowDb[]; + + if (ExecuteBatch.debug === true) { + process.stdout.write(`Found ${allWorkflows.length} workflows to execute.\n`); + } + + // Wait till the n8n-packages have been read + await loadNodesAndCredentialsPromise; + + // Load the credentials overwrites if any exist + await CredentialsOverwrites().init(); + + // Load all external hooks + const externalHooks = ExternalHooks(); + await externalHooks.init(); + + // Add the found types to an instance other parts of the application can use + const nodeTypes = NodeTypes(); + await nodeTypes.init(loadNodesAndCredentials.nodeTypes); + const credentialTypes = CredentialTypes(); + await credentialTypes.init(loadNodesAndCredentials.credentialTypes); + + // Send a shallow copy of allWorkflows so we still have all workflow data. + const results = await this.runTests([...allWorkflows]); + + let { retries } = flags; + + while (retries > 0 && (results.summary.warningExecutions + results.summary.failedExecutions > 0) && ExecuteBatch.cancelled === false) { + const failedWorkflowIds = results.summary.errors.map(execution => execution.workflowId); + failedWorkflowIds.push(...results.summary.warnings.map(execution => execution.workflowId)); + + const newWorkflowList = allWorkflows.filter(workflow => failedWorkflowIds.includes(workflow.id)); + + const retryResults = await this.runTests(newWorkflowList); + + this.mergeResults(results, retryResults); + // By now, `results` has been updated with the new successful executions. + retries--; + } + + if (flags.output !== undefined) { + fs.writeFileSync(flags.output, this.formatJsonOutput(results)); + console.log('\nExecution finished.'); + console.log('Summary:'); + console.log(`\tSuccess: ${results.summary.successfulExecutions}`); + console.log(`\tFailures: ${results.summary.failedExecutions}`); + console.log(`\tWarnings: ${results.summary.warningExecutions}`); + console.log('\nNodes successfully tested:'); + Object.entries(results.coveredNodes).forEach(([nodeName, nodeCount]) => { + console.log(`\t${nodeName}: ${nodeCount}`); + }); + console.log('\nCheck the JSON file for more details.'); + } else { + if (flags.shortOutput === true) { + console.log(this.formatJsonOutput({ ...results, executions: results.executions.filter(execution => execution.executionStatus !== 'success') })); + } else { + console.log(this.formatJsonOutput(results)); + } + } + + await ExecuteBatch.stopProcess(true); + + if (results.summary.failedExecutions > 0) { + this.exit(1); + } + this.exit(0); + + } + + mergeResults(results: IResult, retryResults: IResult) { + + if (retryResults.summary.successfulExecutions === 0) { + // Nothing to replace. + return; + } + + // Find successful executions and replace them on previous result. + retryResults.executions.forEach(newExecution => { + if (newExecution.executionStatus === 'success') { + // Remove previous execution from list. + results.executions = results.executions.filter(previousExecutions => previousExecutions.workflowId !== newExecution.workflowId); + + const errorIndex = results.summary.errors.findIndex(summaryInformation => summaryInformation.workflowId === newExecution.workflowId); + if (errorIndex !== -1) { + // This workflow errored previously. Decrement error count. + results.summary.failedExecutions--; + // Remove from the list of errors. + results.summary.errors.splice(errorIndex, 1); + } + + const warningIndex = results.summary.warnings.findIndex(summaryInformation => summaryInformation.workflowId === newExecution.workflowId); + if (warningIndex !== -1) { + // This workflow errored previously. Decrement error count. + results.summary.warningExecutions--; + // Remove from the list of errors. + results.summary.warnings.splice(warningIndex, 1); + } + // Increment successful executions count and push it to all executions array. + results.summary.successfulExecutions++; + results.executions.push(newExecution); + } + }); + } + + async runTests(allWorkflows: IWorkflowDb[]): Promise { + const result: IResult = { + totalWorkflows: allWorkflows.length, + summary: { + failedExecutions: 0, + warningExecutions: 0, + successfulExecutions: 0, + errors: [], + warnings: [], + }, + coveredNodes: {}, + executions: [], + }; + + if (ExecuteBatch.debug) { + this.initializeLogs(); + } + + return new Promise(async (res) => { + const promisesArray = []; + for (let i = 0; i < ExecuteBatch.concurrency; i++) { + const promise = new Promise(async (resolve) => { + let workflow: IWorkflowDb | undefined; + while (allWorkflows.length > 0) { + workflow = allWorkflows.shift(); + if (ExecuteBatch.cancelled === true) { + process.stdout.write(`Thread ${i + 1} resolving and quitting.`); + resolve(true); + break; + } + // This if shouldn't be really needed + // but it's a concurrency precaution. + if (workflow === undefined) { + resolve(true); + return; + } + + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow.id, + status: 'running', + }); + this.updateStatus(); + } + + await this.startThread(workflow).then((executionResult) => { + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].pop(); + } + result.executions.push(executionResult); + if (executionResult.executionStatus === 'success') { + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow!.id, + status: 'success', + }); + this.updateStatus(); + } + result.summary.successfulExecutions++; + const nodeNames = Object.keys(executionResult.coveredNodes); + + nodeNames.map(nodeName => { + if (result.coveredNodes[nodeName] === undefined) { + result.coveredNodes[nodeName] = 0; + } + result.coveredNodes[nodeName] += executionResult.coveredNodes[nodeName]; + }); + } else if (executionResult.executionStatus === 'warning') { + result.summary.warningExecutions++; + result.summary.warnings.push({ + workflowId: executionResult.workflowId, + error: executionResult.error!, + }); + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow!.id, + status: 'warning', + }); + this.updateStatus(); + } + } else if (executionResult.executionStatus === 'error') { + result.summary.failedExecutions++; + result.summary.errors.push({ + workflowId: executionResult.workflowId, + error: executionResult.error!, + }); + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow!.id, + status: 'error', + }); + this.updateStatus(); + } + } else { + throw new Error('Wrong execution status - cannot proceed'); + } + }); + } + + resolve(true); + }); + + promisesArray.push(promise); + } + + await Promise.allSettled(promisesArray); + + res(result); + }); + } + + updateStatus() { + + if (ExecuteBatch.cancelled === true) { + return; + } + + if (process.stdout.isTTY === true) { + process.stdout.moveCursor(0, - (ExecuteBatch.concurrency)); + process.stdout.cursorTo(0); + process.stdout.clearLine(0); + } + + + ExecuteBatch.workflowExecutionsProgress.map((concurrentThread, index) => { + let message = `${index + 1}: `; + concurrentThread.map((executionItem, workflowIndex) => { + let openColor = '\x1b[0m'; + const closeColor = '\x1b[0m'; + switch (executionItem.status) { + case 'success': + openColor = '\x1b[32m'; + break; + case 'error': + openColor = '\x1b[31m'; + break; + case 'warning': + openColor = '\x1b[33m'; + break; + default: + break; + } + message += (workflowIndex > 0 ? ', ' : '') + `${openColor}${executionItem.workflowId}${closeColor}`; + }); + if (process.stdout.isTTY === true) { + process.stdout.cursorTo(0); + process.stdout.clearLine(0); + } + process.stdout.write(message + '\n'); + }); + } + + initializeLogs() { + process.stdout.write('**********************************************\n'); + process.stdout.write(' n8n test workflows\n'); + process.stdout.write('**********************************************\n'); + process.stdout.write('\n'); + process.stdout.write('Batch number:\n'); + ExecuteBatch.workflowExecutionsProgress = []; + for (let i = 0; i < ExecuteBatch.concurrency; i++) { + ExecuteBatch.workflowExecutionsProgress.push([]); + process.stdout.write(`${i + 1}: \n`); + } + } + + startThread(workflowData: IWorkflowDb): Promise { + // This will be the object returned by the promise. + // It will be updated according to execution progress below. + const executionResult: IExecutionResult = { + workflowId: workflowData.id, + workflowName: workflowData.name, + executionTime: 0, + finished: false, + executionStatus: 'running', + coveredNodes: {}, + }; + + + + const requiredNodeTypes = ['n8n-nodes-base.start']; + let startNode: INode | undefined = undefined; + for (const node of workflowData.nodes) { + if (requiredNodeTypes.includes(node.type)) { + startNode = node; + break; + } + } + + // We have a cool feature here. + // On each node, on the Settings tab in the node editor you can change + // the `Notes` field to add special cases for comparison and snapshots. + // You need to set one configuration per line with the following possible keys: + // CAP_RESULTS_LENGTH=x where x is a number. Cap the number of rows from this node to x. + // This means if you set CAP_RESULTS_LENGTH=1 we will have only 1 row in the output + // IGNORED_PROPERTIES=x,y,z where x, y and z are JSON property names. Removes these + // properties from the JSON object (useful for optional properties that can + // cause the comparison to detect changes when not true). + const nodeEdgeCases = {} as INodeSpecialCases; + workflowData.nodes.forEach(node => { + executionResult.coveredNodes[node.type] = (executionResult.coveredNodes[node.type] || 0) + 1; + if (node.notes !== undefined && node.notes !== '') { + node.notes.split('\n').forEach(note => { + const parts = note.split('='); + if (parts.length === 2) { + if (nodeEdgeCases[node.name] === undefined) { + nodeEdgeCases[node.name] = {} as INodeSpecialCase; + } + if (parts[0] === 'CAP_RESULTS_LENGTH') { + nodeEdgeCases[node.name].capResults = parseInt(parts[1], 10); + } else if (parts[0] === 'IGNORED_PROPERTIES') { + nodeEdgeCases[node.name].ignoredProperties = parts[1].split(',').map(property => property.trim()); + } + } + }); + } + }); + + return new Promise(async (resolve) => { + if (startNode === undefined) { + // If the workflow does not contain a start-node we can not know what + // should be executed and with which data to start. + executionResult.error = 'Workflow cannot be started as it does not contain a "Start" node.'; + executionResult.executionStatus = 'warning'; + resolve(executionResult); + } + + let gotCancel = false; + + // Timeouts execution after 5 minutes. + const timeoutTimer = setTimeout(() => { + gotCancel = true; + executionResult.error = 'Workflow execution timed out.'; + executionResult.executionStatus = 'warning'; + resolve(executionResult); + }, ExecuteBatch.executionTimeout); + + + try { + const credentials = await WorkflowCredentials(workflowData!.nodes); + + const runData: IWorkflowExecutionDataProcess = { + credentials, + executionMode: 'cli', + startNodes: [startNode!.name], + workflowData: workflowData!, + }; + + const workflowRunner = new WorkflowRunner(); + const executionId = await workflowRunner.run(runData); + + const activeExecutions = ActiveExecutions.getInstance(); + const data = await activeExecutions.getPostExecutePromise(executionId); + if (gotCancel || ExecuteBatch.cancelled === true) { + clearTimeout(timeoutTimer); + // The promise was settled already so we simply ignore. + return; + } + + if (data === undefined) { + executionResult.error = 'Workflow did not return any data.'; + executionResult.executionStatus = 'error'; + } else { + executionResult.executionTime = (Date.parse(data.stoppedAt as unknown as string) - Date.parse(data.startedAt as unknown as string)) / 1000; + executionResult.finished = (data?.finished !== undefined) as boolean; + + if (data.data.resultData.error) { + executionResult.error = + data.data.resultData.error.hasOwnProperty('description') ? + // @ts-ignore + data.data.resultData.error.description : data.data.resultData.error.message; + if (data.data.resultData.lastNodeExecuted !== undefined) { + executionResult.error += ` on node ${data.data.resultData.lastNodeExecuted}`; + } + executionResult.executionStatus = 'error'; + + if (this.shouldBeConsideredAsWarning(executionResult.error || '')) { + executionResult.executionStatus = 'warning'; + } + } else { + if (ExecuteBatch.shallow === true) { + // What this does is guarantee that top-level attributes + // from the JSON are kept and the are the same type. + + // We convert nested JSON objects to a simple {object:true} + // and we convert nested arrays to ['json array'] + + // This reduces the chance of false positives but may + // result in not detecting deeper changes. + Object.keys(data.data.resultData.runData).map((nodeName: string) => { + data.data.resultData.runData[nodeName].map((taskData: ITaskData) => { + if (taskData.data === undefined) { + return; + } + Object.keys(taskData.data).map(connectionName => { + const connection = taskData.data![connectionName] as Array; + connection.map(executionDataArray => { + if (executionDataArray === null) { + return; + } + + if (nodeEdgeCases[nodeName] !== undefined && nodeEdgeCases[nodeName].capResults !== undefined) { + executionDataArray.splice(nodeEdgeCases[nodeName].capResults!); + } + + executionDataArray.map(executionData => { + if (executionData.json === undefined) { + return; + } + if (nodeEdgeCases[nodeName] !== undefined && nodeEdgeCases[nodeName].ignoredProperties !== undefined) { + nodeEdgeCases[nodeName].ignoredProperties!.forEach(ignoredProperty => delete executionData.json[ignoredProperty]); + } + + const jsonProperties = executionData.json; + + const nodeOutputAttributes = Object.keys(jsonProperties); + nodeOutputAttributes.map(attributeName => { + if (Array.isArray(jsonProperties[attributeName])) { + jsonProperties[attributeName] = ['json array']; + } else if (typeof jsonProperties[attributeName] === 'object') { + jsonProperties[attributeName] = { object: true }; + } + }); + }); + }); + + }); + }); + }); + } else { + // If not using shallow comparison then we only treat nodeEdgeCases. + const specialCases = Object.keys(nodeEdgeCases); + + specialCases.forEach(nodeName => { + data.data.resultData.runData[nodeName].map((taskData: ITaskData) => { + if (taskData.data === undefined) { + return; + } + Object.keys(taskData.data).map(connectionName => { + const connection = taskData.data![connectionName] as Array; + connection.map(executionDataArray => { + if (executionDataArray === null) { + return; + } + + if (nodeEdgeCases[nodeName].capResults !== undefined) { + executionDataArray.splice(nodeEdgeCases[nodeName].capResults!); + } + + if (nodeEdgeCases[nodeName].ignoredProperties !== undefined) { + executionDataArray.map(executionData => { + if (executionData.json === undefined) { + return; + } + nodeEdgeCases[nodeName].ignoredProperties!.forEach(ignoredProperty => delete executionData.json[ignoredProperty]); + }); + } + }); + + }); + }); + }); + } + + const serializedData = this.formatJsonOutput(data); + if (ExecuteBatch.compare === undefined) { + executionResult.executionStatus = 'success'; + } else { + const fileName = (ExecuteBatch.compare.endsWith(sep) ? ExecuteBatch.compare : ExecuteBatch.compare + sep) + `${workflowData.id}-snapshot.json`; + if (fs.existsSync(fileName) === true) { + + const contents = fs.readFileSync(fileName, { encoding: 'utf-8' }); + + const changes = diff(JSON.parse(contents), data, { keysOnly: true }); + + if (changes !== undefined) { + // we have structural changes. Report them. + executionResult.error = `Workflow may contain breaking changes`; + executionResult.changes = changes; + executionResult.executionStatus = 'error'; + } else { + executionResult.executionStatus = 'success'; + } + } else { + executionResult.error = 'Snapshot for not found.'; + executionResult.executionStatus = 'warning'; + } + } + // Save snapshots only after comparing - this is to make sure we're updating + // After comparing to existing verion. + if (ExecuteBatch.snapshot !== undefined) { + const fileName = (ExecuteBatch.snapshot.endsWith(sep) ? ExecuteBatch.snapshot : ExecuteBatch.snapshot + sep) + `${workflowData.id}-snapshot.json`; + fs.writeFileSync(fileName, serializedData); + } + } + } + } catch (e) { + executionResult.error = 'Workflow failed to execute.'; + executionResult.executionStatus = 'error'; + } + clearTimeout(timeoutTimer); + resolve(executionResult); + }); + } + +} diff --git a/packages/cli/commands/import/credentials.ts b/packages/cli/commands/import/credentials.ts index b038f33693..af2d7e0d46 100644 --- a/packages/cli/commands/import/credentials.ts +++ b/packages/cli/commands/import/credentials.ts @@ -65,6 +65,9 @@ export class ImportCredentialsCommand extends Command { try { await Db.init(); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); let i; const encryptionKey = await UserSettings.getEncryptionKey(); diff --git a/packages/cli/commands/import/workflow.ts b/packages/cli/commands/import/workflow.ts index 5b31041a44..65ddb77000 100644 --- a/packages/cli/commands/import/workflow.ts +++ b/packages/cli/commands/import/workflow.ts @@ -18,6 +18,9 @@ import { import * as fs from 'fs'; import * as glob from 'glob-promise'; import * as path from 'path'; +import { + UserSettings, +} from 'n8n-core'; export class ImportWorkflowsCommand extends Command { static description = 'Import workflows'; @@ -60,6 +63,9 @@ export class ImportWorkflowsCommand extends Command { try { await Db.init(); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); let i; if (flags.separate) { const files = await glob((flags.input.endsWith(path.sep) ? flags.input : flags.input + path.sep) + '*.json'); diff --git a/packages/cli/commands/list/workflow.ts b/packages/cli/commands/list/workflow.ts new file mode 100644 index 0000000000..6fdca2e253 --- /dev/null +++ b/packages/cli/commands/list/workflow.ts @@ -0,0 +1,67 @@ +import { + Command, + flags, +} from '@oclif/command'; + +import { + IDataObject +} from 'n8n-workflow'; + +import { + Db, +} from "../../src"; + + +export class ListWorkflowCommand extends Command { + static description = '\nList workflows'; + + static examples = [ + '$ n8n list:workflow', + '$ n8n list:workflow --active=true --onlyId', + '$ n8n list:workflow --active=false', + ]; + + static flags = { + help: flags.help({ char: 'h' }), + active: flags.string({ + description: 'Filters workflows by active status. Can be true or false', + }), + onlyId: flags.boolean({ + description: 'Outputs workflow IDs only, one per line.', + }), + }; + + async run() { + const { flags } = this.parse(ListWorkflowCommand); + + if (flags.active !== undefined && !['true', 'false'].includes(flags.active)) { + this.error('The --active flag has to be passed using true or false'); + } + + try { + await Db.init(); + + const findQuery: IDataObject = {}; + if (flags.active !== undefined) { + findQuery.active = flags.active === 'true'; + } + + const workflows = await Db.collections.Workflow!.find(findQuery); + if (flags.onlyId) { + workflows.forEach(workflow => console.log(workflow.id)); + } else { + workflows.forEach(workflow => console.log(workflow.id + "|" + workflow.name)); + } + + + } catch (e) { + console.error('\nGOT ERROR'); + console.log('===================================='); + console.error(e.message); + console.error(e.stack); + this.exit(1); + } + + this.exit(); + } +} diff --git a/packages/cli/package.json b/packages/cli/package.json index ca4a198df1..ce0467aa64 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -82,6 +82,7 @@ "dependencies": { "@oclif/command": "^1.5.18", "@oclif/errors": "^1.2.2", + "@types/json-diff": "^0.5.1", "@types/jsonwebtoken": "^8.5.2", "basic-auth": "^2.0.1", "bcryptjs": "^2.4.3", @@ -101,6 +102,7 @@ "glob-promise": "^3.4.0", "google-timezones-json": "^1.0.2", "inquirer": "^7.0.1", + "json-diff": "^0.5.4", "jsonwebtoken": "^8.5.1", "jwks-rsa": "~1.12.1", "localtunnel": "^2.0.0", diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index 4aaf4747fc..aa44bc610f 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -1,7 +1,8 @@ { "compilerOptions": { "lib": [ - "es2017" + "es2017", + "ES2020.Promise" ], "types": [ "node", diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index e76da5fb2c..f48d605e2d 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -348,6 +348,7 @@ export interface INode { type: string; position: [number, number]; disabled?: boolean; + notes?: string; notesInFlow?: boolean; retryOnFail?: boolean; maxTries?: number;