diff --git a/packages/@n8n/task-runner-python/src/constants.py b/packages/@n8n/task-runner-python/src/constants.py index 49b4e1c70c..196a2cc85d 100644 --- a/packages/@n8n/task-runner-python/src/constants.py +++ b/packages/@n8n/task-runner-python/src/constants.py @@ -46,6 +46,12 @@ ENV_BUILTINS_DENY = "N8N_RUNNERS_BUILTINS_DENY" # Logging LOG_FORMAT = "%(asctime)s.%(msecs)03d\t%(levelname)s\t%(message)s" LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S" +LOG_TASK_COMPLETE = 'Completed task {task_id} in {duration} for node "{node_name}" ({node_id}) in workflow "{workflow_name}" ({workflow_id})' +LOG_TASK_CANCEL = 'Cancelled task {task_id} for node "{node_name}" ({node_id}) in workflow "{workflow_name}" ({workflow_id})' +LOG_TASK_CANCEL_UNKNOWN = ( + "Received cancel for unknown task: {task_id}. Discarding message." +) +LOG_TASK_CANCEL_WAITING = "Cancelled task {task_id} (waiting for settings)" # RPC RPC_BROWSER_CONSOLE_LOG_METHOD = "logNodeOutput" diff --git a/packages/@n8n/task-runner-python/src/message_serde.py b/packages/@n8n/task-runner-python/src/message_serde.py index bbcc715993..0bf8a41460 100644 --- a/packages/@n8n/task-runner-python/src/message_serde.py +++ b/packages/@n8n/task-runner-python/src/message_serde.py @@ -37,12 +37,19 @@ def _get_node_mode(node_mode_str: str) -> NodeMode: def _parse_task_settings(d: dict) -> BrokerTaskSettings: try: + # required task_id = d["taskId"] settings_dict = d["settings"] code = settings_dict["code"] node_mode = _get_node_mode(settings_dict["nodeMode"]) - continue_on_fail = settings_dict.get("continueOnFail", False) items = settings_dict["items"] + + # optional + continue_on_fail = settings_dict.get("continueOnFail", False) + workflow_name = settings_dict.get("workflowName", "Unknown") + workflow_id = settings_dict.get("workflowId", "Unknown") + node_name = settings_dict.get("nodeName", "Unknown") + node_id = settings_dict.get("nodeId", "Unknown") except KeyError as e: raise ValueError(f"Missing field in task settings message: {e}") @@ -53,6 +60,10 @@ def _parse_task_settings(d: dict) -> BrokerTaskSettings: node_mode=node_mode, continue_on_fail=continue_on_fail, items=items, + workflow_name=workflow_name, + workflow_id=workflow_id, + node_name=node_name, + node_id=node_id, ), ) diff --git a/packages/@n8n/task-runner-python/src/message_types/broker.py b/packages/@n8n/task-runner-python/src/message_types/broker.py index 619d292578..91cee692bc 100644 --- a/packages/@n8n/task-runner-python/src/message_types/broker.py +++ b/packages/@n8n/task-runner-python/src/message_types/broker.py @@ -39,6 +39,10 @@ class TaskSettings: node_mode: NodeMode continue_on_fail: bool items: Items + workflow_name: str + workflow_id: str + node_name: str + node_id: str @dataclass diff --git a/packages/@n8n/task-runner-python/src/task_runner.py b/packages/@n8n/task-runner-python/src/task_runner.py index d4915ab581..e60868731e 100644 --- a/packages/@n8n/task-runner-python/src/task_runner.py +++ b/packages/@n8n/task-runner-python/src/task_runner.py @@ -26,6 +26,10 @@ from src.constants import ( OFFER_VALIDITY_LATENCY_BUFFER, TASK_BROKER_WS_PATH, RPC_BROWSER_CONSOLE_LOG_METHOD, + LOG_TASK_COMPLETE, + LOG_TASK_CANCEL, + LOG_TASK_CANCEL_UNKNOWN, + LOG_TASK_CANCEL_WAITING, ) from src.message_types import ( BrokerMessage, @@ -204,11 +208,18 @@ class TaskRunner: ) return + task_state.workflow_name = message.settings.workflow_name + task_state.workflow_id = message.settings.workflow_id + task_state.node_name = message.settings.node_name + task_state.node_id = message.settings.node_id + task_state.status = TaskStatus.RUNNING asyncio.create_task(self._execute_task(message.task_id, message.settings)) self.logger.info(f"Received task {message.task_id}") async def _execute_task(self, task_id: str, task_settings: TaskSettings) -> None: + start_time = time.time() + try: task_state = self.running_tasks.get(task_id) @@ -242,7 +253,14 @@ class TaskRunner: response = RunnerTaskDone(task_id=task_id, data={"result": result}) await self._send_message(response) - self.logger.info(f"Completed task {task_id}") + + self.logger.info( + LOG_TASK_COMPLETE.format( + task_id=task_id, + duration=self._get_duration(start_time), + **task_state.context(), + ) + ) except Exception as e: response = RunnerTaskError(task_id=task_id, error={"message": str(e)}) @@ -252,16 +270,16 @@ class TaskRunner: self.running_tasks.pop(task_id, None) async def _handle_task_cancel(self, message: BrokerTaskCancel) -> None: - task_state = self.running_tasks.get(message.task_id) + task_id = message.task_id + task_state = self.running_tasks.get(task_id) if task_state is None: - self.logger.warning( - f"Received cancel for unknown task: {message.task_id}. Discarding message." - ) + self.logger.warning(LOG_TASK_CANCEL_UNKNOWN.format(task_id=task_id)) return if task_state.status == TaskStatus.WAITING_FOR_SETTINGS: - self.running_tasks.pop(message.task_id, None) + self.running_tasks.pop(task_id, None) + self.logger.info(LOG_TASK_CANCEL_WAITING.format(task_id=task_id)) await self._send_offers() return @@ -269,6 +287,10 @@ class TaskRunner: task_state.status = TaskStatus.ABORTING self.executor.stop_process(task_state.process) + self.logger.info( + LOG_TASK_CANCEL.format(task_id=task_id, **task_state.context()) + ) + async def _send_rpc_message(self, task_id: str, method_name: str, params: list): message = RunnerRpcCall( call_id=nanoid(), task_id=task_id, name=method_name, params=params @@ -283,6 +305,17 @@ class TaskRunner: serialized = self.serde.serialize_runner_message(message) await self.websocket_connection.send(serialized) + def _get_duration(self, start_time: float) -> str: + elapsed = time.time() - start_time + + if elapsed < 1: + return f"{int(elapsed * 1000)}ms" + + if elapsed < 60: + return f"{int(elapsed)}s" + + return f"{int(elapsed) // 60}m" + # ========== Offers ========== async def _send_offers_loop(self) -> None: diff --git a/packages/@n8n/task-runner-python/src/task_state.py b/packages/@n8n/task-runner-python/src/task_state.py index c545639826..cdb42865be 100644 --- a/packages/@n8n/task-runner-python/src/task_state.py +++ b/packages/@n8n/task-runner-python/src/task_state.py @@ -15,8 +15,24 @@ class TaskState: task_id: str status: TaskStatus process: Optional[SpawnProcess] = None + workflow_name: Optional[str] = None + workflow_id: Optional[str] = None + node_name: Optional[str] = None + node_id: Optional[str] = None def __init__(self, task_id: str): self.task_id = task_id self.status = TaskStatus.WAITING_FOR_SETTINGS self.process = None + self.workflow_name = None + self.workflow_id = None + self.node_name = None + self.node_id = None + + def context(self): + return { + "node_name": self.node_name, + "node_id": self.node_id, + "workflow_name": self.workflow_name, + "workflow_id": self.workflow_id, + } diff --git a/packages/nodes-base/nodes/Code/PythonTaskRunnerSandbox.ts b/packages/nodes-base/nodes/Code/PythonTaskRunnerSandbox.ts index 0499b52778..377c038fae 100644 --- a/packages/nodes-base/nodes/Code/PythonTaskRunnerSandbox.ts +++ b/packages/nodes-base/nodes/Code/PythonTaskRunnerSandbox.ts @@ -25,12 +25,19 @@ export class PythonTaskRunnerSandbox { async runUsingIncomingItems() { const itemIndex = 0; + const node = this.executeFunctions.getNode(); + const workflow = this.executeFunctions.getWorkflow(); + const taskSettings: Record = { code: this.pythonCode, nodeMode: this.nodeMode, workflowMode: this.workflowMode, continueOnFail: this.executeFunctions.continueOnFail(), items: this.executeFunctions.getInputData(), + nodeId: node.id, + nodeName: node.name, + workflowId: workflow.id, + workflowName: workflow.name, }; const executionResult = await this.executeFunctions.startJob(