Skip to content

Commit 325d19d

Browse files
authored
Improve Everest logging with open telemetry traces
1 parent 657656b commit 325d19d

File tree

6 files changed

+154
-68
lines changed

6 files changed

+154
-68
lines changed

src/everest/bin/main.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from everest.bin.monitor_script import monitor_entry
1717
from everest.bin.visualization_script import visualization_entry
1818
from everest.plugins.everest_plugin_manager import EverestPluginManager
19+
from everest.trace import tracer, tracer_provider
1920

2021

2122
def _build_args_parser() -> argparse.ArgumentParser:
@@ -48,7 +49,9 @@ def __init__(self, args: list[str]) -> None:
4849
parser.error("Unrecognized command")
4950

5051
# Setup logging from plugins:
51-
EverestPluginManager().add_log_handle_to_root()
52+
plugin_manager = EverestPluginManager()
53+
plugin_manager.add_log_handle_to_root()
54+
plugin_manager.add_span_processor_to_trace_provider(tracer_provider)
5255
logger = logging.getLogger(__name__)
5356
logger.info(f"Started everest with {parsed_args}")
5457
# Use dispatch pattern to invoke method with same name
@@ -103,6 +106,7 @@ def results(self, args: list[str]) -> None:
103106
visualization_entry(args)
104107

105108

109+
@tracer.start_as_current_span("everest.application.start")
106110
def start_everest(args: list[str] | None = None) -> None:
107111
"""Main entry point for the everest application"""
108112
args = args or sys.argv

src/everest/detached/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
START_EXPERIMENT_ENDPOINT,
3333
STOP_ENDPOINT,
3434
)
35+
from everest.trace import get_traceparent
3536

3637
# Specifies how many times to try a http request within the specified timeout.
3738
_HTTP_REQUEST_RETRY = 10
@@ -60,6 +61,8 @@ async def start_server(
6061
str(config.output_dir),
6162
"--logging-level",
6263
str(logging_level),
64+
"--traceparent",
65+
str(get_traceparent()),
6366
]
6467
poll_task = asyncio.create_task(driver.poll(), name="poll_task")
6568
await driver.submit(0, "everserver", *args, name=Path(config.config_file).stem)

src/everest/detached/jobs/everserver.py

+105-67
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
HTTPBasic,
4646
HTTPBasicCredentials,
4747
)
48+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
4849
from pydantic import BaseModel
4950

5051
from ert.config import QueueSystem
@@ -77,6 +78,7 @@
7778
START_EXPERIMENT_ENDPOINT,
7879
STOP_ENDPOINT,
7980
)
81+
from everest.trace import tracer, tracer_provider
8082
from everest.util import makedirs_if_needed, version_info
8183

8284

@@ -424,13 +426,40 @@ def make_handler_config(
424426
}
425427

426428
logging.config.dictConfig(logging_config)
427-
EverestPluginManager().add_log_handle_to_root()
429+
plugin_manager = EverestPluginManager()
430+
plugin_manager.add_log_handle_to_root()
431+
plugin_manager.add_span_processor_to_trace_provider(tracer_provider)
432+
433+
434+
def get_trace_context():
435+
arg_parser = argparse.ArgumentParser()
436+
arg_parser.add_argument(
437+
"--traceparent",
438+
type=str,
439+
help="Trace parent id to be used by the storage root span",
440+
default=None,
441+
)
442+
options = arg_parser.parse_args()
443+
ctx = (
444+
TraceContextTextMapPropagator().extract(
445+
carrier={"traceparent": options.traceparent}
446+
)
447+
if options.traceparent
448+
else None
449+
)
450+
return ctx
428451

429452

430453
def main() -> None:
431454
arg_parser = argparse.ArgumentParser()
432455
arg_parser.add_argument("--output-dir", "-o", type=str)
433456
arg_parser.add_argument("--logging-level", "-l", type=int, default=logging.INFO)
457+
arg_parser.add_argument(
458+
"--traceparent",
459+
type=str,
460+
help="Trace parent id to be used by the storage root span",
461+
default=None,
462+
)
434463
options = arg_parser.parse_args()
435464

