feat(core): Support print RPC call in native Python runner (no-changelog) (#18630)

This commit is contained in:
Iván Ovejero
2025-08-26 11:40:55 +02:00
committed by GitHub
parent 85e3bfd3e4
commit 1da5acee30
9 changed files with 172 additions and 17 deletions

View File

@@ -4,12 +4,14 @@ BROKER_RUNNER_REGISTERED = "broker:runnerregistered"
BROKER_TASK_OFFER_ACCEPT = "broker:taskofferaccept"
BROKER_TASK_SETTINGS = "broker:tasksettings"
BROKER_TASK_CANCEL = "broker:taskcancel"
BROKER_RPC_RESPONSE = "broker:rpcresponse"
RUNNER_INFO = "runner:info"
RUNNER_TASK_OFFER = "runner:taskoffer"
RUNNER_TASK_ACCEPTED = "runner:taskaccepted"
RUNNER_TASK_REJECTED = "runner:taskrejected"
RUNNER_TASK_DONE = "runner:taskdone"
RUNNER_TASK_ERROR = "runner:taskerror"
RUNNER_RPC_CALL = "runner:rpc"
# Runner
TASK_TYPE_PYTHON = "python"
@@ -24,6 +26,7 @@ OFFER_VALIDITY_LATENCY_BUFFER = 0.1 # 100ms
# Executor
EXECUTOR_USER_OUTPUT_KEY = "__n8n_internal_user_output__"
EXECUTOR_CIRCULAR_REFERENCE_KEY = "__n8n_internal_circular_ref__"
# Broker
DEFAULT_TASK_BROKER_URI = "http://127.0.0.1:5679"
@@ -41,6 +44,9 @@ ENV_HIDE_TASK_OFFER_LOGS = "N8N_RUNNERS_HIDE_TASK_OFFER_LOGS"
LOG_FORMAT = "%(asctime)s.%(msecs)03d\t%(levelname)s\t%(message)s"
LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
# RPC
RPC_BROWSER_CONSOLE_LOG_METHOD = "logNodeOutput"
# Rejection reasons
TASK_REJECTED_REASON_OFFER_EXPIRED = (
"Offer expired - not accepted within validity window"

View File

@@ -1,6 +1,10 @@
import logging
import os
from .constants import LOG_FORMAT, LOG_TIMESTAMP_FORMAT, ENV_HIDE_TASK_OFFER_LOGS
from .constants import (
LOG_FORMAT,
LOG_TIMESTAMP_FORMAT,
ENV_HIDE_TASK_OFFER_LOGS,
)
COLORS = {
"DEBUG": "\033[34m", # blue

View File

@@ -9,6 +9,7 @@ from .constants import (
BROKER_TASK_CANCEL,
BROKER_TASK_OFFER_ACCEPT,
BROKER_TASK_SETTINGS,
BROKER_RPC_RESPONSE,
)
from .message_types import (
BrokerMessage,
@@ -18,6 +19,7 @@ from .message_types import (
BrokerTaskOfferAccept,
BrokerTaskSettings,
BrokerTaskCancel,
BrokerRpcResponse,
)
@@ -75,12 +77,24 @@ def _parse_task_cancel(d: dict) -> BrokerTaskCancel:
return BrokerTaskCancel(task_id=task_id, reason=reason)
def _parse_rpc_response(d: dict) -> BrokerRpcResponse:
try:
call_id = d["callId"]
task_id = d["taskId"]
status = d["status"]
except KeyError as e:
raise ValueError(f"Missing field in RPC response message: {e}")
return BrokerRpcResponse(call_id, task_id, status)
MESSAGE_TYPE_MAP = {
BROKER_INFO_REQUEST: lambda _: BrokerInfoRequest(),
BROKER_RUNNER_REGISTERED: lambda _: BrokerRunnerRegistered(),
BROKER_TASK_OFFER_ACCEPT: _parse_task_offer_accept,
BROKER_TASK_SETTINGS: _parse_task_settings,
BROKER_TASK_CANCEL: _parse_task_cancel,
BROKER_RPC_RESPONSE: _parse_rpc_response,
}

View File

@@ -5,6 +5,7 @@ from .broker import (
BrokerTaskOfferAccept,
BrokerTaskSettings,
BrokerTaskCancel,
BrokerRpcResponse,
)
from .runner import (
RunnerMessage,
@@ -14,6 +15,7 @@ from .runner import (
RunnerTaskRejected,
RunnerTaskDone,
RunnerTaskError,
RunnerRpcCall,
)
__all__ = [
@@ -23,6 +25,7 @@ __all__ = [
"BrokerTaskOfferAccept",
"BrokerTaskSettings",
"BrokerTaskCancel",
"BrokerRpcResponse",
"RunnerMessage",
"RunnerInfo",
"RunnerTaskOffer",
@@ -30,4 +33,5 @@ __all__ = [
"RunnerTaskRejected",
"RunnerTaskDone",
"RunnerTaskError",
"RunnerRpcCall",
]

View File

@@ -7,6 +7,7 @@ from ..constants import (
BROKER_TASK_CANCEL,
BROKER_TASK_OFFER_ACCEPT,
BROKER_TASK_SETTINGS,
BROKER_RPC_RESPONSE,
)
@@ -54,10 +55,19 @@ class BrokerTaskCancel:
type: Literal["broker:taskcancel"] = BROKER_TASK_CANCEL
@dataclass
class BrokerRpcResponse:
call_id: str
task_id: str
status: str
type: Literal["broker:rpcresponse"] = BROKER_RPC_RESPONSE
BrokerMessage = Union[
BrokerInfoRequest,
BrokerRunnerRegistered,
BrokerTaskOfferAccept,
BrokerTaskSettings,
BrokerTaskCancel,
BrokerRpcResponse,
]

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass
from typing import List, Literal, Union, Any, Dict
from ..constants import RUNNER_RPC_CALL
from ..constants import (
RUNNER_INFO,
@@ -53,6 +54,15 @@ class RunnerTaskError:
type: Literal["runner:taskerror"] = RUNNER_TASK_ERROR
@dataclass
class RunnerRpcCall:
call_id: str
task_id: str
name: str
params: List[Any]
type: Literal["runner:rpc"] = RUNNER_RPC_CALL
RunnerMessage = Union[
RunnerInfo,
RunnerTaskOffer,
@@ -60,4 +70,5 @@ RunnerMessage = Union[
RunnerTaskRejected,
RunnerTaskDone,
RunnerTaskError,
RunnerRpcCall,
]

View File

@@ -2,6 +2,7 @@ from queue import Empty
import multiprocessing
import traceback
import textwrap
import json
from .errors import (
TaskResultMissingError,
@@ -11,12 +12,15 @@ from .errors import (
)
from .message_types.broker import NodeMode, Items
from .constants import EXECUTOR_USER_OUTPUT_KEY
from .constants import EXECUTOR_CIRCULAR_REFERENCE_KEY, EXECUTOR_USER_OUTPUT_KEY
from typing import Any
from multiprocessing.context import SpawnProcess
MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn")
PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task
class TaskExecutor:
"""Responsible for executing Python code tasks in isolated subprocesses."""
@@ -42,9 +46,11 @@ class TaskExecutor:
queue: multiprocessing.Queue,
task_timeout: int,
continue_on_fail: bool,
):
) -> tuple[list, PrintArgs]:
"""Execute a subprocess for a Python code task."""
print_args: PrintArgs = []
try:
process.start()
process.join(timeout=task_timeout)
@@ -65,11 +71,14 @@ class TaskExecutor:
if "error" in returned:
raise TaskRuntimeError(returned["error"])
return returned["result"] or []
result = returned.get("result", [])
print_args = returned.get("print_args", [])
return result, print_args
except Exception as e:
if continue_on_fail:
return [{"json": {"error": str(e)}}]
return [{"json": {"error": str(e)}}], print_args
raise
@staticmethod
@@ -89,27 +98,46 @@ class TaskExecutor:
def _all_items(raw_code: str, items: Items, queue: multiprocessing.Queue):
"""Execute a Python code task in all-items mode."""
print_args: PrintArgs = []
try:
code = TaskExecutor._wrap_code(raw_code)
globals = {"__builtins__": __builtins__, "_items": items}
globals = {
"__builtins__": __builtins__,
"_items": items,
"print": TaskExecutor._create_custom_print(print_args),
}
exec(code, globals)
queue.put({"result": globals[EXECUTOR_USER_OUTPUT_KEY]})
queue.put(
{"result": globals[EXECUTOR_USER_OUTPUT_KEY], "print_args": print_args}
)
except Exception as e:
TaskExecutor._put_error(queue, e)
TaskExecutor._put_error(queue, e, print_args)
@staticmethod
def _per_item(raw_code: str, items: Items, queue: multiprocessing.Queue):
"""Execute a Python code task in per-item mode."""
print_args: PrintArgs = []
try:
wrapped_code = TaskExecutor._wrap_code(raw_code)
compiled_code = compile(wrapped_code, "<per_item_task_execution>", "exec")
result = []
for index, item in enumerate(items):
globals = {"__builtins__": __builtins__, "_item": item}
globals = {
"__builtins__": __builtins__,
"_item": item,
"print": TaskExecutor._create_custom_print(print_args),
}
exec(compiled_code, globals)
user_output = globals[EXECUTOR_USER_OUTPUT_KEY]
if user_output is None:
@@ -118,10 +146,10 @@ class TaskExecutor:
user_output["pairedItem"] = {"item": index}
result.append(user_output)
queue.put({"result": result})
queue.put({"result": result, "print_args": print_args})
except Exception as e:
TaskExecutor._put_error(queue, e)
TaskExecutor._put_error(queue, e, print_args)
@staticmethod
def _wrap_code(raw_code: str) -> str:
@@ -129,5 +157,63 @@ class TaskExecutor:
return f"def _user_function():\n{indented_code}\n\n{EXECUTOR_USER_OUTPUT_KEY} = _user_function()"
@staticmethod
def _put_error(queue: multiprocessing.Queue, e: Exception):
queue.put({"error": {"message": str(e), "stack": traceback.format_exc()}})
def _put_error(queue: multiprocessing.Queue, e: Exception, print_args: PrintArgs):
queue.put(
{
"error": {"message": str(e), "stack": traceback.format_exc()},
"print_args": print_args,
}
)
# ========== print() ==========
@staticmethod
def _create_custom_print(print_args: PrintArgs):
def custom_print(*args):
serializable_args = []
for arg in args:
try:
json.dumps(arg, default=str, ensure_ascii=False)
serializable_args.append(arg)
except Exception as _:
# Ensure args are serializable so they are transmissible
# through the multiprocessing queue and via websockets.
serializable_args.append(
{
EXECUTOR_CIRCULAR_REFERENCE_KEY: repr(arg),
"__type__": type(arg).__name__,
}
)
formatted = TaskExecutor._format_print_args(*serializable_args)
print_args.append(formatted)
print("[user code]", *args)
return custom_print
@staticmethod
def _format_print_args(*args) -> list[str]:
"""
Takes the arguments passed to a `print()` call in user code and converts them
to string representations suitable for display in a browser console.
Expects all args to be serializable.
"""
formatted = []
for arg in args:
if isinstance(arg, str):
formatted.append(f"'{arg}'")
elif arg is None or isinstance(arg, (int, float, bool)):
formatted.append(str(arg))
elif isinstance(arg, dict) and EXECUTOR_CIRCULAR_REFERENCE_KEY in arg:
formatted.append(f"[Circular {arg.get('__type__', 'Object')}]")
else:
formatted.append(json.dumps(arg, default=str, ensure_ascii=False))
return formatted

View File

@@ -7,7 +7,6 @@ from urllib.parse import urlparse
import websockets
import random
from .errors import WebsocketConnectionError, TaskMissingError
from .message_types.broker import TaskSettings
from .nanoid import nanoid
@@ -22,6 +21,7 @@ from .constants import (
OFFER_VALIDITY_MAX_JITTER,
OFFER_VALIDITY_LATENCY_BUFFER,
TASK_BROKER_WS_PATH,
RPC_BROWSER_CONSOLE_LOG_METHOD,
)
from .message_types import (
BrokerMessage,
@@ -31,12 +31,14 @@ from .message_types import (
BrokerTaskOfferAccept,
BrokerTaskSettings,
BrokerTaskCancel,
BrokerRpcResponse,
RunnerInfo,
RunnerTaskOffer,
RunnerTaskAccepted,
RunnerTaskRejected,
RunnerTaskDone,
RunnerTaskError,
RunnerRpcCall,
)
from .message_serde import MessageSerde
from .task_state import TaskState, TaskStatus
@@ -140,6 +142,8 @@ class TaskRunner:
await self._handle_task_settings(message)
case BrokerTaskCancel():
await self._handle_task_cancel(message)
case BrokerRpcResponse():
pass # currently only logging, already handled by browser
case _:
self.logger.warning(f"Unhandled message type: {type(message)}")
@@ -208,10 +212,15 @@ class TaskRunner:
task_state.process = process
result = self.executor.execute_process(
result, print_args = self.executor.execute_process(
process, queue, self.opts.task_timeout, task_settings.continue_on_fail
)
for print_args_per_call in print_args:
await self._send_rpc_message(
task_id, RPC_BROWSER_CONSOLE_LOG_METHOD, print_args_per_call
)
response = RunnerTaskDone(task_id=task_id, data={"result": result})
await self._send_message(response)
self.logger.info(f"Completed task {task_id}")
@@ -241,6 +250,13 @@ class TaskRunner:
task_state.status = TaskStatus.ABORTING
self.executor.stop_process(task_state.process)
async def _send_rpc_message(self, task_id: str, method_name: str, params: list):
message = RunnerRpcCall(
call_id=nanoid(), task_id=task_id, name=method_name, params=params
)
await self._send_message(message)
async def _send_message(self, message: RunnerMessage) -> None:
if self.websocket_connection is None:
raise WebsocketConnectionError(self.task_broker_uri)

View File

@@ -30,9 +30,13 @@ export const useCompleter = (
const word = context.matchBefore(/\w*/);
if (!word) return null;
const label = toValue(mode) === 'runOnceForEachItem' ? '_item' : '_items';
const options = [];
return { from: word.from, options: [{ label, type: 'variable' }] };
const label = toValue(mode) === 'runOnceForEachItem' ? '_item' : '_items';
options.push({ label, type: 'variable' });
options.push({ label: 'print', type: 'function' });
return { from: word.from, options };
};
return autocompletion({ icons: false, override: [completions] });