-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathfastapi_handler.py
More file actions
316 lines (265 loc) · 10.6 KB
/
fastapi_handler.py
File metadata and controls
316 lines (265 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import asyncio
import copy
import inspect
import json
import os
import sys
from functools import wraps
from typing import Any
from loguru import logger
from backend.protocol_rpc.configuration import GlobalConfiguration
from backend.protocol_rpc.message_handler.types import (
EventScope,
EventType,
LogEvent,
)
from backend.protocol_rpc.broadcast import Broadcast
from backend.protocol_rpc.websocket import GLOBAL_CHANNEL
MAX_LOG_MESSAGE_LENGTH = 3000
from .base import IMessageHandler
class MessageHandler(IMessageHandler):
"""FastAPI-compatible MessageHandler backed by Starlette Broadcast."""
def __init__(self, broadcast: Broadcast, config: GlobalConfiguration):
self.broadcast = broadcast
self.config = config
self.client_session_id = None
def with_client_session(self, client_session_id: str):
new_msg_handler = MessageHandler(self.broadcast, self.config)
new_msg_handler.client_session_id = client_session_id
return new_msg_handler
def log_endpoint_info(self, func):
"""Decorator for logging endpoint information."""
if inspect.iscoroutinefunction(func):
# Async wrapper for coroutine functions
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Log endpoint call
logger.info(f"Endpoint called: {func.__name__}")
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
logger.error(f"Endpoint error in {func.__name__}: {e}")
raise
return async_wrapper
else:
# Sync wrapper for regular functions
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Log endpoint call
logger.info(f"Endpoint called: {func.__name__}")
try:
result = func(*args, **kwargs)
return result
except Exception as e:
logger.error(f"Endpoint error in {func.__name__}: {e}")
raise
return sync_wrapper
def _publish(self, channel: str, payload: dict[str, Any]) -> None:
"""Queue a broadcast publish for the given channel."""
if self.broadcast is None:
return
message = json.dumps(payload)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
if not loop.is_running():
return
loop.create_task(self.broadcast.publish(channel=channel, message=message))
def _socket_emit(self, log_event: LogEvent) -> None:
"""Emit a log event via broadcast channels.
Preference order:
1) transaction_hash channel (most specific)
2) account_address channel (per-account logs)
3) client_session_id channel (per-session logs)
4) GLOBAL channel (fallback only)
"""
payload = {"event": log_event.name, "data": log_event.to_dict()}
if log_event.transaction_hash:
self._publish(log_event.transaction_hash, payload)
return
if getattr(log_event, "account_address", None):
self._publish(log_event.account_address, payload)
return
if getattr(log_event, "client_session_id", None) or getattr(
self, "client_session_id", None
):
channel = log_event.client_session_id or self.client_session_id # type: ignore[attr-defined]
if channel:
self._publish(channel, payload)
return
# Fallback
self._publish(GLOBAL_CHANNEL, payload)
def _log_event(self, log_event: LogEvent):
"""Log an event to the appropriate channels."""
# Console logging with optional data payload
message = log_event.message
gray = "\033[38;5;245m"
reset = "\033[0m"
if getattr(log_event, "data", None):
try:
data_to_log = self._apply_log_level_truncation(log_event.data)
data_str = json.dumps(data_to_log, default=lambda o: o.__dict__)
message = f"{message} {gray}{data_str}{reset}"
except TypeError as e:
message = f"{message} {gray}{str(log_event.data)} (serialization error: {e}){reset}"
if log_event.type == EventType.ERROR:
logger.error(message)
elif log_event.type == EventType.WARNING:
logger.warning(message)
elif log_event.type == EventType.DEBUG:
logger.debug(message)
else:
logger.info(message)
# WebSocket emission
self._socket_emit(log_event)
def log(self, message: str, level: str = "info", **kwargs):
"""Generic logging method."""
log_event = LogEvent(
name="log",
type=EventType.INFO if level == "info" else EventType.ERROR,
message=message,
scope=EventScope.RPC,
**kwargs,
)
self._log_event(log_event)
def error(self, message: str, **kwargs):
"""Log an error message."""
log_event = LogEvent(
name="error",
type=EventType.ERROR,
message=message,
scope=EventScope.RPC,
**kwargs,
)
self._log_event(log_event)
def warning(self, message: str, **kwargs):
"""Log a warning message."""
log_event = LogEvent(
name="warning",
type=EventType.WARNING,
message=message,
scope=EventScope.RPC,
**kwargs,
)
self._log_event(log_event)
def info(self, message: str, **kwargs):
"""Log an info message."""
log_event = LogEvent(
name="info",
type=EventType.INFO,
message=message,
scope=EventScope.RPC,
**kwargs,
)
self._log_event(log_event)
def send_message(self, log_event: LogEvent, log_to_terminal: bool = True):
"""Send a message via WebSocket and optionally log to terminal."""
if log_to_terminal:
self._log_event(log_event)
else:
# Just emit via WebSocket without terminal logging
self._socket_emit(log_event)
# Transaction-specific logging methods
def send_transaction_status_update(
self, transaction_hash: str, status: str, **kwargs
):
"""Send transaction status update via WebSocket."""
log_event = LogEvent(
name="transaction_status_updated", # Match frontend expectation
type=EventType.INFO,
message=f"Transaction {transaction_hash} status: {status}",
transaction_hash=transaction_hash,
data={"hash": transaction_hash, "status": status, **kwargs},
scope=EventScope.TRANSACTION,
)
self._socket_emit(log_event)
def send_transaction_event(self, transaction_hash: str, event_name: str, data: Any):
"""Send a custom transaction event."""
log_event = LogEvent(
name=event_name,
type=EventType.INFO,
message=f"Transaction event: {event_name}",
transaction_hash=transaction_hash,
data=data,
scope=EventScope.TRANSACTION,
)
self._socket_emit(log_event)
def _apply_log_level_truncation(self, data, max_length=100):
"""Apply LOG_LEVEL-based truncation to log data for better readability."""
should_truncate = os.environ.get("LOG_LEVEL", "INFO").upper() != "DEBUG"
if not should_truncate or not isinstance(data, dict):
return data
truncated_data = copy.deepcopy(data)
self._truncate_dict(truncated_data, max_length)
return truncated_data
def _truncate_dict(self, data_dict, max_length):
"""Recursively truncate dictionary values based on key patterns."""
if not isinstance(data_dict, dict):
return
for key in ["calldata", "contract_code", "result"]:
if (
key in data_dict
and isinstance(data_dict[key], str)
and len(data_dict[key]) > max_length
):
data_dict[key] = (
f"{data_dict[key][:max_length]}... ({len(data_dict[key])} chars)"
)
if "contract_state" in data_dict and data_dict["contract_state"]:
value = data_dict["contract_state"]
if len(str(value)) > max_length:
if isinstance(value, dict):
data_dict["contract_state"] = f"<{len(value)} entries, truncated>"
else:
data_dict["contract_state"] = (
f"<{len(str(value))} chars, truncated>"
)
if "state" in data_dict:
data_dict["state"] = "<truncated>"
if "code" in data_dict and isinstance(data_dict["code"], str):
data_dict["code"] = f"<{len(data_dict['code'])} chars>"
for key, value in data_dict.items():
if isinstance(value, dict):
self._truncate_dict(value, max_length)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
self._truncate_dict(item, max_length)
def setup_loguru_config():
"""Set up unified logging configuration using Loguru.
Routes logs to stdout/stderr based on severity for proper GCP Cloud Logging:
- DEBUG/INFO/WARNING → stdout (GCP labels as INFO/default severity)
- ERROR/CRITICAL → stderr (GCP labels as ERROR severity)
"""
# Remove default handler
logger.remove()
# Add custom handler with formatting
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
log_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
# Non-error logs to stdout (GCP labels as INFO/default)
logger.add(
sys.stdout,
format=log_format,
level=log_level,
filter=lambda record: record["level"].no < 40, # Below ERROR (40)
colorize=True,
)
# Error logs to stderr (GCP labels as ERROR)
logger.add(
sys.stderr,
format=log_format,
level="ERROR",
colorize=True,
)
# File handler (optional)
if os.environ.get("LOG_TO_FILE"):
logger.add(
"logs/app.log",
rotation="10 MB",
retention="7 days",
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
)
return logger