436465
output_dir = options.output_dir
@@ -441,77 +470,86 @@ def main() -> None:
441470
host_file = ServerConfig.get_hostfile_path(output_dir)
442471
msg_queue: SimpleQueue[EverestServerMsg] = SimpleQueue()
443472

444-
try:
445-
_configure_loggers(
446-
detached_dir=Path(ServerConfig.get_detached_node_dir(output_dir)),
447-
log_dir=Path(output_dir) / OPTIMIZATION_LOG_DIR,
448-
logging_level=logging_level,
473+
ctx = (
474+
TraceContextTextMapPropagator().extract(
475+
carrier={"traceparent": options.traceparent}
449476
)
477+
if options.traceparent
478+
else None
479+
)
450480

451-
update_everserver_status(status_path, ServerStatus.starting)
452-
logging.getLogger(EVEREST).info(version_info())
453-
logging.getLogger(EVEREST).info(f"Output directory: {output_dir}")
481+
with tracer.start_as_current_span("everest.server", context=ctx):
482+
try:
483+
_configure_loggers(
484+
detached_dir=Path(ServerConfig.get_detached_node_dir(output_dir)),
485+
log_dir=Path(output_dir) / OPTIMIZATION_LOG_DIR,
486+
logging_level=logging_level,
487+
)
454488

455-
authentication = _generate_authentication()
456-
cert_path, key_path, key_pw = _generate_certificate(
457-
ServerConfig.get_certificate_dir(output_dir)
458-
)
459-
host = _get_machine_name()
460-
port = _find_open_port(host, lower=5000, upper=5800)
461-
_write_hostfile(host_file, host, port, cert_path, authentication)
462-
463-
shared_data = {
464-
STOP_ENDPOINT: False,
465-
"started": False,
466-
"events": [],
467-
"subscribers": {},
468-
}
489+
update_everserver_status(status_path, ServerStatus.starting)
490+
logging.getLogger(EVEREST).info(version_info())
491+
logging.getLogger(EVEREST).info(f"Output directory: {output_dir}")
469492

470-
server_config = {
471-
"optimization_output_dir": optimization_output_dir,
472-
"port": port,
473-
"cert_path": cert_path,
474-
"key_path": key_path,
475-
"key_passwd": key_pw,
476-
"authentication": authentication,
477-
}
478-
# Starting the server
479-
everserver_instance = threading.Thread(
480-
target=_everserver_thread,
481-
args=(shared_data, server_config, msg_queue),
482-
)
483-
everserver_instance.daemon = True
484-
everserver_instance.start()
493+
authentication = _generate_authentication()
494+
cert_path, key_path, key_pw = _generate_certificate(
495+
ServerConfig.get_certificate_dir(output_dir)
496+
)
497+
host = _get_machine_name()
498+
port = _find_open_port(host, lower=5000, upper=5800)
499+
_write_hostfile(host_file, host, port, cert_path, authentication)
500+
501+
shared_data = {
502+
STOP_ENDPOINT: False,
503+
"started": False,
504+
"events": [],
505+
"subscribers": {},
506+
}
507+
508+
server_config = {
509+
"optimization_output_dir": optimization_output_dir,
510+
"port": port,
511+
"cert_path": cert_path,
512+
"key_path": key_path,
513+
"key_passwd": key_pw,
514+
"authentication": authentication,
515+
}
516+
# Starting the server
517+
everserver_instance = threading.Thread(
518+
target=_everserver_thread,
519+
args=(shared_data, server_config, msg_queue),
520+
)
521+
everserver_instance.daemon = True
522+
everserver_instance.start()
485523

486-
# Monitoring the server
487-
while True:
488-
try:
489-
item = msg_queue.get(timeout=1) # Wait for data
490-
match item:
491-
case ServerStarted():
492-
update_everserver_status(status_path, ServerStatus.running)
493-
case ServerStopped():
494-
update_everserver_status(status_path, ServerStatus.stopped)
495-
return
496-
case ExperimentFailed():
497-
update_everserver_status(
498-
status_path, ServerStatus.failed, item.msg
499-
)
500-
return
501-
case ExperimentComplete():
502-
status, message = _get_optimization_status(
503-
item.exit_code, item.data
504-
)
505-
update_everserver_status(status_path, status, message)
506-
return
507-
except Empty:
508-
continue
509-
except:
510-
update_everserver_status(
511-
status_path,
512-
ServerStatus.failed,
513-
message=traceback.format_exc(),
514-
)
524+
# Monitoring the server
525+
while True:
526+
try:
527+
item = msg_queue.get(timeout=1) # Wait for data
528+
match item:
529+
case ServerStarted():
530+
update_everserver_status(status_path, ServerStatus.running)
531+
case ServerStopped():
532+
update_everserver_status(status_path, ServerStatus.stopped)
533+
return
534+
case ExperimentFailed():
535+
update_everserver_status(
536+
status_path, ServerStatus.failed, item.msg
537+
)
538+
return
539+
case ExperimentComplete():
540+
status, message = _get_optimization_status(
541+
item.exit_code, item.data
542+
)
543+
update_everserver_status(status_path, status, message)
544+
return
545+
except Empty:
546+
continue
547+
except:
548+
update_everserver_status(
549+
status_path,
550+
ServerStatus.failed,
551+
message=traceback.format_exc(),
552+
)
515553

516554

517555
def _get_optimization_status(

src/everest/plugins/everest_plugin_manager.py

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Any
33

44
import pluggy
5+
from opentelemetry.sdk.trace import TracerProvider
56

67
from everest.plugins import hook_impl, hook_specs
78
from everest.strings import EVEREST
@@ -26,3 +27,10 @@ def add_log_handle_to_root(self) -> None:
2627
root_logger = logging.getLogger()
2728
for handler in self.hook.add_log_handle_to_root():
2829
root_logger.addHandler(handler)
30+
31+
def add_span_processor_to_trace_provider(
32+
self, trace_provider: TracerProvider
33+
) -> None:
34+
span_processors = self.hook.add_span_processor()
35+
for span_processor in span_processors:
36+
trace_provider.add_span_processor(span_processor)

src/everest/plugins/hook_specs.py

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from collections.abc import Sequence
33
from typing import Any
44

5+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
56
from pydantic import BaseModel
67

78
from everest.plugins import hookspec
@@ -102,6 +103,16 @@ def add_log_handle_to_root() -> logging.Handler: # type: ignore[empty-body]
102103
"""
103104

104105

106+
@hookspec
107+
def add_span_processor() -> BatchSpanProcessor: # type: ignore
108+
"""
109+
Create a BatchSpanProcessor which will be added to the trace provider
110+
in ert.
111+
112+
:return: A BatchSpanProcessor that will be added to the trace provider in everest
113+
"""
114+
115+
105116
@hookspec
106117
def get_forward_model_documentations() -> dict[str, Any]: # type: ignore[empty-body]
107118
""" """

src/everest/trace.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from opentelemetry import trace
2+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
3+
from opentelemetry.sdk.trace import SpanLimits, TracerProvider
4+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
5+
6+
resource = Resource(attributes={SERVICE_NAME: "everest"})
7+
tracer_provider = TracerProvider(
8+
resource=resource, span_limits=SpanLimits(max_events=128 * 16)
9+
)
10+
11+
tracer = trace.get_tracer("everest.main", tracer_provider=tracer_provider)
12+
13+
14+
def get_trace_id() -> str:
15+
return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id)
16+
17+
18+
def get_traceparent() -> str | None:
19+
carrier: dict[str, str] = {}
20+
# Write the current context into the carrier.
21+
TraceContextTextMapPropagator().inject(carrier)
22+
return carrier.get("traceparent")

0 commit comments

Comments
 (0)