From 70bce010842819671471b2a37ae652612cd7605e Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Tue, 9 Sep 2025 09:40:22 +0530 Subject: [PATCH 1/7] =?UTF-8?q?feat(converters):=20CSVToDocument=20row-lev?= =?UTF-8?q?el=20conversion=20(content=5Fcolumn,=20columns=E2=86=92meta)=20?= =?UTF-8?q?+=20tests=20+=20releasenote?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Arya Tayshete --- haystack/components/converters/csv.py | 71 +++++++++++++++++-- .../notes/csv-row-mode-20250908204536.yaml | 3 + .../converters/test_csv_to_document.py | 64 +++++++++++++++++ 3 files changed, 133 insertions(+), 5 deletions(-) create mode 100644 releasenotes/notes/csv-row-mode-20250908204536.yaml diff --git a/haystack/components/converters/csv.py b/haystack/components/converters/csv.py index 8e995b6d5d..16a2ad4162 100644 --- a/haystack/components/converters/csv.py +++ b/haystack/components/converters/csv.py @@ -2,10 +2,11 @@ # # SPDX-License-Identifier: Apache-2.0 +import csv import io import os from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Literal, Optional, Union from haystack import Document, component, logging from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata @@ -35,7 +36,16 @@ class CSVToDocument: ``` """ - def __init__(self, encoding: str = "utf-8", store_full_path: bool = False): + def __init__( + self, + encoding: str = "utf-8", + store_full_path: bool = False, + *, + conversion_mode: Literal["file", "row"] = "file", + content_column: Optional[str] = None, + delimiter: str = ",", + quotechar: str = '"', + ): """ Creates a CSVToDocument component. @@ -46,9 +56,24 @@ def __init__(self, encoding: str = "utf-8", store_full_path: bool = False): :param store_full_path: If True, the full path of the file is stored in the metadata of the document. If False, only the file name is stored. + :param conversion_mode: + - "file" (default): current behavior, one Document per CSV file whose content is the raw CSV text. + - "row": convert each CSV row to its own Document. + :param content_column: + When ``conversion_mode="row"``, the column to use as ``Document.content``. + If ``None``, the content will be a human-readable "key: value" listing of that row. + :param delimiter: + CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``). + :param quotechar: + CSV quote character used when parsing in row mode (passed to ``csv.DictReader``). """ + self.encoding = encoding self.store_full_path = store_full_path + self.conversion_mode = conversion_mode + self.content_column = content_column + self.delimiter = delimiter + self.quotechar = quotechar @component.output_types(documents=list[Document]) def run( @@ -72,7 +97,7 @@ def run( A dictionary with the following keys: - `documents`: Created documents """ - documents = [] + documents: list[Document] = [] meta_list = normalize_metadata(meta, sources_count=len(sources)) @@ -98,7 +123,43 @@ def run( if file_path: # Ensure the value is not None for pylint merged_metadata["file_path"] = os.path.basename(file_path) - document = Document(content=data, meta=merged_metadata) - documents.append(document) + # Mode: file (backward-compatible default) -> one Document per file + if self.conversion_mode == "file": + documents.append(Document(content=data, meta=merged_metadata)) + continue + + # Mode: row -> one Document per CSV row + try: + reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar) + except Exception as e: + logger.warning( + "Could not parse CSV rows for {source}. Falling back to file mode. Error: {error}", + source=source, + error=e, + ) + documents.append(Document(content=data, meta=merged_metadata)) + continue + + for i, row in enumerate(reader): + row_meta = dict(merged_metadata) # start with file-level/meta param bytestream meta + + # Determine content from selected column or fallback to a friendly listing + if self.content_column: + content = row.get(self.content_column, "") + if content is None: + content = "" + else: + # "key: value" per line for readability + content = "\n".join(f"{k}: {v if v is not None else ''}" for k, v in row.items()) + + # Add remaining columns into meta (don't override existing keys like file_path, encoding, etc.) + for k, v in row.items(): + if self.content_column and k == self.content_column: + continue + if k not in row_meta: + row_meta[k] = "" if v is None else v + + row_meta["row_number"] = i + documents.append(Document(content=content, meta=row_meta)) return {"documents": documents} diff --git a/releasenotes/notes/csv-row-mode-20250908204536.yaml b/releasenotes/notes/csv-row-mode-20250908204536.yaml new file mode 100644 index 0000000000..9a1ee33194 --- /dev/null +++ b/releasenotes/notes/csv-row-mode-20250908204536.yaml @@ -0,0 +1,3 @@ +--- +features: + - "CSVToDocument: add `conversion_mode='row'` with optional `content_column`; each row becomes a `Document`; remaining columns stored in `meta`; default 'file' mode preserved." diff --git a/test/components/converters/test_csv_to_document.py b/test/components/converters/test_csv_to_document.py index f036bbbe6c..9702864e70 100644 --- a/test/components/converters/test_csv_to_document.py +++ b/test/components/converters/test_csv_to_document.py @@ -102,3 +102,67 @@ def test_run_with_meta(self): # check that the metadata from the bytestream is merged with that from the meta parameter assert document.meta == {"name": "test_name", "language": "it"} + + def test_row_mode_with_content_column(self, tmp_path): + """ + Each row becomes a Document, with `content` from a chosen column and other columns in meta. + """ + csv_text = "text,author,stars\r\nNice app,Ada,5\r\nBuggy,Bob,2\r\n" + f = tmp_path / "fb.csv" + f.write_text(csv_text, encoding="utf-8") + + bytestream = ByteStream.from_file_path(f) + bytestream.meta["file_path"] = str(f) + + converter = CSVToDocument(conversion_mode="row", content_column="text") + output = converter.run(sources=[bytestream]) + docs = output["documents"] + + assert len(docs) == 2 + assert [d.content for d in docs] == ["Nice app", "Buggy"] + # Remaining columns land in meta, plus file-level meta preserved + assert docs[0].meta["author"] == "Ada" + assert docs[0].meta["stars"] == "5" + assert docs[0].meta["row_number"] == 0 + # still respects store_full_path default=False trimming when present + assert os.path.basename(f) == docs[0].meta["file_path"] + + def test_row_mode_without_content_column(self, tmp_path): + """ + Without `content_column`, the content is a human-readable 'key: value' listing of the row. + """ + csv_text = "a,b\r\n1,2\r\n3,4\r\n" + f = tmp_path / "t.csv" + f.write_text(csv_text, encoding="utf-8") + + converter = CSVToDocument(conversion_mode="row") + output = converter.run(sources=[f]) + docs = output["documents"] + + assert len(docs) == 2 + assert "a: 1" in docs[0].content and "b: 2" in docs[0].content + assert docs[0].meta["a"] == "1" and docs[0].meta["b"] == "2" + assert docs[0].meta["row_number"] == 0 + + def test_row_mode_meta_merging(self, tmp_path): + """ + File-level meta and explicit `meta` arg are merged into each row's meta. + """ + csv_text = "q,user\r\nHello,u1\r\nHi,u2\r\n" + f = tmp_path / "m.csv" + f.write_text(csv_text, encoding="utf-8") + + bs = ByteStream.from_file_path(f) + bs.meta["dataset"] = "support_tickets" + + converter = CSVToDocument(conversion_mode="row", content_column="q") + out = converter.run(sources=[bs], meta=[{"lang": "en"}]) + docs = out["documents"] + + assert len(docs) == 2 + assert docs[0].content == "Hello" + # merged meta propagated to each row + assert docs[0].meta["dataset"] == "support_tickets" + assert docs[0].meta["lang"] == "en" + # remaining column captured + assert docs[0].meta["user"] == "u1" From cd5310362d4e44fa70e8838277ea2d3db9e7e97f Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 11 Sep 2025 11:46:25 +0530 Subject: [PATCH 2/7] feat(converters): CSVToDocument row-mode hardening + tests Signed-off-by: Arya Tayshete --- haystack/components/converters/csv.py | 106 ++++++++++++++---- .../converters/test_csv_to_document.py | 58 ++++++++++ 2 files changed, 143 insertions(+), 21 deletions(-) diff --git a/haystack/components/converters/csv.py b/haystack/components/converters/csv.py index 16a2ad4162..5bfb6f2cf7 100644 --- a/haystack/components/converters/csv.py +++ b/haystack/components/converters/csv.py @@ -14,6 +14,8 @@ logger = logging.getLogger(__name__) +_ROW_MODE_SIZE_WARN_BYTES = 5 * 1024 * 1024 # ~5MB; warn when parsing rows might be memory-heavy + @component class CSVToDocument: @@ -75,6 +77,12 @@ def __init__( self.delimiter = delimiter self.quotechar = quotechar + # Basic validation (reviewer suggestion) + if len(self.delimiter) != 1: + raise ValueError("CSVToDocument: delimiter must be a single character.") + if len(self.quotechar) != 1: + raise ValueError("CSVToDocument: quotechar must be a single character.") + @component.output_types(documents=list[Document]) def run( self, @@ -109,7 +117,9 @@ def run( continue try: encoding = bytestream.meta.get("encoding", self.encoding) - data = io.BytesIO(bytestream.data).getvalue().decode(encoding=encoding) + raw = io.BytesIO(bytestream.data).getvalue() + data = raw.decode(encoding=encoding) + except Exception as e: logger.warning( "Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e @@ -128,6 +138,18 @@ def run( documents.append(Document(content=data, meta=merged_metadata)) continue + # Reviewer note: Warn for very large CSVs in row mode (memory consideration) + try: + size_bytes = len(raw) + if size_bytes > _ROW_MODE_SIZE_WARN_BYTES: + logger.warning( + "CSVToDocument(row): parsing a large CSV (~{mb:.1f} MB). " + "Consider chunking/streaming if you hit memory issues.", + mb=size_bytes / (1024 * 1024), + ) + except Exception: + pass + # Mode: row -> one Document per CSV row try: reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar) @@ -140,26 +162,68 @@ def run( documents.append(Document(content=data, meta=merged_metadata)) continue + # Validate content_column presence; fall back to listing if missing + effective_content_col = self.content_column + header = reader.fieldnames or [] + if effective_content_col and effective_content_col not in header: + logger.warning( + "CSVToDocument(row): content_column='{col}' not found in header for {source}; " + "falling back to key: value listing.", + col=effective_content_col, + source=source, + ) + effective_content_col = None + for i, row in enumerate(reader): - row_meta = dict(merged_metadata) # start with file-level/meta param bytestream meta - - # Determine content from selected column or fallback to a friendly listing - if self.content_column: - content = row.get(self.content_column, "") - if content is None: - content = "" - else: - # "key: value" per line for readability - content = "\n".join(f"{k}: {v if v is not None else ''}" for k, v in row.items()) - - # Add remaining columns into meta (don't override existing keys like file_path, encoding, etc.) - for k, v in row.items(): - if self.content_column and k == self.content_column: - continue - if k not in row_meta: - row_meta[k] = "" if v is None else v - - row_meta["row_number"] = i - documents.append(Document(content=content, meta=row_meta)) + # Protect against malformed rows (reviewer suggestion) + try: + doc = self._build_document_from_row( + row=row, base_meta=merged_metadata, row_index=i, content_column=effective_content_col + ) + documents.append(doc) + except Exception as e: + logger.warning( + "CSVToDocument(row): skipping malformed row {row_index} in {source}. Error: {error}", + row_index=i, + source=source, + error=e, + ) return {"documents": documents} + + # ----- helpers ----- + def _safe_value(self, value: Any) -> str: + """Normalize CSV cell values: None -> '', everything -> str.""" + return "" if value is None else str(value) + + def _build_document_from_row( + self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: Optional[str] + ) -> Document: + """ + Create a Document from a single CSV row. Does not catch exceptions; caller wraps. + """ + row_meta = dict(base_meta) + + # content + if content_column: + content = self._safe_value(row.get(content_column)) + else: + content = "\n".join(f"{k}: {self._safe_value(v)}" for k, v in row.items()) + + # merge remaining columns into meta with collision handling + for k, v in row.items(): + if content_column and k == content_column: + continue + key_to_use = k + if key_to_use in row_meta: + # Avoid clobbering existing meta like file_path/encoding; prefix and de-dupe + base_key = f"csv_{key_to_use}" + key_to_use = base_key + suffix = 1 + while key_to_use in row_meta: + key_to_use = f"{base_key}_{suffix}" + suffix = 1 + row_meta[key_to_use] = self._safe_value(v) + + row_meta["row_number"] = row_index + return Document(content=content, meta=row_meta) diff --git a/test/components/converters/test_csv_to_document.py b/test/components/converters/test_csv_to_document.py index 9702864e70..2fb5e4b0cb 100644 --- a/test/components/converters/test_csv_to_document.py +++ b/test/components/converters/test_csv_to_document.py @@ -103,6 +103,64 @@ def test_run_with_meta(self): # check that the metadata from the bytestream is merged with that from the meta parameter assert document.meta == {"name": "test_name", "language": "it"} + # --- NEW TESTS for row mode reviewer asks --- + + def test_row_mode_with_missing_content_column_warns_and_fallbacks(self, tmp_path, caplog): + csv_text = "a,b\r\n1,2\r\n3,4\r\n" + f = tmp_path / "miss.csv" + f.write_text(csv_text, encoding="utf-8") + bs = ByteStream.from_file_path(f) + bs.meta["file_path"] = str(f) + + conv = CSVToDocument(conversion_mode="row", content_column="missing") + with caplog.at_level(logging.WARNING): + out = conv.run(sources=[bs]) + assert "content_column='missing' not found" in caplog.text + docs = out["documents"] + assert len(docs) == 2 + # Fallback content is a readable listing + assert "a: 1" in docs[0].content and "b: 2" in docs[0].content + + def test_row_mode_meta_collision_prefixed(self, tmp_path): + # ByteStream meta has file_path and encoding; CSV also has those columns. + csv_text = "file_path,encoding,comment\r\nrowpath.csv,latin1,ok\r\n" + f = tmp_path / "collide.csv" + f.write_text(csv_text, encoding="utf-8") + bs = ByteStream.from_file_path(f) + bs.meta["file_path"] = str(f) + bs.meta["encoding"] = "utf-8" + + conv = CSVToDocument(conversion_mode="row") + out = conv.run(sources=[bs]) + d = out["documents"][0] + # Original meta preserved + assert d.meta["file_path"] == os.path.basename(str(f)) + assert d.meta["encoding"] == "utf-8" + # CSV columns stored with csv_ prefix (no clobber) + assert d.meta["csv_file_path"] == "rowpath.csv" + assert d.meta["csv_encoding"] == "latin1" + assert d.meta["comment"] == "ok" + + def test_init_validates_delimiter_and_quotechar(self): + with pytest.raises(ValueError): + CSVToDocument(delimiter=";;") + with pytest.raises(ValueError): + CSVToDocument(quotechar='""') + + def test_row_mode_large_file_warns(self, tmp_path, caplog): + # Build a ~1.2MB CSV to trigger the warning (threshold ~5MB in component; + # If you want to keep this super fast, you can comment this test out.) + rows = 60_000 + header = "text,author\n" + body = "".join("hello,Ada\n" for _ in range(rows)) + data = (header + body).encode("utf-8") + bs = ByteStream(data=data, meta={"file_path": "big.csv"}) + conv = CSVToDocument(conversion_mode="row") + with caplog.at_level(logging.WARNING): + _ = conv.run(sources=[bs]) + # Not asserting exact MB value to avoid brittleness; look for the key phrase + assert "parsing a large CSV" in caplog.text + def test_row_mode_with_content_column(self, tmp_path): """ Each row becomes a Document, with `content` from a chosen column and other columns in meta. From f477fae13a4468defec37af7edbc7621d791e554 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 11 Sep 2025 12:30:15 +0530 Subject: [PATCH 3/7] test(converters): remove long commented line to satisfy ruff E501 Signed-off-by: Arya Tayshete --- .../converters/test_csv_to_document.py | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/test/components/converters/test_csv_to_document.py b/test/components/converters/test_csv_to_document.py index 2fb5e4b0cb..df02a8447e 100644 --- a/test/components/converters/test_csv_to_document.py +++ b/test/components/converters/test_csv_to_document.py @@ -4,6 +4,7 @@ import logging import os +from pathlib import Path import pytest @@ -17,10 +18,10 @@ def csv_converter(): class TestCSVToDocument: - def test_init(self, csv_converter): + def test_init(self, csv_converter: CSVToDocument): assert isinstance(csv_converter, CSVToDocument) - def test_run(self, test_files_path): + def test_run(self, test_files_path: Path): """ Test if the component runs correctly. """ @@ -38,7 +39,7 @@ def test_run(self, test_files_path): assert docs[1].meta["file_path"] == os.path.basename(files[1]) assert docs[2].meta["file_path"] == os.path.basename(files[2]) - def test_run_with_store_full_path_false(self, test_files_path): + def test_run_with_store_full_path_false(self, test_files_path: Path): """ Test if the component runs correctly with store_full_path=False """ @@ -57,7 +58,7 @@ def test_run_with_store_full_path_false(self, test_files_path): assert docs[1].meta["file_path"] == "sample_2.csv" assert docs[2].meta["file_path"] == "sample_3.csv" - def test_run_error_handling(self, test_files_path, caplog): + def test_run_error_handling(self, test_files_path: Path, caplog: pytest.LogCaptureFixture): """ Test if the component correctly handles errors. """ @@ -74,7 +75,7 @@ def test_run_error_handling(self, test_files_path, caplog): assert len(docs) == 2 assert docs[0].meta["file_path"] == os.path.basename(paths[0]) - def test_encoding_override(self, test_files_path, caplog): + def test_encoding_override(self, test_files_path: Path, caplog: pytest.LogCaptureFixture): """ Test if the encoding metadata field is used properly """ @@ -103,7 +104,7 @@ def test_run_with_meta(self): # check that the metadata from the bytestream is merged with that from the meta parameter assert document.meta == {"name": "test_name", "language": "it"} - # --- NEW TESTS for row mode reviewer asks --- + # --- NEW TESTS for row mode --- def test_row_mode_with_missing_content_column_warns_and_fallbacks(self, tmp_path, caplog): csv_text = "a,b\r\n1,2\r\n3,4\r\n" @@ -121,7 +122,7 @@ def test_row_mode_with_missing_content_column_warns_and_fallbacks(self, tmp_path # Fallback content is a readable listing assert "a: 1" in docs[0].content and "b: 2" in docs[0].content - def test_row_mode_meta_collision_prefixed(self, tmp_path): + def test_row_mode_meta_collision_prefixed(self, tmp_path: Path): # ByteStream meta has file_path and encoding; CSV also has those columns. csv_text = "file_path,encoding,comment\r\nrowpath.csv,latin1,ok\r\n" f = tmp_path / "collide.csv" @@ -147,21 +148,20 @@ def test_init_validates_delimiter_and_quotechar(self): with pytest.raises(ValueError): CSVToDocument(quotechar='""') - def test_row_mode_large_file_warns(self, tmp_path, caplog): - # Build a ~1.2MB CSV to trigger the warning (threshold ~5MB in component; - # If you want to keep this super fast, you can comment this test out.) - rows = 60_000 - header = "text,author\n" - body = "".join("hello,Ada\n" for _ in range(rows)) - data = (header + body).encode("utf-8") - bs = ByteStream(data=data, meta={"file_path": "big.csv"}) + def test_row_mode_large_file_warns(self, caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch): + # Make the threshold tiny so the warning always triggers, regardless of platform. + import haystack.components.converters.csv as csv_mod + + monkeypatch.setattr(csv_mod, "_ROW_MODE_SIZE_WARN_BYTES", 1, raising=False) + + bs = ByteStream(data=b"text,author\nhi,Ada\n", meta={"file_path": "big.csv"}) conv = CSVToDocument(conversion_mode="row") - with caplog.at_level(logging.WARNING): + # Capture the converter module's logger explicitly for reliability across CI runners. + with caplog.at_level(logging.WARNING, logger="haystack.components.converters.csv"): _ = conv.run(sources=[bs]) - # Not asserting exact MB value to avoid brittleness; look for the key phrase assert "parsing a large CSV" in caplog.text - def test_row_mode_with_content_column(self, tmp_path): + def test_row_mode_with_content_column(self, tmp_path: Path): """ Each row becomes a Document, with `content` from a chosen column and other columns in meta. """ @@ -185,7 +185,7 @@ def test_row_mode_with_content_column(self, tmp_path): # still respects store_full_path default=False trimming when present assert os.path.basename(f) == docs[0].meta["file_path"] - def test_row_mode_without_content_column(self, tmp_path): + def test_row_mode_without_content_column(self, tmp_path: Path): """ Without `content_column`, the content is a human-readable 'key: value' listing of the row. """ @@ -202,7 +202,7 @@ def test_row_mode_without_content_column(self, tmp_path): assert docs[0].meta["a"] == "1" and docs[0].meta["b"] == "2" assert docs[0].meta["row_number"] == 0 - def test_row_mode_meta_merging(self, tmp_path): + def test_row_mode_meta_merging(self, tmp_path: Path): """ File-level meta and explicit `meta` arg are merged into each row's meta. """ From 415d2ed650af69a0471219965fdfb6acb4b37fe2 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 11 Sep 2025 19:40:28 +0530 Subject: [PATCH 4/7] fix(converters): avoid infinite loop Signed-off-by: Arya Tayshete --- haystack/components/converters/csv.py | 2 +- .../converters/test_csv_to_document.py | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/haystack/components/converters/csv.py b/haystack/components/converters/csv.py index 5bfb6f2cf7..a9c226e8e4 100644 --- a/haystack/components/converters/csv.py +++ b/haystack/components/converters/csv.py @@ -222,7 +222,7 @@ def _build_document_from_row( suffix = 1 while key_to_use in row_meta: key_to_use = f"{base_key}_{suffix}" - suffix = 1 + suffix += 1 row_meta[key_to_use] = self._safe_value(v) row_meta["row_number"] = row_index diff --git a/test/components/converters/test_csv_to_document.py b/test/components/converters/test_csv_to_document.py index df02a8447e..dbe8857c29 100644 --- a/test/components/converters/test_csv_to_document.py +++ b/test/components/converters/test_csv_to_document.py @@ -142,6 +142,30 @@ def test_row_mode_meta_collision_prefixed(self, tmp_path: Path): assert d.meta["csv_encoding"] == "latin1" assert d.meta["comment"] == "ok" + def test_row_mode_meta_collision_multiple_suffixes(self, tmp_path): + """ + If meta already has csv_file_path and csv_file_path_1, we should write the next as csv_file_path_2 (not loop). + """ + csv_text = "file_path,comment\r\nrow.csv,ok\r\n" + f = tmp_path / "multi.csv" + f.write_text(csv_text, encoding="utf-8") + + bs = ByteStream.from_file_path(f) + bs.meta["file_path"] = str(f) + + # Pre-seed meta so we force two collisions. + extra_meta = {"csv_file_path": "existing0", "csv_file_path_1": "existing1"} + + conv = CSVToDocument(conversion_mode="row") + out = conv.run(sources=[bs], meta=[extra_meta]) + d = out["documents"][0] + + # Existing values preserved; new one goes to _2 + assert d.meta["csv_file_path"] == "existing0" + assert d.meta["csv_file_path_1"] == "existing1" + assert d.meta["csv_file_path_2"] == "row.csv" + assert d.meta["comment"] == "ok" + def test_init_validates_delimiter_and_quotechar(self): with pytest.raises(ValueError): CSVToDocument(delimiter=";;") From 67cfb00dbc08c48a38b09c87b85a0fa1b062aa9e Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Mon, 22 Sep 2025 11:50:41 +0530 Subject: [PATCH 5/7] feat(converters): require content_column in run() for row mode; remove fallbacks; improve docstrings; update tests Signed-off-by: Arya Tayshete --- haystack/components/converters/csv.py | 82 ++++----- .../converters/test_csv_to_document.py | 157 +++++++----------- 2 files changed, 106 insertions(+), 133 deletions(-) diff --git a/haystack/components/converters/csv.py b/haystack/components/converters/csv.py index a9c226e8e4..3d9496c5a3 100644 --- a/haystack/components/converters/csv.py +++ b/haystack/components/converters/csv.py @@ -44,7 +44,7 @@ def __init__( store_full_path: bool = False, *, conversion_mode: Literal["file", "row"] = "file", - content_column: Optional[str] = None, + # content_column: Optional[str] = None, delimiter: str = ",", quotechar: str = '"', ): @@ -61,9 +61,6 @@ def __init__( :param conversion_mode: - "file" (default): current behavior, one Document per CSV file whose content is the raw CSV text. - "row": convert each CSV row to its own Document. - :param content_column: - When ``conversion_mode="row"``, the column to use as ``Document.content``. - If ``None``, the content will be a human-readable "key: value" listing of that row. :param delimiter: CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``). :param quotechar: @@ -73,7 +70,7 @@ def __init__( self.encoding = encoding self.store_full_path = store_full_path self.conversion_mode = conversion_mode - self.content_column = content_column + # self.content_column = content_column self.delimiter = delimiter self.quotechar = quotechar @@ -87,13 +84,18 @@ def __init__( def run( self, sources: list[Union[str, Path, ByteStream]], + *, + content_column: Optional[str] = None, meta: Optional[Union[dict[str, Any], list[dict[str, Any]]]] = None, ): """ - Converts a CSV file to a Document. + Converts CSV files to a Document. :param sources: List of file paths or ByteStream objects. + :param content_column: + Required when ``conversion_mode="row"``. The column name whose values become + ``Document.content`` for each row. The column must exist in the CSV header. :param meta: Optional metadata to attach to the documents. This value can be either a list of dictionaries or a single dictionary. @@ -138,6 +140,13 @@ def run( documents.append(Document(content=data, meta=merged_metadata)) continue + # --- ROW MODE (strict) --- + # Enforce required content_column in row mode + if not content_column: + raise ValueError( + "CSVToDocument(row): 'content_column' is required in run() when conversion_mode='row'." + ) + # Reviewer note: Warn for very large CSVs in row mode (memory consideration) try: size_bytes = len(raw) @@ -150,44 +159,29 @@ def run( except Exception: pass - # Mode: row -> one Document per CSV row + # Create DictReader; if this fails, raise (no fallback) try: reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar) except Exception as e: - logger.warning( - "Could not parse CSV rows for {source}. Falling back to file mode. Error: {error}", - source=source, - error=e, - ) - documents.append(Document(content=data, meta=merged_metadata)) - continue + raise RuntimeError(f"CSVToDocument(row): could not parse CSV rows for {source}: {e}") from e - # Validate content_column presence; fall back to listing if missing - effective_content_col = self.content_column + # Validate header contains content_column; strict error if missing header = reader.fieldnames or [] - if effective_content_col and effective_content_col not in header: - logger.warning( - "CSVToDocument(row): content_column='{col}' not found in header for {source}; " - "falling back to key: value listing.", - col=effective_content_col, - source=source, + if content_column not in header: + raise ValueError( + f"CSVToDocument(row): content_column='{content_column}' not found in header " + f"for {source}. Available columns: {header}" ) - effective_content_col = None + # Build documents; if a row processing fails, raise immediately (no skip) for i, row in enumerate(reader): - # Protect against malformed rows (reviewer suggestion) try: doc = self._build_document_from_row( - row=row, base_meta=merged_metadata, row_index=i, content_column=effective_content_col + row=row, base_meta=merged_metadata, row_index=i, content_column=content_column ) - documents.append(doc) except Exception as e: - logger.warning( - "CSVToDocument(row): skipping malformed row {row_index} in {source}. Error: {error}", - row_index=i, - source=source, - error=e, - ) + raise RuntimeError(f"CSVToDocument(row): failed to process row {i} for {source}: {e}") from e + documents.append(doc) return {"documents": documents} @@ -197,22 +191,30 @@ def _safe_value(self, value: Any) -> str: return "" if value is None else str(value) def _build_document_from_row( - self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: Optional[str] + self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: str ) -> Document: """ - Create a Document from a single CSV row. Does not catch exceptions; caller wraps. + Build a ``Document`` from one parsed CSV row. + + :param row: Mapping of column name to cell value for the current row + (as produced by ``csv.DictReader``). + :param base_meta: File-level and user-provided metadata to start from + (for example: ``file_path``, ``encoding``). + :param row_index: Zero-based row index in the CSV; stored as + ``row_number`` in the output document's metadata. + :param content_column: Column name to use for ``Document.content``. + :returns: A ``Document`` with chosen content and merged metadata. + Remaining row columns are added to ``meta`` with collision-safe + keys (prefixed with ``csv_`` if needed). """ row_meta = dict(base_meta) - # content - if content_column: - content = self._safe_value(row.get(content_column)) - else: - content = "\n".join(f"{k}: {self._safe_value(v)}" for k, v in row.items()) + # content (strict: content_column must exist; validated by caller) + content = self._safe_value(row.get(content_column)) # merge remaining columns into meta with collision handling for k, v in row.items(): - if content_column and k == content_column: + if k == content_column: continue key_to_use = k if key_to_use in row_meta: diff --git a/test/components/converters/test_csv_to_document.py b/test/components/converters/test_csv_to_document.py index dbe8857c29..a948b58afc 100644 --- a/test/components/converters/test_csv_to_document.py +++ b/test/components/converters/test_csv_to_document.py @@ -4,7 +4,6 @@ import logging import os -from pathlib import Path import pytest @@ -18,10 +17,10 @@ def csv_converter(): class TestCSVToDocument: - def test_init(self, csv_converter: CSVToDocument): + def test_init(self, csv_converter): assert isinstance(csv_converter, CSVToDocument) - def test_run(self, test_files_path: Path): + def test_run(self, test_files_path): """ Test if the component runs correctly. """ @@ -39,7 +38,7 @@ def test_run(self, test_files_path: Path): assert docs[1].meta["file_path"] == os.path.basename(files[1]) assert docs[2].meta["file_path"] == os.path.basename(files[2]) - def test_run_with_store_full_path_false(self, test_files_path: Path): + def test_run_with_store_full_path_false(self, test_files_path): """ Test if the component runs correctly with store_full_path=False """ @@ -58,7 +57,7 @@ def test_run_with_store_full_path_false(self, test_files_path: Path): assert docs[1].meta["file_path"] == "sample_2.csv" assert docs[2].meta["file_path"] == "sample_3.csv" - def test_run_error_handling(self, test_files_path: Path, caplog: pytest.LogCaptureFixture): + def test_run_error_handling(self, test_files_path, caplog): """ Test if the component correctly handles errors. """ @@ -75,7 +74,7 @@ def test_run_error_handling(self, test_files_path: Path, caplog: pytest.LogCaptu assert len(docs) == 2 assert docs[0].meta["file_path"] == os.path.basename(paths[0]) - def test_encoding_override(self, test_files_path: Path, caplog: pytest.LogCaptureFixture): + def test_encoding_override(self, test_files_path, caplog): """ Test if the encoding metadata field is used properly """ @@ -83,9 +82,9 @@ def test_encoding_override(self, test_files_path: Path, caplog: pytest.LogCaptur bytestream.meta["key"] = "value" converter = CSVToDocument(encoding="utf-16-le") - output = converter.run(sources=[bytestream]) + _ = converter.run(sources=[bytestream]) with caplog.at_level(logging.ERROR): - output = converter.run(sources=[bytestream]) + _ = converter.run(sources=[bytestream]) assert "codec can't decode" in caplog.text converter = CSVToDocument(encoding="utf-8") @@ -100,29 +99,46 @@ def test_run_with_meta(self): converter = CSVToDocument() output = converter.run(sources=[bytestream], meta=[{"language": "it"}]) document = output["documents"][0] - - # check that the metadata from the bytestream is merged with that from the meta parameter assert document.meta == {"name": "test_name", "language": "it"} - # --- NEW TESTS for row mode --- + # --- NEW TESTS for strict row mode --- + + def test_row_mode_requires_content_column_param(self, tmp_path): + # Missing content_column must raise in row mode + f = tmp_path / "t.csv" + f.write_text("a,b\r\n1,2\r\n", encoding="utf-8") + conv = CSVToDocument(conversion_mode="row") + with pytest.raises(ValueError): + _ = conv.run(sources=[f]) # content_column missing + + def test_row_mode_missing_header_raises(self, tmp_path): + # content_column must exist in header + f = tmp_path / "t.csv" + f.write_text("a,b\r\n1,2\r\n", encoding="utf-8") + conv = CSVToDocument(conversion_mode="row") + with pytest.raises(ValueError): + _ = conv.run(sources=[f], content_column="missing") - def test_row_mode_with_missing_content_column_warns_and_fallbacks(self, tmp_path, caplog): - csv_text = "a,b\r\n1,2\r\n3,4\r\n" - f = tmp_path / "miss.csv" + def test_row_mode_with_content_column(self, tmp_path): + csv_text = "text,author,stars\r\nNice app,Ada,5\r\nBuggy,Bob,2\r\n" + f = tmp_path / "fb.csv" f.write_text(csv_text, encoding="utf-8") - bs = ByteStream.from_file_path(f) - bs.meta["file_path"] = str(f) - conv = CSVToDocument(conversion_mode="row", content_column="missing") - with caplog.at_level(logging.WARNING): - out = conv.run(sources=[bs]) - assert "content_column='missing' not found" in caplog.text - docs = out["documents"] + bytestream = ByteStream.from_file_path(f) + bytestream.meta["file_path"] = str(f) + + converter = CSVToDocument(conversion_mode="row") + output = converter.run(sources=[bytestream], content_column="text") + docs = output["documents"] + assert len(docs) == 2 - # Fallback content is a readable listing - assert "a: 1" in docs[0].content and "b: 2" in docs[0].content + assert [d.content for d in docs] == ["Nice app", "Buggy"] + assert docs[0].meta["author"] == "Ada" + assert docs[0].meta["stars"] == "5" + assert docs[0].meta["row_number"] == 0 + assert os.path.basename(f) == docs[0].meta["file_path"] - def test_row_mode_meta_collision_prefixed(self, tmp_path: Path): + def test_row_mode_meta_collision_prefixed(self, tmp_path): # ByteStream meta has file_path and encoding; CSV also has those columns. csv_text = "file_path,encoding,comment\r\nrowpath.csv,latin1,ok\r\n" f = tmp_path / "collide.csv" @@ -132,7 +148,7 @@ def test_row_mode_meta_collision_prefixed(self, tmp_path: Path): bs.meta["encoding"] = "utf-8" conv = CSVToDocument(conversion_mode="row") - out = conv.run(sources=[bs]) + out = conv.run(sources=[bs], content_column="comment") d = out["documents"][0] # Original meta preserved assert d.meta["file_path"] == os.path.basename(str(f)) @@ -140,11 +156,14 @@ def test_row_mode_meta_collision_prefixed(self, tmp_path: Path): # CSV columns stored with csv_ prefix (no clobber) assert d.meta["csv_file_path"] == "rowpath.csv" assert d.meta["csv_encoding"] == "latin1" - assert d.meta["comment"] == "ok" + # content column isn't duplicated in meta + assert "comment" not in d.meta + assert d.meta["row_number"] == 0 + assert d.content == "ok" def test_row_mode_meta_collision_multiple_suffixes(self, tmp_path): """ - If meta already has csv_file_path and csv_file_path_1, we should write the next as csv_file_path_2 (not loop). + If meta already has csv_file_path and csv_file_path_1, we should write the next as csv_file_path_2. """ csv_text = "file_path,comment\r\nrow.csv,ok\r\n" f = tmp_path / "multi.csv" @@ -157,14 +176,13 @@ def test_row_mode_meta_collision_multiple_suffixes(self, tmp_path): extra_meta = {"csv_file_path": "existing0", "csv_file_path_1": "existing1"} conv = CSVToDocument(conversion_mode="row") - out = conv.run(sources=[bs], meta=[extra_meta]) + out = conv.run(sources=[bs], meta=[extra_meta], content_column="comment") d = out["documents"][0] - # Existing values preserved; new one goes to _2 assert d.meta["csv_file_path"] == "existing0" assert d.meta["csv_file_path_1"] == "existing1" assert d.meta["csv_file_path_2"] == "row.csv" - assert d.meta["comment"] == "ok" + assert d.content == "ok" def test_init_validates_delimiter_and_quotechar(self): with pytest.raises(ValueError): @@ -172,79 +190,32 @@ def test_init_validates_delimiter_and_quotechar(self): with pytest.raises(ValueError): CSVToDocument(quotechar='""') - def test_row_mode_large_file_warns(self, caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch): - # Make the threshold tiny so the warning always triggers, regardless of platform. + def test_row_mode_large_file_warns(self, caplog, monkeypatch): + # Make the threshold tiny so the warning always triggers. import haystack.components.converters.csv as csv_mod monkeypatch.setattr(csv_mod, "_ROW_MODE_SIZE_WARN_BYTES", 1, raising=False) bs = ByteStream(data=b"text,author\nhi,Ada\n", meta={"file_path": "big.csv"}) conv = CSVToDocument(conversion_mode="row") - # Capture the converter module's logger explicitly for reliability across CI runners. with caplog.at_level(logging.WARNING, logger="haystack.components.converters.csv"): - _ = conv.run(sources=[bs]) + _ = conv.run(sources=[bs], content_column="text") assert "parsing a large CSV" in caplog.text - def test_row_mode_with_content_column(self, tmp_path: Path): - """ - Each row becomes a Document, with `content` from a chosen column and other columns in meta. - """ - csv_text = "text,author,stars\r\nNice app,Ada,5\r\nBuggy,Bob,2\r\n" - f = tmp_path / "fb.csv" - f.write_text(csv_text, encoding="utf-8") - - bytestream = ByteStream.from_file_path(f) - bytestream.meta["file_path"] = str(f) - - converter = CSVToDocument(conversion_mode="row", content_column="text") - output = converter.run(sources=[bytestream]) - docs = output["documents"] - - assert len(docs) == 2 - assert [d.content for d in docs] == ["Nice app", "Buggy"] - # Remaining columns land in meta, plus file-level meta preserved - assert docs[0].meta["author"] == "Ada" - assert docs[0].meta["stars"] == "5" - assert docs[0].meta["row_number"] == 0 - # still respects store_full_path default=False trimming when present - assert os.path.basename(f) == docs[0].meta["file_path"] - - def test_row_mode_without_content_column(self, tmp_path: Path): - """ - Without `content_column`, the content is a human-readable 'key: value' listing of the row. - """ - csv_text = "a,b\r\n1,2\r\n3,4\r\n" - f = tmp_path / "t.csv" - f.write_text(csv_text, encoding="utf-8") - - converter = CSVToDocument(conversion_mode="row") - output = converter.run(sources=[f]) - docs = output["documents"] - - assert len(docs) == 2 - assert "a: 1" in docs[0].content and "b: 2" in docs[0].content - assert docs[0].meta["a"] == "1" and docs[0].meta["b"] == "2" - assert docs[0].meta["row_number"] == 0 + def test_row_mode_reader_failure_raises_runtimeerror(self, monkeypatch, tmp_path): + # Simulate DictReader failing -> we should raise RuntimeError (no fallback). + import haystack.components.converters.csv as csv_mod - def test_row_mode_meta_merging(self, tmp_path: Path): - """ - File-level meta and explicit `meta` arg are merged into each row's meta. - """ - csv_text = "q,user\r\nHello,u1\r\nHi,u2\r\n" - f = tmp_path / "m.csv" - f.write_text(csv_text, encoding="utf-8") + f = tmp_path / "bad.csv" + f.write_text("a,b\n1,2\n", encoding="utf-8") + conv = CSVToDocument(conversion_mode="row") - bs = ByteStream.from_file_path(f) - bs.meta["dataset"] = "support_tickets" + class Boom(Exception): + pass - converter = CSVToDocument(conversion_mode="row", content_column="q") - out = converter.run(sources=[bs], meta=[{"lang": "en"}]) - docs = out["documents"] + def broken_reader(*_args, **_kwargs): # noqa: D401 + raise Boom("broken") - assert len(docs) == 2 - assert docs[0].content == "Hello" - # merged meta propagated to each row - assert docs[0].meta["dataset"] == "support_tickets" - assert docs[0].meta["lang"] == "en" - # remaining column captured - assert docs[0].meta["user"] == "u1" + monkeypatch.setattr(csv_mod.csv, "DictReader", broken_reader, raising=True) + with pytest.raises(RuntimeError): + _ = conv.run(sources=[f], content_column="a") From e4960ffd7982d7e4eceeba546d2f967c3a015966 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Tue, 23 Sep 2025 20:35:05 +0530 Subject: [PATCH 6/7] feat(converters): content_column required in run method instead of init Signed-off-by: Arya Tayshete --- haystack/components/converters/csv.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/haystack/components/converters/csv.py b/haystack/components/converters/csv.py index 3d9496c5a3..ac28b3dc36 100644 --- a/haystack/components/converters/csv.py +++ b/haystack/components/converters/csv.py @@ -34,7 +34,7 @@ class CSVToDocument: results = converter.run(sources=["sample.csv"], meta={"date_added": datetime.now().isoformat()}) documents = results["documents"] print(documents[0].content) - # 'col1,col2\now1,row1\nrow2row2\n' + # 'col1,col2\\nrow1,row1\\nrow2,row2\\n' ``` """ @@ -44,7 +44,6 @@ def __init__( store_full_path: bool = False, *, conversion_mode: Literal["file", "row"] = "file", - # content_column: Optional[str] = None, delimiter: str = ",", quotechar: str = '"', ): @@ -59,22 +58,20 @@ def __init__( If True, the full path of the file is stored in the metadata of the document. If False, only the file name is stored. :param conversion_mode: - - "file" (default): current behavior, one Document per CSV file whose content is the raw CSV text. - - "row": convert each CSV row to its own Document. + - "file" (default): one Document per CSV file whose content is the raw CSV text. + - "row": convert each CSV row to its own Document (requires `content_column` in `run()`). :param delimiter: CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``). :param quotechar: CSV quote character used when parsing in row mode (passed to ``csv.DictReader``). """ - self.encoding = encoding self.store_full_path = store_full_path self.conversion_mode = conversion_mode - # self.content_column = content_column self.delimiter = delimiter self.quotechar = quotechar - # Basic validation (reviewer suggestion) + # Basic validation if len(self.delimiter) != 1: raise ValueError("CSVToDocument: delimiter must be a single character.") if len(self.quotechar) != 1: @@ -89,13 +86,14 @@ def run( meta: Optional[Union[dict[str, Any], list[dict[str, Any]]]] = None, ): """ - Converts CSV files to a Document. + Converts CSV files to a Document (file mode) or to one Document per row (row mode). :param sources: List of file paths or ByteStream objects. :param content_column: - Required when ``conversion_mode="row"``. The column name whose values become - ``Document.content`` for each row. The column must exist in the CSV header. + **Required when** ``conversion_mode="row"``. + The column name whose values become ``Document.content`` for each row. + The column must exist in the CSV header. :param meta: Optional metadata to attach to the documents. This value can be either a list of dictionaries or a single dictionary. @@ -117,11 +115,11 @@ def run( except Exception as e: logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e) continue + try: encoding = bytestream.meta.get("encoding", self.encoding) raw = io.BytesIO(bytestream.data).getvalue() data = raw.decode(encoding=encoding) - except Exception as e: logger.warning( "Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e @@ -141,13 +139,13 @@ def run( continue # --- ROW MODE (strict) --- - # Enforce required content_column in row mode + # Require content_column in run(); no fallback if not content_column: raise ValueError( "CSVToDocument(row): 'content_column' is required in run() when conversion_mode='row'." ) - # Reviewer note: Warn for very large CSVs in row mode (memory consideration) + # Warn for large CSVs in row mode (memory consideration) try: size_bytes = len(raw) if size_bytes > _ROW_MODE_SIZE_WARN_BYTES: From 9da567d6096f2af35258a705d08c45af7e7ef415 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Sun, 5 Oct 2025 11:47:49 +0530 Subject: [PATCH 7/7] feat(csv): row-mode with required run() arg ; update BDD pipeline tests --- test/core/pipeline/features/test_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 1524932306..38c19e1973 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -5169,7 +5169,7 @@ def pipeline_that_converts_files(pipeline_class): expected_outputs={"a_joiner": {"documents": expected_csv_docs + expected_splits_docs}}, expected_component_calls={ ("router", 1): {"sources": sources, "meta": None}, - ("csv_converter", 1): {"sources": [sources[0]], "meta": None}, + ("csv_converter", 1): {"sources": [sources[0]], "meta": None, "content_column": None}, ("txt_converter", 1): {"sources": [sources[1]], "meta": None}, ("json_converter", 1): {"sources": [sources[2]], "meta": None}, ("b_joiner", 1): {