fix(core): Resolve Python multiprocessing queue deadlock (#19084)

This commit is contained in:
Iván Ovejero
2025-09-02 13:12:48 +02:00
committed by GitHub
parent bd8ad0a0c5
commit de30ecc359

View File

@@ -5,6 +5,8 @@ import textwrap
import json
import os
import sys
import tempfile
import pickle
from src.errors import (
TaskResultMissingError,
@@ -25,6 +27,7 @@ from typing import Any, Set
from multiprocessing.context import SpawnProcess
MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn")
MAX_PRINT_STATEMENTS_ALLOWED = 100
PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task
@@ -97,7 +100,16 @@ class TaskExecutor:
if "error" in returned:
raise TaskRuntimeError(returned["error"])
result = returned.get("result", [])
if "result_file" not in returned:
raise TaskResultMissingError()
result_file = returned["result_file"]
try:
with open(result_file, "rb") as f:
result = pickle.load(f)
finally:
os.unlink(result_file)
print_args = returned.get("print_args", [])
return result, print_args
@@ -152,9 +164,8 @@ class TaskExecutor:
exec(compiled_code, globals)
queue.put(
{"result": globals[EXECUTOR_USER_OUTPUT_KEY], "print_args": print_args}
)
result = globals[EXECUTOR_USER_OUTPUT_KEY]
TaskExecutor._put_result(queue, result, print_args)
except Exception as e:
TaskExecutor._put_error(queue, e, print_args)
@@ -201,7 +212,7 @@ class TaskExecutor:
user_output["pairedItem"] = {"item": index}
result.append(user_output)
queue.put({"result": result, "print_args": print_args})
TaskExecutor._put_result(queue, result, print_args)
except Exception as e:
TaskExecutor._put_error(queue, e, print_args)
@@ -211,12 +222,26 @@ class TaskExecutor:
indented_code = textwrap.indent(raw_code, " ")
return f"def _user_function():\n{indented_code}\n\n{EXECUTOR_USER_OUTPUT_KEY} = _user_function()"
@staticmethod
def _put_result(
queue: multiprocessing.Queue, result: list[Any], print_args: PrintArgs
):
with tempfile.NamedTemporaryFile(
mode="wb", delete=False, prefix="n8n_result_"
) as f:
pickle.dump(result, f)
result_file = f.name
print_args_to_send = TaskExecutor._truncate_print_args(print_args)
queue.put({"result_file": result_file, "print_args": print_args_to_send})
@staticmethod
def _put_error(queue: multiprocessing.Queue, e: Exception, print_args: PrintArgs):
print_args_to_send = TaskExecutor._truncate_print_args(print_args)
queue.put(
{
"error": {"message": str(e), "stack": traceback.format_exc()},
"print_args": print_args,
"print_args": print_args_to_send,
}
)
@@ -273,6 +298,22 @@ class TaskExecutor:
return formatted
@staticmethod
def _truncate_print_args(print_args: PrintArgs) -> PrintArgs:
"""Truncate print_args to prevent pipe buffer overflow."""
if not print_args or len(print_args) <= MAX_PRINT_STATEMENTS_ALLOWED:
return print_args
truncated = print_args[:MAX_PRINT_STATEMENTS_ALLOWED]
truncated.append(
[
f"[Output truncated - {len(print_args) - MAX_PRINT_STATEMENTS_ALLOWED} more print statements]"
]
)
return truncated
# ========== security ==========
@staticmethod