Skip to content

Commit

Permalink
add less command
Browse files Browse the repository at this point in the history
  • Loading branch information
noppaz committed Jan 21, 2024
1 parent c38353c commit e991828
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 38 deletions.
23 changes: 23 additions & 0 deletions cmd/less.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cmd

import (
"github.com/noppaz/collie/internal/commands"
"github.com/spf13/cobra"
)

func init() {
var amount int
lessCommand := &cobra.Command{
Use: "less",
Short: "Scroll through N values from the file",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
filename := args[0]
return commands.LessCommand(filename, amount)
},
}

lessCommand.Flags().IntVarP(&amount, "rows", "n", 1000, "Amount of rows to load to the buffer")

rootCmd.AddCommand(lessCommand)
}
43 changes: 14 additions & 29 deletions internal/commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"strings"

"github.com/apache/arrow/go/parquet/file"
"github.com/noppaz/collie/internal/parse"
Expand Down Expand Up @@ -44,42 +45,26 @@ func RowGroupsCommand(filename string, perPage int) error {
}

func HeadCommand(filename string, amount int) error {
reader, err := file.OpenParquetFile(filename, true)
if err != nil {
return fmt.Errorf("error opening parquet file: %w", err)
}
defer reader.Close()

const rowGroupIndex = 0
rowGroup := reader.RowGroup(rowGroupIndex)

rowGroupStats, err := parse.GetRowGroupStats(rowGroupIndex, rowGroup)
headers, rows, err := parse.ReadRows(filename, amount)
if err != nil {
return err
}
for _, rowGroupColumn := range rowGroupStats.ChunkStats {
if rowGroupColumn.HasUnsupportedCompressions() {
return fmt.Errorf(
"Row group %v, column '%s' has unsupported column compression: %s",
rowGroupIndex,
rowGroupColumn.GetColumnName(),
rowGroupColumn.GetColumnCompression(),
)
}
}

rows, err := parse.ReadRowGroupValues(rowGroup, amount)
fmt.Printf("Rows: %v\n", amount)
fmt.Printf("Columns: %v\n", len(headers))
fmt.Println(visualize.LipglossTable(headers, rows))
return nil
}

func LessCommand(filename string, amount int) error {
headers, rows, err := parse.ReadRows(filename, amount)
if err != nil {
return err
}

headers := make([]string, rowGroup.NumColumns())
for i := 0; i < rowGroup.NumColumns(); i++ {
headers[i] = rowGroup.Column(i).Descriptor().Name()
}
tableString := visualize.LipglossTable(headers, rows).String()
header := strings.Join(strings.Split(tableString, "\n")[:3], "\n")
content := strings.Join(strings.Split(tableString, "\n")[3:], "\n")

fmt.Printf("Rows: %v\n", amount)
fmt.Printf("Columns: %v\n", len(headers))
fmt.Println(visualize.LipglossTable(headers, rows))
return nil
return visualize.ViewportCreator(header, content)
}
67 changes: 59 additions & 8 deletions internal/parse/rowgroup_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,80 @@ import (
"github.com/apache/arrow/go/parquet/file"
)

