diff --git a/Project.toml b/Project.toml index 9ebf6bc..5536eae 100644 --- a/Project.toml +++ b/Project.toml @@ -14,6 +14,8 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" LittleEndianBase128 = "1724a1d5-ab78-548d-94b3-135c294f96cf" MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94" Missings = "e1d29d7a-bbdc-5cf2-9ac0-f12de2c33e28" +NamedTupleTools = "d9ec5142-1e00-5aa0-9d6a-321866360f50" +ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" Snappy = "59d4ed8c-697a-5b28-a4c7-fe95c22820f9" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" Thrift = "8d9c9c80-f77e-5080-9541-c6f69d204e22" diff --git a/README.md b/README.md index 9b5b9e1..82a210c 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,16 @@ ## Reader +### High level reader + +You can read a parquet file using `read_parquet` for example + +``` +df = read_parquet(parquet_file_path); +``` + +### Lower level reader + Load a [parquet file](https://en.wikipedia.org/wiki/Apache_Parquet). Only metadata is read initially, data is loaded in chunks on demand. (Note: [ParquetFiles.jl](https://github.com/queryverse/ParquetFiles.jl) also provides load support for Parquet files under the FileIO.jl package.) `ParFile` represents a Parquet file at `path` open for reading. Options to map logical types can be provided via `map_logical_types`. diff --git a/src/Parquet.jl b/src/Parquet.jl index ab38625..2ddc01a 100644 --- a/src/Parquet.jl +++ b/src/Parquet.jl @@ -21,6 +21,7 @@ export schema export logical_timestamp, logical_string export RecordCursor, BatchedColumnsCursor export write_parquet +export read_parquet # package code goes here include("PAR2/PAR2.jl") @@ -31,5 +32,9 @@ include("reader.jl") include("cursor.jl") include("show.jl") include("writer.jl") +include("encoding.jl") +include("metadata.jl") +include("column_reader.jl") +include("read_parquet.jl") end # module diff --git a/src/column_reader.jl b/src/column_reader.jl new file mode 100644 index 0000000..e0a2eec --- /dev/null +++ b/src/column_reader.jl @@ -0,0 +1,509 @@ +import Base: iterate, length, IteratorSize, IteratorEltype, eltype, @_gc_preserve_begin, @_gc_preserve_end + +const TYPES = (Bool, Int32, Int64, Int128, Float32, Float64, String, UInt8) + +struct BitPackedIterator + data::Vector{UInt8} + bitwidth::Int32 +end + + +iterate(bp::BitPackedIterator) = iterate(bp::BitPackedIterator, 1) + +length(bp::BitPackedIterator) = div(8*length(bp.data), bp.bitwidth) + +IteratorSize(::Type{BitPackedIterator}) = Base.HasLength() +IteratorEltype(::Type{BitPackedIterator}) = Base.HasEltype() +eltype(::Type{BitPackedIterator}) = UInt + +function iterate(bp::BitPackedIterator, state) + end_bit = state * bp.bitwidth + end_byte = ceil(Int, end_bit / 8) + + if end_byte > length(bp.data) + return nothing + end + + start_bit = (state - 1) * bp.bitwidth + 1 + + start_byte, bits_to_drop = divrem(start_bit-1, 8) + + start_byte += 1 + bits_to_drop = bits_to_drop + + # start bit shift the value + value = UInt(0) + + @inbounds for byte in @view bp.data[end_byte:-1:start_byte] + value = (value << 8) | byte + end + + value >>= bits_to_drop + + (value & UInt(2^bp.bitwidth-1), state + 1) +end + +function decompress_with_codec!(uncompressed_data::Vector{UInt8}, compressed_data::Vector{UInt8}, codec) + if codec == PAR2.CompressionCodec.SNAPPY + Snappy.snappy_uncompress(compressed_data, uncompressed_data) + else + error("codedc $codec unsupported atm") + end +end + +read_column(path, col_num) = begin + filemetadata = Parquet.metadata(path) + read_column(path, filemetadata, col_num) +end + +function read_column(path, filemetadata, col_num) + T = TYPES[filemetadata.schema[col_num+1]._type+1] + + par = ParFile(path) + # TODO detect if missing is necessary + if T == String + # the memory structure of String is different to other supported types + # so it's better to initialise it with missing + res = Vector{Union{Missing, String}}(missing, nrows(par)) + else + res = Vector{Union{Missing, T}}(undef, nrows(par)) + end + close(par) + + fileio = open(path) + + # I thnk there is a bug with Julia's multithreaded reads + # which can be fixed by doing the below + # DO NOT remove the code below or multithreading will fail + println("$(position(fileio))") + # if true + # not_used = open(tempname()*string(col_num), "w") + # write(not_used, position(fileio)) + # close(not_used) + # end + + # to reduce allocations we make a compressed_data array to store compressed data + compressed_data_buffer = Vector{UInt8}(undef, 100) + compressed_data = UInt8[] # initialise it + + from = 1 + last_from = from + + j = 1 + for row_group in filemetadata.row_groups + colchunk_meta = row_group.columns[col_num].meta_data + + if isfilled(colchunk_meta, :dictionary_page_offset) + seek(fileio, colchunk_meta.dictionary_page_offset) + dict_page_header = read_thrift(fileio, PAR2.PageHeader) + + # use the + readbytes!(fileio, compressed_data_buffer, dict_page_header.compressed_page_size) + GC.@preserve compressed_data_buffer begin + compressed_data = unsafe_wrap(Vector{UInt8}, pointer(compressed_data_buffer), dict_page_header.compressed_page_size) + end + # compressed_data = read(fileio, dict_page_header.compressed_page_size) + + uncompressed_data = Vector{UInt8}(undef, dict_page_header.uncompressed_page_size) + + decompress_with_codec!(uncompressed_data, compressed_data, colchunk_meta.codec) + @assert length(uncompressed_data) == dict_page_header.uncompressed_page_size + + if dict_page_header.dictionary_page_header.encoding == PAR2.Encoding.PLAIN_DICTIONARY + # see https://github.com/apache/parquet-format/blob/master/Encodings.md#dictionary-encoding-plain_dictionary--2-and-rle_dictionary--8 + # which is in effect the plain encoding see https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0 + if T == String + dict = Vector{String}(undef, dict_page_header.dictionary_page_header.num_values) + uncompressed_data_io = IOBuffer(uncompressed_data) + j = 1 + while !eof(uncompressed_data_io) + str_len = read(uncompressed_data_io, UInt32) + dict[j] = String(read(uncompressed_data_io, str_len)) + j += 1 + end + else + dict = reinterpret(T, uncompressed_data) + # nvals = dict_page_header.dictionary_page_header.num_values + # GC.@preserve uncompressed_data begin + # dict = unsafe_wrap(Vector{T}, Ptr{T}(pointer(uncompressed_data)), nvals) + # end + end + else + error("Only Plain Dictionary encoding is supported") + end + else + dict = nothing + end + + # seek to the first data page + seek(fileio, colchunk_meta.data_page_offset) + + # the buffer is resizable and is used to reduce the amount of allocations + uncompressed_data_buffer = Vector{UInt8}(undef, 1048584) + + # repeated read data page + while (from - last_from < row_group.num_rows) & (from <= length(res)) + from = read_data_page_vals!(res, uncompressed_data_buffer, fileio, dict, colchunk_meta.codec, T, from) + + if from isa Tuple + return from + else + from += 1 + end + end + last_from = from + + # (j == 2) && return res + j += 1 + + end + + res +end + +function read_data_page_vals!(res, uncompressed_data_buffer::Vector{UInt8}, fileio::IOStream, dict, codec, T, from::Integer = 1) + """ + Read one data page + """ + + # the result length is used latter on to prevent writing too much data + res_len = length(res) + + data_page_header = read_thrift(fileio, PAR2.PageHeader) + + # the number of values stored in this page + num_values = data_page_header.data_page_header.num_values + # read values + to = from + num_values - 1 + @assert to <= res_len + + #compressed_data = read(fileio, data_page_header.compressed_page_size) + compressed_data_buffer = Vector{UInt8}(undef, ceil(Int, data_page_header.compressed_page_size)) + + readbytes!(fileio, compressed_data_buffer, data_page_header.compressed_page_size) + + # resize the buffer if it's too small + if data_page_header.uncompressed_page_size > length(uncompressed_data_buffer) + uncompressed_data_buffer = Vector{UInt8}(undef, ceil(Int, data_page_header.uncompressed_page_size*1.1)) + end + + t1 = @_gc_preserve_begin uncompressed_data_buffer + + GC.@preserve compressed_data_buffer uncompressed_data_buffer begin + compressed_data = unsafe_wrap(Vector{UInt8}, pointer(compressed_data_buffer), data_page_header.compressed_page_size) + uncompressed_data = unsafe_wrap(Vector{UInt8}, pointer(uncompressed_data_buffer), data_page_header.uncompressed_page_size) + # uncompressed_data = Vector{UInt8}(undef, data_page_header.uncompressed_page_size) + # decompression seems to be quite slow and uses lots of RAM! + decompress_with_codec!(uncompressed_data, compressed_data, codec) + end + + @assert length(uncompressed_data) == data_page_header.uncompressed_page_size + + uncompressed_data_io = IOBuffer(uncompressed_data, read=true, write=false, append=false) + + # this is made up of these 3 things written back to back + # * repetition levels - can be ignored for unnested data + # * definition levels - + # * values + + # this will be set in future + has_missing = false + + # initialise it to something + missing_bytes = Vector{UInt8}(undef, num_values) + missing_bytes_io = IOBuffer(missing_bytes, write=true) + + # definition levels + if data_page_header.data_page_header.definition_level_encoding == PAR2.Encoding.RLE + # for unnested columns the highest possible value for definiton is 1 + # which can represented with just one bit so the bit width is always 1 + bitwidth = 1 + encoded_data_len = read(uncompressed_data_io, UInt32) + pos_before_encoded_data = position(uncompressed_data_io) + + from_defn = from + + pos_after_reading_encoded_data = pos_before_encoded_data + + while (pos_after_reading_encoded_data - pos_before_encoded_data) < encoded_data_len + encoded_data_header = Parquet._read_varint(uncompressed_data_io, UInt32) + + if iseven(encoded_data_header) + # RLE encoded + rle_len = Int(encoded_data_header >> 1) + rle_val = read(uncompressed_data_io, UInt8) + + pos_after_reading_encoded_data = position(uncompressed_data_io) + + if T == String + # strings memoery are stored differently so can't benefit from this + else + # fill the memory location with all missing + GC.@preserve res begin + # TODO there is a better way to locate the missing bytes + # find the location of missing + dest_ptr = Ptr{UInt8}(pointer(res, res_len+1)) + from_defn - 1 + tmparray = unsafe_wrap(Vector{UInt8}, dest_ptr, rle_len) + fill!(tmparray, rle_val) + end + end + + write(missing_bytes_io, fill(rle_val, rle_len)) + + from_defn += rle_len + @assert from_defn - from == position(missing_bytes_io) + @assert position(missing_bytes_io) <= num_values + else + # the only reaosn to use bitpacking is because there are missings + has_missing = true + + # bitpacked encoded + bit_pack_len = Int(encoded_data_header >> 1) + + bytes_to_read = bitwidth*bit_pack_len + data = read(uncompressed_data_io, bytes_to_read) + + pos_after_reading_encoded_data = position(uncompressed_data_io) + + # the structure of Vector{Union{T, Missing}} is + # * the `values::T` first + # * the missing are stored with UInt8(0) for missing + # * and UInt8(1) otherwise + # see https://docs.julialang.org/en/v1/devdocs/isbitsunionarrays/ + + # TODO I suspect this is not the fastest way to unpack bitwidth = 1 + # data + @assert bitwidth == 1 + bp = BitPackedIterator(data, bitwidth) + + tmp_missing_bytes::Vector{UInt8} = BitPackedIterator(data, bitwidth) |> collect + + len_of_tmp_missing_bytes = length(tmp_missing_bytes) + @assert mod(len_of_tmp_missing_bytes, 8) == 0 + + # the tmp_missing_bytes is always in a multiple of 8 so need to + # be careful not to write too much + # compute the new from_defn + new_from_defn = min(from_defn + len_of_tmp_missing_bytes, from + num_values) + + len_to_write = new_from_defn - from_defn + + if len_to_write == len_of_tmp_missing_bytes + write(missing_bytes_io, tmp_missing_bytes) + elseif len_to_write < len_of_tmp_missing_bytes + tmp_missing_bytes_smaller = unsafe_wrap(Vector{UInt8}, pointer(tmp_missing_bytes), len_to_write) + write(missing_bytes_io, tmp_missing_bytes_smaller) + else + error("something is wrong") + end + + if T == String + # do nothing + else + if len_to_write == len_of_tmp_missing_bytes + GC.@preserve tmp_missing_bytes res begin + # if not too long then can straight copy + src_ptr = Ptr{UInt8}(pointer(tmp_missing_bytes)) + dest_ptr = Ptr{UInt8}(pointer(res, res_len+1)) + from_defn - 1 + # copy content over + unsafe_copyto!(dest_ptr, src_ptr, length(tmp_missing_bytes)) + end + elseif len_to_write < len_of_tmp_missing_bytes + GC.@preserve tmp_missing_bytes_smaller res begin + src_ptr = Ptr{UInt8}(pointer(tmp_missing_bytes_smaller)) + dest_ptr = Ptr{UInt8}(pointer(res, res_len+1)) + from_defn - 1 + # copy content over + unsafe_copyto!(dest_ptr, src_ptr, len_to_write) + end + else + error("something is wrong") + end + + end + from_defn = new_from_defn + end + end + else + error("no definition encoding not supported") + end + + # this line ensures that we have read all the encoded definition data + @assert pos_after_reading_encoded_data - pos_before_encoded_data == encoded_data_len + + if has_missing + @assert position(missing_bytes_io) == num_values + end + + + + if data_page_header.data_page_header.encoding == PAR2.Encoding.PLAIN + # just return the data as is + if T == Bool + if has_missing + upto = 1 + raw_data = Vector{Bool}(undef, 8) + for (i, missing_byte) in zip(from:to, missing_bytes) + if missing_byte == 1 + if upto == 1 + digits!(raw_data, read(uncompressed_data_io, UInt8), base=2) + end + res[i] = raw_data[upto] + upto += 1 + if upto == 9 + upto = 1 + end + end + end + else + # for boolean every bit is a value so the length is 8 times + i = from + while !eof(uncompressed_data_io) + udi = read(uncompressed_data_io, UInt8) + GC.@preserve res begin + raw_data = Base.unsafe_wrap(Vector{Bool}, pointer(res, i) |> Ptr{Bool}, (8,)) + end + digits!(raw_data, udi, base=2) + + if i + 8 - 1 <= res_len + digits!(raw_data, udi, base=2) + i += 8 + else + for rd in digits(Bool, udi, base=2, pad = 8) + if i <= res_len + res[i] = rd + end + i += 1 + end + end + end + end + elseif T == String + if has_missing + for (i, missing_byte) in zip(from:to, missing_bytes) + if missing_byte == 1 + # 1 means not missing + str_len = read(uncompressed_data_io, UInt32) + res[i] = String(read(uncompressed_data_io, str_len)) + end + end + else + i = from + while !eof(uncompressed_data_io) + str_len = read(uncompressed_data_io, UInt32) + res[i] = String(read(uncompressed_data_io, str_len)) + i = i + 1 + end + end + + else + if has_missing + # raw_data = reinterpret(T, read(uncompressed_data_io)) + arr_pos = position(uncompressed_data_io) + 1 + # seek till the end + seek(uncompressed_data_io, uncompressed_data_io.size + 1) + # TODO remove this allocation too + ok = uncompressed_data[arr_pos:end] + raw_data = reinterpret(T, ok) + j = 1 + for (i, missing_byte) in zip(from:to, missing_bytes) + if missing_byte == 1 + # 1 means not missing + res[i] = raw_data[j] + j += 1 + end + end + else + # if there is no missing, can just copy the data into the + # right memory location + # the copying approach is alot faster than the commented out + # assignment approach + pos_for_pointer = position(uncompressed_data_io) + 1 + GC.@preserve uncompressed_data res begin + src_ptr = Ptr{T}(pointer(uncompressed_data, pos_for_pointer)) + dest_ptr = Ptr{T}(pointer(res, from)) + # copy content over + unsafe_copyto!(dest_ptr, src_ptr, num_values) + end + end + end + elseif data_page_header.data_page_header.encoding == PAR2.Encoding.PLAIN_DICTIONARY + # this means the data is encoded in integers format which form the indices to the data + bitwidth = Int(read(uncompressed_data_io, UInt8)) + + # the documented max bitwidth is + @assert bitwidth <= 32 + + rle_cnt = 0 + bp_cnt = 0 + rle_size = 0 + bp_size = 0 + while !eof(uncompressed_data_io) + encoded_data_header = Parquet._read_varint(uncompressed_data_io, UInt32) + + if iseven(encoded_data_header) + rle_cnt += 1 + # RLE encoded + rle_len = Int(encoded_data_header >> 1) + rle_val_vec::Vector{UInt8} = read(uncompressed_data_io, ceil(Int, bitwidth/8)) + rle_val = UInt(0) + + for tmp in @view rle_val_vec[end:-1:1] + rle_val = rle_val << 8 + rle_val = rle_val | tmp + end + + if has_missing + index = from:min(to, from + rle_len - 1) + for (i, missing_byte) in zip(index, missing_bytes) + if missing_byte == 1 + res[i] = dict[rle_val+1] + end + end + else + res[from:min(to, from + rle_len - 1)] .= dict[rle_val+1] + end + + rle_size += rle_len + from = from + rle_len + else + bp_cnt += 1 + # bitpacked encoded + bit_pack_len = Int(encoded_data_header >> 1) + @assert (bit_pack_len >= 1) && (bit_pack_len <= 2^31 - 1) + bytes_to_read = bitwidth*bit_pack_len + data = read(uncompressed_data_io, bytes_to_read) + # TODO remove the collect here + bp = BitPackedIterator(data, bitwidth) |> collect + # now need a decoding algorithm to break it up + # reading `bitwidth` bits at a time + l = length(bp) + + index = from:min(from + l - 1, to) + + if has_missing + j = 1 + for (i, missing_byte) in zip(index, missing_bytes) + if missing_byte == 1 + res[i] = dict[bp[j]+1] + j += 1 + end + end + else + for (i, v) in zip(index, bp) + res[i] = dict[v+1] + end + end + + bp_size += l + from = from + l + end + end + # println("rle_cnt $rle_cnt bp_cnt $bp_cnt rle_size $rle_size bp_size $bp_size") + else + erorr("encoding not supported") + end + + @_gc_preserve_end t2 + + return to +end diff --git a/src/column_reader_dev.jl b/src/column_reader_dev.jl new file mode 100644 index 0000000..eb5e623 --- /dev/null +++ b/src/column_reader_dev.jl @@ -0,0 +1,139 @@ +using Parquet + +path = "c:/data/Performance_2003Q3.txt.parquet" +#Parquet.metadata(path) +@time col = Parquet.read_column(path, 5); + +for i in 1:31 + println(i) + @time Parquet.read_column(path, i); +end + + +@time read_parquet(path); + +path = "c:/git/parquet-data-collection/dsd50p.parquet" +@time adf = read_parquet(path); + +@time adf = read_parquet(path, multithreaded=false); + + + +using JDF: type_compress! + +@time adf = type_compress!(DataFrame(read_parquet(path, multithreaded=false), copycols=false)); + +using Random: randstring +tbl = ( + int32 = rand(Int32, 1000), + int64 = rand(Int64, 1000), + float32 = rand(Float32, 1000), + float64 = rand(Float64, 1000), + bool = rand(Bool, 1000), + string = [randstring(8) for i in 1:1000], + int32m = rand([missing, rand(Int32, 10)...], 1000), + int64m = rand([missing, rand(Int64, 10)...], 1000), + float32m = rand([missing, rand(Float32, 10)...], 1000), + float64m = rand([missing, rand(Float64, 10)...], 1000), + boolm = rand([missing, true, false], 1000), + stringm = rand([missing, "abc", "def", "ghi"], 1000) +); + +tmpfile = tempname()*".parquet" + +@time write_parquet(tmpfile, tbl); + +path = tmpfile +@time adf = read_parquet(path); + +all([all(c1 .=== c2) for (c1, c2) in zip(tbl, adf)]) + + + +using BenchmarkTools +@benchmark adf = read_parquet(path) + +col_num = 1 +@time col1 = Parquet.read_column(path, col_num); +col1 + +meta = Parquet.metadata(path); +par = ParFile(path); + +nrows(par) + +colnames(par) +close(par) + +#@time tbl = Parquet.read_column.(Ref(path), 1:length(colnames(par))); + + +col1 + +col1[19:20] + +last(col1) + +uncompressed_data_io = col1[1] + +encoded_data_header = Parquet._read_varint(uncompressed_data_io, UInt32) + +using Debugger + +filemetadata = Parquet.metadata(path); +Debugger.@enter Parquet.read_column(path, filemetadata, col_num); + +col1 +correct = getproperty(tbl, keys(tbl)[col_num]) +all(ismissing.(col1) .== ismissing.(correct)) +all(skipmissing(col1) .== skipmissing(correct)) + +using Test +using Base.Threads: @spawn + +checkcol(path, n; multithreaded=true) = begin + res = Vector{Any}(undef, n) + if multithreaded + for col_num in 1:n + res[col_num] = @spawn Parquet.read_column(path, col_num); + end + return fetch.(res) + else + for col_num in 1:n + println(col_num) + res[col_num] = Parquet.read_column(path, col_num); + end + return res + end +end + +@time checkcol(path, 31, multithreaded=true); +@time checkcol(path, 31, multithreaded=false); + +@time checkcol(path, 12, multithreaded=false); + + + +using Base.Threads: @spawn +read1(path, n) = begin + result = Vector{Any}(undef, length(n)) + for i in n + result[i] = @spawn Parquet.read_column(path, i) + end + fetch.(result) +end + +@time a = read1(path, 1:5) + +using DataFrames + +@time ba=DataFrame(a, copycols=false) +@time ba=DataFrame(a) + +b1 + + +import Base: add_int +@edit Base.add_int(100, 1) + +add_int diff --git a/src/encoding.jl b/src/encoding.jl new file mode 100644 index 0000000..8efd5b6 --- /dev/null +++ b/src/encoding.jl @@ -0,0 +1,14 @@ +# obtain the encoding of the page +using Thrift: isfilled + +function page_encoding(page::Page) + if isfilled(page.hdr, :data_page_header) + return page.hdr.data_page_header.encoding + elseif isfilled(page.hdr, :data_page_header_v2) + return page.hdr.data_page_header_v2.encoding + elseif isfilled(page.hdr, :dictionary_page_header) + return page.hdr.dictionary_page_header.encoding + else + error("not supported page") + end +end diff --git a/src/metadata.jl b/src/metadata.jl new file mode 100644 index 0000000..dad19d8 --- /dev/null +++ b/src/metadata.jl @@ -0,0 +1,17 @@ +using Thrift + +function metadata(path) + io = open(path) + sz = filesize(io) + seek(io, sz - SZ_PAR_MAGIC - SZ_FOOTER) + + # read footer size as little endian signed Int32 + meta_len = read(io, Int32) + datasize = sz - meta_len - 2SZ_PAR_MAGIC - SZ_FOOTER + seek(io, SZ_PAR_MAGIC + datasize) + filemetadata = read_thrift(io, PAR2.FileMetaData) + + close(io) + + filemetadata +end diff --git a/src/read_parquet.jl b/src/read_parquet.jl new file mode 100644 index 0000000..68488ec --- /dev/null +++ b/src/read_parquet.jl @@ -0,0 +1,52 @@ +using Base.Threads: @spawn +using Base.Iterators: drop +using ProgressMeter: @showprogress, Progress, next! +using NamedTupleTools: namedtuple + +read_parquet(path, cols::Vector{Symbol}; kwargs...) = read_parquet(path, String.(cols); kwargs...) + +read_parquet(path; kwargs...) = read_parquet(path, String[]; kwargs...) + +function read_parquet(path, cols::Vector{String}; multithreaded=true, verbose = false) + """function for reading parquet""" + + par = ParFile(path) + nc = ncols(par) + + colnames = [sch.name for sch in drop(par.schema.schema, 1)] + + close(par) + + if length(cols) == 0 + colnums = collect(1:nc) + else + colnums = [findfirst(==(c), colnames) for c in cols] + end + + results = Vector{Any}(undef, length(colnums)) + + filemetadata = metadata(path) + + symbol_col_names = collect(Symbol(col) for col in colnames[colnums]) + + p = Progress(length(colnums)) + if multithreaded + for (i, j) in enumerate(colnums) + results[i] = @spawn begin + # next!(p) + res = read_column(path, filemetadata, j) + res + end + end + results = fetch.(results) + else + + for (i, j) in enumerate(colnums) + results[i] = read_column(path, filemetadata, j) + next!(p) + end + end + + + return namedtuple(symbol_col_names, results) +end diff --git a/src/reader.jl b/src/reader.jl index 12e4067..2189c04 100644 --- a/src/reader.jl +++ b/src/reader.jl @@ -337,6 +337,11 @@ function read_levels_and_nmissing(io, defn_enc::Int32, repn_enc::Int32, num_valu end end + #@debug("before reading repn levels bytesavailable in page: $(bytesavailable(io))") + # read repetition levels. skipped if all columns are at 1st level + max_repn_level = max_repetition_level(par.schema, cname) + ((length(cname) > 1) && (max_repn_level > 0)) && read_levels(io, max_repn_level, repn_enc, num_values, repn_levels, repn_offset) + nmissing end @@ -445,7 +450,7 @@ function is_par_file(io) magic = Array{UInt8}(undef, 4) read!(io, magic) (String(magic) == PAR_MAGIC) || return false - + seek(io, sz - SZ_PAR_MAGIC) magic = Array{UInt8}(undef, 4) read!(io, magic) diff --git a/src/show.jl b/src/show.jl index 212d843..fcd17a9 100644 --- a/src/show.jl +++ b/src/show.jl @@ -1,214 +1,214 @@ -function print_indent(io, n) - for d in 1:n - print(io, " ") - end -end - -function show(io::IO, cursor::RecordCursor) - par = cursor.par - rows = cursor.colcursors[1].row.rows - println(io, "Record Cursor on $(par.path)") - println(io, " rows: $rows") - - colpaths = [join(colname, '.') for colname in cursor.colnames] - println(io, " cols: $(join(colpaths, ", "))") -end - -function show(io::IO, cursor::BatchedColumnsCursor) - par = cursor.par - rows = cursor.colcursors[1].row.rows - println(io, "Batched Columns Cursor on $(par.path)") - println(io, " rows: $rows") - println(io, " batches: $(length(cursor))") - - colpaths = [join(colname, '.') for colname in cursor.colnames] - println(io, " cols: $(join(colpaths, ", "))") -end - -function show(io::IO, schema::SchemaElement, indent::AbstractString="", nchildren::Vector{Int}=Int[]) - print(io, indent) - lchildren = length(nchildren) - print_indent(io, lchildren) - if isfilled(schema, :repetition_type) - r = schema.repetition_type - print(io, (r == FieldRepetitionType.REQUIRED) ? "required" : (r == FieldRepetitionType.OPTIONAL) ? "optional" : "repeated", " "); - end - isfilled(schema, :_type) && print(io, Thrift.enumstr(_Type, schema._type), " ") - - print(io, schema.name) - isfilled(schema, :field_id) && print(io, " (", schema.field_id, ")") - - if isfilled(schema, :converted_type) - print(io, "# (from ", Thrift.enumstr(ConvertedType, schema.converted_type)) - if schema.converted_type == ConvertedType.DECIMAL - print(io, "(", schema.scale, ".", schema.precision) - end - print(") ") - end - - if isfilled(schema, :num_children) - push!(nchildren, schema.num_children) - print(io, " {") - elseif lchildren > 0 - nchildren[lchildren] -= 1 - if nchildren[lchildren] == 0 - pop!(nchildren) - println(io, "") - print_indent(io, length(nchildren)) - print(io, indent, "}") - end - end - - println(io, "") -end - -function show(io::IO, schema::Vector{SchemaElement}, indent::AbstractString="") - println(io, indent, "Schema:") - nchildren=Int[] - for schemaelem in schema - show(io, schemaelem, indent * " ", nchildren) - end -end - -show(io::IO, schema::Schema, indent::AbstractString="") = show(io, schema.schema, indent) - -function show(io::IO, kvmeta::KeyValue, indent::AbstractString="") - println(io, indent, kvmeta.key, " => ", kvmeta.value) -end - -function show(io::IO, kvmetas::Vector{KeyValue}, indent::AbstractString="") - isempty(kvmetas) && return - println(io, indent, "Metadata:") - for kvmeta in kvmetas - show(io, kvmeta, indent * " ") - end -end - -function show_encodings(io::IO, encodings::Vector{Int32}, indent::AbstractString="") - isempty(encodings) && return - print(io, indent, "Encodings: ") - pfx = "" - for encoding in encodings - print(io, pfx, Thrift.enumstr(Encoding, encoding)) - pfx = ", " - end - println(io, "") -end - -show(io::IO, hdr::IndexPageHeader, indent::AbstractString="") = nothing -function show(io::IO, page::DictionaryPageHeader, indent::AbstractString="") - println(io, indent, page.num_values, " values") -end - -function show(io::IO, hdr::DataPageHeader, indent::AbstractString="") - println(io, indent, hdr.num_values, " values") - println(io, indent, "encodings: values as ", Thrift.enumstr(Encoding, hdr.encoding), ", definitions as ", Thrift.enumstr(Encoding, hdr.definition_level_encoding), ", repetitions as ", Thrift.enumstr(Encoding, hdr.repetition_level_encoding)) - Thrift.isfilled(hdr, :statistics) && show(io, hdr.statistics, indent) -end - -function show(io::IO, hdr::DataPageHeaderV2, indent::AbstractString="") - compressed = Thrift.isfilled(hdr, :is_compressed) ? hdr.is_compressed : true - println(io, indent, hdr.num_values, " values, ", hdr.num_nulls, " nulls, ", hdr.num_rows, " rows, compressed:", compressed) - println(io, indent, "encoding:", Thrift.enumstr(Encoding, hdr.encoding), ", definition:", Thrift.enumstr(Encoding, hdr.definition_level_encoding), ", repetition:", Thrift.enumstr(Encoding, hdr.repetition_level_encoding)) - Thrift.isfilled(hdr, :statistics) && show(io, hdr.statistics, indent) -end - -function show(io::IO, page::PageHeader, indent::AbstractString="") - println(io, indent, Thrift.enumstr(PageType, page._type), " compressed bytes:", page.compressed_page_size, " (", page.uncompressed_page_size, " uncompressed)") - Thrift.isfilled(page, :data_page_header) && show(io, page.data_page_header, indent * " ") - Thrift.isfilled(page, :data_page_header_v2) && show(io, page.data_page_header_v2, indent * " ") - Thrift.isfilled(page, :index_page_header) && show(io, page.index_page_header, indent * " ") - Thrift.isfilled(page, :dictionary_page_header) && show(io, page.dictionary_page_header, indent * " ") -end - -function show(io::IO, pages::Vector{PageHeader}, indent::AbstractString="") - println(io, indent, "Pages:") - for page in pages - show(io, page, indent * " ") - end -end - -show(io::IO, page::Page, indent::AbstractString="") = show(io, page.hdr, indent) -show(io::IO, pages::Vector{Page}, indent::AbstractString="") = show(io, [page.hdr for page in pages], indent) - -function show(io::IO, stat::Statistics, indent::AbstractString="") - println(io, indent, "Statistics:") - if Thrift.isfilled(stat, :min) && Thrift.isfilled(stat, :max) - println(io, indent, " range:", stat.min, ":", stat.max) - elseif Thrift.isfilled(stat, :min) - println(io, indent, " min:", stat.min) - elseif Thrift.isfilled(stat, :max) - println(io, indent, " max:", stat.max) - end - Thrift.isfilled(stat, :null_count) && println(io, indent, " null count:", stat.null_count) - Thrift.isfilled(stat, :distinct_count) && println(io, indent, " distinct count:", stat.distinct_count) -end - -function show(io::IO, page_enc::PageEncodingStats, indent::AbstractString="") - println(io, indent, page_enc.count, " ", Thrift.enumstr(Encoding, page_enc.encoding), " encoded ", Thrift.enumstr(PageType, page_enc.page_type), " pages") -end - -function show(io::IO, page_encs::Vector{PageEncodingStats}, indent::AbstractString="") - isempty(page_encs) && return - println(io, indent, "Page encoding statistics:") - for page_enc in page_encs - show(io, page_enc, indent * " ") - end -end - -function show(io::IO, colmeta::ColumnMetaData, indent::AbstractString="") - println(io, indent, Thrift.enumstr(_Type, coltype(colmeta)), " ", join(colname(colmeta), '.'), ", num values:", colmeta.num_values) - show_encodings(io, colmeta.encodings, indent) - if colmeta.codec != CompressionCodec.UNCOMPRESSED - println(io, indent, Thrift.enumstr(CompressionCodec, colmeta.codec), " compressed bytes:", colmeta.total_compressed_size, " (", colmeta.total_uncompressed_size, " uncompressed)") - else - println(io, indent, Thrift.enumstr(CompressionCodec, colmeta.codec), " bytes:", colmeta.total_compressed_size) - end - - print(io, indent, "offsets: data:", colmeta.data_page_offset) - Thrift.isfilled(colmeta, :index_page_offset) && print(io, ", index:", colmeta.index_page_offset) - Thrift.isfilled(colmeta, :dictionary_page_offset) && print(io, ", dictionary:", colmeta.dictionary_page_offset) - println(io, "") - Thrift.isfilled(colmeta, :statistics) && show(io, colmeta.statistics, indent) - Thrift.isfilled(colmeta, :encoding_stats) && show(io, colmeta.encoding_stats, indent) - Thrift.isfilled(colmeta, :key_value_metadata) && show(io, colmeta.key_value_metadata, indent) -end - -function show(io::IO, columns::Vector{ColumnChunk}, indent::AbstractString="") - for col in columns - path = isfilled(col, :file_path) ? col.file_path : "" - println(io, indent, "Column at offset: ", path, "#", col.file_offset) - show(io, col.meta_data, indent * " ") - end -end - -function show(io::IO, grp::RowGroup, indent::AbstractString="") - println(io, indent, "Row Group: ", grp.num_rows, " rows in ", grp.total_byte_size, " bytes") - show(io, grp.columns, indent * " ") -end - -function show(io::IO, row_groups::Vector{RowGroup}, indent::AbstractString="") - println(io, indent, "Row Groups:") - for grp in row_groups - show(io, grp, indent * " ") - end -end - -function show(io::IO, meta::FileMetaData, indent::AbstractString="") - println(io, indent, "version: ", meta.version) - println(io, indent, "nrows: ", meta.num_rows) - println(io, indent, "created by: ", meta.created_by) - - show(io, meta.schema, indent) - show(io, meta.row_groups, indent) - Thrift.isfilled(meta, :key_value_metadata) && show(io, meta.key_value_metadata, indent) -end - -function show(io::IO, par::ParFile) - println(io, "Parquet file: $(par.path)") - meta = par.meta - println(io, " version: $(meta.version)") - println(io, " nrows: $(meta.num_rows)") - println(io, " created by: $(meta.created_by)") - println(io, " cached: $(length(par.page_cache.refs)) column chunks") -end +# function print_indent(io, n) +# for d in 1:n +# print(io, " ") +# end +# end +# +# function show(io::IO, cursor::RecordCursor) +# par = cursor.par +# rows = cursor.colcursors[1].row.rows +# println(io, "Record Cursor on $(par.path)") +# println(io, " rows: $rows") +# +# colpaths = [join(colname, '.') for colname in cursor.colnames] +# println(io, " cols: $(join(colpaths, ", "))") +# end +# +# function show(io::IO, cursor::BatchedColumnsCursor) +# par = cursor.par +# rows = cursor.colcursors[1].row.rows +# println(io, "Batched Columns Cursor on $(par.path)") +# println(io, " rows: $rows") +# println(io, " batches: $(length(cursor))") +# +# colpaths = [join(colname, '.') for colname in cursor.colnames] +# println(io, " cols: $(join(colpaths, ", "))") +# end +# +# function show(io::IO, schema::SchemaElement, indent::AbstractString="", nchildren::Vector{Int}=Int[]) +# print(io, indent) +# lchildren = length(nchildren) +# print_indent(io, lchildren) +# if isfilled(schema, :repetition_type) +# r = schema.repetition_type +# print(io, (r == FieldRepetitionType.REQUIRED) ? "required" : (r == FieldRepetitionType.OPTIONAL) ? "optional" : "repeated", " "); +# end +# isfilled(schema, :_type) && print(io, Thrift.enumstr(_Type, schema._type), " ") +# +# print(io, schema.name) +# isfilled(schema, :field_id) && print(io, " (", schema.field_id, ")") +# +# if isfilled(schema, :converted_type) +# print(io, "# (from ", Thrift.enumstr(ConvertedType, schema.converted_type)) +# if schema.converted_type == ConvertedType.DECIMAL +# print(io, "(", schema.scale, ".", schema.precision) +# end +# print(") ") +# end +# +# if isfilled(schema, :num_children) +# push!(nchildren, schema.num_children) +# print(io, " {") +# elseif lchildren > 0 +# nchildren[lchildren] -= 1 +# if nchildren[lchildren] == 0 +# pop!(nchildren) +# println(io, "") +# print_indent(io, length(nchildren)) +# print(io, indent, "}") +# end +# end +# +# println(io, "") +# end +# +# function show(io::IO, schema::Vector{SchemaElement}, indent::AbstractString="") +# println(io, indent, "Schema:") +# nchildren=Int[] +# for schemaelem in schema +# show(io, schemaelem, indent * " ", nchildren) +# end +# end +# +# show(io::IO, schema::Schema, indent::AbstractString="") = show(io, schema.schema, indent) +# +# function show(io::IO, kvmeta::KeyValue, indent::AbstractString="") +# println(io, indent, kvmeta.key, " => ", kvmeta.value) +# end +# +# function show(io::IO, kvmetas::Vector{KeyValue}, indent::AbstractString="") +# isempty(kvmetas) && return +# println(io, indent, "Metadata:") +# for kvmeta in kvmetas +# show(io, kvmeta, indent * " ") +# end +# end +# +# function show_encodings(io::IO, encodings::Vector{Int32}, indent::AbstractString="") +# isempty(encodings) && return +# print(io, indent, "Encodings: ") +# pfx = "" +# for encoding in encodings +# print(io, pfx, Thrift.enumstr(Encoding, encoding)) +# pfx = ", " +# end +# println(io, "") +# end +# +# show(io::IO, hdr::IndexPageHeader, indent::AbstractString="") = nothing +# function show(io::IO, page::DictionaryPageHeader, indent::AbstractString="") +# println(io, indent, page.num_values, " values") +# end +# +# function show(io::IO, hdr::DataPageHeader, indent::AbstractString="") +# println(io, indent, hdr.num_values, " values") +# println(io, indent, "encodings: values as ", Thrift.enumstr(Encoding, hdr.encoding), ", definitions as ", Thrift.enumstr(Encoding, hdr.definition_level_encoding), ", repetitions as ", Thrift.enumstr(Encoding, hdr.repetition_level_encoding)) +# Thrift.isfilled(hdr, :statistics) && show(io, hdr.statistics, indent) +# end +# +# function show(io::IO, hdr::DataPageHeaderV2, indent::AbstractString="") +# compressed = Thrift.isfilled(hdr, :is_compressed) ? hdr.is_compressed : true +# println(io, indent, hdr.num_values, " values, ", hdr.num_nulls, " nulls, ", hdr.num_rows, " rows, compressed:", compressed) +# println(io, indent, "encoding:", Thrift.enumstr(Encoding, hdr.encoding), ", definition:", Thrift.enumstr(Encoding, hdr.definition_level_encoding), ", repetition:", Thrift.enumstr(Encoding, hdr.repetition_level_encoding)) +# Thrift.isfilled(hdr, :statistics) && show(io, hdr.statistics, indent) +# end +# +# function show(io::IO, page::PageHeader, indent::AbstractString="") +# println(io, indent, Thrift.enumstr(PageType, page._type), " compressed bytes:", page.compressed_page_size, " (", page.uncompressed_page_size, " uncompressed)") +# Thrift.isfilled(page, :data_page_header) && show(io, page.data_page_header, indent * " ") +# Thrift.isfilled(page, :data_page_header_v2) && show(io, page.data_page_header_v2, indent * " ") +# Thrift.isfilled(page, :index_page_header) && show(io, page.index_page_header, indent * " ") +# Thrift.isfilled(page, :dictionary_page_header) && show(io, page.dictionary_page_header, indent * " ") +# end +# +# function show(io::IO, pages::Vector{PageHeader}, indent::AbstractString="") +# println(io, indent, "Pages:") +# for page in pages +# show(io, page, indent * " ") +# end +# end +# +# show(io::IO, page::Page, indent::AbstractString="") = show(io, page.hdr, indent) +# show(io::IO, pages::Vector{Page}, indent::AbstractString="") = show(io, [page.hdr for page in pages], indent) +# +# function show(io::IO, stat::Statistics, indent::AbstractString="") +# println(io, indent, "Statistics:") +# if Thrift.isfilled(stat, :min) && Thrift.isfilled(stat, :max) +# println(io, indent, " range:", stat.min, ":", stat.max) +# elseif Thrift.isfilled(stat, :min) +# println(io, indent, " min:", stat.min) +# elseif Thrift.isfilled(stat, :max) +# println(io, indent, " max:", stat.max) +# end +# Thrift.isfilled(stat, :null_count) && println(io, indent, " null count:", stat.null_count) +# Thrift.isfilled(stat, :distinct_count) && println(io, indent, " distinct count:", stat.distinct_count) +# end +# +# function show(io::IO, page_enc::PageEncodingStats, indent::AbstractString="") +# println(io, indent, page_enc.count, " ", Thrift.enumstr(Encoding, page_enc.encoding), " encoded ", Thrift.enumstr(PageType, page_enc.page_type), " pages") +# end +# +# function show(io::IO, page_encs::Vector{PageEncodingStats}, indent::AbstractString="") +# isempty(page_encs) && return +# println(io, indent, "Page encoding statistics:") +# for page_enc in page_encs +# show(io, page_enc, indent * " ") +# end +# end +# +# function show(io::IO, colmeta::ColumnMetaData, indent::AbstractString="") +# println(io, indent, Thrift.enumstr(_Type, coltype(colmeta)), " ", join(colname(colmeta), '.'), ", num values:", colmeta.num_values) +# show_encodings(io, colmeta.encodings, indent) +# if colmeta.codec != CompressionCodec.UNCOMPRESSED +# println(io, indent, Thrift.enumstr(CompressionCodec, colmeta.codec), " compressed bytes:", colmeta.total_compressed_size, " (", colmeta.total_uncompressed_size, " uncompressed)") +# else +# println(io, indent, Thrift.enumstr(CompressionCodec, colmeta.codec), " bytes:", colmeta.total_compressed_size) +# end +# +# print(io, indent, "offsets: data:", colmeta.data_page_offset) +# Thrift.isfilled(colmeta, :index_page_offset) && print(io, ", index:", colmeta.index_page_offset) +# Thrift.isfilled(colmeta, :dictionary_page_offset) && print(io, ", dictionary:", colmeta.dictionary_page_offset) +# println(io, "") +# Thrift.isfilled(colmeta, :statistics) && show(io, colmeta.statistics, indent) +# Thrift.isfilled(colmeta, :encoding_stats) && show(io, colmeta.encoding_stats, indent) +# Thrift.isfilled(colmeta, :key_value_metadata) && show(io, colmeta.key_value_metadata, indent) +# end +# +# function show(io::IO, columns::Vector{ColumnChunk}, indent::AbstractString="") +# for col in columns +# path = isfilled(col, :file_path) ? col.file_path : "" +# println(io, indent, "Column at offset: ", path, "#", col.file_offset) +# show(io, col.meta_data, indent * " ") +# end +# end +# +# function show(io::IO, grp::RowGroup, indent::AbstractString="") +# println(io, indent, "Row Group: ", grp.num_rows, " rows in ", grp.total_byte_size, " bytes") +# show(io, grp.columns, indent * " ") +# end +# +# function show(io::IO, row_groups::Vector{RowGroup}, indent::AbstractString="") +# println(io, indent, "Row Groups:") +# for grp in row_groups +# show(io, grp, indent * " ") +# end +# end +# +# function show(io::IO, meta::FileMetaData, indent::AbstractString="") +# println(io, indent, "version: ", meta.version) +# println(io, indent, "nrows: ", meta.num_rows) +# println(io, indent, "created by: ", meta.created_by) +# +# show(io, meta.schema, indent) +# show(io, meta.row_groups, indent) +# Thrift.isfilled(meta, :key_value_metadata) && show(io, meta.key_value_metadata, indent) +# end +# +# function show(io::IO, par::ParFile) +# println(io, "Parquet file: $(par.path)") +# meta = par.meta +# println(io, " version: $(meta.version)") +# println(io, " nrows: $(meta.num_rows)") +# println(io, " created by: $(meta.created_by)") +# println(io, " cached: $(length(par.page_cache.refs)) column chunks") +# end