Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
aa59f2c
update fern
hassiebp Nov 7, 2024
e1536b3
add media upload
hassiebp Nov 12, 2024
266d334
add mediawrapper
hassiebp Nov 12, 2024
5b33316
add tests
hassiebp Nov 12, 2024
ea186e4
add decorator test
hassiebp Nov 12, 2024
71c717b
add langchain test
hassiebp Nov 12, 2024
d03e275
rename to LangfuseMedia
hassiebp Nov 12, 2024
5338ebb
fix test client
hassiebp Nov 12, 2024
a7dcb63
add upload time ms
hassiebp Nov 13, 2024
6be8f66
Merge branch 'main' into add-multimodal-support
hassiebp Nov 13, 2024
a810062
update post endpoint
hassiebp Nov 14, 2024
a590e15
keep openai format on audio
hassiebp Nov 14, 2024
5f24726
check for mediacontenttype
hassiebp Nov 14, 2024
634f7e4
add tests
hassiebp Nov 14, 2024
db93586
add retries
hassiebp Nov 14, 2024
f40972e
Merge branch 'main' into add-multimodal-support
hassiebp Nov 14, 2024
61b87c6
fix decorator serialization
hassiebp Nov 14, 2024
ca74a28
fix test
hassiebp Nov 14, 2024
9a817db
fix operand
hassiebp Nov 14, 2024
f05dbcb
pause upload consumer after ingestion
hassiebp Nov 14, 2024
8bf9b0d
move paramspec to typing extensions
hassiebp Nov 14, 2024
023658d
fix type access for python <3.10
hassiebp Nov 15, 2024
d55e56f
search for media 10 levels deep only
hassiebp Nov 15, 2024
4b269a0
handle ingestion consumer errors gracefully
hassiebp Nov 15, 2024
f11776e
revert decorator io media
hassiebp Nov 15, 2024
fcb71fd
make data uri parsing more robust
hassiebp Nov 15, 2024
aaef73c
use threads for media upload threads
hassiebp Nov 15, 2024
ca7bcc1
fix test
hassiebp Nov 15, 2024
e909c6c
fix test
hassiebp Nov 15, 2024
7f966ff
fix logging
hassiebp Nov 15, 2024
1a141e4
fix openai test
hassiebp Nov 15, 2024
b655790
address comments
hassiebp Nov 15, 2024
9706d63
fix ci script
hassiebp Nov 15, 2024
a93d5c8
push
hassiebp Nov 15, 2024
bc37741
remove queue type for 3.8 support
hassiebp Nov 15, 2024
281b50c
fix test
hassiebp Nov 15, 2024
c83e6c7
fix openai test
hassiebp Nov 15, 2024
1b2bbe1
fix langchain test
hassiebp Nov 15, 2024
22dc7a8
fix
hassiebp Nov 15, 2024
765850d
push
hassiebp Nov 15, 2024
359e995
push
hassiebp Nov 15, 2024
1f2df0b
fix
hassiebp Nov 15, 2024
10d8977
fix string stripping
hassiebp Nov 15, 2024
d19f4f1
increase ci timeout
hassiebp Nov 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

242 changes: 242 additions & 0 deletions langfuse/_task_manager/media_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import logging
from queue import Empty
from typing import Any, Callable, Optional, TypeVar, ParamSpec

import time
import requests
import backoff

from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody
from langfuse.api.core import ApiError
from langfuse.api.client import FernLangfuse
from langfuse.media import LangfuseMedia
from langfuse.utils import _get_timestamp

from .media_upload_queue import MediaUploadQueue, UploadMediaJob


T = TypeVar("T")
P = ParamSpec("P")


class MediaManager:
_log = logging.getLogger(__name__)

def __init__(
self,
*,
api_client: FernLangfuse,
media_upload_queue: MediaUploadQueue,
max_retries: Optional[int] = 3,
):
self._api_client = api_client
self._queue = media_upload_queue
self._max_retries = max_retries

def process_next_media_upload(self):
try:
upload_job = self._queue.get(block=True, timeout=1)
self._log.debug(f"Processing upload for {upload_job['media_id']}")
self._process_upload_media_job(data=upload_job)

self._queue.task_done()
except Empty:
self._log.debug("Media upload queue is empty")
pass
except Exception as e:
self._log.error(f"Error uploading media: {e}")
self._queue.task_done()

def process_media_in_event(self, event: dict):
try:
if "body" not in event:
return

body = event["body"]
trace_id = body.get("traceId", None) or (
body.get("id", None)
if "type" in event and "trace" in event["type"]
else None
)

if trace_id is None:
raise ValueError("trace_id is required for media upload")

observation_id = (
body.get("id", None)
if "type" in event
and ("generation" in event["type"] or "span" in event["type"])
else None
)

multimodal_fields = ["input", "output", "metadata"]

