diff --git a/packages/@n8n/task-runner-python/src/constants.py b/packages/@n8n/task-runner-python/src/constants.py index 641596fa82..cdf15d7225 100644 --- a/packages/@n8n/task-runner-python/src/constants.py +++ b/packages/@n8n/task-runner-python/src/constants.py @@ -4,12 +4,14 @@ BROKER_RUNNER_REGISTERED = "broker:runnerregistered" BROKER_TASK_OFFER_ACCEPT = "broker:taskofferaccept" BROKER_TASK_SETTINGS = "broker:tasksettings" BROKER_TASK_CANCEL = "broker:taskcancel" +BROKER_RPC_RESPONSE = "broker:rpcresponse" RUNNER_INFO = "runner:info" RUNNER_TASK_OFFER = "runner:taskoffer" RUNNER_TASK_ACCEPTED = "runner:taskaccepted" RUNNER_TASK_REJECTED = "runner:taskrejected" RUNNER_TASK_DONE = "runner:taskdone" RUNNER_TASK_ERROR = "runner:taskerror" +RUNNER_RPC_CALL = "runner:rpc" # Runner TASK_TYPE_PYTHON = "python" @@ -24,6 +26,7 @@ OFFER_VALIDITY_LATENCY_BUFFER = 0.1 # 100ms # Executor EXECUTOR_USER_OUTPUT_KEY = "__n8n_internal_user_output__" +EXECUTOR_CIRCULAR_REFERENCE_KEY = "__n8n_internal_circular_ref__" # Broker DEFAULT_TASK_BROKER_URI = "http://127.0.0.1:5679" @@ -41,6 +44,9 @@ ENV_HIDE_TASK_OFFER_LOGS = "N8N_RUNNERS_HIDE_TASK_OFFER_LOGS" LOG_FORMAT = "%(asctime)s.%(msecs)03d\t%(levelname)s\t%(message)s" LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S" +# RPC +RPC_BROWSER_CONSOLE_LOG_METHOD = "logNodeOutput" + # Rejection reasons TASK_REJECTED_REASON_OFFER_EXPIRED = ( "Offer expired - not accepted within validity window" diff --git a/packages/@n8n/task-runner-python/src/logs.py b/packages/@n8n/task-runner-python/src/logs.py index 62879adcbf..df537e2bb0 100644 --- a/packages/@n8n/task-runner-python/src/logs.py +++ b/packages/@n8n/task-runner-python/src/logs.py @@ -1,6 +1,10 @@ import logging import os -from .constants import LOG_FORMAT, LOG_TIMESTAMP_FORMAT, ENV_HIDE_TASK_OFFER_LOGS +from .constants import ( + LOG_FORMAT, + LOG_TIMESTAMP_FORMAT, + ENV_HIDE_TASK_OFFER_LOGS, +) COLORS = { "DEBUG": "\033[34m", # blue diff --git a/packages/@n8n/task-runner-python/src/message_serde.py b/packages/@n8n/task-runner-python/src/message_serde.py index 59ee45c7ad..01c0be8e89 100644 --- a/packages/@n8n/task-runner-python/src/message_serde.py +++ b/packages/@n8n/task-runner-python/src/message_serde.py @@ -9,6 +9,7 @@ from .constants import ( BROKER_TASK_CANCEL, BROKER_TASK_OFFER_ACCEPT, BROKER_TASK_SETTINGS, + BROKER_RPC_RESPONSE, ) from .message_types import ( BrokerMessage, @@ -18,6 +19,7 @@ from .message_types import ( BrokerTaskOfferAccept, BrokerTaskSettings, BrokerTaskCancel, + BrokerRpcResponse, ) @@ -75,12 +77,24 @@ def _parse_task_cancel(d: dict) -> BrokerTaskCancel: return BrokerTaskCancel(task_id=task_id, reason=reason) +def _parse_rpc_response(d: dict) -> BrokerRpcResponse: + try: + call_id = d["callId"] + task_id = d["taskId"] + status = d["status"] + except KeyError as e: + raise ValueError(f"Missing field in RPC response message: {e}") + + return BrokerRpcResponse(call_id, task_id, status) + + MESSAGE_TYPE_MAP = { BROKER_INFO_REQUEST: lambda _: BrokerInfoRequest(), BROKER_RUNNER_REGISTERED: lambda _: BrokerRunnerRegistered(), BROKER_TASK_OFFER_ACCEPT: _parse_task_offer_accept, BROKER_TASK_SETTINGS: _parse_task_settings, BROKER_TASK_CANCEL: _parse_task_cancel, + BROKER_RPC_RESPONSE: _parse_rpc_response, } diff --git a/packages/@n8n/task-runner-python/src/message_types/__init__.py b/packages/@n8n/task-runner-python/src/message_types/__init__.py index bddda2eabb..1fe0139010 100644 --- a/packages/@n8n/task-runner-python/src/message_types/__init__.py +++ b/packages/@n8n/task-runner-python/src/message_types/__init__.py @@ -5,6 +5,7 @@ from .broker import ( BrokerTaskOfferAccept, BrokerTaskSettings, BrokerTaskCancel, + BrokerRpcResponse, ) from .runner import ( RunnerMessage, @@ -14,6 +15,7 @@ from .runner import ( RunnerTaskRejected, RunnerTaskDone, RunnerTaskError, + RunnerRpcCall, ) __all__ = [ @@ -23,6 +25,7 @@ __all__ = [ "BrokerTaskOfferAccept", "BrokerTaskSettings", "BrokerTaskCancel", + "BrokerRpcResponse", "RunnerMessage", "RunnerInfo", "RunnerTaskOffer", @@ -30,4 +33,5 @@ __all__ = [ "RunnerTaskRejected", "RunnerTaskDone", "RunnerTaskError", + "RunnerRpcCall", ] diff --git a/packages/@n8n/task-runner-python/src/message_types/broker.py b/packages/@n8n/task-runner-python/src/message_types/broker.py index bbf1c64f8f..0d303edcae 100644 --- a/packages/@n8n/task-runner-python/src/message_types/broker.py +++ b/packages/@n8n/task-runner-python/src/message_types/broker.py @@ -7,6 +7,7 @@ from ..constants import ( BROKER_TASK_CANCEL, BROKER_TASK_OFFER_ACCEPT, BROKER_TASK_SETTINGS, + BROKER_RPC_RESPONSE, ) @@ -54,10 +55,19 @@ class BrokerTaskCancel: type: Literal["broker:taskcancel"] = BROKER_TASK_CANCEL +@dataclass +class BrokerRpcResponse: + call_id: str + task_id: str + status: str + type: Literal["broker:rpcresponse"] = BROKER_RPC_RESPONSE + + BrokerMessage = Union[ BrokerInfoRequest, BrokerRunnerRegistered, BrokerTaskOfferAccept, BrokerTaskSettings, BrokerTaskCancel, + BrokerRpcResponse, ] diff --git a/packages/@n8n/task-runner-python/src/message_types/runner.py b/packages/@n8n/task-runner-python/src/message_types/runner.py index 8f54994fa5..dec0001123 100644 --- a/packages/@n8n/task-runner-python/src/message_types/runner.py +++ b/packages/@n8n/task-runner-python/src/message_types/runner.py @@ -1,5 +1,6 @@ from dataclasses import dataclass from typing import List, Literal, Union, Any, Dict +from ..constants import RUNNER_RPC_CALL from ..constants import ( RUNNER_INFO, @@ -53,6 +54,15 @@ class RunnerTaskError: type: Literal["runner:taskerror"] = RUNNER_TASK_ERROR +@dataclass +class RunnerRpcCall: + call_id: str + task_id: str + name: str + params: List[Any] + type: Literal["runner:rpc"] = RUNNER_RPC_CALL + + RunnerMessage = Union[ RunnerInfo, RunnerTaskOffer, @@ -60,4 +70,5 @@ RunnerMessage = Union[ RunnerTaskRejected, RunnerTaskDone, RunnerTaskError, + RunnerRpcCall, ] diff --git a/packages/@n8n/task-runner-python/src/task_executor.py b/packages/@n8n/task-runner-python/src/task_executor.py index abe440307f..6bbbb06791 100644 --- a/packages/@n8n/task-runner-python/src/task_executor.py +++ b/packages/@n8n/task-runner-python/src/task_executor.py @@ -2,6 +2,7 @@ from queue import Empty import multiprocessing import traceback import textwrap +import json from .errors import ( TaskResultMissingError, @@ -11,12 +12,15 @@ from .errors import ( ) from .message_types.broker import NodeMode, Items -from .constants import EXECUTOR_USER_OUTPUT_KEY +from .constants import EXECUTOR_CIRCULAR_REFERENCE_KEY, EXECUTOR_USER_OUTPUT_KEY +from typing import Any from multiprocessing.context import SpawnProcess MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn") +PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task + class TaskExecutor: """Responsible for executing Python code tasks in isolated subprocesses.""" @@ -42,9 +46,11 @@ class TaskExecutor: queue: multiprocessing.Queue, task_timeout: int, continue_on_fail: bool, - ): + ) -> tuple[list, PrintArgs]: """Execute a subprocess for a Python code task.""" + print_args: PrintArgs = [] + try: process.start() process.join(timeout=task_timeout) @@ -65,11 +71,14 @@ class TaskExecutor: if "error" in returned: raise TaskRuntimeError(returned["error"]) - return returned["result"] or [] + result = returned.get("result", []) + print_args = returned.get("print_args", []) + + return result, print_args except Exception as e: if continue_on_fail: - return [{"json": {"error": str(e)}}] + return [{"json": {"error": str(e)}}], print_args raise @staticmethod @@ -89,27 +98,46 @@ class TaskExecutor: def _all_items(raw_code: str, items: Items, queue: multiprocessing.Queue): """Execute a Python code task in all-items mode.""" + print_args: PrintArgs = [] + try: code = TaskExecutor._wrap_code(raw_code) - globals = {"__builtins__": __builtins__, "_items": items} + + globals = { + "__builtins__": __builtins__, + "_items": items, + "print": TaskExecutor._create_custom_print(print_args), + } + exec(code, globals) - queue.put({"result": globals[EXECUTOR_USER_OUTPUT_KEY]}) + + queue.put( + {"result": globals[EXECUTOR_USER_OUTPUT_KEY], "print_args": print_args} + ) except Exception as e: - TaskExecutor._put_error(queue, e) + TaskExecutor._put_error(queue, e, print_args) @staticmethod def _per_item(raw_code: str, items: Items, queue: multiprocessing.Queue): """Execute a Python code task in per-item mode.""" + print_args: PrintArgs = [] + try: wrapped_code = TaskExecutor._wrap_code(raw_code) compiled_code = compile(wrapped_code, "", "exec") result = [] for index, item in enumerate(items): - globals = {"__builtins__": __builtins__, "_item": item} + globals = { + "__builtins__": __builtins__, + "_item": item, + "print": TaskExecutor._create_custom_print(print_args), + } + exec(compiled_code, globals) + user_output = globals[EXECUTOR_USER_OUTPUT_KEY] if user_output is None: @@ -118,10 +146,10 @@ class TaskExecutor: user_output["pairedItem"] = {"item": index} result.append(user_output) - queue.put({"result": result}) + queue.put({"result": result, "print_args": print_args}) except Exception as e: - TaskExecutor._put_error(queue, e) + TaskExecutor._put_error(queue, e, print_args) @staticmethod def _wrap_code(raw_code: str) -> str: @@ -129,5 +157,63 @@ class TaskExecutor: return f"def _user_function():\n{indented_code}\n\n{EXECUTOR_USER_OUTPUT_KEY} = _user_function()" @staticmethod - def _put_error(queue: multiprocessing.Queue, e: Exception): - queue.put({"error": {"message": str(e), "stack": traceback.format_exc()}}) + def _put_error(queue: multiprocessing.Queue, e: Exception, print_args: PrintArgs): + queue.put( + { + "error": {"message": str(e), "stack": traceback.format_exc()}, + "print_args": print_args, + } + ) + + # ========== print() ========== + + @staticmethod + def _create_custom_print(print_args: PrintArgs): + def custom_print(*args): + serializable_args = [] + + for arg in args: + try: + json.dumps(arg, default=str, ensure_ascii=False) + serializable_args.append(arg) + except Exception as _: + # Ensure args are serializable so they are transmissible + # through the multiprocessing queue and via websockets. + serializable_args.append( + { + EXECUTOR_CIRCULAR_REFERENCE_KEY: repr(arg), + "__type__": type(arg).__name__, + } + ) + + formatted = TaskExecutor._format_print_args(*serializable_args) + print_args.append(formatted) + print("[user code]", *args) + + return custom_print + + @staticmethod + def _format_print_args(*args) -> list[str]: + """ + Takes the arguments passed to a `print()` call in user code and converts them + to string representations suitable for display in a browser console. + + Expects all args to be serializable. + """ + + formatted = [] + + for arg in args: + if isinstance(arg, str): + formatted.append(f"'{arg}'") + + elif arg is None or isinstance(arg, (int, float, bool)): + formatted.append(str(arg)) + + elif isinstance(arg, dict) and EXECUTOR_CIRCULAR_REFERENCE_KEY in arg: + formatted.append(f"[Circular {arg.get('__type__', 'Object')}]") + + else: + formatted.append(json.dumps(arg, default=str, ensure_ascii=False)) + + return formatted diff --git a/packages/@n8n/task-runner-python/src/task_runner.py b/packages/@n8n/task-runner-python/src/task_runner.py index f41bcaec4e..d73b9ddd5a 100644 --- a/packages/@n8n/task-runner-python/src/task_runner.py +++ b/packages/@n8n/task-runner-python/src/task_runner.py @@ -7,7 +7,6 @@ from urllib.parse import urlparse import websockets import random - from .errors import WebsocketConnectionError, TaskMissingError from .message_types.broker import TaskSettings from .nanoid import nanoid @@ -22,6 +21,7 @@ from .constants import ( OFFER_VALIDITY_MAX_JITTER, OFFER_VALIDITY_LATENCY_BUFFER, TASK_BROKER_WS_PATH, + RPC_BROWSER_CONSOLE_LOG_METHOD, ) from .message_types import ( BrokerMessage, @@ -31,12 +31,14 @@ from .message_types import ( BrokerTaskOfferAccept, BrokerTaskSettings, BrokerTaskCancel, + BrokerRpcResponse, RunnerInfo, RunnerTaskOffer, RunnerTaskAccepted, RunnerTaskRejected, RunnerTaskDone, RunnerTaskError, + RunnerRpcCall, ) from .message_serde import MessageSerde from .task_state import TaskState, TaskStatus @@ -140,6 +142,8 @@ class TaskRunner: await self._handle_task_settings(message) case BrokerTaskCancel(): await self._handle_task_cancel(message) + case BrokerRpcResponse(): + pass # currently only logging, already handled by browser case _: self.logger.warning(f"Unhandled message type: {type(message)}") @@ -208,10 +212,15 @@ class TaskRunner: task_state.process = process - result = self.executor.execute_process( + result, print_args = self.executor.execute_process( process, queue, self.opts.task_timeout, task_settings.continue_on_fail ) + for print_args_per_call in print_args: + await self._send_rpc_message( + task_id, RPC_BROWSER_CONSOLE_LOG_METHOD, print_args_per_call + ) + response = RunnerTaskDone(task_id=task_id, data={"result": result}) await self._send_message(response) self.logger.info(f"Completed task {task_id}") @@ -241,6 +250,13 @@ class TaskRunner: task_state.status = TaskStatus.ABORTING self.executor.stop_process(task_state.process) + async def _send_rpc_message(self, task_id: str, method_name: str, params: list): + message = RunnerRpcCall( + call_id=nanoid(), task_id=task_id, name=method_name, params=params + ) + + await self._send_message(message) + async def _send_message(self, message: RunnerMessage) -> None: if self.websocket_connection is None: raise WebsocketConnectionError(self.task_broker_uri) diff --git a/packages/frontend/editor-ui/src/components/CodeNodeEditor/completer.ts b/packages/frontend/editor-ui/src/components/CodeNodeEditor/completer.ts index cf45ad972c..77bf74033c 100644 --- a/packages/frontend/editor-ui/src/components/CodeNodeEditor/completer.ts +++ b/packages/frontend/editor-ui/src/components/CodeNodeEditor/completer.ts @@ -30,9 +30,13 @@ export const useCompleter = ( const word = context.matchBefore(/\w*/); if (!word) return null; - const label = toValue(mode) === 'runOnceForEachItem' ? '_item' : '_items'; + const options = []; - return { from: word.from, options: [{ label, type: 'variable' }] }; + const label = toValue(mode) === 'runOnceForEachItem' ? '_item' : '_items'; + options.push({ label, type: 'variable' }); + options.push({ label: 'print', type: 'function' }); + + return { from: word.from, options }; }; return autocompletion({ icons: false, override: [completions] });