Skip to content

Prototype: recording gen_ai content on attributes and uploading somewhere else #3397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# pylint: skip-file
import json
import os
import uuid

from openai import OpenAI

# NOTE: OpenTelemetry Python Logs and Events APIs are in beta
from opentelemetry import _events, _logs, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
Expand All @@ -13,11 +14,14 @@
)
from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LogRecordProcessor, LogData
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from dotenv import load_dotenv
load_dotenv()

# configure tracing
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
Expand All @@ -31,21 +35,139 @@
)
_events.set_event_logger_provider(EventLoggerProvider())

# instrument OpenAI
OpenAIInstrumentor().instrument()

def prompt_uploader(span, event, prompt):
attribute_name = "gen_ai.request.inputs_ref"
ref = f"https://my-storage/bucket/{uuid.uuid4()}"
print(f"Uploading prompt to {ref}, prompt: {prompt}")

if span.is_recording():
span.set_attribute(attribute_name, ref)
span.set_attribute("gen_ai.request.inputs", "")

if event:
event.attributes[attribute_name] = ref
event.body = f"prompt uploaded to {ref}"

def completion_uploader(span, event, completion):
attribute_name = "gen_ai.response.outputs_ref"
ref = f"https://my-storage/bucket/{uuid.uuid4()}"
print(f"Uploading completion to {ref}, completion: {completion}")

if span.is_recording():
span.set_attribute(attribute_name, ref)
span.set_attribute("gen_ai.response.outputs", "")

if event:
event.attributes[attribute_name] = ref
event.body = f"completion uploaded to {ref}"

OpenAIInstrumentor().instrument(
capture_sensitive_content=True, # the same as existing OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT env var
capture_verbose_attributes=True,

# | sensitive_content | verbose_attributes | Result |
# |-------------------|--------------------|------------------------------------------------------------------------------------------------------------------------------- |
# | False (default) | False (default) | Prompts/completions - not captured. Not-sensitive opt-in attributes - not captured |
# | False (default) | True | Prompts/completions - not captured. Not-sensitive opt-in attributes - captured |
# | True | False (default) | Prompts/completions - captured on events if DEBUG level is enabled. Not-sensitive opt-in attributes - not captured |
# | True | True | Prompts/completions - captured on attributes and events if DEBUG level is enabled. Not-sensitive opt-in attributes - captured |
# can probably merge two flags in one enum

# optional hooks, independent from above flags:
# prompt_hook=prompt_uploader,
# completion_hook=completion_uploader
)

weather_tool = {
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a given location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City or location name"}
},
"required": ["location"]
}
}
}

location_tool = {
"type": "function",
"function": {
"name": "get_location",
"description": "Get location information",
"parameters": {
}
}
}

response_format = {
"type": "json_schema",
"json_schema": {
"name":"weather_forecast",
"schema" : {
"type": "object",
"properties": {
"temperature": {
"type": "string",
},
"precipitation": {
"type": "string",
},
},
"required": ["temperature"],
"additionalProperties": True,
}
}
}

tracer = trace.get_tracer(__name__)
@tracer.start_as_current_span("main")
def main():
client = OpenAI()
chat_completion = client.chat.completions.create(
model=os.getenv("CHAT_MODEL", "gpt-4o-mini"),
messages=[
model=os.getenv("CHAT_MODEL", "gpt-4o-mini")

messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": "Write a short poem on OpenTelemetry.",
"content": "What is the weather like?",
},
],
]

chat_completion = client.chat.completions.create(
model=model,
messages=messages,
tools=[location_tool, weather_tool],
response_format=response_format
)

while chat_completion.choices[0].finish_reason == "tool_calls":
messages.append(chat_completion.choices[0].message)
# Call the tool with the response from the model
for call in chat_completion.choices[0].message.tool_calls:
function_args = json.loads(call.function.arguments)
if call.function.name == "get_weather":
messages.append(
{"tool_call_id": call.id,
"role": "tool",
"name": "get_weather",
"content": f"Weather in {function_args['location']} is sunny and 75 degrees."})
if call.function.name == "get_location":
messages.append(
{"tool_call_id": call.id,
"role": "tool",
"name": "get_location",
"content": "Seattle, WA"})
chat_completion = client.chat.completions.create(
model=model,
messages=messages,
tools=[location_tool, weather_tool],
response_format=response_format)

