Skip to content

Commit dee9540

Browse files
committed
Add SpanProcessor for OpenTelemetry
1 parent 5f6a0c9 commit dee9540

File tree

11 files changed

+949
-1
lines changed

11 files changed

+949
-1
lines changed

config/config.exs

+5
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ if config_env() == :test do
1616
end
1717

1818
config :phoenix, :json_library, if(Code.ensure_loaded?(JSON), do: JSON, else: Jason)
19+
20+
config :opentelemetry, span_processor: {Sentry.OpenTelemetry.SpanProcessor, []}
21+
22+
config :opentelemetry,
23+
sampler: {Sentry.OpenTelemetry.Sampler, [drop: ["Elixir.Oban.Stager process"]]}

lib/sentry/application.ex

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ defmodule Sentry.Application do
3232
Sentry.Sources,
3333
Sentry.Dedupe,
3434
Sentry.ClientReport.Sender,
35+
Sentry.OpenTelemetry.SpanStorage,
3536
{Sentry.Integrations.CheckInIDMappings,
3637
[
3738
max_expected_check_in_time:

lib/sentry/opentelemetry/sampler.ex

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Sentry.OpenTelemetry.Sampler do
2+
@moduledoc false
3+
4+
def setup(config) do
5+
config
6+
end
7+
8+
def description(_) do
9+
"SentrySampler"
10+
end
11+
12+
def should_sample(
13+
_ctx,
14+
_trace_id,
15+
_links,
16+
span_name,
17+
_span_kind,
18+
_attributes,
19+
config
20+
) do
21+
if span_name in config[:drop] do
22+
{:drop, [], []}
23+
else
24+
{:record_and_sample, [], []}
25+
end
26+
end
27+
end
+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
defmodule Sentry.OpenTelemetry.SpanProcessor do
2+
@moduledoc false
3+
4+
require OpenTelemetry.SemConv.ClientAttributes, as: ClientAttributes
5+
require OpenTelemetry.SemConv.Incubating.DBAttributes, as: DBAttributes
6+
require OpenTelemetry.SemConv.Incubating.HTTPAttributes, as: HTTPAttributes
7+
require OpenTelemetry.SemConv.Incubating.URLAttributes, as: URLAttributes
8+
require OpenTelemetry.SemConv.Incubating.MessagingAttributes, as: MessagingAttributes
9+
@behaviour :otel_span_processor
10+
11+
require Logger
12+
13+
alias Sentry.{Transaction, OpenTelemetry.SpanStorage, OpenTelemetry.SpanRecord}
14+
alias Sentry.Interfaces.Span
15+
16+
@impl true
17+
def on_start(_ctx, otel_span, _config) do
18+
span_record = SpanRecord.new(otel_span)
19+
20+
SpanStorage.store_span(span_record)
21+
22+
otel_span
23+
end
24+
25+
@impl true
26+
def on_end(otel_span, _config) do
27+
span_record = SpanRecord.new(otel_span)
28+
29+
SpanStorage.update_span(span_record)
30+
31+
if span_record.parent_span_id == nil do
32+
root_span_record = SpanStorage.get_root_span(span_record.span_id)
33+
child_span_records = SpanStorage.get_child_spans(span_record.span_id)
34+
transaction = build_transaction(root_span_record, child_span_records)
35+
36+
result =
37+
case Sentry.send_transaction(transaction) do
38+
{:ok, _id} ->
39+
true
40+
41+
:ignored ->
42+
true
43+
44+
{:error, error} ->
45+
Logger.error("Failed to send transaction to Sentry: #{inspect(error)}")
46+
{:error, :invalid_span}
47+
end
48+
49+
:ok = SpanStorage.remove_span(span_record.span_id)
50+
51+
result
52+
else
53+
true
54+
end
55+
end
56+
57+
@impl true
58+
def force_flush(_config) do
59+
:ok
60+
end
61+
62+
defp build_transaction(root_span_record, child_span_records) do
63+
root_span = build_span(root_span_record)
64+
child_spans = Enum.map(child_span_records, &build_span(&1))
65+
66+
Transaction.new(%{
67+
span_id: root_span.span_id,
68+
transaction: transaction_name(root_span_record),
69+
transaction_info: %{source: :custom},
70+
start_timestamp: root_span_record.start_time,
71+
timestamp: root_span_record.end_time,
72+
contexts: %{
73+
trace: build_trace_context(root_span_record),
74+
otel: build_otel_context(root_span_record)
75+
},
76+
spans: child_spans
77+
})
78+
end
79+
80+
defp transaction_name(
81+
%{attributes: %{unquote(to_string(MessagingAttributes.messaging_system())) => :oban}} =
82+
span_record
83+
) do
84+
span_record.attributes["oban.job.worker"]
85+
end
86+
87+
defp transaction_name(span_record), do: span_record.name
88+
89+
defp build_trace_context(span_record) do
90+
{op, description} = get_op_description(span_record)
91+
92+
%{
93+
trace_id: span_record.trace_id,
94+
span_id: span_record.span_id,
95+
parent_span_id: span_record.parent_span_id,
96+
op: op,
97+
description: description,
98+
origin: span_record.origin,
99+
data: span_record.attributes
100+
}
101+
end
102+
103+
defp build_otel_context(span_record), do: span_record.attributes
104+
105+
defp get_op_description(
106+
%{
107+
attributes: %{
108+
unquote(to_string(HTTPAttributes.http_request_method())) => http_request_method
109+
}
110+
} = span_record
111+
) do
112+
op = "http.#{span_record.kind}"
113+
client_address = Map.get(span_record.attributes, to_string(ClientAttributes.client_address()))
114+
url_path = Map.get(span_record.attributes, to_string(URLAttributes.url_path()))
115+
116+
description =
117+
to_string(http_request_method) <>
118+
((client_address && " from #{client_address}") || "") <>
119+
((url_path && " #{url_path}") || "")
120+
121+
{op, description}
122+
end
123+
124+
defp get_op_description(
125+
%{attributes: %{unquote(to_string(DBAttributes.db_system())) => _db_system}} =
126+
span_record
127+
) do
128+
db_query_text = Map.get(span_record.attributes, "db.statement")
129+
130+
{"db", db_query_text}
131+
end
132+
133+
defp get_op_description(%{
134+
attributes:
135+
%{unquote(to_string(MessagingAttributes.messaging_system())) => :oban} = attributes
136+
}) do
137+
{"queue.process", attributes["oban.job.worker"]}
138+
end
139+
140+
defp get_op_description(span_record) do
141+
{span_record.name, span_record.name}
142+
end
143+
144+
defp build_span(span_record) do
145+
{op, description} = get_op_description(span_record)
146+
147+
%Span{
148+
op: op,
149+
description: description,
150+
start_timestamp: span_record.start_time,
151+
timestamp: span_record.end_time,
152+
trace_id: span_record.trace_id,
153+
span_id: span_record.span_id,
154+
parent_span_id: span_record.parent_span_id,
155+
origin: span_record.origin,
156+
data: Map.put(span_record.attributes, "otel.kind", span_record.kind),
157+
status: span_status(span_record)
158+
}
159+
end
160+
161+
defp span_status(%{
162+
attributes: %{
163+
unquote(to_string(HTTPAttributes.http_response_status_code())) =>
164+
http_response_status_code
165+
}
166+
}) do
167+
to_status(http_response_status_code)
168+
end
169+
170+
defp span_status(_span_record), do: nil
171+
172+
# WebSocket upgrade spans doesn't have a HTTP status
173+
defp to_status(nil), do: nil
174+
175+
defp to_status(status) when status in 200..299, do: "ok"
176+
177+
for {status, string} <- %{
178+
400 => "invalid_argument",
179+
401 => "unauthenticated",
180+
403 => "permission_denied",
181+
404 => "not_found",
182+
409 => "already_exists",
183+
429 => "resource_exhausted",
184+
499 => "cancelled",
185+
500 => "internal_error",
186+
501 => "unimplemented",
187+
503 => "unavailable",
188+
504 => "deadline_exceeded"
189+
} do
190+
defp to_status(unquote(status)), do: unquote(string)
191+
end
192+
193+
defp to_status(_any), do: "unknown_error"
194+
end
+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
defmodule Sentry.OpenTelemetry.SpanRecord do
2+
require Record
3+
require OpenTelemetry
4+
5+
@fields Record.extract(:span, from_lib: "opentelemetry/include/otel_span.hrl")
6+
Record.defrecordp(:span, @fields)
7+
8+
defstruct @fields ++ [:origin]
9+
10+
def new(otel_span) do
11+
otel_attrs = span(otel_span)
12+
13+
{:attributes, _, _, _, attributes} = otel_attrs[:attributes]
14+
15+
origin =
16+
case otel_attrs[:instrumentation_scope] do
17+
{:instrumentation_scope, origin, _version, _} ->
18+
origin
19+
20+
_ ->
21+
:undefined
22+
end
23+
24+
attrs =
25+
otel_attrs
26+
|> Keyword.delete(:attributes)
27+
|> Keyword.merge(
28+
trace_id: cast_trace_id(otel_attrs[:trace_id]),
29+
span_id: cast_span_id(otel_attrs[:span_id]),
30+
parent_span_id: cast_span_id(otel_attrs[:parent_span_id]),
31+
origin: origin,
32+
start_time: cast_timestamp(otel_attrs[:start_time]),
33+
end_time: cast_timestamp(otel_attrs[:end_time]),
34+
attributes: normalize_attributes(attributes)
35+
)
36+
|> Map.new()
37+
38+
struct(__MODULE__, attrs)
39+
end
40+
41+
defp normalize_attributes(attributes) do
42+
Enum.map(attributes, fn {key, value} ->
43+
{to_string(key), value}
44+
end)
45+
|> Map.new()
46+
end
47+
48+
defp cast_span_id(nil), do: nil
49+
defp cast_span_id(:undefined), do: nil
50+
defp cast_span_id(span_id), do: bytes_to_hex(span_id, 16)
51+
52+
defp cast_trace_id(trace_id), do: bytes_to_hex(trace_id, 32)
53+
54+
defp cast_timestamp(:undefined), do: nil
55+
defp cast_timestamp(nil), do: nil
56+
57+
defp cast_timestamp(timestamp) do
58+
nano_timestamp = OpenTelemetry.timestamp_to_nano(timestamp)
59+
{:ok, datetime} = DateTime.from_unix(div(nano_timestamp, 1_000_000), :millisecond)
60+
61+
DateTime.to_iso8601(datetime)
62+
end
63+
64+
defp bytes_to_hex(bytes, length) do
65+
case(:otel_utils.format_binary_string("~#{length}.16.0b", [bytes])) do
66+
{:ok, result} -> result
67+
{:error, _} -> raise "Failed to convert bytes to hex: #{inspect(bytes)}"
68+
end
69+
end
70+
end

0 commit comments

Comments
 (0)