mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 09:36:44 +00:00
fix(Salesforce Trigger Node): Update polling logic to account for Salesforce processing delay (#19377)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import jwt from 'jsonwebtoken';
|
||||
import moment from 'moment-timezone';
|
||||
import { DateTime } from 'luxon';
|
||||
import type {
|
||||
IExecuteFunctions,
|
||||
ILoadOptionsFunctions,
|
||||
@@ -242,3 +243,44 @@ export function getQuery(options: IDataObject, sobject: string, returnAll: boole
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the polling start date with safety margin to account for Salesforce indexing delays
|
||||
*/
|
||||
export function getPollStartDate(lastTimeChecked: string | undefined): string {
|
||||
if (!lastTimeChecked) {
|
||||
return DateTime.now().toISO();
|
||||
}
|
||||
const safetyMarginMinutes = 15;
|
||||
return DateTime.fromISO(lastTimeChecked).minus({ minutes: safetyMarginMinutes }).toISO();
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters out already processed items and manages the processed IDs list
|
||||
*/
|
||||
export function filterAndManageProcessedItems(
|
||||
responseData: IDataObject[],
|
||||
processedIds: string[],
|
||||
): { newItems: IDataObject[]; updatedProcessedIds: string[] } {
|
||||
const processedIdsSet = new Set(processedIds);
|
||||
|
||||
const newItems: IDataObject[] = [];
|
||||
const newItemIds: string[] = [];
|
||||
|
||||
for (const item of responseData) {
|
||||
if (typeof item.Id !== 'string') continue;
|
||||
|
||||
const itemId = item.Id;
|
||||
if (!processedIdsSet.has(itemId)) {
|
||||
newItems.push(item);
|
||||
newItemIds.push(itemId);
|
||||
} else {
|
||||
processedIdsSet.delete(itemId);
|
||||
}
|
||||
}
|
||||
|
||||
const remainingProcessedIds = Array.from(processedIdsSet);
|
||||
const updatedProcessedIds = remainingProcessedIds.concat(newItemIds);
|
||||
|
||||
return { newItems, updatedProcessedIds };
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ import {
|
||||
salesforceApiRequest,
|
||||
salesforceApiRequestAllItems,
|
||||
sortOptions,
|
||||
getPollStartDate,
|
||||
filterAndManageProcessedItems,
|
||||
} from './GenericFunctions';
|
||||
|
||||
export class SalesforceTrigger implements INodeType {
|
||||
@@ -185,7 +187,8 @@ export class SalesforceTrigger implements INodeType {
|
||||
};
|
||||
|
||||
async poll(this: IPollFunctions): Promise<INodeExecutionData[][] | null> {
|
||||
const workflowData = this.getWorkflowStaticData('node');
|
||||
const workflowData: { processedIds?: string[]; lastTimeChecked?: string } =
|
||||
this.getWorkflowStaticData('node');
|
||||
let responseData;
|
||||
const qs: IDataObject = {};
|
||||
const triggerOn = this.getNodeParameter('triggerOn') as string;
|
||||
@@ -196,11 +199,15 @@ export class SalesforceTrigger implements INodeType {
|
||||
triggerResource = this.getNodeParameter('customObject') as string;
|
||||
}
|
||||
|
||||
const now = DateTime.now().toISO();
|
||||
const startDate = (workflowData.lastTimeChecked as string) || now;
|
||||
const endDate = now;
|
||||
const endDate = DateTime.now().toISO();
|
||||
|
||||
if (!workflowData.processedIds) {
|
||||
workflowData.processedIds = [];
|
||||
}
|
||||
const processedIds = workflowData.processedIds;
|
||||
|
||||
try {
|
||||
const pollStartDate = startDate;
|
||||
const pollStartDate = getPollStartDate(workflowData.lastTimeChecked);
|
||||
const pollEndDate = endDate;
|
||||
|
||||
const options = {
|
||||
@@ -262,6 +269,20 @@ export class SalesforceTrigger implements INodeType {
|
||||
workflowData.lastTimeChecked = endDate;
|
||||
return null;
|
||||
}
|
||||
|
||||
const { newItems, updatedProcessedIds } = filterAndManageProcessedItems(
|
||||
responseData,
|
||||
processedIds,
|
||||
);
|
||||
|
||||
workflowData.processedIds = updatedProcessedIds;
|
||||
workflowData.lastTimeChecked = endDate;
|
||||
|
||||
if (newItems.length > 0) {
|
||||
return [this.helpers.returnJsonArray(newItems as IDataObject[])];
|
||||
}
|
||||
|
||||
return null;
|
||||
} catch (error) {
|
||||
if (this.getMode() === 'manual' || !workflowData.lastTimeChecked) {
|
||||
throw error;
|
||||
@@ -278,12 +299,5 @@ export class SalesforceTrigger implements INodeType {
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
workflowData.lastTimeChecked = endDate;
|
||||
|
||||
if (Array.isArray(responseData) && responseData.length) {
|
||||
return [this.helpers.returnJsonArray(responseData as IDataObject[])];
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
sortOptions,
|
||||
getDefaultFields,
|
||||
getQuery,
|
||||
filterAndManageProcessedItems,
|
||||
} from '../GenericFunctions';
|
||||
|
||||
describe('Salesforce -> GenericFunctions', () => {
|
||||
@@ -217,4 +218,136 @@ describe('Salesforce -> GenericFunctions', () => {
|
||||
expect(result).toBe('SELECT id,name,email FROM Account WHERE id = 123 LIMIT 5');
|
||||
});
|
||||
});
|
||||
|
||||
describe('filterAndManageProcessedItems', () => {
|
||||
it('should filter out already processed items', () => {
|
||||
const responseData: IDataObject[] = [
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '002', Name: 'Item 2' },
|
||||
{ Id: '003', Name: 'Item 3' },
|
||||
];
|
||||
const processedIds = ['002'];
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.newItems).toEqual([
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '003', Name: 'Item 3' },
|
||||
]);
|
||||
// '002' is removed since it was encountered, only new items remain
|
||||
expect(result.updatedProcessedIds).toEqual(['001', '003']);
|
||||
});
|
||||
|
||||
it('should handle empty response data', () => {
|
||||
const responseData: IDataObject[] = [];
|
||||
const processedIds = ['001', '002'];
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.newItems).toEqual([]);
|
||||
expect(result.updatedProcessedIds).toEqual(['001', '002']);
|
||||
});
|
||||
|
||||
it('should handle empty processed IDs', () => {
|
||||
const responseData: IDataObject[] = [
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '002', Name: 'Item 2' },
|
||||
];
|
||||
const processedIds: string[] = [];
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.newItems).toEqual(responseData);
|
||||
expect(result.updatedProcessedIds).toEqual(['001', '002']);
|
||||
});
|
||||
|
||||
it('should handle all items already processed', () => {
|
||||
const responseData: IDataObject[] = [
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '002', Name: 'Item 2' },
|
||||
];
|
||||
const processedIds = ['001', '002', '003'];
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.newItems).toEqual([]);
|
||||
// Should only keep '003' since '001' and '002' were encountered and removed
|
||||
expect(result.updatedProcessedIds).toEqual(['003']);
|
||||
});
|
||||
|
||||
it('should handle large numbers of processed IDs efficiently', () => {
|
||||
// Create 995 existing processed IDs
|
||||
const processedIds = Array.from({ length: 995 }, (_, i) => `existing-${i}`);
|
||||
|
||||
// Add 10 new items
|
||||
const responseData: IDataObject[] = Array.from({ length: 10 }, (_, i) => ({
|
||||
Id: `new-${i}`,
|
||||
Name: `New Item ${i}`,
|
||||
}));
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.newItems).toHaveLength(10);
|
||||
// Should keep all existing IDs + 10 new IDs (no artificial limit)
|
||||
expect(result.updatedProcessedIds).toHaveLength(1005);
|
||||
|
||||
// Should keep all existing IDs + new IDs
|
||||
expect(result.updatedProcessedIds.slice(0, 995)).toEqual(processedIds);
|
||||
expect(result.updatedProcessedIds.slice(-10)).toEqual(
|
||||
responseData.map((item) => item.Id as string),
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle very large batches of new items', () => {
|
||||
const processedIds = ['existing-1', 'existing-2'];
|
||||
|
||||
// Create 1005 new items
|
||||
const responseData: IDataObject[] = Array.from({ length: 1005 }, (_, i) => ({
|
||||
Id: `new-${i}`,
|
||||
Name: `New Item ${i}`,
|
||||
}));
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.newItems).toHaveLength(1005);
|
||||
// Should keep all existing IDs + all new IDs (no artificial limit)
|
||||
expect(result.updatedProcessedIds).toHaveLength(1007);
|
||||
|
||||
// Should keep all existing IDs + all new IDs
|
||||
const expectedIds = processedIds.concat(responseData.map((item) => item.Id as string));
|
||||
expect(result.updatedProcessedIds).toEqual(expectedIds);
|
||||
});
|
||||
|
||||
it('should handle duplicate IDs in response data correctly', () => {
|
||||
const responseData: IDataObject[] = [
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '002', Name: 'Item 2' },
|
||||
{ Id: '001', Name: 'Item 1 Duplicate' }, // Duplicate ID
|
||||
];
|
||||
const processedIds = ['003'];
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
// Should include both items with ID '001' since they're not in processedIds
|
||||
expect(result.newItems).toEqual([
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '002', Name: 'Item 2' },
|
||||
{ Id: '001', Name: 'Item 1 Duplicate' },
|
||||
]);
|
||||
expect(result.updatedProcessedIds).toEqual(['003', '001', '002', '001']);
|
||||
});
|
||||
|
||||
it('should maintain order of processed IDs', () => {
|
||||
const responseData: IDataObject[] = [
|
||||
{ Id: '003', Name: 'Item 3' },
|
||||
{ Id: '001', Name: 'Item 1' },
|
||||
{ Id: '004', Name: 'Item 4' },
|
||||
];
|
||||
const processedIds = ['100', '200'];
|
||||
|
||||
const result = filterAndManageProcessedItems(responseData, processedIds);
|
||||
|
||||
expect(result.updatedProcessedIds).toEqual(['100', '200', '003', '001', '004']);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user