From de30ecc359f3cc8389d1bb24ec7db2533f05dbf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 2 Sep 2025 13:12:48 +0200 Subject: [PATCH] fix(core): Resolve Python multiprocessing queue deadlock (#19084) --- .../task-runner-python/src/task_executor.py | 53 ++++++++++++++++--- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/packages/@n8n/task-runner-python/src/task_executor.py b/packages/@n8n/task-runner-python/src/task_executor.py index a07afc1fc0..d5977fcc4e 100644 --- a/packages/@n8n/task-runner-python/src/task_executor.py +++ b/packages/@n8n/task-runner-python/src/task_executor.py @@ -5,6 +5,8 @@ import textwrap import json import os import sys +import tempfile +import pickle from src.errors import ( TaskResultMissingError, @@ -25,6 +27,7 @@ from typing import Any, Set from multiprocessing.context import SpawnProcess MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn") +MAX_PRINT_STATEMENTS_ALLOWED = 100 PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task @@ -97,7 +100,16 @@ class TaskExecutor: if "error" in returned: raise TaskRuntimeError(returned["error"]) - result = returned.get("result", []) + if "result_file" not in returned: + raise TaskResultMissingError() + + result_file = returned["result_file"] + try: + with open(result_file, "rb") as f: + result = pickle.load(f) + finally: + os.unlink(result_file) + print_args = returned.get("print_args", []) return result, print_args @@ -152,9 +164,8 @@ class TaskExecutor: exec(compiled_code, globals) - queue.put( - {"result": globals[EXECUTOR_USER_OUTPUT_KEY], "print_args": print_args} - ) + result = globals[EXECUTOR_USER_OUTPUT_KEY] + TaskExecutor._put_result(queue, result, print_args) except Exception as e: TaskExecutor._put_error(queue, e, print_args) @@ -201,7 +212,7 @@ class TaskExecutor: user_output["pairedItem"] = {"item": index} result.append(user_output) - queue.put({"result": result, "print_args": print_args}) + TaskExecutor._put_result(queue, result, print_args) except Exception as e: TaskExecutor._put_error(queue, e, print_args) @@ -211,12 +222,26 @@ class TaskExecutor: indented_code = textwrap.indent(raw_code, " ") return f"def _user_function():\n{indented_code}\n\n{EXECUTOR_USER_OUTPUT_KEY} = _user_function()" + @staticmethod + def _put_result( + queue: multiprocessing.Queue, result: list[Any], print_args: PrintArgs + ): + with tempfile.NamedTemporaryFile( + mode="wb", delete=False, prefix="n8n_result_" + ) as f: + pickle.dump(result, f) + result_file = f.name + + print_args_to_send = TaskExecutor._truncate_print_args(print_args) + queue.put({"result_file": result_file, "print_args": print_args_to_send}) + @staticmethod def _put_error(queue: multiprocessing.Queue, e: Exception, print_args: PrintArgs): + print_args_to_send = TaskExecutor._truncate_print_args(print_args) queue.put( { "error": {"message": str(e), "stack": traceback.format_exc()}, - "print_args": print_args, + "print_args": print_args_to_send, } ) @@ -273,6 +298,22 @@ class TaskExecutor: return formatted + @staticmethod + def _truncate_print_args(print_args: PrintArgs) -> PrintArgs: + """Truncate print_args to prevent pipe buffer overflow.""" + + if not print_args or len(print_args) <= MAX_PRINT_STATEMENTS_ALLOWED: + return print_args + + truncated = print_args[:MAX_PRINT_STATEMENTS_ALLOWED] + truncated.append( + [ + f"[Output truncated - {len(print_args) - MAX_PRINT_STATEMENTS_ALLOWED} more print statements]" + ] + ) + + return truncated + # ========== security ========== @staticmethod