diff --git a/packages/@n8n/task-runner-python/src/constants.py b/packages/@n8n/task-runner-python/src/constants.py index 196a2cc85d..95433921fb 100644 --- a/packages/@n8n/task-runner-python/src/constants.py +++ b/packages/@n8n/task-runner-python/src/constants.py @@ -33,6 +33,10 @@ EXECUTOR_CIRCULAR_REFERENCE_KEY = "__n8n_internal_circular_ref__" DEFAULT_TASK_BROKER_URI = "http://127.0.0.1:5679" TASK_BROKER_WS_PATH = "/runners/_ws" +# Health check +DEFAULT_HEALTH_CHECK_SERVER_HOST = "127.0.0.1" +DEFAULT_HEALTH_CHECK_SERVER_PORT = 5681 + # Env vars ENV_TASK_BROKER_URI = "N8N_RUNNERS_TASK_BROKER_URI" ENV_GRANT_TOKEN = "N8N_RUNNERS_GRANT_TOKEN" @@ -42,6 +46,9 @@ ENV_TASK_TIMEOUT = "N8N_RUNNERS_TASK_TIMEOUT" ENV_STDLIB_ALLOW = "N8N_RUNNERS_STDLIB_ALLOW" ENV_EXTERNAL_ALLOW = "N8N_RUNNERS_EXTERNAL_ALLOW" ENV_BUILTINS_DENY = "N8N_RUNNERS_BUILTINS_DENY" +ENV_HEALTH_CHECK_SERVER_ENABLED = "N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED" +ENV_HEALTH_CHECK_SERVER_HOST = "N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST" +ENV_HEALTH_CHECK_SERVER_PORT = "N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT" # Logging LOG_FORMAT = "%(asctime)s.%(msecs)03d\t%(levelname)s\t%(message)s" diff --git a/packages/@n8n/task-runner-python/src/env.py b/packages/@n8n/task-runner-python/src/env.py index db61daff12..3b4780fa5e 100644 --- a/packages/@n8n/task-runner-python/src/env.py +++ b/packages/@n8n/task-runner-python/src/env.py @@ -1,11 +1,14 @@ import os -from typing import Set +from dataclasses import dataclass +from typing import Set, Tuple from src.constants import ( DEFAULT_MAX_CONCURRENCY, DEFAULT_TASK_TIMEOUT, DEFAULT_TASK_BROKER_URI, DEFAULT_MAX_PAYLOAD_SIZE, + DEFAULT_HEALTH_CHECK_SERVER_HOST, + DEFAULT_HEALTH_CHECK_SERVER_PORT, BUILTINS_DENY_DEFAULT, ENV_MAX_CONCURRENCY, ENV_MAX_PAYLOAD_SIZE, @@ -15,10 +18,20 @@ from src.constants import ( ENV_BUILTINS_DENY, ENV_STDLIB_ALLOW, ENV_EXTERNAL_ALLOW, + ENV_HEALTH_CHECK_SERVER_ENABLED, + ENV_HEALTH_CHECK_SERVER_HOST, + ENV_HEALTH_CHECK_SERVER_PORT, ) from src.task_runner import TaskRunnerOpts +@dataclass +class HealthCheckOpts: + enabled: bool + host: str + port: int + + def parse_allowlist(allowlist_str: str, list_name: str) -> Set[str]: if not allowlist_str: return set() @@ -45,7 +58,7 @@ def parse_denylist(denylist_str: str) -> Set[str]: return {name for raw_name in denylist_str.split(",") if (name := raw_name.strip())} -def parse_env_vars() -> TaskRunnerOpts: +def parse_env_vars() -> Tuple[TaskRunnerOpts, HealthCheckOpts]: grant_token = os.getenv(ENV_GRANT_TOKEN, "") if not grant_token: @@ -60,7 +73,7 @@ def parse_env_vars() -> TaskRunnerOpts: external_allow_str = os.getenv(ENV_EXTERNAL_ALLOW, "") external_allow = parse_allowlist(external_allow_str, "external allowlist") - return TaskRunnerOpts( + task_runner_opts = TaskRunnerOpts( grant_token=grant_token, task_broker_uri=os.getenv(ENV_TASK_BROKER_URI, DEFAULT_TASK_BROKER_URI), max_concurrency=int( @@ -74,3 +87,14 @@ def parse_env_vars() -> TaskRunnerOpts: external_allow=external_allow, builtins_deny=builtins_deny, ) + + health_check_opts = HealthCheckOpts( + enabled=os.getenv(ENV_HEALTH_CHECK_SERVER_ENABLED, "") == "true", + host=os.getenv(ENV_HEALTH_CHECK_SERVER_HOST, DEFAULT_HEALTH_CHECK_SERVER_HOST), + port=int( + os.getenv(ENV_HEALTH_CHECK_SERVER_PORT) + or str(DEFAULT_HEALTH_CHECK_SERVER_PORT) + ), + ) + + return task_runner_opts, health_check_opts diff --git a/packages/@n8n/task-runner-python/src/health.py b/packages/@n8n/task-runner-python/src/health.py new file mode 100644 index 0000000000..7df98518aa --- /dev/null +++ b/packages/@n8n/task-runner-python/src/health.py @@ -0,0 +1,43 @@ +import asyncio +import errno +import logging +from typing import Optional + +HEALTH_CHECK_RESPONSE = ( + b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 2\r\n\r\nOK" +) + + +class HealthCheckServer: + def __init__(self): + self.server: Optional[asyncio.Server] = None + self.logger = logging.getLogger(__name__) + + async def start(self, host: str, port: int) -> None: + try: + self.server = await asyncio.start_server(self._handle_request, host, port) + self.logger.info(f"Health check server listening on {host}, port {port}") + except OSError as e: + if e.errno == errno.EADDRINUSE: + raise OSError(f"Port {port} is already in use") from e + else: + raise + + async def stop(self) -> None: + if self.server: + self.server.close() + await self.server.wait_closed() + self.server = None + self.logger.info("Health check server stopped") + + async def _handle_request( + self, _reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + try: + writer.write(HEALTH_CHECK_RESPONSE) + await writer.drain() + except Exception: + pass + finally: + writer.close() + await writer.wait_closed() diff --git a/packages/@n8n/task-runner-python/src/logs.py b/packages/@n8n/task-runner-python/src/logs.py index 0a9b1450a3..70a20cd4bd 100644 --- a/packages/@n8n/task-runner-python/src/logs.py +++ b/packages/@n8n/task-runner-python/src/logs.py @@ -21,7 +21,7 @@ class ColorFormatter(logging.Formatter): self.use_colors = os.getenv("NO_COLOR") is None # When started by launcher, log level and timestamp are handled by launcher. - self.short_form = os.getenv("N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED") == "true" + self.short_form = not sys.stdout.isatty() def format(self, record): if self.short_form: diff --git a/packages/@n8n/task-runner-python/src/main.py b/packages/@n8n/task-runner-python/src/main.py index 072514d0a3..b640c8654b 100644 --- a/packages/@n8n/task-runner-python/src/main.py +++ b/packages/@n8n/task-runner-python/src/main.py @@ -1,6 +1,7 @@ import asyncio import logging import sys +from typing import Optional from src.env import parse_env_vars from src.logs import setup_logging @@ -14,12 +15,25 @@ async def main(): logger.info("Starting runner...") try: - opts = parse_env_vars() + task_runner_opts, health_check_opts = parse_env_vars() except ValueError as e: logger.error(str(e)) sys.exit(1) - task_runner = TaskRunner(opts) + task_runner = TaskRunner(task_runner_opts) + health_check_server: Optional["HealthCheckServer"] = None + + if health_check_opts.enabled: + from src.health import HealthCheckServer + + health_check_server = HealthCheckServer() + try: + await health_check_server.start( + health_check_opts.host, health_check_opts.port + ) + except OSError as e: + logger.error(f"Failed to start health check server: {e}") + sys.exit(1) try: await task_runner.start() @@ -27,7 +41,9 @@ async def main(): logger.info("Shutting down runner...") finally: await task_runner.stop() - logger.info("Runner stopped") + + if health_check_server: + await health_check_server.stop() if __name__ == "__main__": diff --git a/packages/@n8n/task-runner-python/src/task_runner.py b/packages/@n8n/task-runner-python/src/task_runner.py index 1e8330b7c1..73db8db3c4 100644 --- a/packages/@n8n/task-runner-python/src/task_runner.py +++ b/packages/@n8n/task-runner-python/src/task_runner.py @@ -130,6 +130,8 @@ class TaskRunner: await self.websocket_connection.close() self.logger.info("Disconnected from broker") + self.logger.info("Runner stopped") + # ========== Messages ========== async def _listen_for_messages(self) -> None: