Skip to content

Commit 3ab2ba0

Browse files
authored
Merge pull request #9327 from youngsofun/fix_csv
fix(csv): fix align_flush with header only.
2 parents 5b59f0d + 601b3d6 commit 3ab2ba0

File tree

4 files changed

+66
-2
lines changed

4 files changed

+66
-2
lines changed

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs

+42
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,48 @@ impl InputFormatTextBase for InputFormatCSV {
299299

300300
fn align_flush(state: &mut AligningState<Self>) -> Result<Vec<RowBatch>> {
301301
let mut res = vec![];
302+
let num_fields = state.num_fields;
303+
let reader = state.csv_reader.as_mut().expect("must success");
304+
let field_ends = &mut reader.field_ends[..];
305+
let in_tmp = Vec::new();
306+
let mut out_tmp = vec![0u8; 1];
307+
let mut endlen = reader.n_end;
308+
309+
if state.rows_to_skip > 0 {
310+
let (result, _, _, n_end) =
311+
reader
312+
.reader
313+
.read_record(&in_tmp, &mut out_tmp, &mut field_ends[endlen..]);
314+
endlen += n_end;
315+
316+
return match result {
317+
ReadRecordResult::InputEmpty => {
318+
reader.n_end = endlen;
319+
Ok(vec![])
320+
}
321+
ReadRecordResult::OutputFull => Err(output_full_error(&state.path, state.rows)),
322+
ReadRecordResult::OutputEndsFull => Err(output_ends_full_error(
323+
num_fields,
324+
field_ends.len(),
325+
&state.path,
326+
state.rows,
327+
)),
328+
ReadRecordResult::Record => {
329+
Self::check_num_field(num_fields, endlen, field_ends, &state.path, state.rows)?;
330+
331+
state.rows_to_skip -= 1;
332+
tracing::debug!(
333+
"csv aligner: skip a header row, remain {}",
334+
state.rows_to_skip
335+
);
336+
Ok(vec![])
337+
}
338+
ReadRecordResult::End => {
339+
Err(csv_error("unexpect eof in header", &state.path, state.rows))
340+
}
341+
};
342+
}
343+
302344
let num_fields = state.num_fields;
303345
let reader = state.csv_reader.as_mut().expect("must success");
304346
let field_ends = &mut reader.field_ends[..];

src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,10 @@ impl<I: InputFormatPipe> Processor for Aligner<I> {
123123
let eof = read_batch.is_none();
124124
let row_batches = state.align(read_batch)?;
125125
for b in row_batches.into_iter() {
126-
process_values.rows += b.rows();
127-
self.row_batches.push_back(b);
126+
if b.size() > 0 {
127+
process_values.rows += b.rows();
128+
self.row_batches.push_back(b);
129+
}
128130
}
129131
if eof {
130132
assert!(self.is_flushing_split);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../../../../shell_env.sh
5+
6+
echo "drop table if exists test_csv_header_only" | $MYSQL_CLIENT_CONNECT
7+
8+
cat << EOF > /tmp/test_csv_header_only.csv
9+
c1,c2
10+
EOF
11+
12+
echo "CREATE TABLE test_csv_header_only
13+
(
14+
a Int,
15+
b Int
16+
);" | $MYSQL_CLIENT_CONNECT
17+
18+
curl -sH "insert_sql:insert into test_csv_header_only file_format = (type = 'CSV' skip_header = 1 record_delimiter = '-')" -F "upload=@/tmp/test_csv_header_only.csv" \
19+
-u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c '\"rows\":0'

0 commit comments

Comments
 (0)