print(chat_completion.choices[0].message.content)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@
---
"""

import json
from typing import Collection

from wrapt import wrap_function_wrapper

from opentelemetry._events import get_event_logger
from opentelemetry._events import get_event_logger, Event
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.openai_v2.package import _instruments
from opentelemetry.instrumentation.openai_v2.utils import is_content_enabled
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import get_tracer
from opentelemetry._logs.severity import SeverityNumber

from .instruments import Instruments
from .patch import async_chat_completions_create, chat_completions_create
Expand All @@ -64,6 +66,69 @@ def __init__(self):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def record_prompt_content(self, span, prompt_body):
"""Default prompt hook."""

prompt_event = None

if self.content_enabled:
# TODO: perf - check if DEBUG is enabled
prompt_event = Event(
"gen_ai.request.inputs",
body=prompt_body, # note, semconvs are switching to event attributes instead of body, so this is temporary
attributes={
"gen_ai.system": "openai",
},
severity_number=SeverityNumber.DEBUG,
)

if self.capture_verbose_attributes and span.is_recording():
span.set_attribute("gen_ai.request.inputs", json.dumps(prompt_body, ensure_ascii=False))

if self.custom_prompt_hook:
try:
self.custom_prompt_hook(span, prompt_event, prompt_body)
except Exception as e:
# TODO - proper internal logging
print(f"Error in prompt hook, turning it off: {e}")
self.custom_prompt_hook = None
pass

# prompt hook can modify the event, so we need to emit it after the hook is called
if prompt_event:
self.event_logger.emit(prompt_event)

def record_completion_content(self, span, completion_body):
"""Default completion hook."""

completion_event = None
if self.content_enabled:
# TODO: perf - check if DEBUG is enabled
completion_event = Event(
"gen_ai.response.outputs",
body=completion_body, # note, semconvs are switching to event attributes instead of body, so this is temporary
attributes={
"gen_ai.system": "openai",
},
severity_number=SeverityNumber.DEBUG,
)

if self.capture_verbose_attributes and span.is_recording():
span.set_attribute("gen_ai.response.outputs", json.dumps(completion_body, ensure_ascii=False))

if self.custom_completion_hook:
try:
self.custom_completion_hook(span, completion_event, completion_body)
except Exception as e:
# TODO - proper internal logging
print(f"Error in completion hook, turning it off: {e}")
self.custom_completion_hook = None
pass

# completion hook can modify the event, so we need to emit it after the hook is called
if completion_event:
self.event_logger.emit(completion_event)

def _instrument(self, **kwargs):
"""Enable OpenAI instrumentation."""
tracer_provider = kwargs.get("tracer_provider")
Expand All @@ -74,7 +139,7 @@ def _instrument(self, **kwargs):
schema_url=Schemas.V1_28_0.value,
)
event_logger_provider = kwargs.get("event_logger_provider")
event_logger = get_event_logger(
self.event_logger = get_event_logger(
__name__,
"",
schema_url=Schemas.V1_28_0.value,
Expand All @@ -90,19 +155,33 @@ def _instrument(self, **kwargs):

instruments = Instruments(self._meter)

self.capture_verbose_attributes = kwargs.get("capture_verbose_attributes", True)

self.content_enabled = is_content_enabled() or kwargs.get("capture_sensitive_content", False)
self.custom_prompt_hook = kwargs.get("prompt_hook")
self.custom_completion_hook = kwargs.get("completion_hook")

prompt_hook = None
if self.content_enabled or self.custom_prompt_hook:
prompt_hook = self.record_prompt_content

completion_hook = None
if self.content_enabled or self.custom_completion_hook:
completion_hook = self.record_completion_content

wrap_function_wrapper(
module="openai.resources.chat.completions",
name="Completions.create",
wrapper=chat_completions_create(
tracer, event_logger, instruments, is_content_enabled()
tracer, self.event_logger, instruments, prompt_hook, completion_hook, self.capture_verbose_attributes
),
)

wrap_function_wrapper(
module="openai.resources.chat.completions",
name="AsyncCompletions.create",
wrapper=async_chat_completions_create(
tracer, event_logger, instruments, is_content_enabled()
tracer, self.event_logger, instruments, prompt_hook, completion_hook, self.capture_verbose_attributes
),
)

Expand Down
Loading
Loading