From b480f495d96fd170f0b8efdc59511e4ea6834368 Mon Sep 17 00:00:00 2001 From: Michael Kret <88898367+michael-radency@users.noreply.github.com> Date: Tue, 16 Sep 2025 15:52:00 +0300 Subject: [PATCH] fix(Salesforce Trigger Node): Update polling logic to account for Salesforce processing delay (#19377) --- .../nodes/Salesforce/GenericFunctions.ts | 42 ++++++ .../Salesforce/SalesforceTrigger.node.ts | 38 +++-- .../__test__/GenericFunctions.test.ts | 133 ++++++++++++++++++ 3 files changed, 201 insertions(+), 12 deletions(-) diff --git a/packages/nodes-base/nodes/Salesforce/GenericFunctions.ts b/packages/nodes-base/nodes/Salesforce/GenericFunctions.ts index 275f92b6f6..3e6a9a5d05 100644 --- a/packages/nodes-base/nodes/Salesforce/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Salesforce/GenericFunctions.ts @@ -1,5 +1,6 @@ import jwt from 'jsonwebtoken'; import moment from 'moment-timezone'; +import { DateTime } from 'luxon'; import type { IExecuteFunctions, ILoadOptionsFunctions, @@ -242,3 +243,44 @@ export function getQuery(options: IDataObject, sobject: string, returnAll: boole return query; } + +/** + * Calculates the polling start date with safety margin to account for Salesforce indexing delays + */ +export function getPollStartDate(lastTimeChecked: string | undefined): string { + if (!lastTimeChecked) { + return DateTime.now().toISO(); + } + const safetyMarginMinutes = 15; + return DateTime.fromISO(lastTimeChecked).minus({ minutes: safetyMarginMinutes }).toISO(); +} + +/** + * Filters out already processed items and manages the processed IDs list + */ +export function filterAndManageProcessedItems( + responseData: IDataObject[], + processedIds: string[], +): { newItems: IDataObject[]; updatedProcessedIds: string[] } { + const processedIdsSet = new Set(processedIds); + + const newItems: IDataObject[] = []; + const newItemIds: string[] = []; + + for (const item of responseData) { + if (typeof item.Id !== 'string') continue; + + const itemId = item.Id; + if (!processedIdsSet.has(itemId)) { + newItems.push(item); + newItemIds.push(itemId); + } else { + processedIdsSet.delete(itemId); + } + } + + const remainingProcessedIds = Array.from(processedIdsSet); + const updatedProcessedIds = remainingProcessedIds.concat(newItemIds); + + return { newItems, updatedProcessedIds }; +} diff --git a/packages/nodes-base/nodes/Salesforce/SalesforceTrigger.node.ts b/packages/nodes-base/nodes/Salesforce/SalesforceTrigger.node.ts index 42c04bebb3..6509be3278 100644 --- a/packages/nodes-base/nodes/Salesforce/SalesforceTrigger.node.ts +++ b/packages/nodes-base/nodes/Salesforce/SalesforceTrigger.node.ts @@ -16,6 +16,8 @@ import { salesforceApiRequest, salesforceApiRequestAllItems, sortOptions, + getPollStartDate, + filterAndManageProcessedItems, } from './GenericFunctions'; export class SalesforceTrigger implements INodeType { @@ -185,7 +187,8 @@ export class SalesforceTrigger implements INodeType { }; async poll(this: IPollFunctions): Promise { - const workflowData = this.getWorkflowStaticData('node'); + const workflowData: { processedIds?: string[]; lastTimeChecked?: string } = + this.getWorkflowStaticData('node'); let responseData; const qs: IDataObject = {}; const triggerOn = this.getNodeParameter('triggerOn') as string; @@ -196,11 +199,15 @@ export class SalesforceTrigger implements INodeType { triggerResource = this.getNodeParameter('customObject') as string; } - const now = DateTime.now().toISO(); - const startDate = (workflowData.lastTimeChecked as string) || now; - const endDate = now; + const endDate = DateTime.now().toISO(); + + if (!workflowData.processedIds) { + workflowData.processedIds = []; + } + const processedIds = workflowData.processedIds; + try { - const pollStartDate = startDate; + const pollStartDate = getPollStartDate(workflowData.lastTimeChecked); const pollEndDate = endDate; const options = { @@ -262,6 +269,20 @@ export class SalesforceTrigger implements INodeType { workflowData.lastTimeChecked = endDate; return null; } + + const { newItems, updatedProcessedIds } = filterAndManageProcessedItems( + responseData, + processedIds, + ); + + workflowData.processedIds = updatedProcessedIds; + workflowData.lastTimeChecked = endDate; + + if (newItems.length > 0) { + return [this.helpers.returnJsonArray(newItems as IDataObject[])]; + } + + return null; } catch (error) { if (this.getMode() === 'manual' || !workflowData.lastTimeChecked) { throw error; @@ -278,12 +299,5 @@ export class SalesforceTrigger implements INodeType { ); throw error; } - workflowData.lastTimeChecked = endDate; - - if (Array.isArray(responseData) && responseData.length) { - return [this.helpers.returnJsonArray(responseData as IDataObject[])]; - } - - return null; } } diff --git a/packages/nodes-base/nodes/Salesforce/__test__/GenericFunctions.test.ts b/packages/nodes-base/nodes/Salesforce/__test__/GenericFunctions.test.ts index e75cd8e806..39de6e1225 100644 --- a/packages/nodes-base/nodes/Salesforce/__test__/GenericFunctions.test.ts +++ b/packages/nodes-base/nodes/Salesforce/__test__/GenericFunctions.test.ts @@ -6,6 +6,7 @@ import { sortOptions, getDefaultFields, getQuery, + filterAndManageProcessedItems, } from '../GenericFunctions'; describe('Salesforce -> GenericFunctions', () => { @@ -217,4 +218,136 @@ describe('Salesforce -> GenericFunctions', () => { expect(result).toBe('SELECT id,name,email FROM Account WHERE id = 123 LIMIT 5'); }); }); + + describe('filterAndManageProcessedItems', () => { + it('should filter out already processed items', () => { + const responseData: IDataObject[] = [ + { Id: '001', Name: 'Item 1' }, + { Id: '002', Name: 'Item 2' }, + { Id: '003', Name: 'Item 3' }, + ]; + const processedIds = ['002']; + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.newItems).toEqual([ + { Id: '001', Name: 'Item 1' }, + { Id: '003', Name: 'Item 3' }, + ]); + // '002' is removed since it was encountered, only new items remain + expect(result.updatedProcessedIds).toEqual(['001', '003']); + }); + + it('should handle empty response data', () => { + const responseData: IDataObject[] = []; + const processedIds = ['001', '002']; + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.newItems).toEqual([]); + expect(result.updatedProcessedIds).toEqual(['001', '002']); + }); + + it('should handle empty processed IDs', () => { + const responseData: IDataObject[] = [ + { Id: '001', Name: 'Item 1' }, + { Id: '002', Name: 'Item 2' }, + ]; + const processedIds: string[] = []; + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.newItems).toEqual(responseData); + expect(result.updatedProcessedIds).toEqual(['001', '002']); + }); + + it('should handle all items already processed', () => { + const responseData: IDataObject[] = [ + { Id: '001', Name: 'Item 1' }, + { Id: '002', Name: 'Item 2' }, + ]; + const processedIds = ['001', '002', '003']; + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.newItems).toEqual([]); + // Should only keep '003' since '001' and '002' were encountered and removed + expect(result.updatedProcessedIds).toEqual(['003']); + }); + + it('should handle large numbers of processed IDs efficiently', () => { + // Create 995 existing processed IDs + const processedIds = Array.from({ length: 995 }, (_, i) => `existing-${i}`); + + // Add 10 new items + const responseData: IDataObject[] = Array.from({ length: 10 }, (_, i) => ({ + Id: `new-${i}`, + Name: `New Item ${i}`, + })); + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.newItems).toHaveLength(10); + // Should keep all existing IDs + 10 new IDs (no artificial limit) + expect(result.updatedProcessedIds).toHaveLength(1005); + + // Should keep all existing IDs + new IDs + expect(result.updatedProcessedIds.slice(0, 995)).toEqual(processedIds); + expect(result.updatedProcessedIds.slice(-10)).toEqual( + responseData.map((item) => item.Id as string), + ); + }); + + it('should handle very large batches of new items', () => { + const processedIds = ['existing-1', 'existing-2']; + + // Create 1005 new items + const responseData: IDataObject[] = Array.from({ length: 1005 }, (_, i) => ({ + Id: `new-${i}`, + Name: `New Item ${i}`, + })); + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.newItems).toHaveLength(1005); + // Should keep all existing IDs + all new IDs (no artificial limit) + expect(result.updatedProcessedIds).toHaveLength(1007); + + // Should keep all existing IDs + all new IDs + const expectedIds = processedIds.concat(responseData.map((item) => item.Id as string)); + expect(result.updatedProcessedIds).toEqual(expectedIds); + }); + + it('should handle duplicate IDs in response data correctly', () => { + const responseData: IDataObject[] = [ + { Id: '001', Name: 'Item 1' }, + { Id: '002', Name: 'Item 2' }, + { Id: '001', Name: 'Item 1 Duplicate' }, // Duplicate ID + ]; + const processedIds = ['003']; + + const result = filterAndManageProcessedItems(responseData, processedIds); + + // Should include both items with ID '001' since they're not in processedIds + expect(result.newItems).toEqual([ + { Id: '001', Name: 'Item 1' }, + { Id: '002', Name: 'Item 2' }, + { Id: '001', Name: 'Item 1 Duplicate' }, + ]); + expect(result.updatedProcessedIds).toEqual(['003', '001', '002', '001']); + }); + + it('should maintain order of processed IDs', () => { + const responseData: IDataObject[] = [ + { Id: '003', Name: 'Item 3' }, + { Id: '001', Name: 'Item 1' }, + { Id: '004', Name: 'Item 4' }, + ]; + const processedIds = ['100', '200']; + + const result = filterAndManageProcessedItems(responseData, processedIds); + + expect(result.updatedProcessedIds).toEqual(['100', '200', '003', '001', '004']); + }); + }); });