Skip to content

Commit 29308e3

Browse files
authored
Merge pull request #83 from JuliaIO/tan/cursors
Added a batched columns iterator
2 parents 944d7ae + 1b1c5cf commit 29308e3

File tree

6 files changed

+138
-3
lines changed

6 files changed

+138
-3
lines changed

README.md

+26
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,32 @@ Schema:
6969
}
7070
```
7171

72+
Create cursor to iterate over batches of column values. Each iteration returns a named tuple of column names with batch of column values. One batch corresponds to one row group of the parquet file.
73+
74+
```julia
75+
julia> cc = Parquet.BatchedColumnsCursor(par)
76+
Batched Columns Cursor on customer.impala.parquet
77+
rows: 1:150000
78+
batches: 1
79+
cols: c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment
80+
81+
julia> batchvals, state = iterate(cc);
82+
83+
julia> propertynames(batchvals)
84+
(:c_custkey, :c_name, :c_address, :c_nationkey, :c_phone, :c_acctbal, :c_mktsegment, :c_comment)
85+
86+
julia> length(batchvals.c_name)
87+
150000
88+
89+
julia> batchvals.c_name[1:5]
90+
5-element Array{Union{Missing, String},1}:
91+
"Customer#000000001"
92+
"Customer#000000002"
93+
"Customer#000000003"
94+
"Customer#000000004"
95+
"Customer#000000005"
96+
```
97+
7298
Create cursor to iterate over records. In parallel mode, multiple remote cursors can be created and iterated on in parallel.
7399

74100
```julia

src/Parquet.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import Thrift: isfilled
1313
export is_par_file, ParFile, show, nrows, ncols, rowgroups, columns, pages, bytes, values, colname, colnames
1414
export schema
1515
export logical_timestamp, logical_string
16-
export RecordCursor
16+
export RecordCursor, BatchedColumnsCursor
1717

1818
# package code goes here
1919
include("PAR2/PAR2.jl")

src/cursor.jl

+61
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,67 @@ function Base.iterate(cursor::ColCursor)
273273
return r
274274
end
275275

276+
##
277+
278+
mutable struct BatchedColumnsCursor{T}
279+
par::ParFile
280+
colnames::Vector{Vector{String}}
281+
colcursors::Vector{ColCursor}
282+
colstates::Vector{Tuple{Int64,Int64}}
283+
rowgroupid::Int
284+
row::Int64
285+
end
286+
287+
function BatchedColumnsCursor(par::ParFile)
288+
sch = schema(par)
289+
290+
# supports only non nested columns as of now
291+
if !all(num_children(schemaelem) == 0 for schemaelem in sch.schema[2:end])
292+
error("nested schemas are not supported with BatchedColumnsCursor yet")
293+
end
294+
295+
colcursors = [ColCursor(par, colname) for colname in colnames(par)]
296+
rectype = ntcolstype(sch, sch.schema[1])
297+
BatchedColumnsCursor{rectype}(par, colnames(par), colcursors, Array{Tuple{Int64,Int64}}(undef, length(colcursors)), 1, 1)
298+
end
299+
300+
eltype(cursor::BatchedColumnsCursor{T}) where {T} = T
301+
length(cursor::BatchedColumnsCursor) = length(rowgroups(cursor.par))
302+
303+
function colcursor_values(colcursor::ColCursor)
304+
defn_levels = colcursor.defn_levels
305+
vals = colcursor.vals
306+
307+
logical_converter_fn = colcursor.logical_converter_fn
308+
309+
if !isempty(defn_levels) && !all(x===Int32(1) for x in defn_levels)
310+
[(defn_levels[idx] === Int32(1)) ? logical_converter_fn(vals[idx]) : missing for idx in 1:length(vals)]
311+
else
312+
(logical_converter_fn === identity) ? vals : map(logical_converter_fn, vals)
313+
end
314+
end
315+
316+
function Base.iterate(cursor::BatchedColumnsCursor{T}, rowgroupid) where {T}
317+
(rowgroupid > length(cursor)) && (return nothing)
318+
319+
colcursors = cursor.colcursors
320+
for colcursor in colcursors
321+
setrow(colcursor, cursor.row)
322+
end
323+
colvals = [colcursor_values(colcursor) for colcursor in colcursors]
324+
325+
cursor.row += (rowgroups(cursor.par)[cursor.rowgroupid]).num_rows
326+
cursor.rowgroupid += 1
327+
T(colvals), cursor.rowgroupid
328+
end
329+
330+
function Base.iterate(cursor::BatchedColumnsCursor{T}) where {T}
331+
cursor.row = 1
332+
cursor.colstates = [_start(colcursor) for colcursor in cursor.colcursors]
333+
iterate(cursor, cursor.rowgroupid)
334+
end
335+
336+
276337
##
277338

