Skip to content

Commit

Permalink
feat: Report pre and post compressed content length to API (#1359)
Browse files Browse the repository at this point in the history
### Purpose
Report full and compressed content length to API for monitoring `zstd`
compression ratio for real-world tracing
  • Loading branch information
angus-langchain authored Jan 3, 2025
1 parent a949edf commit 76b4123
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 30 deletions.
36 changes: 25 additions & 11 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TYPE_CHECKING,
List,
Optional,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -98,12 +99,14 @@ def _tracing_thread_drain_queue(

def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Optional[io.BytesIO]:
) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]:
assert client.compressed_runs is not None
with client.compressed_runs.lock:
client.compressed_runs.compressor_writer.flush()
current_size = client.compressed_runs.buffer.tell()

pre_compressed_size = client.compressed_runs.uncompressed_size

if size_limit is not None and size_limit <= 0:
raise ValueError(f"size_limit must be positive; got {size_limit}")
if size_limit_bytes is not None and size_limit_bytes < 0:
Expand All @@ -114,18 +117,20 @@ def _tracing_thread_drain_compressed_buffer(
if (size_limit_bytes is None or current_size < size_limit_bytes) and (
size_limit is None or client.compressed_runs.run_count < size_limit
):
return None
return None, None

# Write final boundary and close compression stream
client.compressed_runs.compressor_writer.write(f"--{_BOUNDARY}--\r\n".encode())
client.compressed_runs.compressor_writer.close()

filled_buffer = client.compressed_runs.buffer

compressed_runs_info = (pre_compressed_size, current_size)

client.compressed_runs.reset()

filled_buffer.seek(0)
return filled_buffer
return (filled_buffer, compressed_runs_info)


def _tracing_thread_handle_batch(
Expand Down Expand Up @@ -303,35 +308,44 @@ def keep_thread_active() -> bool:
continue
client._data_available_event.clear()

data_stream = _tracing_thread_drain_compressed_buffer(
data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, data_stream
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream)
client._send_compressed_multipart_req(data_stream, compressed_runs_info)

# Drain the buffer on exit
try:
final_data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
)
if final_data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, final_data_stream
client._send_compressed_multipart_req,
final_data_stream,
compressed_runs_info,
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(final_data_stream)
client._send_compressed_multipart_req(
final_data_stream,
compressed_runs_info,
)

except Exception:
logger.error("Error in final cleanup", exc_info=True)
Expand Down
4 changes: 4 additions & 0 deletions python/langsmith/_internal/_compressed_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def __init__(self):
self.buffer = io.BytesIO()
self.run_count = 0
self.lock = threading.Lock()
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
Expand All @@ -30,6 +32,8 @@ def __init__(self):
def reset(self):
self.buffer = io.BytesIO()
self.run_count = 0
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
Expand Down
22 changes: 9 additions & 13 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@
import uuid
from typing import Literal, Optional, Union, cast

try:
from zstandard import ZstdCompressionWriter # type: ignore[import]
except ImportError:

class ZstdCompressionWriter: # type: ignore[no-redef]
"""only used for typing checks."""


from langsmith import schemas as ls_schemas
from langsmith._internal import _orjson
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._multipart import MultipartPart, MultipartPartsAndContext
from langsmith._internal._serde import dumps_json as _dumps_json

Expand Down Expand Up @@ -283,7 +276,7 @@ def serialized_run_operation_to_multipart_parts_and_context(

def compress_multipart_parts_and_context(
parts_and_context: MultipartPartsAndContext,
compressor_writer: ZstdCompressionWriter,
compressed_runs: CompressedRuns,
boundary: str,
) -> None:
for part_name, (filename, data, content_type, headers) in parts_and_context.parts:
Expand All @@ -303,12 +296,15 @@ def compress_multipart_parts_and_context(
]
)

compressor_writer.write("".join(header_parts).encode())
compressed_runs.compressor_writer.write("".join(header_parts).encode())

if isinstance(data, (bytes, bytearray)):
compressor_writer.write(data)
compressed_runs.uncompressed_size += len(data)
compressed_runs.compressor_writer.write(data)
else:
compressor_writer.write(str(data).encode())
encoded_data = str(data).encode()
compressed_runs.uncompressed_size += len(encoded_data)
compressed_runs.compressor_writer.write(encoded_data)

# Write part terminator
compressor_writer.write(b"\r\n")
compressed_runs.compressor_writer.write(b"\r\n")
27 changes: 21 additions & 6 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ def create_run(
with self.compressed_runs.lock:
compress_multipart_parts_and_context(
multipart_form,
self.compressed_runs.compressor_writer,
self.compressed_runs,
_BOUNDARY,
)
self.compressed_runs.run_count += 1
Expand Down Expand Up @@ -1816,7 +1816,13 @@ def _send_multipart_req(self, acc: MultipartPartsAndContext, *, attempts: int =
# do not retry by default
return

def _send_compressed_multipart_req(self, data_stream, *, attempts: int = 3):
def _send_compressed_multipart_req(
self,
data_stream: io.BytesIO,
compressed_runs_info: Optional[Tuple[int, int]],
*,
attempts: int = 3,
):
"""Send a zstd-compressed multipart form data stream to the backend."""
_context: str = ""

Expand All @@ -1830,6 +1836,12 @@ def _send_compressed_multipart_req(self, data_stream, *, attempts: int = 3):
"X-API-KEY": api_key,
"Content-Type": f"multipart/form-data; boundary={_BOUNDARY}",
"Content-Encoding": "zstd",
"X-Pre-Compressed-Size": (
str(compressed_runs_info[0]) if compressed_runs_info else ""
),
"X-Post-Compressed-Size": (
str(compressed_runs_info[1]) if compressed_runs_info else ""
),
}

self.request_with_retries(
Expand Down Expand Up @@ -1985,7 +1997,7 @@ def update_run(
with self.compressed_runs.lock:
compress_multipart_parts_and_context(
multipart_form,
self.compressed_runs.compressor_writer,
self.compressed_runs,
_BOUNDARY,
)
self.compressed_runs.run_count += 1
Expand Down Expand Up @@ -2024,8 +2036,10 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
_tracing_thread_drain_compressed_buffer,
)

final_data_stream = _tracing_thread_drain_compressed_buffer(
self, size_limit=1, size_limit_bytes=1
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
self, size_limit=1, size_limit_bytes=1
)
)

if final_data_stream is not None:
Expand All @@ -2035,13 +2049,14 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
future = HTTP_REQUEST_THREAD_POOL.submit(
self._send_compressed_multipart_req,
final_data_stream,
compressed_runs_info,
attempts=attempts,
)
self._futures.add(future)
except RuntimeError:
# In case the ThreadPoolExecutor is already shutdown
self._send_compressed_multipart_req(
final_data_stream, attempts=attempts
final_data_stream, compressed_runs_info, attempts=attempts
)

# If we got a future, wait for it to complete
Expand Down

0 comments on commit 76b4123

Please sign in to comment.