Skip to content

Commit

Permalink
stream-encode: CSV writer differentiates 0-length vs. absent string
Browse files Browse the repository at this point in the history
A string field that is present in a document with a value of 0-length, AKA the
string "", is different than a string field that is absent.

The stdlib Go CSV writer does not allow for such a distinction to be made,
since all fields must be strings and there is no way to indicate an "absent"
string.

This commit implements a custom CSV writer that can tell the difference between
an absent and 0-length string. An absent string gets "skipped", with no value at
all placed between the commas of a row. A 0-length string gets quoted to just be
"". Other strings get quoted if they have special characters requiring quotes;
otherwise they are written as-is.

This new CSV writer is inspired by the Go stdlib CSV writer, with the required
additions for absent vs. empty string. Some of the extra configuration that we
generally don't need has been stripped out like specifying a special NULL
string, and using a custom "comma" value - nobody has ever used these options in
our filesink materializations, and it seems reasonable to not support them
unless somebody requests them down the line. This custom CSV writer also does
not use an additional internal buffer, since this is redundant with the
buffer(s) that are inevitably used by its outputs.
  • Loading branch information
williamhbaker committed Feb 4, 2025
1 parent 8bce00f commit aacc2a5
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 135 deletions.
14 changes: 1 addition & 13 deletions filesink/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,16 @@ func NewParquetStreamEncoder(cfg ParquetConfig, b *pf.MaterializationSpec_Bindin
}

type CsvConfig struct {
Delimiter string `json:"delimiter,omitempty" jsonschema:"title=Delimiter,description=Character to separate columns within a row. Defaults to a comma if blank. Must be a single character with a byte length of 1." jsonschema_extras:"order=0"`
NullString string `json:"nullString,omitempty" jsonschema:"title=Null String,description=String to use to represent NULL values. Defaults to an empty string if blank." jsonschema_extras:"order=1"`
SkipHeaders bool `json:"skipHeaders,omitempty" jsonschema:"title=Skip Headers,description=Do not write headers to files." jsonschema_extras:"order=2"`
SkipHeaders bool `json:"skipHeaders,omitempty" jsonschema:"title=Skip Headers,description=Do not write headers to files." jsonschema_extras:"order=2"`
}

func (c CsvConfig) Validate() error {
if r := []rune(c.Delimiter); len(r) > 1 {
return fmt.Errorf("delimiter %q must be a single rune (byte length of 1): got byte length of %d", c.Delimiter, len(r))
}

return nil
}

func NewCsvStreamEncoder(cfg CsvConfig, b *pf.MaterializationSpec_Binding, w io.WriteCloser) StreamEncoder {
var opts []enc.CsvOption

if cfg.Delimiter != "" {
opts = append(opts, enc.WithCsvDelimiter([]rune(cfg.Delimiter)[0])) // already validated to be 1 byte in length
}
if cfg.NullString != "" {
opts = append(opts, enc.WithCsvNullString(cfg.NullString))
}
if cfg.SkipHeaders {
opts = append(opts, enc.WithCsvSkipHeaders())
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

155 changes: 108 additions & 47 deletions materialize-boilerplate/stream-encode/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package stream_encode

import (
"compress/flate"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"unicode"
"unicode/utf8"

"github.com/klauspost/compress/gzip"
)
Expand All @@ -15,14 +17,12 @@ const csvCompressionlevel = flate.BestSpeed

type csvConfig struct {
skipHeaders bool
nullStr string
delimiter rune
}

type CsvEncoder struct {
cfg csvConfig
fields []string
csv *csv.Writer
csv *csvWriter
cwc *countingWriteCloser
gz *gzip.Writer
}
Expand All @@ -35,18 +35,6 @@ func WithCsvSkipHeaders() CsvOption {
}
}

func WithCsvNullString(str string) CsvOption {
return func(cfg *csvConfig) {
cfg.nullStr = str
}
}

func WithCsvDelimiter(r rune) CsvOption {
return func(cfg *csvConfig) {
cfg.delimiter = r
}
}

func NewCsvEncoder(w io.WriteCloser, fields []string, opts ...CsvOption) *CsvEncoder {
var cfg csvConfig
for _, o := range opts {
Expand All @@ -60,14 +48,9 @@ func NewCsvEncoder(w io.WriteCloser, fields []string, opts ...CsvOption) *CsvEnc
panic("invalid compression level for gzip.NewWriterLevel")
}

csvw := csv.NewWriter(gz)
if cfg.delimiter != 0 {
csvw.Comma = cfg.delimiter
}

return &CsvEncoder{
cfg: cfg,
csv: csvw,
csv: newCsvWriter(gz),
cwc: cwc,
gz: gz,
fields: fields,
Expand All @@ -76,56 +59,134 @@ func NewCsvEncoder(w io.WriteCloser, fields []string, opts ...CsvOption) *CsvEnc

func (e *CsvEncoder) Encode(row []any) error {
if !e.cfg.skipHeaders {
if err := e.csv.Write(e.fields); err != nil {
headerRow := make([]any, len(e.fields))
for i, f := range e.fields {
headerRow[i] = f
}
if err := e.csv.writeRow(headerRow); err != nil {
return fmt.Errorf("writing header: %w", err)
}
e.cfg.skipHeaders = true
}

record := make([]string, 0, len(row))
return e.csv.writeRow(row)
}

func (e *CsvEncoder) Written() int {
return e.cwc.written
}

func (e *CsvEncoder) Close() error {
if err := e.gz.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
} else if err := e.cwc.Close(); err != nil {
return fmt.Errorf("closing counting writer: %w", err)
}

return nil
}

type csvWriter struct {
w io.Writer
}

func newCsvWriter(w io.Writer) *csvWriter {
return &csvWriter{w: w}
}

for _, v := range row {
func (w *csvWriter) writeRow(row []any) error {
for n, v := range row {
if n > 0 {
if _, err := w.w.Write([]byte(",")); err != nil {
return err
}
}

var field string
switch value := v.(type) {
case json.RawMessage:
record = append(record, string(value))
field = string(value)
case []byte:
record = append(record, string(value))
field = string(value)
case string:
record = append(record, value)
field = value
case bool:
record = append(record, strconv.FormatBool(value))
field = strconv.FormatBool(value)
case int64:
record = append(record, strconv.Itoa(int(value)))
field = strconv.Itoa(int(value))
case int:
record = append(record, strconv.Itoa(value))
field = strconv.Itoa(value)
case float64:
record = append(record, strconv.FormatFloat(value, 'f', -1, 64))
field = strconv.FormatFloat(value, 'f', -1, 64)
case float32:
record = append(record, strconv.FormatFloat(float64(value), 'f', -1, 64))
field = strconv.FormatFloat(float64(value), 'f', -1, 64)
case nil:
record = append(record, e.cfg.nullStr)
continue
default:
record = append(record, fmt.Sprintf("%v", value))
field = fmt.Sprintf("%v", value)
}

if err := w.writeField(field); err != nil {
return err
}
}

return e.csv.Write(record)
if _, err := w.w.Write([]byte("\n")); err != nil {
return err
}

return nil
}

func (e *CsvEncoder) Written() int {
return e.cwc.written
func (w *csvWriter) writeField(field string) error {
if !w.fieldNeedsQuotes(field) {
if _, err := w.w.Write([]byte(field)); err != nil {
return err
}
} else {
if _, err := w.w.Write([]byte(`"`)); err != nil {
return err
}
for len(field) > 0 {
// Escape quote characters present in the string by replacing them
// with double quotes.
i := strings.Index(field, `"`)
if i < 0 {
i = len(field)
}

if _, err := w.w.Write([]byte(field[:i])); err != nil {
return err
}

field = field[i:]
if len(field) > 0 {
if _, err := w.w.Write([]byte(`""`)); err != nil {
return err
}
field = field[1:]
}
}
if _, err := w.w.Write([]byte(`"`)); err != nil {
return err
}
}

return nil
}

func (e *CsvEncoder) Close() error {
e.csv.Flush()
func (w *csvWriter) fieldNeedsQuotes(field string) bool {
if field == "" {
return true
}

if err := e.csv.Error(); err != nil {
return fmt.Errorf("flushing csv writer: %w", err)
} else if err := e.gz.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
} else if err := e.cwc.Close(); err != nil {
return fmt.Errorf("closing counting writer: %w", err)
for i := 0; i < len(field); i++ {
c := field[i]
if c == '\n' || c == '\r' || c == '"' || c == ',' {
return true
}
}

return nil
r1, _ := utf8.DecodeRuneInString(field)
return unicode.IsSpace(r1)
}
52 changes: 37 additions & 15 deletions materialize-boilerplate/stream-encode/csv_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream_encode

import (
"bytes"
"os"
"testing"

Expand All @@ -24,21 +25,6 @@ func TestCsvEncoder(t *testing.T) {
nulls: false,
opts: []CsvOption{WithCsvSkipHeaders()},
},
{
name: "with default null",
nulls: true,
opts: nil,
},
{
name: "with custom null",
nulls: true,
opts: []CsvOption{WithCsvNullString("MyNullString")},
},
{
name: "with custom delimiter",
nulls: true,
opts: []CsvOption{WithCsvDelimiter([]rune("|")[0])},
},
}

for _, tt := range tests {
Expand All @@ -61,3 +47,39 @@ func TestCsvEncoder(t *testing.T) {
})
}
}

func TestCsvWriter(t *testing.T) {
for _, tt := range []struct {
name string
row []any
want string
}{
{
name: "empty",
row: nil,
want: "\n",
},
{
name: "basic",
row: []any{"first", "second", "third"},
want: "first,second,third\n",
},
{
name: "empty string and null",
row: []any{"first", "", nil},
want: "first,\"\",\n",
},
{
name: "special characters",
row: []any{"has\nnewline", " startsWithSpace", "\tstartsWithTab", "has\"quote", "has,comma", "has\rreturn"},
want: "\"has\nnewline\",\" startsWithSpace\",\"\tstartsWithTab\",\"has\"\"quote\",\"has,comma\",\"has\rreturn\"\n",
},
} {
t.Run(tt.name, func(t *testing.T) {
var buf bytes.Buffer
csvw := newCsvWriter(&buf)
require.NoError(t, csvw.writeRow(tt.row))
require.Equal(t, tt.want, buf.String())
})
}
}
12 changes: 0 additions & 12 deletions materialize-gcs-csv/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,6 @@
},
"csvConfig": {
"properties": {
"delimiter": {
"type": "string",
"title": "Delimiter",
"description": "Character to separate columns within a row. Defaults to a comma if blank. Must be a single character with a byte length of 1.",
"order": 0
},
"nullString": {
"type": "string",
"title": "Null String",
"description": "String to use to represent NULL values. Defaults to an empty string if blank.",
"order": 1
},
"skipHeaders": {
"type": "boolean",
"title": "Skip Headers",
Expand Down
Loading

0 comments on commit aacc2a5

Please sign in to comment.