diff --git a/packages/@n8n/task-runner-python/src/task_executor.py b/packages/@n8n/task-runner-python/src/task_executor.py index 82d9c81863..8ae6de67bd 100644 --- a/packages/@n8n/task-runner-python/src/task_executor.py +++ b/packages/@n8n/task-runner-python/src/task_executor.py @@ -25,7 +25,7 @@ from typing import Any, Set from multiprocessing.context import SpawnProcess -MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn") +MULTIPROCESSING_CONTEXT = multiprocessing.get_context("fork") MAX_PRINT_ARGS_ALLOWED = 100 PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task @@ -95,6 +95,9 @@ class TaskExecutor: returned = queue.get_nowait() except Empty: raise TaskResultMissingError() + finally: + queue.close() + queue.join_thread() if "error" in returned: raise TaskRuntimeError(returned["error"]) diff --git a/packages/@n8n/task-runner-python/tests/integration/test_execution.py b/packages/@n8n/task-runner-python/tests/integration/test_execution.py index 8cd4290605..7bd2090861 100644 --- a/packages/@n8n/task-runner-python/tests/integration/test_execution.py +++ b/packages/@n8n/task-runner-python/tests/integration/test_execution.py @@ -187,7 +187,7 @@ async def test_timeout_during_execution(broker, manager): task_settings = create_task_settings(code=code, node_mode="all_items") await broker.send_task(task_id=task_id, task_settings=task_settings) - error_msg = await wait_for_task_error(broker, task_id, timeout=TASK_TIMEOUT + 0.5) + error_msg = await wait_for_task_error(broker, task_id, timeout=TASK_TIMEOUT + 1.5) assert error_msg["taskId"] == task_id assert "timed out" in error_msg["error"]["message"].lower()