From de4c08b81cf6502579d2144f56ae492c2bd9362b Mon Sep 17 00:00:00 2001 From: danielnyari-seon Date: Fri, 12 Dec 2025 11:20:33 +0100 Subject: [PATCH 1/3] add dataset serialization --- langfuse/_client/datasets.py | 85 +++++++++++++++++++++- tests/test_datasets.py | 132 +++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 2 deletions(-) diff --git a/langfuse/_client/datasets.py b/langfuse/_client/datasets.py index 0a9a0312c..67e3885ed 100644 --- a/langfuse/_client/datasets.py +++ b/langfuse/_client/datasets.py @@ -90,6 +90,7 @@ def __init__(self, dataset_item: DatasetItem, langfuse: "Langfuse"): self.dataset_name = dataset_item.dataset_name self.created_at = dataset_item.created_at self.updated_at = dataset_item.updated_at + self.dataset_item = dataset_item self.langfuse = langfuse @@ -179,7 +180,12 @@ class DatasetClient: updated_at: dt.datetime items: List[DatasetItemClient] - def __init__(self, dataset: Dataset, items: List[DatasetItemClient]): + def __init__( + self, + dataset: Dataset, + items: List[DatasetItemClient], + langfuse: Optional["Langfuse"] = None, + ): """Initialize the DatasetClient.""" self.id = dataset.id self.name = dataset.name @@ -189,7 +195,7 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]): self.created_at = dataset.created_at self.updated_at = dataset.updated_at self.items = items - self._langfuse: Optional["Langfuse"] = None + self._langfuse = langfuse def _get_langfuse_client(self) -> Optional["Langfuse"]: """Get the Langfuse client from the first item.""" @@ -197,6 +203,81 @@ def _get_langfuse_client(self) -> Optional["Langfuse"]: self._langfuse = self.items[0].langfuse return self._langfuse + def items_to_dict(self) -> List[Dict[str, Any]]: + """Convert all dataset items to a list of dictionaries. + + Returns: + List of dictionaries, where each dictionary contains the serialized + representation of a dataset item including its id, status, input, + expected_output, metadata, and other attributes. + + Example: + Serialize dataset items and process them in separate worker functions. + This pattern is useful for distributed processing where each worker + runs independently (e.g., AWS Step functions, Azure Durable Functions, or separate processes): + + ```python + import json + from langfuse import get_client + from langfuse.model import DatasetItem + from langfuse._client.datasets import DatasetItemClient + + # Worker function (runs in separate process/container) + def process_item(item_json: str, run_name: str) -> dict: + client = get_client() + dataset_item = DatasetItem(**item_dict) + item_client = DatasetItemClient(dataset_item, client) + + with item_client.run(run_name=run_name) as span: + output = my_llm_app(item_client.input) + span.score_trace(name="accuracy", value=evaluate(output)) + + client.flush() + return {"item_id": item_client.id, "output": output} + + # Orchestrator: serialize items for distribution + dataset = get_client().get_dataset("my-dataset") + items_as_dicts = dataset.items_to_dict() + + # Pass serialized items to workers (e.g., via queue, API, or file) + for item_dict in items_as_dicts: + item_json = json.dumps(item_dict, default=str) + # Send item_json to worker via your orchestration system + ``` + + Using Prefect for parallel task execution: + + ```python + from prefect import flow, task + from prefect.futures import wait + from langfuse import get_client + from langfuse.model import DatasetItem + from langfuse._client.datasets import DatasetItemClient + + @task + def process_item(item_dict: dict, run_name: str) -> dict: + client = get_client() + dataset_item = DatasetItem(**item_dict) + item_client = DatasetItemClient(dataset_item, client) + + with item_client.run(run_name=run_name) as span: + output = my_llm_app(item_client.input) + span.score_trace(name="accuracy", value=evaluate(output)) + + client.flush() + return {"item_id": item_client.id, "output": output} + + @flow + def run_evaluation(): + dataset = get_client().get_dataset("my-dataset") + items_as_dicts = dataset.items_to_dict() + + futures = process_item.map(items_as_dicts, run_name="prefect-run") + wait(futures) + ``` + """ + return [item.dataset_item.dict() for item in self.items] + def run_experiment( self, *, diff --git a/tests/test_datasets.py b/tests/test_datasets.py index c3ad7a318..2824a5371 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -418,3 +418,135 @@ def execute_dataset_item(item, run_name): assert "args" in trace.input assert trace.input["args"][0] == expected_input assert trace.output == expected_input + + +def test_items_to_dict_basic(): + """Test that items_to_dict returns a list of dictionaries with correct structure.""" + langfuse = Langfuse(debug=False) + dataset_name = create_uuid() + langfuse.create_dataset(name=dataset_name) + + input_data = {"input": "Hello World"} + expected_output = {"output": "Expected response"} + metadata = {"key": "value"} + + langfuse.create_dataset_item( + dataset_name=dataset_name, + input=input_data, + expected_output=expected_output, + metadata=metadata, + ) + + dataset = langfuse.get_dataset(dataset_name) + items_dicts = dataset.items_to_dict() + + assert len(items_dicts) == 1 + item_dict = items_dicts[0] + + assert "id" in item_dict + assert "status" in item_dict + assert item_dict["input"] == input_data + assert item_dict["expectedOutput"] == expected_output + assert item_dict["metadata"] == metadata + assert "datasetId" in item_dict + assert "datasetName" in item_dict + assert item_dict["datasetName"] == dataset_name + assert "createdAt" in item_dict + assert "updatedAt" in item_dict + + +def test_items_to_dict_multiple_items(): + """Test that items_to_dict handles multiple items correctly.""" + langfuse = Langfuse(debug=False) + dataset_name = create_uuid() + langfuse.create_dataset(name=dataset_name) + + num_items = 5 + for i in range(num_items): + langfuse.create_dataset_item( + dataset_name=dataset_name, + input={"index": i}, + expected_output={"result": i * 2}, + ) + + dataset = langfuse.get_dataset(dataset_name) + items_dicts = dataset.items_to_dict() + + assert len(items_dicts) == num_items + + # Check all items have unique IDs + ids = [item["id"] for item in items_dicts] + assert len(set(ids)) == num_items + + +def test_items_to_dict_reconstruct_item(): + """Test that items can be reconstructed from dict and used with DatasetItemClient.""" + from langfuse._client.datasets import DatasetItemClient + from langfuse.model import DatasetItem + + langfuse = Langfuse(debug=False) + dataset_name = create_uuid() + langfuse.create_dataset(name=dataset_name) + + input_data = {"input": "Test reconstruction"} + expected_output = {"output": "Expected"} + metadata = {"meta_key": "meta_value"} + + langfuse.create_dataset_item( + dataset_name=dataset_name, + input=input_data, + expected_output=expected_output, + metadata=metadata, + ) + + dataset = langfuse.get_dataset(dataset_name) + items_dicts = dataset.items_to_dict() + + item_dict = items_dicts[0] + reconstructed_item = DatasetItem(**item_dict) + item_client = DatasetItemClient(reconstructed_item, langfuse) + + assert item_client.input == input_data + assert item_client.expected_output == expected_output + assert item_client.metadata == metadata + assert item_client.dataset_name == dataset_name + assert item_client.id == dataset.items[0].id + + +def test_items_to_dict_with_run(): + """Test that reconstructed items can be used with the run() context manager.""" + from langfuse._client.datasets import DatasetItemClient + from langfuse.model import DatasetItem + + langfuse = Langfuse(debug=False) + dataset_name = create_uuid() + langfuse.create_dataset(name=dataset_name) + + input_data = {"input": "Test with run"} + langfuse.create_dataset_item(dataset_name=dataset_name, input=input_data) + + dataset = langfuse.get_dataset(dataset_name) + items_dicts = dataset.items_to_dict() + + run_name = create_uuid() + trace_id = None + + # Simulate distributed processing by reconstructing item from dict + item_dict = items_dicts[0] + reconstructed_item = DatasetItem(**item_dict) + item_client = DatasetItemClient(reconstructed_item, langfuse) + + with item_client.run(run_name=run_name) as span: + trace_id = span.trace_id + span.update_trace(name="reconstructed_item_run", output="test_output") + + langfuse.flush() + time.sleep(1) + + # Verify the trace was created + assert trace_id is not None + trace = langfuse.api.trace.get(trace_id) + + assert trace is not None + assert trace.name == "reconstructed_item_run" + assert trace.output == "test_output" From 86990a668163d03bd5b86e687817c2fb017090f6 Mon Sep 17 00:00:00 2001 From: danielnyari-seon Date: Fri, 12 Dec 2025 11:31:35 +0100 Subject: [PATCH 2/3] fix test imports --- tests/test_datasets.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_datasets.py b/tests/test_datasets.py index 2824a5371..f4d25b5cd 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -9,6 +9,8 @@ from langfuse import Langfuse, observe from langfuse.api.resources.commons.types.dataset_status import DatasetStatus from langfuse.api.resources.commons.types.observation import Observation +from langfuse._client.datasets import DatasetItemClient +from langfuse.model import DatasetItem from langfuse.langchain import CallbackHandler from tests.utils import create_uuid, get_api @@ -481,8 +483,6 @@ def test_items_to_dict_multiple_items(): def test_items_to_dict_reconstruct_item(): """Test that items can be reconstructed from dict and used with DatasetItemClient.""" - from langfuse._client.datasets import DatasetItemClient - from langfuse.model import DatasetItem langfuse = Langfuse(debug=False) dataset_name = create_uuid() @@ -515,8 +515,6 @@ def test_items_to_dict_reconstruct_item(): def test_items_to_dict_with_run(): """Test that reconstructed items can be used with the run() context manager.""" - from langfuse._client.datasets import DatasetItemClient - from langfuse.model import DatasetItem langfuse = Langfuse(debug=False) dataset_name = create_uuid() From 92cd9955ab1efd8805a2361bcc3daf43e6bc9240 Mon Sep 17 00:00:00 2001 From: danielnyari-seon Date: Fri, 12 Dec 2025 11:32:58 +0100 Subject: [PATCH 3/3] Update langfuse/_client/datasets.py Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> --- langfuse/_client/datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/langfuse/_client/datasets.py b/langfuse/_client/datasets.py index 67e3885ed..d20fcb23a 100644 --- a/langfuse/_client/datasets.py +++ b/langfuse/_client/datasets.py @@ -225,7 +225,7 @@ def items_to_dict(self) -> List[Dict[str, Any]]: # Worker function (runs in separate process/container) def process_item(item_json: str, run_name: str) -> dict: client = get_client() - dataset_item = DatasetItem(**item_dict) + dataset_item = DatasetItem(**json.loads(item_json)) item_client = DatasetItemClient(dataset_item, client) with item_client.run(run_name=run_name) as span: