diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml index 07527c42fc..2422c024f9 100644 --- a/.github/workflows/ci-python.yml +++ b/.github/workflows/ci-python.yml @@ -29,7 +29,7 @@ jobs: run: uv python install 3.13 - name: Install project dependencies - run: uv sync + run: uv sync --all-extras - name: Format check run: uv run ruff format --check diff --git a/packages/@n8n/task-runner-python/README.md b/packages/@n8n/task-runner-python/README.md index 28ff4452ba..8a172ee063 100644 --- a/packages/@n8n/task-runner-python/README.md +++ b/packages/@n8n/task-runner-python/README.md @@ -12,8 +12,9 @@ Install: Set up dependencies: -``` -just sync +```sh +just sync # or +just sync-all ``` See `justfile` for available commands. diff --git a/packages/@n8n/task-runner-python/justfile b/packages/@n8n/task-runner-python/justfile index f5df259f23..1e6fa562a7 100644 --- a/packages/@n8n/task-runner-python/justfile +++ b/packages/@n8n/task-runner-python/justfile @@ -9,6 +9,9 @@ run: sync: uv sync +sync-all: + uv sync --all-extras + lint: uv run ruff check diff --git a/packages/@n8n/task-runner-python/pyproject.toml b/packages/@n8n/task-runner-python/pyproject.toml index 8db52b5cff..500afb614f 100644 --- a/packages/@n8n/task-runner-python/pyproject.toml +++ b/packages/@n8n/task-runner-python/pyproject.toml @@ -8,6 +8,9 @@ dependencies = [ "websockets>=15.0.1", ] +[project.optional-dependencies] +sentry = ["sentry-sdk>=2.35.2"] + [dependency-groups] dev = [ "ruff>=0.12.8", diff --git a/packages/@n8n/task-runner-python/src/config/health_check_config.py b/packages/@n8n/task-runner-python/src/config/health_check_config.py new file mode 100644 index 0000000000..adc03a3058 --- /dev/null +++ b/packages/@n8n/task-runner-python/src/config/health_check_config.py @@ -0,0 +1,35 @@ +import os +from dataclasses import dataclass + +from src.constants import ( + DEFAULT_HEALTH_CHECK_SERVER_HOST, + DEFAULT_HEALTH_CHECK_SERVER_PORT, + ENV_HEALTH_CHECK_SERVER_ENABLED, + ENV_HEALTH_CHECK_SERVER_HOST, + ENV_HEALTH_CHECK_SERVER_PORT, +) + + +@dataclass +class HealthCheckConfig: + enabled: bool + host: str + port: int + + @classmethod + def from_env(cls): + port_str = os.getenv( + ENV_HEALTH_CHECK_SERVER_PORT, str(DEFAULT_HEALTH_CHECK_SERVER_PORT) + ) + port = int(port_str) + if port < 1 or port > 65535: + raise ValueError(f"Port must be between 1 and 65535, got {port}") + + return cls( + enabled=os.getenv(ENV_HEALTH_CHECK_SERVER_ENABLED, "false").lower() + == "true", + host=os.getenv( + ENV_HEALTH_CHECK_SERVER_HOST, DEFAULT_HEALTH_CHECK_SERVER_HOST + ), + port=port, + ) diff --git a/packages/@n8n/task-runner-python/src/config/sentry_config.py b/packages/@n8n/task-runner-python/src/config/sentry_config.py new file mode 100644 index 0000000000..33e254a035 --- /dev/null +++ b/packages/@n8n/task-runner-python/src/config/sentry_config.py @@ -0,0 +1,30 @@ +import os +from dataclasses import dataclass + +from src.constants import ( + ENV_DEPLOYMENT_NAME, + ENV_ENVIRONMENT, + ENV_N8N_VERSION, + ENV_SENTRY_DSN, +) + + +@dataclass +class SentryConfig: + dsn: str + n8n_version: str + environment: str + deployment_name: str + + @property + def enabled(self) -> bool: + return bool(self.dsn) + + @classmethod + def from_env(cls): + return cls( + dsn=os.getenv(ENV_SENTRY_DSN, ""), + n8n_version=os.getenv(ENV_N8N_VERSION, ""), + environment=os.getenv(ENV_ENVIRONMENT, ""), + deployment_name=os.getenv(ENV_DEPLOYMENT_NAME, ""), + ) diff --git a/packages/@n8n/task-runner-python/src/config/task_runner_config.py b/packages/@n8n/task-runner-python/src/config/task_runner_config.py new file mode 100644 index 0000000000..9d24bf6158 --- /dev/null +++ b/packages/@n8n/task-runner-python/src/config/task_runner_config.py @@ -0,0 +1,84 @@ +import os +from dataclasses import dataclass +from typing import Set + +from src.constants import ( + BUILTINS_DENY_DEFAULT, + DEFAULT_MAX_CONCURRENCY, + DEFAULT_MAX_PAYLOAD_SIZE, + DEFAULT_TASK_BROKER_URI, + DEFAULT_TASK_TIMEOUT, + ENV_BUILTINS_DENY, + ENV_EXTERNAL_ALLOW, + ENV_GRANT_TOKEN, + ENV_MAX_CONCURRENCY, + ENV_MAX_PAYLOAD_SIZE, + ENV_STDLIB_ALLOW, + ENV_TASK_BROKER_URI, + ENV_TASK_TIMEOUT, +) + + +def parse_allowlist(allowlist_str: str, list_name: str) -> Set[str]: + if not allowlist_str: + return set() + + modules = { + module + for raw_module in allowlist_str.split(",") + if (module := raw_module.strip()) + } + + if "*" in modules and len(modules) > 1: + raise ValueError( + f"Wildcard '*' in {list_name} must be used alone, not with other modules. " + f"Got: {', '.join(sorted(modules))}" + ) + + return modules + + +@dataclass +class TaskRunnerConfig: + grant_token: str + task_broker_uri: str + max_concurrency: int + max_payload_size: int + task_timeout: int + stdlib_allow: Set[str] + external_allow: Set[str] + builtins_deny: Set[str] + + @classmethod + def from_env(cls): + grant_token = os.getenv(ENV_GRANT_TOKEN, "") + if not grant_token: + raise ValueError("Environment variable N8N_RUNNERS_GRANT_TOKEN is required") + + task_timeout = int(os.getenv(ENV_TASK_TIMEOUT, str(DEFAULT_TASK_TIMEOUT))) + if task_timeout <= 0: + raise ValueError(f"Task timeout must be positive, got {task_timeout}") + + return cls( + grant_token=grant_token, + task_broker_uri=os.getenv(ENV_TASK_BROKER_URI, DEFAULT_TASK_BROKER_URI), + max_concurrency=int( + os.getenv(ENV_MAX_CONCURRENCY, str(DEFAULT_MAX_CONCURRENCY)) + ), + max_payload_size=int( + os.getenv(ENV_MAX_PAYLOAD_SIZE, str(DEFAULT_MAX_PAYLOAD_SIZE)) + ), + task_timeout=task_timeout, + stdlib_allow=parse_allowlist( + os.getenv(ENV_STDLIB_ALLOW, ""), ENV_STDLIB_ALLOW + ), + external_allow=parse_allowlist( + os.getenv(ENV_EXTERNAL_ALLOW, ""), ENV_EXTERNAL_ALLOW + ), + builtins_deny=set( + module.strip() + for module in os.getenv(ENV_BUILTINS_DENY, BUILTINS_DENY_DEFAULT).split( + "," + ) + ), + ) diff --git a/packages/@n8n/task-runner-python/src/constants.py b/packages/@n8n/task-runner-python/src/constants.py index 95433921fb..6a2b991429 100644 --- a/packages/@n8n/task-runner-python/src/constants.py +++ b/packages/@n8n/task-runner-python/src/constants.py @@ -28,6 +28,9 @@ MAX_VALIDATION_CACHE_SIZE = 500 # cached validation results # Executor EXECUTOR_USER_OUTPUT_KEY = "__n8n_internal_user_output__" EXECUTOR_CIRCULAR_REFERENCE_KEY = "__n8n_internal_circular_ref__" +EXECUTOR_ALL_ITEMS_FILENAME = "" +EXECUTOR_PER_ITEM_FILENAME = "" +EXECUTOR_FILENAMES = {EXECUTOR_ALL_ITEMS_FILENAME, EXECUTOR_PER_ITEM_FILENAME} # Broker DEFAULT_TASK_BROKER_URI = "http://127.0.0.1:5679" @@ -49,6 +52,14 @@ ENV_BUILTINS_DENY = "N8N_RUNNERS_BUILTINS_DENY" ENV_HEALTH_CHECK_SERVER_ENABLED = "N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED" ENV_HEALTH_CHECK_SERVER_HOST = "N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST" ENV_HEALTH_CHECK_SERVER_PORT = "N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT" +ENV_SENTRY_DSN = "N8N_SENTRY_DSN" +ENV_N8N_VERSION = "N8N_VERSION" +ENV_ENVIRONMENT = "ENVIRONMENT" +ENV_DEPLOYMENT_NAME = "DEPLOYMENT_NAME" + +# Sentry +SENTRY_TAG_SERVER_TYPE = "server_type" +SENTRY_TAG_SERVER_TYPE_VALUE = "task_runner_python" # Logging LOG_FORMAT = "%(asctime)s.%(msecs)03d\t%(levelname)s\t%(message)s" @@ -59,6 +70,7 @@ LOG_TASK_CANCEL_UNKNOWN = ( "Received cancel for unknown task: {task_id}. Discarding message." ) LOG_TASK_CANCEL_WAITING = "Cancelled task {task_id} (waiting for settings)" +LOG_SENTRY_MISSING = "Sentry is enabled but sentry-sdk is not installed. Install with: uv sync --all-extras" # RPC RPC_BROWSER_CONSOLE_LOG_METHOD = "logNodeOutput" diff --git a/packages/@n8n/task-runner-python/src/env.py b/packages/@n8n/task-runner-python/src/env.py deleted file mode 100644 index 3b4780fa5e..0000000000 --- a/packages/@n8n/task-runner-python/src/env.py +++ /dev/null @@ -1,100 +0,0 @@ -import os -from dataclasses import dataclass -from typing import Set, Tuple - -from src.constants import ( - DEFAULT_MAX_CONCURRENCY, - DEFAULT_TASK_TIMEOUT, - DEFAULT_TASK_BROKER_URI, - DEFAULT_MAX_PAYLOAD_SIZE, - DEFAULT_HEALTH_CHECK_SERVER_HOST, - DEFAULT_HEALTH_CHECK_SERVER_PORT, - BUILTINS_DENY_DEFAULT, - ENV_MAX_CONCURRENCY, - ENV_MAX_PAYLOAD_SIZE, - ENV_TASK_BROKER_URI, - ENV_GRANT_TOKEN, - ENV_TASK_TIMEOUT, - ENV_BUILTINS_DENY, - ENV_STDLIB_ALLOW, - ENV_EXTERNAL_ALLOW, - ENV_HEALTH_CHECK_SERVER_ENABLED, - ENV_HEALTH_CHECK_SERVER_HOST, - ENV_HEALTH_CHECK_SERVER_PORT, -) -from src.task_runner import TaskRunnerOpts - - -@dataclass -class HealthCheckOpts: - enabled: bool - host: str - port: int - - -def parse_allowlist(allowlist_str: str, list_name: str) -> Set[str]: - if not allowlist_str: - return set() - - modules = { - module - for raw_module in allowlist_str.split(",") - if (module := raw_module.strip()) - } - - if "*" in modules and len(modules) > 1: - raise ValueError( - f"Wildcard '*' in {list_name} must be used alone, not with other modules. " - f"Got: {', '.join(sorted(modules))}" - ) - - return modules - - -def parse_denylist(denylist_str: str) -> Set[str]: - if not denylist_str: - return set() - - return {name for raw_name in denylist_str.split(",") if (name := raw_name.strip())} - - -def parse_env_vars() -> Tuple[TaskRunnerOpts, HealthCheckOpts]: - grant_token = os.getenv(ENV_GRANT_TOKEN, "") - - if not grant_token: - raise ValueError(f"{ENV_GRANT_TOKEN} environment variable is required") - - builtins_deny_str = os.getenv(ENV_BUILTINS_DENY, BUILTINS_DENY_DEFAULT) - builtins_deny = parse_denylist(builtins_deny_str) - - stdlib_allow_str = os.getenv(ENV_STDLIB_ALLOW, "") - stdlib_allow = parse_allowlist(stdlib_allow_str, "stdlib allowlist") - - external_allow_str = os.getenv(ENV_EXTERNAL_ALLOW, "") - external_allow = parse_allowlist(external_allow_str, "external allowlist") - - task_runner_opts = TaskRunnerOpts( - grant_token=grant_token, - task_broker_uri=os.getenv(ENV_TASK_BROKER_URI, DEFAULT_TASK_BROKER_URI), - max_concurrency=int( - os.getenv(ENV_MAX_CONCURRENCY) or str(DEFAULT_MAX_CONCURRENCY) - ), - max_payload_size=int( - os.getenv(ENV_MAX_PAYLOAD_SIZE) or str(DEFAULT_MAX_PAYLOAD_SIZE) - ), - task_timeout=int(os.getenv(ENV_TASK_TIMEOUT) or str(DEFAULT_TASK_TIMEOUT)), - stdlib_allow=stdlib_allow, - external_allow=external_allow, - builtins_deny=builtins_deny, - ) - - health_check_opts = HealthCheckOpts( - enabled=os.getenv(ENV_HEALTH_CHECK_SERVER_ENABLED, "") == "true", - host=os.getenv(ENV_HEALTH_CHECK_SERVER_HOST, DEFAULT_HEALTH_CHECK_SERVER_HOST), - port=int( - os.getenv(ENV_HEALTH_CHECK_SERVER_PORT) - or str(DEFAULT_HEALTH_CHECK_SERVER_PORT) - ), - ) - - return task_runner_opts, health_check_opts diff --git a/packages/@n8n/task-runner-python/src/health.py b/packages/@n8n/task-runner-python/src/health_check_server.py similarity index 68% rename from packages/@n8n/task-runner-python/src/health.py rename to packages/@n8n/task-runner-python/src/health_check_server.py index 7df98518aa..fca80f61da 100644 --- a/packages/@n8n/task-runner-python/src/health.py +++ b/packages/@n8n/task-runner-python/src/health_check_server.py @@ -3,6 +3,8 @@ import errno import logging from typing import Optional +from src.config.health_check_config import HealthCheckConfig + HEALTH_CHECK_RESPONSE = ( b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 2\r\n\r\nOK" ) @@ -13,13 +15,17 @@ class HealthCheckServer: self.server: Optional[asyncio.Server] = None self.logger = logging.getLogger(__name__) - async def start(self, host: str, port: int) -> None: + async def start(self, config: HealthCheckConfig) -> None: try: - self.server = await asyncio.start_server(self._handle_request, host, port) - self.logger.info(f"Health check server listening on {host}, port {port}") + self.server = await asyncio.start_server( + self._handle_request, config.host, config.port + ) + self.logger.info( + f"Health check server listening on {config.host}, port {config.port}" + ) except OSError as e: if e.errno == errno.EADDRINUSE: - raise OSError(f"Port {port} is already in use") from e + raise OSError(f"Port {config.port} is already in use") from e else: raise diff --git a/packages/@n8n/task-runner-python/src/main.py b/packages/@n8n/task-runner-python/src/main.py index b640c8654b..88efa4c155 100644 --- a/packages/@n8n/task-runner-python/src/main.py +++ b/packages/@n8n/task-runner-python/src/main.py @@ -3,7 +3,9 @@ import logging import sys from typing import Optional -from src.env import parse_env_vars +from src.config.health_check_config import HealthCheckConfig +from src.config.sentry_config import SentryConfig +from src.config.task_runner_config import TaskRunnerConfig from src.logs import setup_logging from src.task_runner import TaskRunner @@ -12,39 +14,56 @@ async def main(): setup_logging() logger = logging.getLogger(__name__) - logger.info("Starting runner...") + sentry = None + sentry_config = SentryConfig.from_env() + + if sentry_config.enabled: + from src.sentry import setup_sentry + + sentry = setup_sentry(sentry_config) try: - task_runner_opts, health_check_opts = parse_env_vars() + health_check_config = HealthCheckConfig.from_env() except ValueError as e: - logger.error(str(e)) + logger.error(f"Invalid health check configuration: {e}") sys.exit(1) - task_runner = TaskRunner(task_runner_opts) health_check_server: Optional["HealthCheckServer"] = None - - if health_check_opts.enabled: - from src.health import HealthCheckServer + if health_check_config.enabled: + from src.health_check_server import HealthCheckServer health_check_server = HealthCheckServer() try: - await health_check_server.start( - health_check_opts.host, health_check_opts.port - ) + await health_check_server.start(health_check_config) except OSError as e: logger.error(f"Failed to start health check server: {e}") sys.exit(1) + try: + task_runner_config = TaskRunnerConfig.from_env() + except ValueError as e: + logger.error(str(e)) + sys.exit(1) + + task_runner = TaskRunner(task_runner_config) + logger.info("Starting runner...") + try: await task_runner.start() except (KeyboardInterrupt, asyncio.CancelledError): logger.info("Shutting down runner...") + except Exception: + logger.error("Unexpected error", exc_info=True) + raise finally: await task_runner.stop() if health_check_server: await health_check_server.stop() + if sentry: + sentry.shutdown() + if __name__ == "__main__": asyncio.run(main()) diff --git a/packages/@n8n/task-runner-python/src/sentry.py b/packages/@n8n/task-runner-python/src/sentry.py new file mode 100644 index 0000000000..7b6fe17362 --- /dev/null +++ b/packages/@n8n/task-runner-python/src/sentry.py @@ -0,0 +1,78 @@ +import logging +from typing import Any, Optional + +from src.errors.task_runtime_error import TaskRuntimeError +from src.config.sentry_config import SentryConfig +from src.constants import ( + EXECUTOR_FILENAMES, + LOG_SENTRY_MISSING, + SENTRY_TAG_SERVER_TYPE, + SENTRY_TAG_SERVER_TYPE_VALUE, +) + + +class TaskRunnerSentry: + def __init__(self, config: SentryConfig): + self.config = config + self.logger = logging.getLogger(__name__) + + def init(self) -> None: + import sentry_sdk + from sentry_sdk.integrations.logging import LoggingIntegration + + sentry_sdk.init( + dsn=self.config.dsn, + release=f"n8n@{self.config.n8n_version}", + environment=self.config.environment, + server_name=self.config.deployment_name, + before_send=self._filter_out_user_code_errors, + attach_stacktrace=True, + send_default_pii=False, + auto_enabling_integrations=False, + default_integrations=True, + integrations=[LoggingIntegration(level=logging.ERROR)], + ) + sentry_sdk.set_tag(SENTRY_TAG_SERVER_TYPE, SENTRY_TAG_SERVER_TYPE_VALUE) + self.logger.info("Sentry ready") + + def shutdown(self) -> None: + import sentry_sdk + + sentry_sdk.flush(timeout=2.0) + self.logger.info("Sentry stopped") + + def _filter_out_user_code_errors(self, event: Any, hint: Any) -> Optional[Any]: + if "exc_info" in hint: + exc_type, _, _ = hint["exc_info"] + if exc_type is TaskRuntimeError: + return None + + for exception in event.get("exception", {}).get("values", []): + if self._is_from_user_code(exception): + return None + + return event + + def _is_from_user_code(self, exception: dict[str, Any]): + for frame in exception.get("stacktrace", {}).get("frames", []): + if frame.get("filename", "") in EXECUTOR_FILENAMES: + return True + return False + + +def setup_sentry(sentry_config: SentryConfig) -> Optional[TaskRunnerSentry]: + if not sentry_config.enabled: + return None + + try: + sentry = TaskRunnerSentry(sentry_config) + sentry.init() + return sentry + except ImportError: + logger = logging.getLogger(__name__) + logger.warning(LOG_SENTRY_MISSING) + return None + except Exception as e: + logger = logging.getLogger(__name__) + logger.warning(f"Failed to initialize Sentry: {e}") + return None diff --git a/packages/@n8n/task-runner-python/src/task_executor.py b/packages/@n8n/task-runner-python/src/task_executor.py index 2291ace814..a07afc1fc0 100644 --- a/packages/@n8n/task-runner-python/src/task_executor.py +++ b/packages/@n8n/task-runner-python/src/task_executor.py @@ -14,7 +14,12 @@ from src.errors import ( ) from src.message_types.broker import NodeMode, Items -from src.constants import EXECUTOR_CIRCULAR_REFERENCE_KEY, EXECUTOR_USER_OUTPUT_KEY +from src.constants import ( + EXECUTOR_CIRCULAR_REFERENCE_KEY, + EXECUTOR_USER_OUTPUT_KEY, + EXECUTOR_ALL_ITEMS_FILENAME, + EXECUTOR_PER_ITEM_FILENAME, +) from typing import Any, Set from multiprocessing.context import SpawnProcess @@ -134,7 +139,8 @@ class TaskExecutor: print_args: PrintArgs = [] try: - code = TaskExecutor._wrap_code(raw_code) + wrapped_code = TaskExecutor._wrap_code(raw_code) + compiled_code = compile(wrapped_code, EXECUTOR_ALL_ITEMS_FILENAME, "exec") globals = { "__builtins__": TaskExecutor._filter_builtins(builtins_deny), @@ -144,7 +150,7 @@ class TaskExecutor: else print, } - exec(code, globals) + exec(compiled_code, globals) queue.put( {"result": globals[EXECUTOR_USER_OUTPUT_KEY], "print_args": print_args} @@ -173,7 +179,7 @@ class TaskExecutor: try: wrapped_code = TaskExecutor._wrap_code(raw_code) - compiled_code = compile(wrapped_code, "", "exec") + compiled_code = compile(wrapped_code, EXECUTOR_PER_ITEM_FILENAME, "exec") result = [] for index, item in enumerate(items): diff --git a/packages/@n8n/task-runner-python/src/task_runner.py b/packages/@n8n/task-runner-python/src/task_runner.py index 84c86e9689..66333f6b3e 100644 --- a/packages/@n8n/task-runner-python/src/task_runner.py +++ b/packages/@n8n/task-runner-python/src/task_runner.py @@ -1,13 +1,13 @@ import asyncio -from dataclasses import dataclass import logging import time -from typing import Dict, Optional, Any, Set +from typing import Dict, Optional, Any from urllib.parse import urlparse import websockets import random +from src.config.task_runner_config import TaskRunnerConfig from src.errors import ( WebsocketConnectionError, TaskMissingError, @@ -64,28 +64,14 @@ class TaskOffer: return time.time() > self.valid_until -@dataclass -class TaskRunnerOpts: - grant_token: str - task_broker_uri: str - max_concurrency: int - max_payload_size: int - task_timeout: int - stdlib_allow: Set[str] - external_allow: Set[str] - builtins_deny: Set[str] - - class TaskRunner: def __init__( self, - opts: TaskRunnerOpts, + config: TaskRunnerConfig, ): self.runner_id = nanoid() self.name = RUNNER_NAME - - self.grant_token = opts.grant_token - self.opts = opts + self.config = config self.websocket_connection: Optional[Any] = None self.can_send_offers = False @@ -96,23 +82,23 @@ class TaskRunner: self.offers_coroutine: Optional[asyncio.Task] = None self.serde = MessageSerde() self.executor = TaskExecutor() - self.analyzer = TaskAnalyzer(opts.stdlib_allow, opts.external_allow) + self.analyzer = TaskAnalyzer(config.stdlib_allow, config.external_allow) self.logger = logging.getLogger(__name__) - self.task_broker_uri = opts.task_broker_uri - websocket_host = urlparse(opts.task_broker_uri).netloc + self.task_broker_uri = config.task_broker_uri + websocket_host = urlparse(config.task_broker_uri).netloc self.websocket_url = ( f"ws://{websocket_host}{TASK_BROKER_WS_PATH}?id={self.runner_id}" ) async def start(self) -> None: - headers = {"Authorization": f"Bearer {self.grant_token}"} + headers = {"Authorization": f"Bearer {self.config.grant_token}"} try: self.websocket_connection = await websockets.connect( self.websocket_url, additional_headers=headers, - max_size=self.opts.max_payload_size, + max_size=self.config.max_payload_size, ) self.logger.info("Connected to broker") @@ -182,7 +168,7 @@ class TaskRunner: await self._send_message(response) return - if len(self.running_tasks) >= self.opts.max_concurrency: + if len(self.running_tasks) >= self.config.max_concurrency: response = RunnerTaskRejected( task_id=message.task_id, reason=TASK_REJECTED_REASON_AT_CAPACITY, @@ -234,9 +220,9 @@ class TaskRunner: code=task_settings.code, node_mode=task_settings.node_mode, items=task_settings.items, - stdlib_allow=self.opts.stdlib_allow, - external_allow=self.opts.external_allow, - builtins_deny=self.opts.builtins_deny, + stdlib_allow=self.config.stdlib_allow, + external_allow=self.config.external_allow, + builtins_deny=self.config.builtins_deny, can_log=task_settings.can_log, ) @@ -245,7 +231,7 @@ class TaskRunner: result, print_args = self.executor.execute_process( process=process, queue=queue, - task_timeout=self.opts.task_timeout, + task_timeout=self.config.task_timeout, continue_on_fail=task_settings.continue_on_fail, ) @@ -266,6 +252,7 @@ class TaskRunner: ) except Exception as e: + self.logger.error(f"Task {task_id} failed", exc_info=True) response = RunnerTaskError(task_id=task_id, error={"message": str(e)}) await self._send_message(response) @@ -344,7 +331,7 @@ class TaskRunner: for offer_id in expired_offer_ids: self.open_offers.pop(offer_id, None) - offers_to_send = self.opts.max_concurrency - ( + offers_to_send = self.config.max_concurrency - ( len(self.open_offers) + len(self.running_tasks) ) diff --git a/packages/@n8n/task-runner-python/uv.lock b/packages/@n8n/task-runner-python/uv.lock index 65aee86e7c..ab322d1e22 100644 --- a/packages/@n8n/task-runner-python/uv.lock +++ b/packages/@n8n/task-runner-python/uv.lock @@ -2,6 +2,15 @@ version = 1 revision = 3 requires-python = ">=3.13" +[[package]] +name = "certifi" +version = "2025.8.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/dc/67/960ebe6bf230a96cda2e0abcf73af550ec4f090005363542f0765df162e0/certifi-2025.8.3.tar.gz", hash = "sha256:e564105f78ded564e3ae7c923924435e1daa7463faeab5bb932bc53ffae63407", size = 162386, upload-time = "2025-08-03T03:07:47.08Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/48/1549795ba7742c948d2ad169c1c8cdbae65bc450d6cd753d124b17c8cd32/certifi-2025.8.3-py3-none-any.whl", hash = "sha256:f6c12493cfb1b06ba2ff328595af9350c65d6644968e5d3a2ffd78699af217a5", size = 161216, upload-time = "2025-08-03T03:07:45.777Z" }, +] + [[package]] name = "ruff" version = "0.12.8" @@ -27,6 +36,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cb/5c/799a1efb8b5abab56e8a9f2a0b72d12bd64bb55815e9476c7d0a2887d2f7/ruff-0.12.8-py3-none-win_arm64.whl", hash = "sha256:c90e1a334683ce41b0e7a04f41790c429bf5073b62c1ae701c9dc5b3d14f0749", size = 11884718, upload-time = "2025-08-07T19:05:42.866Z" }, ] +[[package]] +name = "sentry-sdk" +version = "2.35.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/bd/79/0ecb942f3f1ad26c40c27f81ff82392d85c01d26a45e3c72c2b37807e680/sentry_sdk-2.35.2.tar.gz", hash = "sha256:e9e8f3c795044beb59f2c8f4c6b9b0f9779e5e604099882df05eec525e782cc6", size = 343377, upload-time = "2025-09-01T11:00:58.633Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/91/a43308dc82a0e32d80cd0dfdcfca401ecbd0f431ab45f24e48bb97b7800d/sentry_sdk-2.35.2-py2.py3-none-any.whl", hash = "sha256:38c98e3cbb620dd3dd80a8d6e39c753d453dd41f8a9df581b0584c19a52bc926", size = 363975, upload-time = "2025-09-01T11:00:56.574Z" }, +] + [[package]] name = "task-runner-python" version = "0.1.0" @@ -35,6 +57,11 @@ dependencies = [ { name = "websockets" }, ] +[package.optional-dependencies] +sentry = [ + { name = "sentry-sdk" }, +] + [package.dev-dependencies] dev = [ { name = "ruff" }, @@ -42,7 +69,11 @@ dev = [ ] [package.metadata] -requires-dist = [{ name = "websockets", specifier = ">=15.0.1" }] +requires-dist = [ + { name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.35.2" }, + { name = "websockets", specifier = ">=15.0.1" }, +] +provides-extras = ["sentry"] [package.metadata.requires-dev] dev = [ @@ -75,6 +106,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/98/c6/207bbc2f3bb71df4b1aeabe8e9c31a1cd22c72aff0ab9c1a832b9ae54f6e/ty-0.0.1a17-py3-none-win_arm64.whl", hash = "sha256:636eacc1dceaf09325415a70a03cd57eae53e5c7f281813aaa943a698a45cddb", size = 7782847, upload-time = "2025-08-06T12:13:54.243Z" }, ] +[[package]] +name = "urllib3" +version = "2.5.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/15/22/9ee70a2574a4f4599c47dd506532914ce044817c7752a79b6a51286319bc/urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760", size = 393185, upload-time = "2025-06-18T14:07:41.644Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a7/c2/fe1e52489ae3122415c51f387e221dd0773709bad6c6cdaa599e8a2c5185/urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc", size = 129795, upload-time = "2025-06-18T14:07:40.39Z" }, +] + [[package]] name = "websockets" version = "15.0.1"