1414
1515logger = logging .getLogger (__name__ )
1616
17+ _ROW_MODE_SIZE_WARN_BYTES = 5 * 1024 * 1024 # ~5MB; warn when parsing rows might be memory-heavy
18+
1719
1820@component
1921class CSVToDocument :
@@ -75,6 +77,12 @@ def __init__(
7577 self .delimiter = delimiter
7678 self .quotechar = quotechar
7779
80+ # Basic validation (reviewer suggestion)
81+ if len (self .delimiter ) != 1 :
82+ raise ValueError ("CSVToDocument: delimiter must be a single character." )
83+ if len (self .quotechar ) != 1 :
84+ raise ValueError ("CSVToDocument: quotechar must be a single character." )
85+
7886 @component .output_types (documents = list [Document ])
7987 def run (
8088 self ,
@@ -109,7 +117,9 @@ def run(
109117 continue
110118 try :
111119 encoding = bytestream .meta .get ("encoding" , self .encoding )
112- data = io .BytesIO (bytestream .data ).getvalue ().decode (encoding = encoding )
120+ raw = io .BytesIO (bytestream .data ).getvalue ()
121+ data = raw .decode (encoding = encoding )
122+
113123 except Exception as e :
114124 logger .warning (
115125 "Could not convert file {source}. Skipping it. Error message: {error}" , source = source , error = e
@@ -128,6 +138,18 @@ def run(
128138 documents .append (Document (content = data , meta = merged_metadata ))
129139 continue
130140
141+ # Reviewer note: Warn for very large CSVs in row mode (memory consideration)
142+ try :
143+ size_bytes = len (raw )
144+ if size_bytes > _ROW_MODE_SIZE_WARN_BYTES :
145+ logger .warning (
146+ "CSVToDocument(row): parsing a large CSV (~{mb:.1f} MB). "
147+ "Consider chunking/streaming if you hit memory issues." ,
148+ mb = size_bytes / (1024 * 1024 ),
149+ )
150+ except Exception :
151+ pass
152+
131153 # Mode: row -> one Document per CSV row
132154 try :
133155 reader = csv .DictReader (io .StringIO (data ), delimiter = self .delimiter , quotechar = self .quotechar )
@@ -140,26 +162,68 @@ def run(
140162 documents .append (Document (content = data , meta = merged_metadata ))
141163 continue
142164
165+ # Validate content_column presence; fall back to listing if missing
166+ effective_content_col = self .content_column
167+ header = reader .fieldnames or []
168+ if effective_content_col and effective_content_col not in header :
169+ logger .warning (
170+ "CSVToDocument(row): content_column='{col}' not found in header for {source}; "
171+ "falling back to key: value listing." ,
172+ col = effective_content_col ,
173+ source = source ,
174+ )
175+ effective_content_col = None
176+
143177 for i , row in enumerate (reader ):
144- row_meta = dict (merged_metadata ) # start with file-level/meta param bytestream meta
145-
146- # Determine content from selected column or fallback to a friendly listing
147- if self .content_column :
148- content = row .get (self .content_column , "" )
149- if content is None :
150- content = ""
151- else :
152- # "key: value" per line for readability
153- content = "\n " .join (f"{ k } : { v if v is not None else '' } " for k , v in row .items ())
154-
155- # Add remaining columns into meta (don't override existing keys like file_path, encoding, etc.)
156- for k , v in row .items ():
157- if self .content_column and k == self .content_column :
158- continue
159- if k not in row_meta :
160- row_meta [k ] = "" if v is None else v
161-
162- row_meta ["row_number" ] = i
163- documents .append (Document (content = content , meta = row_meta ))
178+ # Protect against malformed rows (reviewer suggestion)
179+ try :
180+ doc = self ._build_document_from_row (
181+ row = row , base_meta = merged_metadata , row_index = i , content_column = effective_content_col
182+ )
183+ documents .append (doc )
184+ except Exception as e :
185+ logger .warning (
186+ "CSVToDocument(row): skipping malformed row {row_index} in {source}. Error: {error}" ,
187+ row_index = i ,
188+ source = source ,
189+ error = e ,
190+ )
164191
165192 return {"documents" : documents }
193+
194+ # ----- helpers -----
195+ def _safe_value (self , value : Any ) -> str :
196+ """Normalize CSV cell values: None -> '', everything -> str."""
197+ return "" if value is None else str (value )
198+
199+ def _build_document_from_row (
200+ self , row : dict [str , Any ], base_meta : dict [str , Any ], row_index : int , content_column : Optional [str ]
201+ ) -> Document :
202+ """
203+ Create a Document from a single CSV row. Does not catch exceptions; caller wraps.
204+ """
205+ row_meta = dict (base_meta )
206+
207+ # content
208+ if content_column :
209+ content = self ._safe_value (row .get (content_column ))
210+ else :
211+ content = "\n " .join (f"{ k } : { self ._safe_value (v )} " for k , v in row .items ())
212+
213+ # merge remaining columns into meta with collision handling
214+ for k , v in row .items ():
215+ if content_column and k == content_column :
216+ continue
217+ key_to_use = k
218+ if key_to_use in row_meta :
219+ # Avoid clobbering existing meta like file_path/encoding; prefix and de-dupe
220+ base_key = f"csv_{ key_to_use } "
221+ key_to_use = base_key
222+ suffix = 1
223+ while key_to_use in row_meta :
224+ key_to_use = f"{ base_key } _{ suffix } "
225+ suffix = 1
226+ row_meta [key_to_use ] = self ._safe_value (v )
227+
228+ row_meta ["row_number" ] = row_index
229+ return Document (content = content , meta = row_meta )
0 commit comments