278339
mutable struct RecordCursor{T}

src/schema.jl

+14
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,20 @@ function elemtype(schelem::SchemaElement)
129129
jtype
130130
end
131131

132+
ntcolstype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
133+
ntcolstype(sch, sch.name_lookup[schname])
134+
end
135+
function ntcolstype(sch::Schema, schelem::SchemaElement)
136+
@assert num_children(schelem) > 0
137+
idx = findfirst(x->x===schelem, sch.schema)
138+
children_range = (idx+1):(idx+schelem.num_children)
139+
names = [Symbol(x.name) for x in sch.schema[children_range]]
140+
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[children_range]]
141+
optionals = [isoptional(x) for x in sch.schema[children_range]]
142+
types = [Vector{opt ? Union{t,Missing} : t} for (t,opt) in zip(types, optionals)]
143+
NamedTuple{(names...,),Tuple{types...}}
144+
end
145+
132146
ntelemtype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
133147
ntelemtype(sch, sch.name_lookup[schname])
134148
end

src/show.jl

+11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ function show(io::IO, cursor::RecordCursor)
1414
println(io, " cols: $(join(colpaths, ", "))")
1515
end
1616

17+
function show(io::IO, cursor::BatchedColumnsCursor)
18+
par = cursor.par
19+
rows = cursor.colcursors[1].row.rows
20+
println(io, "Batched Columns Cursor on $(par.path)")
21+
println(io, " rows: $rows")
22+
println(io, " batches: $(length(cursor))")
23+
24+
colpaths = [join(colname, '.') for colname in cursor.colnames]
25+
println(io, " cols: $(join(colpaths, ", "))")
26+
end
27+
1728
function show(io::IO, schema::SchemaElement, indent::AbstractString="", nchildren::Vector{Int}=Int[])
1829
print(io, indent)
1930
lchildren = length(nchildren)

test/test_cursors.jl

+25-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ function test_row_cursor(file::String)
3737
@info("loaded", file, count=nr, last_record=rec, time_to_read=time()-t1)
3838
end
3939

40+
function test_batchedcols_cursor(file::String)
41+
p = ParFile(file)
42+
43+
t1 = time()
44+
nr = nrows(p)
45+
cnames = colnames(p)
46+
cc = BatchedColumnsCursor(p)
47+
batch = nothing
48+
for i in cc
49+
batch = i
50+
end
51+
@info("loaded", file, count=nr, ncols=length(propertynames(batch)), time_to_read=time()-t1)
52+
end
53+
4054
function test_col_cursor_all_files()
4155
for encformat in ("SNAPPY", "GZIP", "NONE")
4256
for fname in ("nation", "customer")
@@ -45,13 +59,22 @@ function test_col_cursor_all_files()
4559
end
4660
end
4761

48-
function test_juliabuilder_row_cursor_all_files()
62+
function test_row_cursor_all_files()
4963
for encformat in ("SNAPPY", "GZIP", "NONE")
5064
for fname in ("nation", "customer")
5165
test_row_cursor(joinpath(@__DIR__, "parquet-compatibility", "parquet-testdata", "impala", "1.1.1-$encformat/$fname.impala.parquet"))
5266
end
5367
end
5468
end
5569

70+
function test_batchedcols_cursor_all_files()
71+
for encformat in ("SNAPPY", "GZIP", "NONE")
72+
for fname in ("nation", "customer")
73+
test_batchedcols_cursor(joinpath(@__DIR__, "parquet-compatibility", "parquet-testdata", "impala", "1.1.1-$encformat/$fname.impala.parquet"))
74+
end
75+
end
76+
end
77+
5678
#test_col_cursor_all_files()
57-
test_juliabuilder_row_cursor_all_files()
79+
test_row_cursor_all_files()
80+
test_batchedcols_cursor_all_files()

0 commit comments

Comments
 (0)