Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions langfuse/_client/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(self, dataset_item: DatasetItem, langfuse: "Langfuse"):
self.dataset_name = dataset_item.dataset_name
self.created_at = dataset_item.created_at
self.updated_at = dataset_item.updated_at
self.dataset_item = dataset_item

self.langfuse = langfuse

Expand Down Expand Up @@ -179,7 +180,12 @@ class DatasetClient:
updated_at: dt.datetime
items: List[DatasetItemClient]

def __init__(self, dataset: Dataset, items: List[DatasetItemClient]):
def __init__(
self,
dataset: Dataset,
items: List[DatasetItemClient],
langfuse: Optional["Langfuse"] = None,
):
"""Initialize the DatasetClient."""
self.id = dataset.id
self.name = dataset.name
Expand All @@ -189,14 +195,89 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]):
self.created_at = dataset.created_at
self.updated_at = dataset.updated_at
self.items = items
self._langfuse: Optional["Langfuse"] = None
self._langfuse = langfuse

def _get_langfuse_client(self) -> Optional["Langfuse"]:
"""Get the Langfuse client from the first item."""
if self._langfuse is None and self.items:
self._langfuse = self.items[0].langfuse
return self._langfuse

def items_to_dict(self) -> List[Dict[str, Any]]:
"""Convert all dataset items to a list of dictionaries.

Returns:
List of dictionaries, where each dictionary contains the serialized
representation of a dataset item including its id, status, input,
expected_output, metadata, and other attributes.

Example:
Serialize dataset items and process them in separate worker functions.
This pattern is useful for distributed processing where each worker
runs independently (e.g., AWS Step functions, Azure Durable Functions, or separate processes):

```python
import json
from langfuse import get_client
from langfuse.model import DatasetItem
from langfuse._client.datasets import DatasetItemClient

# Worker function (runs in separate process/container)
def process_item(item_json: str, run_name: str) -> dict:
client = get_client()
dataset_item = DatasetItem(**item_dict)
item_client = DatasetItemClient(dataset_item, client)

with item_client.run(run_name=run_name) as span:
output = my_llm_app(item_client.input)
span.score_trace(name="accuracy", value=evaluate(output))

client.flush()
return {"item_id": item_client.id, "output": output}

# Orchestrator: serialize items for distribution
dataset = get_client().get_dataset("my-dataset")
items_as_dicts = dataset.items_to_dict()

# Pass serialized items to workers (e.g., via queue, API, or file)
for item_dict in items_as_dicts:
item_json = json.dumps(item_dict, default=str)
# Send item_json to worker via your orchestration system
```

Using Prefect for parallel task execution:

```python
from prefect import flow, task
from prefect.futures import wait
from langfuse import get_client
from langfuse.model import DatasetItem
from langfuse._client.datasets import DatasetItemClient

@task
def process_item(item_dict: dict, run_name: str) -> dict:
client = get_client()
dataset_item = DatasetItem(**item_dict)
item_client = DatasetItemClient(dataset_item, client)

with item_client.run(run_name=run_name) as span:
output = my_llm_app(item_client.input)
span.score_trace(name="accuracy", value=evaluate(output))

client.flush()
return {"item_id": item_client.id, "output": output}

@flow
def run_evaluation():
dataset = get_client().get_dataset("my-dataset")
items_as_dicts = dataset.items_to_dict()

futures = process_item.map(items_as_dicts, run_name="prefect-run")
wait(futures)
```
"""
return [item.dataset_item.dict() for item in self.items]

def run_experiment(
self,
*,
Expand Down
132 changes: 132 additions & 0 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,135 @@ def execute_dataset_item(item, run_name):
assert "args" in trace.input
assert trace.input["args"][0] == expected_input
assert trace.output == expected_input


def test_items_to_dict_basic():
"""Test that items_to_dict returns a list of dictionaries with correct structure."""
langfuse = Langfuse(debug=False)
dataset_name = create_uuid()
langfuse.create_dataset(name=dataset_name)

input_data = {"input": "Hello World"}
expected_output = {"output": "Expected response"}
metadata = {"key": "value"}

langfuse.create_dataset_item(
dataset_name=dataset_name,
input=input_data,
expected_output=expected_output,
metadata=metadata,
)

dataset = langfuse.get_dataset(dataset_name)
items_dicts = dataset.items_to_dict()

assert len(items_dicts) == 1
item_dict = items_dicts[0]

assert "id" in item_dict
assert "status" in item_dict
assert item_dict["input"] == input_data
assert item_dict["expectedOutput"] == expected_output
assert item_dict["metadata"] == metadata
assert "datasetId" in item_dict
assert "datasetName" in item_dict
assert item_dict["datasetName"] == dataset_name
assert "createdAt" in item_dict
assert "updatedAt" in item_dict


def test_items_to_dict_multiple_items():
"""Test that items_to_dict handles multiple items correctly."""
langfuse = Langfuse(debug=False)
dataset_name = create_uuid()
langfuse.create_dataset(name=dataset_name)

num_items = 5
for i in range(num_items):
langfuse.create_dataset_item(
dataset_name=dataset_name,
input={"index": i},
expected_output={"result": i * 2},
)

dataset = langfuse.get_dataset(dataset_name)
items_dicts = dataset.items_to_dict()

assert len(items_dicts) == num_items

# Check all items have unique IDs
ids = [item["id"] for item in items_dicts]
assert len(set(ids)) == num_items


def test_items_to_dict_reconstruct_item():
"""Test that items can be reconstructed from dict and used with DatasetItemClient."""
from langfuse._client.datasets import DatasetItemClient
from langfuse.model import DatasetItem

langfuse = Langfuse(debug=False)
dataset_name = create_uuid()
langfuse.create_dataset(name=dataset_name)

input_data = {"input": "Test reconstruction"}
expected_output = {"output": "Expected"}
metadata = {"meta_key": "meta_value"}

langfuse.create_dataset_item(
dataset_name=dataset_name,
input=input_data,
expected_output=expected_output,
metadata=metadata,
)

dataset = langfuse.get_dataset(dataset_name)
items_dicts = dataset.items_to_dict()

item_dict = items_dicts[0]
reconstructed_item = DatasetItem(**item_dict)
item_client = DatasetItemClient(reconstructed_item, langfuse)

assert item_client.input == input_data
assert item_client.expected_output == expected_output
assert item_client.metadata == metadata
assert item_client.dataset_name == dataset_name
assert item_client.id == dataset.items[0].id


def test_items_to_dict_with_run():
"""Test that reconstructed items can be used with the run() context manager."""
from langfuse._client.datasets import DatasetItemClient
from langfuse.model import DatasetItem

langfuse = Langfuse(debug=False)
dataset_name = create_uuid()
langfuse.create_dataset(name=dataset_name)

input_data = {"input": "Test with run"}
langfuse.create_dataset_item(dataset_name=dataset_name, input=input_data)

dataset = langfuse.get_dataset(dataset_name)
items_dicts = dataset.items_to_dict()

run_name = create_uuid()
trace_id = None

# Simulate distributed processing by reconstructing item from dict
item_dict = items_dicts[0]
reconstructed_item = DatasetItem(**item_dict)
item_client = DatasetItemClient(reconstructed_item, langfuse)

with item_client.run(run_name=run_name) as span:
trace_id = span.trace_id
span.update_trace(name="reconstructed_item_run", output="test_output")

langfuse.flush()
time.sleep(1)

# Verify the trace was created
assert trace_id is not None
trace = langfuse.api.trace.get(trace_id)

assert trace is not None
assert trace.name == "reconstructed_item_run"
assert trace.output == "test_output"