44from copy import copy
55from datetime import datetime
66from logging import DEBUG , ERROR , FATAL , INFO , WARN , WARNING # noqa: F401
7+ from threading import Lock , Thread
78from typing import TYPE_CHECKING , Dict , Iterable , List , Optional , Union
89
910from grpclib .exceptions import StreamTerminatedError
1920_MODULE_PARENT : Optional ["RobotClient" ] = None
2021
2122
23+ class _SingletonEventLoopThread :
24+ _instance = None
25+ _lock = Lock ()
26+ _ready_event = asyncio .Event ()
27+ _thread = None
28+
29+ def __new__ (cls ):
30+ # Ensure singleton precondition
31+ if cls ._instance is None :
32+ with cls ._lock :
33+ if cls ._instance is None :
34+ cls ._instance = super (_SingletonEventLoopThread , cls ).__new__ (cls )
35+ cls ._instance ._loop = None
36+ cls ._instance ._thread = Thread (target = cls ._instance ._run )
37+ cls ._instance ._thread .start ()
38+ return cls ._instance
39+
40+ def _run (self ):
41+ self ._loop = asyncio .new_event_loop ()
42+ asyncio .set_event_loop (self ._loop )
43+ self ._ready_event .set ()
44+ self ._loop .run_forever ()
45+
46+ def stop (self ):
47+ if self ._loop is not None :
48+ self ._loop .call_soon_threadsafe (self ._loop .stop )
49+ self ._loop .close ()
50+
51+ def get_loop (self ):
52+ if self ._loop is None :
53+ raise RuntimeError ("Event loop is None. Did you call .start() and .wait_until_ready()?" )
54+ return self ._loop
55+
56+ async def wait_until_ready (self ):
57+ await self ._ready_event .wait ()
58+
59+
2260class _ModuleHandler (logging .Handler ):
2361 _parent : "RobotClient"
2462 _logger : logging .Logger
63+ _worker : _SingletonEventLoopThread
2564
2665 def __init__ (self , parent : "RobotClient" ):
66+ super ().__init__ ()
2767 self ._parent = parent
2868 self ._logger = logging .getLogger ("ModuleLogger" )
2969 addHandlers (self ._logger , True )
30- super ().__init__ ()
3170 self ._logger .setLevel (self .level )
71+ self ._worker = _SingletonEventLoopThread ()
3272
3373 def setLevel (self , level : Union [int , str ]) -> None :
3474 self ._logger .setLevel (level )
3575 return super ().setLevel (level )
3676
37- def handle_task_result (self , task : asyncio .Task ):
77+ async def handle_task_result (self , task : asyncio .Task ):
3878 try :
3979 _ = task .result ()
4080 except (asyncio .CancelledError , asyncio .InvalidStateError , StreamTerminatedError ):
@@ -48,24 +88,28 @@ def emit(self, record: logging.LogRecord):
4888 time = datetime .fromtimestamp (record .created )
4989
5090 try :
51- assert self ._parent is not None
52- try :
53- loop = asyncio .get_event_loop ()
54- loop .create_task (
55- self ._parent .log (name , record .levelname , time , message , stack ), name = f"{ viam ._TASK_PREFIX } -LOG-{ record .created } "
56- ).add_done_callback (self .handle_task_result )
57- except RuntimeError :
58- # If the log is coming from a thread that doesn't have an event loop, create and set a new one.
59- loop = asyncio .new_event_loop ()
60- asyncio .set_event_loop (loop )
61- loop .create_task (
62- self ._parent .log (name , record .levelname , time , message , stack ), name = f"{ viam ._TASK_PREFIX } -LOG-{ record .created } "
63- ).add_done_callback (self .handle_task_result )
91+ loop = self ._worker .get_loop ()
92+ asyncio .run_coroutine_threadsafe (
93+ self ._asynchronously_emit (record , name , message , stack , time ),
94+ loop ,
95+ )
6496 except Exception as err :
6597 # If the module log fails, log using stdout/stderr handlers
6698 self ._logger .error (f"ModuleLogger failed for { record .name } - { err } " )
6799 self ._logger .log (record .levelno , message )
68100
101+ async def _asynchronously_emit (self , record : logging .LogRecord , name : str , message : str , stack : str , time : datetime ):
102+ await self ._worker .wait_until_ready ()
103+ task = self ._worker .get_loop ().create_task (
104+ self ._parent .log (name , record .levelname , time , message , stack ),
105+ name = f"{ viam ._TASK_PREFIX } -LOG-{ record .created } " ,
106+ )
107+ task .add_done_callback (lambda t : asyncio .run_coroutine_threadsafe (self .handle_task_result (t ), self ._worker .get_loop ()))
108+
109+ def close (self ):
110+ self ._worker .stop ()
111+ super ().close ()
112+
69113
70114class _ColorFormatter (logging .Formatter ):
71115 MAPPING = {
@@ -76,8 +120,8 @@ class _ColorFormatter(logging.Formatter):
76120 "CRITICAL" : 41 , # white on red bg
77121 }
78122
79- def __init__ (self , patern ):
80- logging .Formatter .__init__ (self , patern )
123+ def __init__ (self , pattern ):
124+ logging .Formatter .__init__ (self , pattern )
81125
82126 def format (self , record ):
83127 colored_record = copy (record )
0 commit comments