for field in multimodal_fields:
if field in body:
processed_data = self._find_and_process_media(
data=body[field],
trace_id=trace_id,
observation_id=observation_id,
field=field,
)

body[field] = processed_data

except Exception as e:
self._log.error(f"Error processing multimodal event: {e}")

def _find_and_process_media(
self,
*,
data: Any,
trace_id: str,
observation_id: Optional[str],
field: str,
):
seen = set()

def _process_data_recursively(data: Any):
if id(data) in seen:
return data

seen.add(id(data))

if isinstance(data, LangfuseMedia):
self._process_media(
media=data,
trace_id=trace_id,
observation_id=observation_id,
field=field,
)

return data

if isinstance(data, str) and data.startswith("data:"):
media = LangfuseMedia(
obj=data,
base64_data_uri=data,
)

self._process_media(
media=media,
trace_id=trace_id,
observation_id=observation_id,
field=field,
)

return media

if isinstance(data, list):
return [_process_data_recursively(item) for item in data]

if isinstance(data, dict):
return {
key: _process_data_recursively(value) for key, value in data.items()
}

return data

return _process_data_recursively(data)

def _process_media(
self,
*,
media: LangfuseMedia,
trace_id: str,
observation_id: Optional[str],
field: str,
):
if (
media._content_length is None
or media._content_type is None
or media._content_sha256_hash is None
or media._content_bytes is None
):
return

upload_url_response = self._request_with_backoff(
self._api_client.media.get_upload_url,
request=GetMediaUploadUrlRequest(
contentLength=media._content_length,
contentType=media._content_type,
sha256Hash=media._content_sha256_hash,
field=field,
traceId=trace_id,
observationId=observation_id,
),
)

upload_url = upload_url_response.upload_url
media._media_id = upload_url_response.media_id # Important as this is will be used in the media reference string in serializer

if upload_url is not None:
self._log.debug(f"Scheduling upload for {media._media_id}")
self._queue.put(
item={
"upload_url": upload_url,
"media_id": media._media_id,
"content_bytes": media._content_bytes,
"content_type": media._content_type,
"content_sha256_hash": media._content_sha256_hash,
},
block=True,
timeout=1,
)

else:
self._log.debug(f"Media {media._media_id} already uploaded")

def _process_upload_media_job(
self,
*,
data: UploadMediaJob,
):
upload_start_time = time.time()
upload_response = self._request_with_backoff(
requests.put,
data["upload_url"],
headers={
"Content-Type": data["content_type"],
"x-amz-checksum-sha256": data["content_sha256_hash"],
},
data=data["content_bytes"],
)
upload_time_ms = int((time.time() - upload_start_time) * 1000)

self._request_with_backoff(
self._api_client.media.patch,
media_id=data["media_id"],
request=PatchMediaBody(
uploadedAt=_get_timestamp(),
uploadHttpStatus=upload_response.status_code,
uploadHttpError=upload_response.text,
uploadTimeMs=upload_time_ms,
),
)

self._log.debug(
f"Media upload completed for {data['media_id']} in {upload_time_ms}ms"
)

def _request_with_backoff(
self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
@backoff.on_exception(
backoff.expo, Exception, max_tries=self._max_retries, logger=None
)
def execute_task_with_backoff() -> T:
try:
return func(*args, **kwargs)
except ApiError as e:
if (
e.status_code is not None
and 400 <= e.status_code < 500
and (e.status_code) != 429
):
raise e
except Exception as e:
raise e

raise Exception("Failed to execute task")

return execute_task_with_backoff()
39 changes: 39 additions & 0 deletions langfuse/_task_manager/media_upload_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
import threading

from .media_manager import MediaManager


class MediaUploadConsumer(threading.Thread):
_log = logging.getLogger(__name__)
_identifier: int
_max_retries: int
_media_manager: MediaManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: max_retries is defined as a class attribute but never used in the implementation


def __init__(
self,
*,
identifier: int,
media_manager: MediaManager,
):
"""Create a consumer thread."""
super().__init__()
# Make consumer a daemon thread so that it doesn't block program exit
self.daemon = True
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self._identifier = identifier
self._media_manager = media_manager

def run(self):
"""Run the media upload consumer."""
self._log.debug("consumer is running...")
while self.running:
self._media_manager.process_next_media_upload()
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: no error handling around process_next_media_upload() - unhandled exceptions will crash the thread


def pause(self):
"""Pause the media upload consumer."""
self.running = False
14 changes: 14 additions & 0 deletions langfuse/_task_manager/media_upload_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from queue import Queue
from typing import TypedDict


class UploadMediaJob(TypedDict):
upload_url: str
media_id: str
content_type: str
content_bytes: bytes
content_sha256_hash: str


class MediaUploadQueue(Queue[UploadMediaJob]):
pass
Loading