mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 10:02:05 +00:00
refactor(core): Make native Python runner compatible with launcher (#18788)
This commit is contained in:
@@ -29,4 +29,4 @@ typecheck:
|
||||
|
||||
# For debugging only, start the runner with a manually fetched grant token.
|
||||
debug:
|
||||
GRANT_TOKEN=$(curl -s -X POST http://127.0.0.1:5679/runners/auth -H "Content-Type: application/json" -d '{"token":"test"}' | jq -r '.data.token') && N8N_RUNNERS_GRANT_TOKEN="$GRANT_TOKEN" N8N_RUNNERS_HIDE_TASK_OFFER_LOGS=true just run
|
||||
GRANT_TOKEN=$(curl -s -X POST http://127.0.0.1:5679/runners/auth -H "Content-Type: application/json" -d '{"token":"test"}' | jq -r '.data.token') && N8N_RUNNERS_GRANT_TOKEN="$GRANT_TOKEN" just run
|
||||
|
||||
@@ -38,7 +38,6 @@ ENV_GRANT_TOKEN = "N8N_RUNNERS_GRANT_TOKEN"
|
||||
ENV_MAX_CONCURRENCY = "N8N_RUNNERS_MAX_CONCURRENCY"
|
||||
ENV_MAX_PAYLOAD_SIZE = "N8N_RUNNERS_MAX_PAYLOAD"
|
||||
ENV_TASK_TIMEOUT = "N8N_RUNNERS_TASK_TIMEOUT"
|
||||
ENV_HIDE_TASK_OFFER_LOGS = "N8N_RUNNERS_HIDE_TASK_OFFER_LOGS"
|
||||
|
||||
# Logging
|
||||
LOG_FORMAT = "%(asctime)s.%(msecs)03d\t%(levelname)s\t%(message)s"
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
import sys
|
||||
import logging
|
||||
import os
|
||||
from .constants import (
|
||||
LOG_FORMAT,
|
||||
LOG_TIMESTAMP_FORMAT,
|
||||
ENV_HIDE_TASK_OFFER_LOGS,
|
||||
)
|
||||
from src.constants import LOG_FORMAT, LOG_TIMESTAMP_FORMAT
|
||||
|
||||
COLORS = {
|
||||
"DEBUG": "\033[34m", # blue
|
||||
@@ -20,9 +17,16 @@ RESET = "\033[0m"
|
||||
class ColorFormatter(logging.Formatter):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.use_colors = os.getenv("NO_COLOR") is None
|
||||
|
||||
# When started by launcher, log level and timestamp are handled by launcher.
|
||||
self.short_form = os.getenv("N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED") == "true"
|
||||
|
||||
def format(self, record):
|
||||
if self.short_form:
|
||||
return record.getMessage()
|
||||
|
||||
formatted = super().format(record)
|
||||
|
||||
if not self.use_colors:
|
||||
@@ -45,32 +49,18 @@ class ColorFormatter(logging.Formatter):
|
||||
return formatted
|
||||
|
||||
|
||||
class TaskOfferFilter(logging.Filter):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.hide_offers = os.getenv(ENV_HIDE_TASK_OFFER_LOGS, "").lower() == "true"
|
||||
|
||||
def filter(self, record):
|
||||
"""Filter out task offers if N8N_RUNNERS_HIDE_TASK_OFFER_LOGS is 'true'."""
|
||||
|
||||
return not (self.hide_offers and self._is_task_offer_message(record))
|
||||
|
||||
def _is_task_offer_message(self, record):
|
||||
return (
|
||||
record.levelname == "DEBUG"
|
||||
and "websockets" in record.name
|
||||
and '"runner:taskoffer"' in record.getMessage()
|
||||
)
|
||||
|
||||
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
|
||||
logger.setLevel(logging.INFO)
|
||||
log_level_str = os.getenv("N8N_RUNNERS_LAUNCHER_LOG_LEVEL", "INFO").upper()
|
||||
log_level = getattr(logging, log_level_str, logging.INFO)
|
||||
logger.setLevel(log_level)
|
||||
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler = logging.StreamHandler(sys.stdout)
|
||||
stream_handler.setFormatter(ColorFormatter(LOG_FORMAT, LOG_TIMESTAMP_FORMAT))
|
||||
stream_handler.addFilter(TaskOfferFilter())
|
||||
logger.addHandler(stream_handler)
|
||||
|
||||
logging.getLogger("websockets.client").setLevel(logging.DEBUG)
|
||||
# Hardcoded to INFO as websocket logs are too verbose
|
||||
logging.getLogger("websockets.client").setLevel(logging.INFO)
|
||||
logging.getLogger("websockets.server").setLevel(logging.INFO)
|
||||
logging.getLogger("websockets").setLevel(logging.INFO)
|
||||
|
||||
@@ -5,7 +5,7 @@ import sys
|
||||
|
||||
os.environ["WEBSOCKETS_MAX_LOG_SIZE"] = "256"
|
||||
|
||||
from .constants import (
|
||||
from src.constants import (
|
||||
DEFAULT_MAX_CONCURRENCY,
|
||||
DEFAULT_TASK_TIMEOUT,
|
||||
ENV_MAX_CONCURRENCY,
|
||||
@@ -16,8 +16,8 @@ from .constants import (
|
||||
DEFAULT_MAX_PAYLOAD_SIZE,
|
||||
ENV_TASK_TIMEOUT,
|
||||
)
|
||||
from .logs import setup_logging
|
||||
from .task_runner import TaskRunner, TaskRunnerOpts
|
||||
from src.logs import setup_logging
|
||||
from src.task_runner import TaskRunner, TaskRunnerOpts
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
@@ -2,8 +2,8 @@ import json
|
||||
from dataclasses import asdict
|
||||
from typing import cast
|
||||
|
||||
from .message_types.broker import NodeMode, TaskSettings
|
||||
from .constants import (
|
||||
from src.message_types.broker import NodeMode, TaskSettings
|
||||
from src.constants import (
|
||||
BROKER_INFO_REQUEST,
|
||||
BROKER_RUNNER_REGISTERED,
|
||||
BROKER_TASK_CANCEL,
|
||||
@@ -11,7 +11,7 @@ from .constants import (
|
||||
BROKER_TASK_SETTINGS,
|
||||
BROKER_RPC_RESPONSE,
|
||||
)
|
||||
from .message_types import (
|
||||
from src.message_types import (
|
||||
BrokerMessage,
|
||||
RunnerMessage,
|
||||
BrokerInfoRequest,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal, Union, List, Dict, Any
|
||||
|
||||
from ..constants import (
|
||||
from src.constants import (
|
||||
BROKER_INFO_REQUEST,
|
||||
BROKER_RUNNER_REGISTERED,
|
||||
BROKER_TASK_CANCEL,
|
||||
|
||||
@@ -2,7 +2,7 @@ from dataclasses import dataclass
|
||||
from typing import List, Literal, Union, Any, Dict
|
||||
from ..constants import RUNNER_RPC_CALL
|
||||
|
||||
from ..constants import (
|
||||
from src.constants import (
|
||||
RUNNER_INFO,
|
||||
RUNNER_TASK_ACCEPTED,
|
||||
RUNNER_TASK_DONE,
|
||||
|
||||
@@ -4,7 +4,7 @@ import traceback
|
||||
import textwrap
|
||||
import json
|
||||
|
||||
from .errors import (
|
||||
from src.errors import (
|
||||
TaskResultMissingError,
|
||||
TaskRuntimeError,
|
||||
TaskTimeoutError,
|
||||
|
||||
@@ -7,9 +7,10 @@ from urllib.parse import urlparse
|
||||
import websockets
|
||||
import random
|
||||
|
||||
from .errors import WebsocketConnectionError, TaskMissingError
|
||||
from .message_types.broker import TaskSettings
|
||||
from .nanoid import nanoid
|
||||
|
||||
from src.errors import WebsocketConnectionError, TaskMissingError
|
||||
from src.message_types.broker import TaskSettings
|
||||
from src.nanoid_utils import nanoid
|
||||
|
||||
from .constants import (
|
||||
RUNNER_NAME,
|
||||
|
||||
Reference in New Issue
Block a user