feat: Add graceful shutdown and idle timeout to native Python runner (no-changelog) (#19125)

This commit is contained in:
Iván Ovejero
2025-09-03 12:47:45 +02:00
committed by GitHub
parent 06578f287c
commit e4fb6e5f31
7 changed files with 234 additions and 16 deletions

View File

@@ -1,7 +1,7 @@
import asyncio
import logging
import time
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, Callable, Awaitable
from urllib.parse import urlparse
import websockets
import random
@@ -9,8 +9,9 @@ import random
from src.config.task_runner_config import TaskRunnerConfig
from src.errors import (
WebsocketConnectionError,
NoIdleTimeoutHandlerError,
TaskMissingError,
WebsocketConnectionError,
)
from src.message_types.broker import TaskSettings
from src.nanoid import nanoid
@@ -85,6 +86,10 @@ class TaskRunner:
self.analyzer = TaskAnalyzer(config.stdlib_allow, config.external_allow)
self.logger = logging.getLogger(__name__)
self.idle_coroutine: Optional[asyncio.Task] = None
self.on_idle_timeout: Optional[Callable[[], Awaitable[None]]] = None
self.last_activity_time = time.time()
self.task_broker_uri = config.task_broker_uri
websocket_host = urlparse(config.task_broker_uri).netloc
self.websocket_url = (
@@ -92,6 +97,9 @@ class TaskRunner:
)
async def start(self) -> None:
if self.config.is_auto_shutdown_enabled and not self.on_idle_timeout:
raise NoIdleTimeoutHandlerError(self.config.auto_shutdown_timeout)
headers = {"Authorization": f"Bearer {self.config.grant_token}"}
try:
@@ -108,9 +116,27 @@ class TaskRunner:
except Exception:
raise WebsocketConnectionError(self.task_broker_uri)
# ========== Shutdown ==========
async def stop(self) -> None:
if self.offers_coroutine:
self.can_send_offers = False
if self.offers_coroutine and not self.offers_coroutine.done():
self.offers_coroutine.cancel()
try:
await self.offers_coroutine
except asyncio.CancelledError:
pass
if self.idle_coroutine and not self.idle_coroutine.done():
self.idle_coroutine.cancel()
try:
await self.idle_coroutine
except asyncio.CancelledError:
pass
await self._wait_for_tasks()
await self._terminate_tasks()
if self.websocket_connection:
await self.websocket_connection.close()
@@ -118,6 +144,41 @@ class TaskRunner:
self.logger.info("Runner stopped")
async def _wait_for_tasks(self):
if not self.running_tasks:
return
self.logger.debug("Waiting for tasks to complete...")
start_time = time.time()
while (
self.running_tasks
and (time.time() - start_time) < self.config.graceful_shutdown_timeout
):
await asyncio.sleep(0.5)
if self.running_tasks:
self.logger.warning("Timed out waiting for tasks to complete")
async def _terminate_tasks(self):
if not self.running_tasks:
return
self.logger.warning("Terminating tasks...")
tasks_to_terminate = [
asyncio.to_thread(self.executor.stop_process, task_state.process)
for task_state in self.running_tasks.values()
if task_state.process
]
if tasks_to_terminate:
await asyncio.gather(*tasks_to_terminate, return_exceptions=True)
self.running_tasks.clear()
self.logger.warning("Terminated tasks")
# ========== Messages ==========
async def _listen_for_messages(self) -> None:
@@ -156,6 +217,7 @@ class TaskRunner:
self.can_send_offers = True
self.offers_coroutine = asyncio.create_task(self._send_offers_loop())
self.logger.info("Registered with broker")
self._reset_idle_timer()
async def _handle_task_offer_accept(self, message: BrokerTaskOfferAccept) -> None:
offer = self.open_offers.get(message.offer_id)
@@ -184,6 +246,7 @@ class TaskRunner:
response = RunnerTaskAccepted(task_id=message.task_id)
await self._send_message(response)
self.logger.info(f"Accepted task {message.task_id}")
self._reset_idle_timer()
async def _handle_task_settings(self, message: BrokerTaskSettings) -> None:
task_state = self.running_tasks.get(message.task_id)
@@ -258,6 +321,7 @@ class TaskRunner:
finally:
self.running_tasks.pop(task_id, None)
self._reset_idle_timer()
async def _handle_task_cancel(self, message: BrokerTaskCancel) -> None:
task_id = message.task_id
@@ -351,3 +415,31 @@ class TaskRunner:
)
await self._send_message(message)
# ========== Inactivity ==========
def _reset_idle_timer(self):
"""Reset idle timer when key event occurs, namely runner registration, task acceptance, and task completion or failure."""
if not self.config.is_auto_shutdown_enabled:
return
self.last_activity_time = time.time()
if self.idle_coroutine and not self.idle_coroutine.done():
self.idle_coroutine.cancel()
self.idle_coroutine = asyncio.create_task(self._idle_timer_coroutine())
async def _idle_timer_coroutine(self):
try:
await asyncio.sleep(self.config.auto_shutdown_timeout)
if len(self.running_tasks) > 0:
return
assert self.on_idle_timeout is not None # validated at start()
await self.on_idle_timeout()
except asyncio.CancelledError:
pass