Skip to content
Merged
141 changes: 133 additions & 8 deletions haystack/components/converters/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
#
# SPDX-License-Identifier: Apache-2.0

import csv
import io
import os
from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, Literal, Optional, Union

from haystack import Document, component, logging
from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
from haystack.dataclasses import ByteStream

logger = logging.getLogger(__name__)

_ROW_MODE_SIZE_WARN_BYTES = 5 * 1024 * 1024 # ~5MB; warn when parsing rows might be memory-heavy


@component
class CSVToDocument:
Expand All @@ -31,11 +34,19 @@ class CSVToDocument:
results = converter.run(sources=["sample.csv"], meta={"date_added": datetime.now().isoformat()})
documents = results["documents"]
print(documents[0].content)
# 'col1,col2\now1,row1\nrow2row2\n'
# 'col1,col2\\nrow1,row1\\nrow2,row2\\n'
```
"""

def __init__(self, encoding: str = "utf-8", store_full_path: bool = False):
def __init__(
self,
encoding: str = "utf-8",
store_full_path: bool = False,
*,
conversion_mode: Literal["file", "row"] = "file",
delimiter: str = ",",
quotechar: str = '"',
):
"""
Creates a CSVToDocument component.

Expand All @@ -46,21 +57,43 @@ def __init__(self, encoding: str = "utf-8", store_full_path: bool = False):
:param store_full_path:
If True, the full path of the file is stored in the metadata of the document.
If False, only the file name is stored.
:param conversion_mode:
- "file" (default): one Document per CSV file whose content is the raw CSV text.
- "row": convert each CSV row to its own Document (requires `content_column` in `run()`).
:param delimiter:
CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``).
:param quotechar:
CSV quote character used when parsing in row mode (passed to ``csv.DictReader``).
"""
self.encoding = encoding
self.store_full_path = store_full_path
self.conversion_mode = conversion_mode
self.delimiter = delimiter
self.quotechar = quotechar

# Basic validation
if len(self.delimiter) != 1:
raise ValueError("CSVToDocument: delimiter must be a single character.")
if len(self.quotechar) != 1:
raise ValueError("CSVToDocument: quotechar must be a single character.")

@component.output_types(documents=list[Document])
def run(
self,
sources: list[Union[str, Path, ByteStream]],
*,
content_column: Optional[str] = None,
meta: Optional[Union[dict[str, Any], list[dict[str, Any]]]] = None,
):
"""
Converts a CSV file to a Document.
Converts CSV files to a Document (file mode) or to one Document per row (row mode).

:param sources:
List of file paths or ByteStream objects.
:param content_column:
**Required when** ``conversion_mode="row"``.
The column name whose values become ``Document.content`` for each row.
The column must exist in the CSV header.
:param meta:
Optional metadata to attach to the documents.
This value can be either a list of dictionaries or a single dictionary.
Expand All @@ -72,7 +105,7 @@ def run(
A dictionary with the following keys:
- `documents`: Created documents
"""
documents = []
documents: list[Document] = []

meta_list = normalize_metadata(meta, sources_count=len(sources))

Expand All @@ -82,9 +115,11 @@ def run(
except Exception as e:
logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
continue

try:
encoding = bytestream.meta.get("encoding", self.encoding)
data = io.BytesIO(bytestream.data).getvalue().decode(encoding=encoding)
raw = io.BytesIO(bytestream.data).getvalue()
data = raw.decode(encoding=encoding)
except Exception as e:
logger.warning(
"Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e
Expand All @@ -98,7 +133,97 @@ def run(
if file_path: # Ensure the value is not None for pylint
merged_metadata["file_path"] = os.path.basename(file_path)

document = Document(content=data, meta=merged_metadata)
documents.append(document)
# Mode: file (backward-compatible default) -> one Document per file
if self.conversion_mode == "file":
documents.append(Document(content=data, meta=merged_metadata))
continue

# --- ROW MODE (strict) ---
# Require content_column in run(); no fallback
if not content_column:
raise ValueError(
"CSVToDocument(row): 'content_column' is required in run() when conversion_mode='row'."
)

# Warn for large CSVs in row mode (memory consideration)
try:
size_bytes = len(raw)
if size_bytes > _ROW_MODE_SIZE_WARN_BYTES:
logger.warning(
"CSVToDocument(row): parsing a large CSV (~{mb:.1f} MB). "
"Consider chunking/streaming if you hit memory issues.",
mb=size_bytes / (1024 * 1024),
)
except Exception:
pass

# Create DictReader; if this fails, raise (no fallback)
try:
reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar)
except Exception as e:
raise RuntimeError(f"CSVToDocument(row): could not parse CSV rows for {source}: {e}") from e

# Validate header contains content_column; strict error if missing
header = reader.fieldnames or []
if content_column not in header:
raise ValueError(
f"CSVToDocument(row): content_column='{content_column}' not found in header "
f"for {source}. Available columns: {header}"
)

# Build documents; if a row processing fails, raise immediately (no skip)
for i, row in enumerate(reader):
try:
doc = self._build_document_from_row(
row=row, base_meta=merged_metadata, row_index=i, content_column=content_column
)
except Exception as e:
raise RuntimeError(f"CSVToDocument(row): failed to process row {i} for {source}: {e}") from e
documents.append(doc)

return {"documents": documents}

# ----- helpers -----
def _safe_value(self, value: Any) -> str:
"""Normalize CSV cell values: None -> '', everything -> str."""
return "" if value is None else str(value)

def _build_document_from_row(
self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: str
) -> Document:
"""
Build a ``Document`` from one parsed CSV row.

:param row: Mapping of column name to cell value for the current row
(as produced by ``csv.DictReader``).
:param base_meta: File-level and user-provided metadata to start from
(for example: ``file_path``, ``encoding``).
:param row_index: Zero-based row index in the CSV; stored as
``row_number`` in the output document's metadata.
:param content_column: Column name to use for ``Document.content``.
:returns: A ``Document`` with chosen content and merged metadata.
Remaining row columns are added to ``meta`` with collision-safe
keys (prefixed with ``csv_`` if needed).
"""
row_meta = dict(base_meta)

# content (strict: content_column must exist; validated by caller)
content = self._safe_value(row.get(content_column))

# merge remaining columns into meta with collision handling
for k, v in row.items():
if k == content_column:
continue
key_to_use = k
if key_to_use in row_meta:
# Avoid clobbering existing meta like file_path/encoding; prefix and de-dupe
base_key = f"csv_{key_to_use}"
key_to_use = base_key
suffix = 1
while key_to_use in row_meta:
key_to_use = f"{base_key}_{suffix}"
suffix += 1
row_meta[key_to_use] = self._safe_value(v)

row_meta["row_number"] = row_index
return Document(content=content, meta=row_meta)
3 changes: 3 additions & 0 deletions releasenotes/notes/csv-row-mode-20250908204536.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
features:
- "CSVToDocument: add `conversion_mode='row'` with optional `content_column`; each row becomes a `Document`; remaining columns stored in `meta`; default 'file' mode preserved."
125 changes: 121 additions & 4 deletions test/components/converters/test_csv_to_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def test_encoding_override(self, test_files_path, caplog):
bytestream.meta["key"] = "value"

converter = CSVToDocument(encoding="utf-16-le")
output = converter.run(sources=[bytestream])
_ = converter.run(sources=[bytestream])
with caplog.at_level(logging.ERROR):
output = converter.run(sources=[bytestream])
_ = converter.run(sources=[bytestream])
assert "codec can't decode" in caplog.text

converter = CSVToDocument(encoding="utf-8")
Expand All @@ -99,6 +99,123 @@ def test_run_with_meta(self):
converter = CSVToDocument()
output = converter.run(sources=[bytestream], meta=[{"language": "it"}])
document = output["documents"][0]

# check that the metadata from the bytestream is merged with that from the meta parameter
assert document.meta == {"name": "test_name", "language": "it"}

# --- NEW TESTS for strict row mode ---

def test_row_mode_requires_content_column_param(self, tmp_path):
# Missing content_column must raise in row mode
f = tmp_path / "t.csv"
f.write_text("a,b\r\n1,2\r\n", encoding="utf-8")
conv = CSVToDocument(conversion_mode="row")
with pytest.raises(ValueError):
_ = conv.run(sources=[f]) # content_column missing

def test_row_mode_missing_header_raises(self, tmp_path):
# content_column must exist in header
f = tmp_path / "t.csv"
f.write_text("a,b\r\n1,2\r\n", encoding="utf-8")
conv = CSVToDocument(conversion_mode="row")
with pytest.raises(ValueError):
_ = conv.run(sources=[f], content_column="missing")

def test_row_mode_with_content_column(self, tmp_path):
csv_text = "text,author,stars\r\nNice app,Ada,5\r\nBuggy,Bob,2\r\n"
f = tmp_path / "fb.csv"
f.write_text(csv_text, encoding="utf-8")

bytestream = ByteStream.from_file_path(f)
bytestream.meta["file_path"] = str(f)

converter = CSVToDocument(conversion_mode="row")
output = converter.run(sources=[bytestream], content_column="text")
docs = output["documents"]

assert len(docs) == 2
assert [d.content for d in docs] == ["Nice app", "Buggy"]
assert docs[0].meta["author"] == "Ada"
assert docs[0].meta["stars"] == "5"
assert docs[0].meta["row_number"] == 0
assert os.path.basename(f) == docs[0].meta["file_path"]

def test_row_mode_meta_collision_prefixed(self, tmp_path):
# ByteStream meta has file_path and encoding; CSV also has those columns.
csv_text = "file_path,encoding,comment\r\nrowpath.csv,latin1,ok\r\n"
f = tmp_path / "collide.csv"
f.write_text(csv_text, encoding="utf-8")
bs = ByteStream.from_file_path(f)
bs.meta["file_path"] = str(f)
bs.meta["encoding"] = "utf-8"

conv = CSVToDocument(conversion_mode="row")
out = conv.run(sources=[bs], content_column="comment")
d = out["documents"][0]
# Original meta preserved
assert d.meta["file_path"] == os.path.basename(str(f))
assert d.meta["encoding"] == "utf-8"
# CSV columns stored with csv_ prefix (no clobber)
assert d.meta["csv_file_path"] == "rowpath.csv"
assert d.meta["csv_encoding"] == "latin1"
# content column isn't duplicated in meta
assert "comment" not in d.meta
assert d.meta["row_number"] == 0
assert d.content == "ok"

def test_row_mode_meta_collision_multiple_suffixes(self, tmp_path):
"""
If meta already has csv_file_path and csv_file_path_1, we should write the next as csv_file_path_2.
"""
csv_text = "file_path,comment\r\nrow.csv,ok\r\n"
f = tmp_path / "multi.csv"
f.write_text(csv_text, encoding="utf-8")

bs = ByteStream.from_file_path(f)
bs.meta["file_path"] = str(f)

# Pre-seed meta so we force two collisions.
extra_meta = {"csv_file_path": "existing0", "csv_file_path_1": "existing1"}

conv = CSVToDocument(conversion_mode="row")
out = conv.run(sources=[bs], meta=[extra_meta], content_column="comment")
d = out["documents"][0]

assert d.meta["csv_file_path"] == "existing0"
assert d.meta["csv_file_path_1"] == "existing1"
assert d.meta["csv_file_path_2"] == "row.csv"
assert d.content == "ok"

def test_init_validates_delimiter_and_quotechar(self):
with pytest.raises(ValueError):
CSVToDocument(delimiter=";;")
with pytest.raises(ValueError):
CSVToDocument(quotechar='""')

def test_row_mode_large_file_warns(self, caplog, monkeypatch):
# Make the threshold tiny so the warning always triggers.
import haystack.components.converters.csv as csv_mod

monkeypatch.setattr(csv_mod, "_ROW_MODE_SIZE_WARN_BYTES", 1, raising=False)

bs = ByteStream(data=b"text,author\nhi,Ada\n", meta={"file_path": "big.csv"})
conv = CSVToDocument(conversion_mode="row")
with caplog.at_level(logging.WARNING, logger="haystack.components.converters.csv"):
_ = conv.run(sources=[bs], content_column="text")
assert "parsing a large CSV" in caplog.text

def test_row_mode_reader_failure_raises_runtimeerror(self, monkeypatch, tmp_path):
# Simulate DictReader failing -> we should raise RuntimeError (no fallback).
import haystack.components.converters.csv as csv_mod

f = tmp_path / "bad.csv"
f.write_text("a,b\n1,2\n", encoding="utf-8")
conv = CSVToDocument(conversion_mode="row")

class Boom(Exception):
pass

def broken_reader(*_args, **_kwargs): # noqa: D401
raise Boom("broken")

monkeypatch.setattr(csv_mod.csv, "DictReader", broken_reader, raising=True)
with pytest.raises(RuntimeError):
_ = conv.run(sources=[f], content_column="a")
2 changes: 1 addition & 1 deletion test/core/pipeline/features/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5169,7 +5169,7 @@ def pipeline_that_converts_files(pipeline_class):
expected_outputs={"a_joiner": {"documents": expected_csv_docs + expected_splits_docs}},
expected_component_calls={
("router", 1): {"sources": sources, "meta": None},
("csv_converter", 1): {"sources": [sources[0]], "meta": None},
("csv_converter", 1): {"sources": [sources[0]], "meta": None, "content_column": None},
("txt_converter", 1): {"sources": [sources[1]], "meta": None},
("json_converter", 1): {"sources": [sources[2]], "meta": None},
("b_joiner", 1): {
Expand Down
Loading