Skip to content

Commit 769f4a1

Browse files
RSDK-6184: better module logs over gRPC (#557)
1 parent e0395a4 commit 769f4a1

File tree

5 files changed

+141
-35
lines changed

5 files changed

+141
-35
lines changed

src/viam/logging.py

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,66 @@
1+
import asyncio
12
import logging
23
import sys
34
from copy import copy
5+
from datetime import datetime
46
from logging import DEBUG, ERROR, FATAL, INFO, WARN, WARNING # noqa: F401
5-
from typing import Dict
7+
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Union
68

7-
LOG_LEVEL = INFO
8-
LOGGERS: Dict[str, logging.Logger] = {}
9+
from grpclib.exceptions import StreamTerminatedError
10+
11+
import viam
912

13+
if TYPE_CHECKING:
14+
from .robot.client import RobotClient
1015

11-
class ColorFormatter(logging.Formatter):
16+
17+
LOG_LEVEL = INFO
18+
LOGGERS: Dict[str, logging.Logger] = {}
19+
_MODULE_PARENT: Optional["RobotClient"] = None
20+
21+
22+
class _ModuleHandler(logging.Handler):
23+
_parent: "RobotClient"
24+
_logger: logging.Logger
25+
26+
def __init__(self, parent: "RobotClient"):
27+
self._parent = parent
28+
self._logger = logging.getLogger("ModuleLogger")
29+
addHandlers(self._logger, True)
30+
super().__init__()
31+
self._logger.setLevel(self.level)
32+
33+
def setLevel(self, level: Union[int, str]) -> None:
34+
self._logger.setLevel(level)
35+
return super().setLevel(level)
36+
37+
def handle_task_result(self, task: asyncio.Task):
38+
try:
39+
_ = task.result()
40+
except (asyncio.CancelledError, asyncio.InvalidStateError, StreamTerminatedError):
41+
pass
42+
except Exception:
43+
self._logger.exception("Exception raised by task = %r", task)
44+
45+
def emit(self, record: logging.LogRecord):
46+
assert isinstance(record, logging.LogRecord)
47+
name = record.name.split(".")[-1]
48+
message = f"{record.filename}:{record.lineno}\t{record.msg}"
49+
stack = f"exc_info: {record.exc_info}, exc_text: {record.exc_text}, stack_info: {record.stack_info}"
50+
time = datetime.fromtimestamp(record.created)
51+
52+
try:
53+
assert self._parent is not None
54+
asyncio.create_task(
55+
self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
56+
).add_done_callback(self.handle_task_result)
57+
except Exception as err:
58+
# If the module log fails, log using stdout/stderr handlers
59+
self._logger.error(f"ModuleLogger failed for {record.name} - {err}")
60+
self._logger.log(record.levelno, message)
61+
62+
63+
class _ColorFormatter(logging.Formatter):
1264
MAPPING = {
1365
"DEBUG": 37, # white
1466
"INFO": 36, # cyan
@@ -43,31 +95,56 @@ def getLogger(name: str) -> logging.Logger:
4395
return logger
4496

4597

46-
def addHandlers(logger: logging.Logger):
47-
logger.handlers.clear()
98+
def addHandlers(logger: logging.Logger, use_default_handlers=False):
99+
_addHandlers([logger], use_default_handlers)
48100

49-
format = ColorFormatter("%(asctime)s\t\t" + "%(levelname)s\t" + "%(name)s (%(filename)s:%(lineno)d)\t" + "%(message)s\t")
50101

51-
handler = logging.StreamHandler(stream=sys.stdout)
52-
handler.setFormatter(format)
102+
def _addHandlers(loggers: Iterable[logging.Logger], use_default_handlers=False):
103+
format = _ColorFormatter("%(asctime)s\t\t" + "%(levelname)s\t" + "%(name)s (%(filename)s:%(lineno)d)\t" + "%(message)s\t")
104+
105+
handlers: List[logging.Handler] = []
106+
107+
std_handler = logging.StreamHandler(stream=sys.stdout)
108+
std_handler.setFormatter(format)
53109
# filter out logs at error level or above
54-
handler.setLevel(LOG_LEVEL)
55-
handler.addFilter(filter=lambda record: (record.levelno < ERROR))
56-
logger.addHandler(handler)
110+
std_handler.setLevel(LOG_LEVEL)
111+
std_handler.addFilter(filter=lambda record: (record.levelno < ERROR))
57112

58113
err_handler = logging.StreamHandler(stream=sys.stderr)
59114
err_handler.setFormatter(format)
60115
# filter out logs below error level
61116
err_handler.setLevel(max(ERROR, LOG_LEVEL))
62-
logger.addHandler(err_handler)
117+
118+
if _MODULE_PARENT is not None and not use_default_handlers:
119+
mod_handler = _ModuleHandler(_MODULE_PARENT)
120+
mod_handler.setFormatter(format)
121+
mod_handler.setLevel(LOG_LEVEL)
122+
handlers = [mod_handler]
123+
else:
124+
handlers = [std_handler, err_handler]
125+
126+
for logger in loggers:
127+
logger.handlers.clear()
128+
if "viam.sessions_client" in LOGGERS and LOGGERS["viam.sessions_client"] == logger:
129+
logger.addHandler(std_handler)
130+
logger.addHandler(err_handler)
131+
else:
132+
for h in handlers:
133+
logger.addHandler(h)
134+
135+
136+
def setParent(parent: "RobotClient"):
137+
global _MODULE_PARENT
138+
_MODULE_PARENT = parent
139+
_addHandlers(LOGGERS.values())
63140

64141

65142
def setLevel(level: int):
66143
global LOG_LEVEL
67144
LOG_LEVEL = level
68145
for logger in LOGGERS.values():
69146
logger.setLevel(LOG_LEVEL)
70-
addHandlers(logger)
147+
_addHandlers(LOGGERS.values())
71148

72149

73150
def silence():

src/viam/module/module.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ async def _connect_to_parent(self):
8181
log_level=self._log_level,
8282
),
8383
)
84+
LOGGER.debug("Starting module logging")
85+
logging.setParent(self.parent)
8486

8587
async def _get_resource(self, name: ResourceName) -> ResourceBase:
8688
await self._connect_to_parent()
@@ -165,6 +167,7 @@ async def remove_resource(self, request: RemoveResourceRequest):
165167

166168
async def ready(self, request: ReadyRequest) -> ReadyResponse:
167169
self._parent_address = request.parent_address
170+
await self._connect_to_parent()
168171

169172
svcname_to_models: Mapping[Tuple[str, Subtype], List[Model]] = {}
170173
for subtype_model_str in Registry.REGISTERED_RESOURCE_CREATORS().keys():

src/viam/proto/robot/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
@generated by Viam.
33
Do not edit manually!
44
"""
5+
from ...gen.common.v1.common_pb2 import LogEntry
56
from ...gen.robot.v1.robot_grpc import RobotServiceBase, RobotServiceStub
67
from ...gen.robot.v1.robot_pb2 import (
78
BlockForOperationRequest,
@@ -72,6 +73,7 @@
7273
"GetSessionsResponse",
7374
"GetStatusRequest",
7475
"GetStatusResponse",
76+
"LogEntry",
7577
"LogRequest",
7678
"LogResponse",
7779
"Operation",

src/viam/robot/client.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
from dataclasses import dataclass
3+
from datetime import datetime
34
from threading import RLock
45
from typing import Any, Dict, List, Optional, Union
56

@@ -29,6 +30,8 @@
2930
GetOperationsResponse,
3031
GetStatusRequest,
3132
GetStatusResponse,
33+
LogEntry,
34+
LogRequest,
3235
Operation,
3336
ResourceNamesRequest,
3437
ResourceNamesResponse,
@@ -46,7 +49,7 @@
4649
from viam.rpc.dial import DialOptions, ViamChannel, dial
4750
from viam.services.service_base import ServiceBase
4851
from viam.sessions_client import SessionsClient
49-
from viam.utils import dict_to_struct
52+
from viam.utils import datetime_to_timestamp, dict_to_struct
5053

5154
LOGGER = logging.getLogger(__name__)
5255

@@ -753,6 +756,26 @@ async def stop_all(self, extra: Dict[ResourceName, Dict[str, Any]] = {}):
753756
request = StopAllRequest(extra=ep)
754757
await self._client.StopAll(request)
755758

759+
#######
760+
# LOG #
761+
#######
762+
763+
async def log(self, name: str, level: str, time: datetime, log: str, stack: str):
764+
"""Send log from Python module over gRPC.
765+
766+
Create a LogEntry object from the log to send to RDK.
767+
768+
Args:
769+
name (str): The logger's name.
770+
level (str): The level of the log.
771+
time (str): The log creation time.
772+
log (str): The log message.
773+
stack (str): The stack information of the log.
774+
"""
775+
entry = LogEntry(level=level, time=datetime_to_timestamp(time), logger_name=name, message=log, stack=stack)
776+
request = LogRequest(logs=[entry])
777+
await self._client.Log(request)
778+
756779
######################
757780
# Get Cloud Metadata #
758781
######################

tests/test_module.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -156,26 +156,27 @@ async def test_remove_resource(self, module: Module):
156156

157157
@pytest.mark.asyncio
158158
async def test_ready(self, module: Module):
159-
p_addr = "SOME_FAKE_ADDRESS"
160-
assert module._parent_address != p_addr
161-
req = ReadyRequest(parent_address=p_addr)
162-
resp = await module.ready(req)
163-
assert module._parent_address == p_addr
164-
assert len(resp.handlermap.handlers) == 2
165-
166-
handler = resp.handlermap.handlers[0]
167-
rn = Gizmo.get_resource_name("")
168-
assert handler.subtype == ResourceRPCSubtype(subtype=rn, proto_service="acme.component.gizmo.v1.GizmoService")
169-
assert len(handler.models) == 1
170-
model = handler.models[0]
171-
assert model == "acme:demo:mygizmo"
172-
173-
handler = resp.handlermap.handlers[1]
174-
rn = SummationService.get_resource_name("")
175-
assert handler.subtype == ResourceRPCSubtype(subtype=rn, proto_service="acme.service.summation.v1.SummationService")
176-
assert len(handler.models) == 1
177-
model = handler.models[0]
178-
assert model == "acme:demo:mysum"
159+
with mock.patch("viam.module.Module._connect_to_parent"):
160+
p_addr = "SOME_FAKE_ADDRESS"
161+
assert module._parent_address != p_addr
162+
req = ReadyRequest(parent_address=p_addr)
163+
resp = await module.ready(req)
164+
assert module._parent_address == p_addr
165+
assert len(resp.handlermap.handlers) == 2
166+
167+
handler = resp.handlermap.handlers[0]
168+
rn = Gizmo.get_resource_name("")
169+
assert handler.subtype == ResourceRPCSubtype(subtype=rn, proto_service="acme.component.gizmo.v1.GizmoService")
170+
assert len(handler.models) == 1
171+
model = handler.models[0]
172+
assert model == "acme:demo:mygizmo"
173+
174+
handler = resp.handlermap.handlers[1]
175+
rn = SummationService.get_resource_name("")
176+
assert handler.subtype == ResourceRPCSubtype(subtype=rn, proto_service="acme.service.summation.v1.SummationService")
177+
assert len(handler.models) == 1
178+
model = handler.models[0]
179+
assert model == "acme:demo:mysum"
179180

180181
def test_add_model_from_registry(self):
181182
mod = Module("fake")

0 commit comments

Comments
 (0)