type ColumnValueResult struct {
func ReadRows(filename string, amount int) ([]string, [][]string, error) {
reader, err := file.OpenParquetFile(filename, true)
if err != nil {
return nil, nil, fmt.Errorf("error opening parquet file: %w", err)
}
defer reader.Close()

const rowGroupIndex = 0
firstRowGroup := reader.RowGroup(rowGroupIndex)

rowGroupStats, err := GetRowGroupStats(rowGroupIndex, firstRowGroup)
if err != nil {
return nil, nil, err
}
for _, rowGroupColumn := range rowGroupStats.ChunkStats {
if rowGroupColumn.HasUnsupportedCompressions() {
return nil, nil, fmt.Errorf(
"Row group %v, column '%s' has unsupported column compression: %s",
rowGroupIndex,
rowGroupColumn.GetColumnName(),
rowGroupColumn.GetColumnCompression(),
)
}
}

fileSchema := reader.MetaData().Schema
headers := make([]string, firstRowGroup.NumColumns())
for i := 0; i < fileSchema.NumColumns(); i++ {
headers[i] = fileSchema.Column(i).Name()
}

amountWithCeiling := min(int64(amount), reader.NumRows())
rows := make([][]string, 0, amountWithCeiling)
for i := range reader.MetaData().RowGroups {
rowGroup := reader.RowGroup(i)
amountFromRowGroup := min(rowGroup.NumRows(), amountWithCeiling-int64(len(rows)))

if int64(len(rows)) >= min(int64(amount), reader.NumRows()) {
break
}

readRows, err := readRowGroupValues(rowGroup, amountFromRowGroup)
if err != nil {
return nil, nil, err
}
rows = append(rows, readRows...)
}

return headers, rows, nil
}

type columnValueResult struct {
index int
values []string
err error
}

func ReadRowGroupValues(rowGroup *file.RowGroupReader, amount int) ([][]string, error) {
resultChan := make(chan ColumnValueResult, rowGroup.NumColumns())
func readRowGroupValues(rowGroup *file.RowGroupReader, amount int64) ([][]string, error) {
resultChan := make(chan columnValueResult, rowGroup.NumColumns())
var wg sync.WaitGroup
for i := 0; i < rowGroup.NumColumns(); i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
column := rowGroup.Column(idx)
columnValues, err := readColumnValues(column, amount)
resultChan <- ColumnValueResult{idx, columnValues, err}
resultChan <- columnValueResult{idx, columnValues, err}
}(i)
}
wg.Wait()
close(resultChan)

valuesPerRow := make([][]string, amount)
for i := 0; i < amount; i++ {
for i := int64(0); i < amount; i++ {
valuesPerRow[i] = make([]string, rowGroup.NumColumns())
}
for res := range resultChan {
Expand All @@ -46,8 +97,8 @@ func ReadRowGroupValues(rowGroup *file.RowGroupReader, amount int) ([][]string,
return valuesPerRow, nil
}

func readColumnValues(columnReader file.ColumnChunkReader, amount int) ([]string, error) {
var batchSize int64 = int64(amount)
func readColumnValues(columnReader file.ColumnChunkReader, amount int64) ([]string, error) {
batchSize := amount
stringVals := make([]string, 0, batchSize)

for {
Expand Down Expand Up @@ -129,7 +180,7 @@ func readColumnValues(columnReader file.ColumnChunkReader, amount int) ([]string
return nil, fmt.Errorf("unsupported column type: %v", rdr.Type())
}

if !columnReader.HasNext() || len(stringVals) >= amount {
if !columnReader.HasNext() || int64(len(stringVals)) >= amount {
break
}
}
Expand Down
58 changes: 58 additions & 0 deletions internal/parse/rowgroup_values_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package parse

import (
"slices"
"testing"
)

func Test_ReadRows(t *testing.T) {
type Input struct {
filename string
amount int
}

type Expected struct {
rowLength int
headers []string
}

testCases := []struct {
name string
input Input
expected Expected
}{
{
"amount less than one row group",
Input{"../../testdata/sample.parquet", 33},
Expected{33, []string{"col_int64", "col_double", "col_string", "col_timestamp"}},
},
{
"amount more than several row groups",
Input{"../../testdata/sample.parquet", 646},
Expected{646, []string{"col_int64", "col_double", "col_string", "col_timestamp"}},
},
{
"amount more than file has",
Input{"../../testdata/sample.parquet", 1400},
Expected{1000, []string{"col_int64", "col_double", "col_string", "col_timestamp"}},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
headers, rows, err := ReadRows(tt.input.filename, tt.input.amount)
if err != nil {
t.Errorf("ReadRows returned err: %v", err)
}
if !slices.Equal(headers, tt.expected.headers) {
t.Errorf("ReadRows error: header mismatch, got=%v, expected=%v", headers, tt.expected.headers)
}
if len(rows) != tt.expected.rowLength {
t.Errorf(
"ReadRows error: read row length mismatch, got=%v, expected=%v",
len(rows),
tt.expected.rowLength)
}
})
}
}
2 changes: 1 addition & 1 deletion internal/visualize/paginator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func PaginatorCreator(items []string, perPage int) error {

paginate := tea.NewProgram(model)
if _, err := paginate.Run(); err != nil {
return err
return fmt.Errorf("running paginator app: %w", err)
}
return nil
}
Expand Down
108 changes: 108 additions & 0 deletions internal/visualize/viewport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package visualize

import (
"fmt"
"strings"

"github.com/charmbracelet/bubbles/viewport"
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
)

const useHighPerformanceRenderer = true

var (
titleStyle = func() lipgloss.Style {
b := lipgloss.RoundedBorder()
b.Right = "├"
return lipgloss.NewStyle().BorderStyle(b).Padding(0, 1)
}()

infoStyle = func() lipgloss.Style {
b := lipgloss.RoundedBorder()
b.Left = "┤"
return titleStyle.Copy().BorderStyle(b)
}()
)

func ViewportCreator(header, content string) error {
p := tea.NewProgram(
ViewportModel{header: header, content: content},
tea.WithAltScreen(),
tea.WithMouseCellMotion(),
)

if _, err := p.Run(); err != nil {
return fmt.Errorf("running viewport app: %w", err)
}
return nil
}

type ViewportModel struct {
header string
content string
ready bool
viewport viewport.Model
}

func (m ViewportModel) Init() tea.Cmd {
return nil
}

func (m ViewportModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
var (
cmd tea.Cmd
cmds []tea.Cmd
)

switch msg := msg.(type) {
case tea.KeyMsg:
if k := msg.String(); k == "ctrl+c" || k == "q" || k == "esc" {
return m, tea.Quit
}

case tea.WindowSizeMsg:
headerHeight := lipgloss.Height(m.headerView())
footerHeight := lipgloss.Height(m.footerView())
verticalMarginHeight := headerHeight + footerHeight

if !m.ready {
m.viewport = viewport.New(msg.Width, msg.Height-verticalMarginHeight)
m.viewport.YPosition = headerHeight
m.viewport.HighPerformanceRendering = useHighPerformanceRenderer
m.viewport.SetContent(m.content)
m.ready = true

m.viewport.YPosition = headerHeight + 1
} else {
m.viewport.Width = msg.Width
m.viewport.Height = msg.Height - verticalMarginHeight
}

if useHighPerformanceRenderer {
cmds = append(cmds, viewport.Sync(m.viewport))
}
}

m.viewport, cmd = m.viewport.Update(msg)
cmds = append(cmds, cmd)

return m, tea.Batch(cmds...)
}

func (m ViewportModel) View() string {
if !m.ready {
return "\n Initializing..."
}
return fmt.Sprintf("%s\n%s\n%s", m.headerView(), m.viewport.View(), m.footerView())
}

func (m ViewportModel) headerView() string {
return m.header
}

func (m ViewportModel) footerView() string {
info := infoStyle.Render(fmt.Sprintf("%3.f%%", m.viewport.ScrollPercent()*100))
line := strings.Repeat("─", max(0, m.viewport.Width-lipgloss.Width(info)))
return lipgloss.JoinHorizontal(lipgloss.Center, line, info)
}

0 comments on commit e991828

Please sign in to comment.