feat(Chat Trigger Node): Add support for file uploads & harmonize public and development chat (#9802)

Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
oleg
2024-07-09 13:45:41 +02:00
committed by GitHub
parent 501bcd80ff
commit df783151b8
32 changed files with 2309 additions and 940 deletions

View File

@@ -1,6 +1,6 @@
import { pipeline } from 'stream/promises';
import { createWriteStream } from 'fs';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type { IBinaryData, IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { NodeOperationError, BINARY_ENCODING } from 'n8n-workflow';
import type { TextSplitter } from '@langchain/textsplitters';
@@ -60,21 +60,10 @@ export class N8nBinaryLoader {
return docs;
}
async processItem(item: INodeExecutionData, itemIndex: number): Promise<Document[]> {
const selectedLoader: keyof typeof SUPPORTED_MIME_TYPES = this.context.getNodeParameter(
'loader',
itemIndex,
'auto',
) as keyof typeof SUPPORTED_MIME_TYPES;
const docs: Document[] = [];
const metadata = getMetadataFiltersValues(this.context, itemIndex);
if (!item) return [];
const binaryData = this.context.helpers.assertBinaryData(itemIndex, this.binaryDataKey);
const { mimeType } = binaryData;
private async validateMimeType(
mimeType: string,
selectedLoader: keyof typeof SUPPORTED_MIME_TYPES,
): Promise<void> {
// Check if loader matches the mime-type of the data
if (selectedLoader !== 'auto' && !SUPPORTED_MIME_TYPES[selectedLoader].includes(mimeType)) {
const neededLoader = Object.keys(SUPPORTED_MIME_TYPES).find((loader) =>
@@ -90,6 +79,7 @@ export class N8nBinaryLoader {
if (!Object.values(SUPPORTED_MIME_TYPES).flat().includes(mimeType)) {
throw new NodeOperationError(this.context.getNode(), `Unsupported mime type: ${mimeType}`);
}
if (
!SUPPORTED_MIME_TYPES[selectedLoader].includes(mimeType) &&
selectedLoader !== 'textLoader' &&
@@ -100,24 +90,31 @@ export class N8nBinaryLoader {
`Unsupported mime type: ${mimeType} for selected loader: ${selectedLoader}`,
);
}
}
let filePathOrBlob: string | Blob;
private async getFilePathOrBlob(
binaryData: IBinaryData,
mimeType: string,
): Promise<string | Blob> {
if (binaryData.id) {
const binaryBuffer = await this.context.helpers.binaryToBuffer(
await this.context.helpers.getBinaryStream(binaryData.id),
);
filePathOrBlob = new Blob([binaryBuffer], {
return new Blob([binaryBuffer], {
type: mimeType,
});
} else {
filePathOrBlob = new Blob([Buffer.from(binaryData.data, BINARY_ENCODING)], {
return new Blob([Buffer.from(binaryData.data, BINARY_ENCODING)], {
type: mimeType,
});
}
}
let loader: PDFLoader | CSVLoader | EPubLoader | DocxLoader | TextLoader | JSONLoader;
let cleanupTmpFile: DirectoryResult['cleanup'] | undefined = undefined;
private async getLoader(
mimeType: string,
filePathOrBlob: string | Blob,
itemIndex: number,
): Promise<PDFLoader | CSVLoader | EPubLoader | DocxLoader | TextLoader | JSONLoader> {
switch (mimeType) {
case 'application/pdf':
const splitPages = this.context.getNodeParameter(
@@ -125,10 +122,7 @@ export class N8nBinaryLoader {
itemIndex,
false,
) as boolean;
loader = new PDFLoader(filePathOrBlob, {
splitPages,
});
break;
return new PDFLoader(filePathOrBlob, { splitPages });
case 'text/csv':
const column = this.context.getNodeParameter(
`${this.optionsPrefix}column`,
@@ -140,38 +134,23 @@ export class N8nBinaryLoader {
itemIndex,
',',
) as string;
loader = new CSVLoader(filePathOrBlob, {
column: column ?? undefined,
separator,
});
break;
return new CSVLoader(filePathOrBlob, { column: column ?? undefined, separator });
case 'application/epub+zip':
// EPubLoader currently does not accept Blobs https://github.com/langchain-ai/langchainjs/issues/1623
let filePath: string;
if (filePathOrBlob instanceof Blob) {
const tmpFileData = await tmpFile({ prefix: 'epub-loader-' });
cleanupTmpFile = tmpFileData.cleanup;
try {
const bufferData = await filePathOrBlob.arrayBuffer();
await pipeline([new Uint8Array(bufferData)], createWriteStream(tmpFileData.path));
loader = new EPubLoader(tmpFileData.path);
break;
} catch (error) {
await cleanupTmpFile();
throw new NodeOperationError(this.context.getNode(), error as Error);
}
const bufferData = await filePathOrBlob.arrayBuffer();
await pipeline([new Uint8Array(bufferData)], createWriteStream(tmpFileData.path));
return new EPubLoader(tmpFileData.path);
} else {
filePath = filePathOrBlob;
}
loader = new EPubLoader(filePath);
break;
return new EPubLoader(filePath);
case 'application/vnd.openxmlformats-officedocument.wordprocessingml.document':
loader = new DocxLoader(filePathOrBlob);
break;
return new DocxLoader(filePathOrBlob);
case 'text/plain':
loader = new TextLoader(filePathOrBlob);
break;
return new TextLoader(filePathOrBlob);
case 'application/json':
const pointers = this.context.getNodeParameter(
`${this.optionsPrefix}pointers`,
@@ -179,15 +158,77 @@ export class N8nBinaryLoader {
'',
) as string;
const pointersArray = pointers.split(',').map((pointer) => pointer.trim());
loader = new JSONLoader(filePathOrBlob, pointersArray);
break;
return new JSONLoader(filePathOrBlob, pointersArray);
default:
loader = new TextLoader(filePathOrBlob);
return new TextLoader(filePathOrBlob);
}
}
const loadedDoc = this.textSplitter
private async loadDocuments(
loader: PDFLoader | CSVLoader | EPubLoader | DocxLoader | TextLoader | JSONLoader,
): Promise<Document[]> {
return this.textSplitter
? await this.textSplitter.splitDocuments(await loader.load())
: await loader.load();
}
private async cleanupTmpFileIfNeeded(
cleanupTmpFile: DirectoryResult['cleanup'] | undefined,
): Promise<void> {
if (cleanupTmpFile) {
await cleanupTmpFile();
}
}
async processItem(item: INodeExecutionData, itemIndex: number): Promise<Document[]> {
const docs: Document[] = [];
const binaryMode = this.context.getNodeParameter('binaryMode', itemIndex, 'allInputData');
if (binaryMode === 'allInputData') {
const binaryData = this.context.getInputData();
for (const data of binaryData) {
if (data.binary) {
const binaryDataKeys = Object.keys(data.binary);
for (const fileKey of binaryDataKeys) {
const processedDocuments = await this.processItemByKey(item, itemIndex, fileKey);
docs.push(...processedDocuments);
}
}
}
} else {
const processedDocuments = await this.processItemByKey(item, itemIndex, this.binaryDataKey);
docs.push(...processedDocuments);
}
return docs;
}
async processItemByKey(
item: INodeExecutionData,
itemIndex: number,
binaryKey: string,
): Promise<Document[]> {
const selectedLoader: keyof typeof SUPPORTED_MIME_TYPES = this.context.getNodeParameter(
'loader',
itemIndex,
'auto',
) as keyof typeof SUPPORTED_MIME_TYPES;
const docs: Document[] = [];
const metadata = getMetadataFiltersValues(this.context, itemIndex);
if (!item) return [];
const binaryData = this.context.helpers.assertBinaryData(itemIndex, binaryKey);
const { mimeType } = binaryData;
await this.validateMimeType(mimeType, selectedLoader);
const filePathOrBlob = await this.getFilePathOrBlob(binaryData, mimeType);
const cleanupTmpFile: DirectoryResult['cleanup'] | undefined = undefined;
const loader = await this.getLoader(mimeType, filePathOrBlob, itemIndex);
const loadedDoc = await this.loadDocuments(loader);
docs.push(...loadedDoc);
@@ -200,9 +241,8 @@ export class N8nBinaryLoader {
});
}
if (cleanupTmpFile) {
await cleanupTmpFile();
}
await this.cleanupTmpFileIfNeeded(cleanupTmpFile);
return docs;
}
}

View File

@@ -1,5 +1,10 @@
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type { EventNamesAiNodesType, IDataObject, IExecuteFunctions } from 'n8n-workflow';
import type {
EventNamesAiNodesType,
IDataObject,
IExecuteFunctions,
IWebhookFunctions,
} from 'n8n-workflow';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { BaseOutputParser } from '@langchain/core/output_parsers';
import type { BaseMessage } from '@langchain/core/messages';
@@ -81,7 +86,7 @@ export function getPromptInputByType(options: {
}
export function getSessionId(
ctx: IExecuteFunctions,
ctx: IExecuteFunctions | IWebhookFunctions,
itemIndex: number,
selectorKey = 'sessionIdType',
autoSelect = 'fromInput',
@@ -91,7 +96,15 @@ export function getSessionId(
const selectorType = ctx.getNodeParameter(selectorKey, itemIndex) as string;
if (selectorType === autoSelect) {
sessionId = ctx.evaluateExpression('{{ $json.sessionId }}', itemIndex) as string;
// If memory node is used in webhook like node(like chat trigger node), it doesn't have access to evaluateExpression
// so we try to extract sessionId from the bodyData
if ('getBodyData' in ctx) {
const bodyData = ctx.getBodyData() ?? {};
sessionId = bodyData.sessionId as string;
} else {
sessionId = ctx.evaluateExpression('{{ $json.sessionId }}', itemIndex) as string;
}
if (sessionId === '' || sessionId === undefined) {
throw new NodeOperationError(ctx.getNode(), 'No session ID found', {
description: