Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,8 @@ src/Backend/test_data/json

# Allow s3_source directory
!src/Backend/test_data/s3_source/
!src/Backend/test_data/s3_source/**
!src/Backend/test_data/s3_source/**

# Allow a specific CSV dataset that we want tracked despite the general csv ignores
!src/Backend/test_data/csv/
!src/Backend/test_data/csv/Mental_Health_and_Social_Media_Balance_Dataset.csv
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ We use a Makefile to simplify common development tasks. All commands should be r
```bash
make go-test-coverage
```
- Run test with html coverage
```bash
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out
```


### Rust Tests
- Run all tests
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go-test-coverage:
@echo "Running Go tests with coverage..."
cd src/Backend/opti-sql-go && go test -v -coverprofile=coverage.out ./...
cd src/Backend/opti-sql-go && go tool cover -func=coverage.out

go-run:
@echo "Running Go application..."
cd src/Backend/opti-sql-go && go run main.go
Expand Down
12 changes: 12 additions & 0 deletions src/Backend/opti-sql-go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type batchConfig struct {
EnableParallelRead bool `yaml:"enable_parallel_read"`
MaxMemoryBeforeSpill uint64 `yaml:"max_memory_before_spill"`
MaxFileSizeMB int `yaml:"max_file_size_mb"` // max size of a single file
ShouldDowndload bool `yaml:"should_download"`
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
MaxDownloadSizeMB int `yaml:"max_download_size_mb"` // max size to download from external sources like S3
}
type queryConfig struct {
// should results be cached, server side? if so how long
Expand Down Expand Up @@ -64,6 +66,10 @@ var configInstance *Config = &Config{
EnableParallelRead: true,
MaxMemoryBeforeSpill: uint64(gigaByte) * 2, // 2GB
MaxFileSizeMB: 500, // 500MB
// should we download files from external sources like S3
// if so whats the max size to download, if its greater than dont download the file locally
ShouldDowndload: true,
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
MaxDownloadSizeMB: 10, // 10MB
},
Query: queryConfig{
EnableCache: true,
Expand Down Expand Up @@ -138,6 +144,12 @@ func mergeConfig(dst *Config, src map[string]interface{}) {
if v, ok := batch["max_file_size_mb"].(int); ok {
dst.Batch.MaxFileSizeMB = v
}
if v, ok := batch["should_download"].(bool); ok {
dst.Batch.ShouldDowndload = v
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
}
if v, ok := batch["max_download_size_mb"].(int); ok {
dst.Batch.MaxDownloadSizeMB = v
}
}

// =============================
Expand Down
8 changes: 8 additions & 0 deletions src/Backend/opti-sql-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ module opti-sql-go
go 1.24.0

require (
github.com/apache/arrow/go/v15 v15.0.2
github.com/apache/arrow/go/v17 v17.0.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/mod v0.18.0 // indirect
Expand Down
14 changes: 14 additions & 0 deletions src/Backend/opti-sql-go/go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE=
github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54=
github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc=
github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI=
github.com/apache/thrift v0.20.0/go.mod h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI=
github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
37 changes: 37 additions & 0 deletions src/Backend/opti-sql-go/operators/project/projectExec.go
Original file line number Diff line number Diff line change
@@ -1 +1,38 @@
package project

import (
"errors"

"github.com/apache/arrow/go/v17/arrow"
)

// handle keeping only the request columsn but make sure the schema and columns are also aligned
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
// returns error if a column doesnt exist
func ProjectSchemaFilterDown(schema *arrow.Schema, cols []arrow.Array, keepCols ...string) (*arrow.Schema, []arrow.Array, error) {
if len(keepCols) == 0 {
return arrow.NewSchema([]arrow.Field{}, nil), nil, errors.New("no columns passed in")
}

// Build map: columnName -> original index
fieldIndex := make(map[string]int)
for i, f := range schema.Fields() {
fieldIndex[f.Name] = i
}

newFields := make([]arrow.Field, 0, len(keepCols))
newCols := make([]arrow.Array, 0, len(keepCols))

// Preserve order from keepCols, not schema order
for _, name := range keepCols {
idx, exists := fieldIndex[name]
if !exists {
return arrow.NewSchema([]arrow.Field{}, nil), []arrow.Array{}, errors.New("invalid column passed in to be pruned")
}

newFields = append(newFields, schema.Field(idx))
newCols = append(newCols, cols[idx])
}

newSchema := arrow.NewSchema(newFields, nil)
return newSchema, newCols, nil
}
212 changes: 212 additions & 0 deletions src/Backend/opti-sql-go/operators/project/source/csv.go
Original file line number Diff line number Diff line change
@@ -1 +1,213 @@
package source

import (
"encoding/csv"
"fmt"
"io"
"opti-sql-go/operators"
"strconv"
"strings"

"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
)

type ProjectCSVLeaf struct {
r *csv.Reader
schema *arrow.Schema // columns to project as well as types to cast to
colPosition map[string]int
firstDataRow []string
done bool // if this is set in Next, we have reached EOF
}

// assume everything is on disk for now
func NewProjectCSVLeaf(source io.Reader) (*ProjectCSVLeaf, error) {
r := csv.NewReader(source)
proj := &ProjectCSVLeaf{
r: r,
colPosition: make(map[string]int),
}
var err error
// construct the schema from the header
proj.schema, err = proj.parseHeader()
return proj, err
}

func (pcsv *ProjectCSVLeaf) Next(n uint64) (*operators.RecordBatch, error) {
if pcsv.done {
return nil, io.EOF
}

// 1. Create builders
builders := pcsv.initBuilders()

rowsRead := uint64(0)

// Process stored first row (from parseHeader) ---
if pcsv.firstDataRow != nil && rowsRead < n {
if err := pcsv.processRow(pcsv.firstDataRow, builders); err != nil {
return nil, err
}
pcsv.firstDataRow = nil // consume it once
rowsRead++
}

// Stream remaining rows from CSV reader ---
for rowsRead < n {
row, err := pcsv.r.Read()
if err == io.EOF {
if rowsRead == 0 {
pcsv.done = true
return nil, io.EOF
}
break
}
if err != nil {
return nil, err
}

// append to builders
if err := pcsv.processRow(row, builders); err != nil {
return nil, err
}

rowsRead++
}

// Freeze into Arrow arrays
columns := pcsv.finalizeBuilders(builders)

return &operators.RecordBatch{
Schema: pcsv.schema,
Columns: columns,
}, nil
}

func (pcsv *ProjectCSVLeaf) initBuilders() []array.Builder {
fields := pcsv.schema.Fields()
builders := make([]array.Builder, len(fields))

for i, f := range fields {
builders[i] = array.NewBuilder(memory.DefaultAllocator, f.Type)
}

return builders
}
func (pcsv *ProjectCSVLeaf) processRow(
content []string,
builders []array.Builder,
) error {
fields := pcsv.schema.Fields()

for i, f := range fields {
colIdx := pcsv.colPosition[f.Name]
cell := content[colIdx]

switch b := builders[i].(type) {

case *array.Int64Builder:
if cell == "" || cell == "NULL" {
b.AppendNull()
} else {
v, _ := strconv.ParseInt(cell, 10, 64)
b.Append(v)
}

case *array.Float64Builder:
if cell == "" || cell == "NULL" {
b.AppendNull()
} else {
v, _ := strconv.ParseFloat(cell, 64)
b.Append(v)
}

case *array.StringBuilder:
if cell == "" || cell == "NULL" {
b.AppendNull()
} else {
b.Append(cell)
}

case *array.BooleanBuilder:
if cell == "" || cell == "NULL" {
b.AppendNull()
} else {
b.Append(cell == "true")
}

default:
return fmt.Errorf("unsupported Arrow type: %s", f.Type)
}
}

return nil
}
func (pcsv *ProjectCSVLeaf) finalizeBuilders(builders []array.Builder) []arrow.Array {
columns := make([]arrow.Array, len(builders))

for i, b := range builders {
columns[i] = b.NewArray()
b.Release()
}

return columns
}

// first call to csv.Reader
func (pscv *ProjectCSVLeaf) parseHeader() (*arrow.Schema, error) {
header, err := pscv.r.Read()
if err != nil {
return nil, err
}
firstDataRow, err := pscv.r.Read()
if err != nil {
return nil, err
}
pscv.firstDataRow = firstDataRow
newFields := make([]arrow.Field, 0, len(header))
for i, colName := range header {
sampleValue := firstDataRow[i]
newFields = append(newFields, arrow.Field{
Name: colName,
Type: parseDataType(sampleValue),
Nullable: true,
})
pscv.colPosition[colName] = i
}
return arrow.NewSchema(newFields, nil), nil
}
func parseDataType(sample string) arrow.DataType {
sample = strings.TrimSpace(sample)

// Nulls or empty fields → treat as nullable string in inference
if sample == "" || strings.EqualFold(sample, "NULL") {
return arrow.BinaryTypes.String
}

// Boolean
if sample == "true" || sample == "false" {
return arrow.FixedWidthTypes.Boolean
}

// Try int
if _, err := strconv.Atoi(sample); err == nil {
return arrow.PrimitiveTypes.Int64
}

// Try float
if _, err := strconv.ParseFloat(sample, 64); err == nil {
return arrow.PrimitiveTypes.Float64
}

// Fallback to string
return arrow.BinaryTypes.String
}

/*
Integers (int8, int16, int32, int64) - whole numbers like 42, -100
Floating point (float32, float64) - decimal numbers like 3.14, -0.5
Booleans - true/false values (often represented as "true"/"false", "1"/"0", or "yes"/"no")
Strings (text) - any text like "hello", "John Doe"
Nulls
*/
Loading
Loading