feat(Email Trigger (IMAP) Node): Limit new mails fetched (#16926)

Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
Co-authored-by: Michael Kret <michael.k@radency.com>
This commit is contained in:
Elias Meire
2025-07-09 13:04:10 +02:00
committed by GitHub
parent c8b3ac6ab0
commit d1ac292709
18 changed files with 552 additions and 222 deletions

View File

@@ -1,6 +1,15 @@
import type { ImapSimple, ImapSimpleOptions, Message, MessagePart } from '@n8n/imap';
import type { ICredentialsDataImap } from '@credentials/Imap.credentials';
import { isCredentialsDataImap } from '@credentials/Imap.credentials';
import type {
ImapSimple,
ImapSimpleOptions,
Message,
MessagePart,
SearchCriteria,
} from '@n8n/imap';
import { connect as imapConnect } from '@n8n/imap';
import isEmpty from 'lodash/isEmpty';
import { DateTime } from 'luxon';
import type {
ITriggerFunctions,
IBinaryData,
@@ -13,13 +22,11 @@ import type {
INodeTypeDescription,
ITriggerResponse,
JsonObject,
INodeExecutionData,
} from 'n8n-workflow';
import { NodeConnectionTypes, NodeOperationError, TriggerCloseError } from 'n8n-workflow';
import rfc2047 from 'rfc2047';
import type { ICredentialsDataImap } from '@credentials/Imap.credentials';
import { isCredentialsDataImap } from '@credentials/Imap.credentials';
import { getNewEmails } from './utils';
const versionDescription: INodeTypeDescription = {
@@ -28,7 +35,7 @@ const versionDescription: INodeTypeDescription = {
icon: 'fa:inbox',
iconColor: 'green',
group: ['trigger'],
version: 2,
version: [2, 2.1],
description: 'Triggers the workflow when a new email is received',
eventTriggerDescription: 'Waiting for you to receive an email',
defaults: {
@@ -248,6 +255,7 @@ export class EmailReadImapV2 implements INodeType {
const mailbox = this.getNodeParameter('mailbox') as string;
const postProcessAction = this.getNodeParameter('postProcessAction') as string;
const options = this.getNodeParameter('options', {}) as IDataObject;
const activatedAt = DateTime.now();
const staticData = this.getWorkflowStaticData('node');
this.logger.debug('Loaded static data for node "EmailReadImap"', { staticData });
@@ -333,12 +341,10 @@ export class EmailReadImapV2 implements INodeType {
const returnedPromise = this.helpers.createDeferredPromise();
const establishConnection = async (): Promise<ImapSimple> => {
let searchCriteria = ['UNSEEN'] as Array<string | string[]>;
let searchCriteria: SearchCriteria[] = ['UNSEEN'];
if (options.customEmailConfig !== undefined) {
try {
searchCriteria = JSON.parse(options.customEmailConfig as string) as Array<
string | string[]
>;
searchCriteria = JSON.parse(options.customEmailConfig as string) as SearchCriteria[];
} catch (error) {
throw new NodeOperationError(this.getNode(), 'Custom email config is not valid JSON.');
}
@@ -353,10 +359,21 @@ export class EmailReadImapV2 implements INodeType {
tls: credentials.secure,
authTimeout: 20000,
},
onMail: async () => {
onMail: async (numEmails) => {
this.logger.debug('New emails received in node "EmailReadImap"', {
numEmails,
});
if (connection) {
/**
* Only process new emails:
* - If we've seen emails before (lastMessageUid is set), fetch messages higher UID.
* - Otherwise, fetch emails received since the workflow activation date.
*
* Note: IMAP 'SINCE' only filters by date (not time),
* so it may include emails from earlier on the activation day.
*/
if (staticData.lastMessageUid !== undefined) {
searchCriteria.push(['UID', `${staticData.lastMessageUid as number}:*`]);
/**
* A short explanation about UIDs and how they work
* can be found here: https://dev.to/kehers/imap-new-messages-since-last-check-44gm
@@ -369,24 +386,28 @@ export class EmailReadImapV2 implements INodeType {
* - You can check if UIDs changed in the above example
* by checking UIDValidity.
*/
this.logger.debug('Querying for new messages on node "EmailReadImap"', {
searchCriteria,
});
searchCriteria.push(['UID', `${staticData.lastMessageUid as number}:*`]);
} else {
searchCriteria.push(['SINCE', activatedAt.toFormat('dd-LLL-yyyy')]);
}
this.logger.debug('Querying for new messages on node "EmailReadImap"', {
searchCriteria,
});
try {
const returnData = await getNewEmails.call(
this,
connection,
await getNewEmails.call(this, {
imapConnection: connection,
searchCriteria,
staticData,
postProcessAction,
getText,
getAttachment,
);
if (returnData.length) {
this.emit([returnData]);
}
onEmailBatch: async (returnData: INodeExecutionData[]) => {
if (returnData.length) {
this.emit([returnData]);
}
},
});
} catch (error) {
this.logger.error('Email Read Imap node encountered an error fetching new emails', {
error: error as Error,
@@ -399,7 +420,7 @@ export class EmailReadImapV2 implements INodeType {
}
}
},
onUpdate: async (seqNo: number, info) => {
onUpdate: (seqNo: number, info) => {
this.logger.debug(`Email Read Imap:update ${seqNo}`, info);
},
};
@@ -420,8 +441,8 @@ export class EmailReadImapV2 implements INodeType {
// Connect to the IMAP server and open the mailbox
// that we get informed whenever a new email arrives
return await imapConnect(config).then(async (conn) => {
conn.on('close', async (_hadError: boolean) => {
return await imapConnect(config).then((conn) => {
conn.on('close', (_hadError: boolean) => {
if (isCurrentlyReconnecting) {
this.logger.debug('Email Read Imap: Connected closed for forced reconnecting');
} else if (closeFunctionWasCalled) {
@@ -431,7 +452,7 @@ export class EmailReadImapV2 implements INodeType {
this.emitError(new Error('Imap connection closed unexpectedly'));
}
});
conn.on('error', async (error) => {
conn.on('error', (error) => {
const errorCode = ((error as JsonObject).code as string).toUpperCase();
this.logger.debug(`IMAP connection experienced an error: (${errorCode})`, {
error: error as Error,

View File

@@ -1,4 +1,10 @@
import { getParts, type ImapSimple, type Message, type MessagePart } from '@n8n/imap';
import {
getParts,
type ImapSimple,
type Message,
type MessagePart,
type SearchCriteria,
} from '@n8n/imap';
import find from 'lodash/find';
import { simpleParser, type Source as ParserSource } from 'mailparser';
import {
@@ -46,19 +52,30 @@ async function parseRawEmail(
} as INodeExecutionData;
}
const EMAIL_BATCH_SIZE = 20;
export async function getNewEmails(
this: ITriggerFunctions,
imapConnection: ImapSimple,
searchCriteria: Array<string | string[]>,
staticData: IDataObject,
postProcessAction: string,
getText: (parts: MessagePart[], message: Message, subtype: string) => Promise<string>,
getAttachment: (
imapConnection: ImapSimple,
parts: MessagePart[],
message: Message,
) => Promise<IBinaryData[]>,
): Promise<INodeExecutionData[]> {
{
getAttachment,
getText,
onEmailBatch,
imapConnection,
postProcessAction,
searchCriteria,
}: {
imapConnection: ImapSimple;
searchCriteria: SearchCriteria[];
postProcessAction: string;
getText: (parts: MessagePart[], message: Message, subtype: string) => Promise<string>;
getAttachment: (
imapConnection: ImapSimple,
parts: MessagePart[],
message: Message,
) => Promise<IBinaryData[]>;
onEmailBatch: (data: INodeExecutionData[]) => Promise<void>;
},
) {
const format = this.getNodeParameter('format', 0) as string;
let fetchOptions = {};
@@ -77,150 +94,167 @@ export async function getNewEmails(
};
}
const results = await imapConnection.search(searchCriteria, fetchOptions);
let results: Message[] = [];
let maxUid = 0;
const newEmails: INodeExecutionData[] = [];
let newEmail: INodeExecutionData;
let attachments: IBinaryData[];
let propertyName: string;
const staticData = this.getWorkflowStaticData('node');
const limit = this.getNode().typeVersion >= 2.1 ? EMAIL_BATCH_SIZE : undefined;
// All properties get by default moved to metadata except the ones
// which are defined here which get set on the top level.
const topLevelProperties = ['cc', 'date', 'from', 'subject', 'to'];
do {
if (maxUid) {
searchCriteria = searchCriteria.filter((criteria: SearchCriteria) => {
if (Array.isArray(criteria)) {
return !['UID', 'SINCE'].includes(criteria[0]);
}
return true;
});
if (format === 'resolved') {
const dataPropertyAttachmentsPrefixName = this.getNodeParameter(
'dataPropertyAttachmentsPrefixName',
) as string;
for (const message of results) {
if (
staticData.lastMessageUid !== undefined &&
message.attributes.uid <= (staticData.lastMessageUid as number)
) {
continue;
}
if (
staticData.lastMessageUid === undefined ||
(staticData.lastMessageUid as number) < message.attributes.uid
) {
staticData.lastMessageUid = message.attributes.uid;
}
const part = find(message.parts, { which: '' });
if (part === undefined) {
throw new NodeOperationError(this.getNode(), 'Email part could not be parsed.');
}
const parsedEmail = await parseRawEmail.call(
this,
part.body as Buffer,
dataPropertyAttachmentsPrefixName,
);
parsedEmail.json.attributes = {
uid: message.attributes.uid,
};
newEmails.push(parsedEmail);
searchCriteria.push(['UID', `${maxUid}:*`]);
}
} else if (format === 'simple') {
const downloadAttachments = this.getNodeParameter('downloadAttachments') as boolean;
results = await imapConnection.search(searchCriteria, fetchOptions, limit);
let dataPropertyAttachmentsPrefixName = '';
if (downloadAttachments) {
dataPropertyAttachmentsPrefixName = this.getNodeParameter(
this.logger.debug(`Process ${results.length} new emails in node "EmailReadImap"`);
const newEmails: INodeExecutionData[] = [];
let newEmail: INodeExecutionData;
let attachments: IBinaryData[];
let propertyName: string;
// All properties get by default moved to metadata except the ones
// which are defined here which get set on the top level.
const topLevelProperties = ['cc', 'date', 'from', 'subject', 'to'];
if (format === 'resolved') {
const dataPropertyAttachmentsPrefixName = this.getNodeParameter(
'dataPropertyAttachmentsPrefixName',
) as string;
}
for (const message of results) {
if (
staticData.lastMessageUid !== undefined &&
message.attributes.uid <= (staticData.lastMessageUid as number)
) {
continue;
}
if (
staticData.lastMessageUid === undefined ||
(staticData.lastMessageUid as number) < message.attributes.uid
) {
staticData.lastMessageUid = message.attributes.uid;
}
const parts = getParts(message.attributes.struct as IDataObject[]);
newEmail = {
json: {
textHtml: await getText(parts, message, 'html'),
textPlain: await getText(parts, message, 'plain'),
metadata: {} as IDataObject,
attributes: {
uid: message.attributes.uid,
} as IDataObject,
},
};
const messageHeader = message.parts.filter((part) => part.which === 'HEADER');
const messageBody = messageHeader[0].body as Record<string, string[]>;
for (propertyName of Object.keys(messageBody)) {
if (messageBody[propertyName].length) {
if (topLevelProperties.includes(propertyName)) {
newEmail.json[propertyName] = messageBody[propertyName][0];
} else {
(newEmail.json.metadata as IDataObject)[propertyName] = messageBody[propertyName][0];
}
for (const message of results) {
const lastMessageUid = this.getWorkflowStaticData('node').lastMessageUid as number;
if (lastMessageUid !== undefined && message.attributes.uid <= lastMessageUid) {
continue;
}
}
// Track the maximum UID to update staticData later
if (message.attributes.uid > maxUid) {
maxUid = message.attributes.uid;
}
const part = find(message.parts, { which: '' });
if (part === undefined) {
throw new NodeOperationError(this.getNode(), 'Email part could not be parsed.');
}
const parsedEmail = await parseRawEmail.call(
this,
part.body as Buffer,
dataPropertyAttachmentsPrefixName,
);
parsedEmail.json.attributes = {
uid: message.attributes.uid,
};
newEmails.push(parsedEmail);
}
} else if (format === 'simple') {
const downloadAttachments = this.getNodeParameter('downloadAttachments') as boolean;
let dataPropertyAttachmentsPrefixName = '';
if (downloadAttachments) {
// Get attachments and add them if any get found
attachments = await getAttachment(imapConnection, parts, message);
if (attachments.length) {
newEmail.binary = {};
for (let i = 0; i < attachments.length; i++) {
newEmail.binary[`${dataPropertyAttachmentsPrefixName}${i}`] = attachments[i];
dataPropertyAttachmentsPrefixName = this.getNodeParameter(
'dataPropertyAttachmentsPrefixName',
) as string;
}
for (const message of results) {
const lastMessageUid = this.getWorkflowStaticData('node').lastMessageUid as number;
if (lastMessageUid !== undefined && message.attributes.uid <= lastMessageUid) {
continue;
}
// Track the maximum UID to update staticData later
if (message.attributes.uid > maxUid) {
maxUid = message.attributes.uid;
}
const parts = getParts(message.attributes.struct as IDataObject[]);
newEmail = {
json: {
textHtml: await getText(parts, message, 'html'),
textPlain: await getText(parts, message, 'plain'),
metadata: {} as IDataObject,
attributes: {
uid: message.attributes.uid,
} as IDataObject,
},
};
const messageHeader = message.parts.filter((part) => part.which === 'HEADER');
const messageBody = messageHeader[0].body as Record<string, string[]>;
for (propertyName of Object.keys(messageBody)) {
if (messageBody[propertyName].length) {
if (topLevelProperties.includes(propertyName)) {
newEmail.json[propertyName] = messageBody[propertyName][0];
} else {
(newEmail.json.metadata as IDataObject)[propertyName] = messageBody[propertyName][0];
}
}
}
}
newEmails.push(newEmail);
if (downloadAttachments) {
// Get attachments and add them if any get found
attachments = await getAttachment(imapConnection, parts, message);
if (attachments.length) {
newEmail.binary = {};
for (let i = 0; i < attachments.length; i++) {
newEmail.binary[`${dataPropertyAttachmentsPrefixName}${i}`] = attachments[i];
}
}
}
newEmails.push(newEmail);
}
} else if (format === 'raw') {
for (const message of results) {
const lastMessageUid = this.getWorkflowStaticData('node').lastMessageUid as number;
if (lastMessageUid !== undefined && message.attributes.uid <= lastMessageUid) {
continue;
}
// Track the maximum UID to update staticData later
if (message.attributes.uid > maxUid) {
maxUid = message.attributes.uid;
}
const part = find(message.parts, { which: 'TEXT' });
if (part === undefined) {
throw new NodeOperationError(this.getNode(), 'Email part could not be parsed.');
}
// Return base64 string
newEmail = {
json: {
raw: part.body as string,
},
};
newEmails.push(newEmail);
}
}
} else if (format === 'raw') {
for (const message of results) {
if (
staticData.lastMessageUid !== undefined &&
message.attributes.uid <= (staticData.lastMessageUid as number)
) {
continue;
}
if (
staticData.lastMessageUid === undefined ||
(staticData.lastMessageUid as number) < message.attributes.uid
) {
staticData.lastMessageUid = message.attributes.uid;
}
const part = find(message.parts, { which: 'TEXT' });
if (part === undefined) {
throw new NodeOperationError(this.getNode(), 'Email part could not be parsed.');
// only mark messages as seen once processing has finished
if (postProcessAction === 'read') {
const uidList = results.map((e) => e.attributes.uid);
if (uidList.length > 0) {
await imapConnection.addFlags(uidList, '\\SEEN');
}
// Return base64 string
newEmail = {
json: {
raw: part.body as string,
},
};
newEmails.push(newEmail);
}
await onEmailBatch(newEmails);
} while (results.length >= EMAIL_BATCH_SIZE);
// Update lastMessageUid after processing all messages
if (maxUid > ((staticData.lastMessageUid as number) ?? 0)) {
this.getWorkflowStaticData('node').lastMessageUid = maxUid;
}
// only mark messages as seen once processing has finished
if (postProcessAction === 'read') {
const uidList = results.map((e) => e.attributes.uid);
if (uidList.length > 0) {
await imapConnection.addFlags(uidList, '\\SEEN');
}
}
return newEmails;
}