Skip to content

Commit

Permalink
Merge pull request #71 from ChevronETC/foldmap
Browse files Browse the repository at this point in the history
add fast-path foldmap method for entire dataset
  • Loading branch information
samtkaplan authored Nov 12, 2024
2 parents 87ba2f5 + b46c57c commit 80143ad
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 10 deletions.
14 changes: 8 additions & 6 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CvxCompress = "3e489a4b-a92a-5e1b-a7bf-ed666eae205e"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
TeaSeis = "db5f7d96-a200-5343-9fc6-a259b42289b2"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
Expand All @@ -17,21 +18,22 @@ Blosc = "a74b3585-a348-5f62-a45c-50e91977d574"
CvxCompress = "3e489a4b-a92a-5e1b-a7bf-ed666eae205e"
ZfpCompression = "43441a71-1662-41c6-b8ea-40ed1525242b"

[extras]
Blosc = "a74b3585-a348-5f62-a45c-50e91977d574"
CvxCompress = "3e489a4b-a92a-5e1b-a7bf-ed666eae205e"
ZfpCompression = "43441a71-1662-41c6-b8ea-40ed1525242b"

[extensions]
BloscExt = "Blosc"
CvxCompressExt = ["CvxCompress", "ZfpCompression"]
ZfpExt = "ZfpCompression"

[compat]
AbstractStorage = "^1.1"
AbstractStorage = "^1.3"
Blosc = "0.7"
CvxCompress = "1"
JSON = "0.21"
ProgressMeter = "1"
TeaSeis = "0.4"
ZfpCompression = "0.2"
julia = "1"

[extras]
Blosc = "a74b3585-a348-5f62-a45c-50e91977d574"
CvxCompress = "3e489a4b-a92a-5e1b-a7bf-ed666eae205e"
ZfpCompression = "43441a71-1662-41c6-b8ea-40ed1525242b"
29 changes: 28 additions & 1 deletion ext/BloscExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,33 @@ function CloudSeis.cache_from_file!(io::CSeis{T,N,BloscCompressor}, extentindex,
nothing
end

function CloudSeis.read_foldmap!(io::CSeis{T,N,BloscCompressor}, extentindex::Integer, fmap::Vector{UInt8}; serial=false) where {T,N}
@warn "reading a foldmap from a Blosc compressed dataset is not efficient"
try
cdata = read!(io.extents[extentindex].container, io.extents[extentindex].name, Vector{UInt8}(undef, filesize(io.extents[extentindex].container, io.extents[extentindex].name)); serial)
io_cdata = IOBuffer(cdata; read=true, write=false)
nbuffers = read(io_cdata, Int)
buffersize = read!(io_cdata, Vector{Int}(undef, nbuffers))

data = UInt8[]
nframes = length(io.extents[extentindex].frameindices)
nframes_bytes = nframes * sizeof(Int)
for ibuffer = 1:nbuffers
cbuffer = read!(io_cdata, Vector{UInt8}(undef, buffersize[ibuffer]))
data = [data;decompress(UInt8, cbuffer)]
if length(data) > nframes_bytes
break
end
end
fmap .= data[1:nframes_bytes]
catch e
fmap .= 0
if !isa(e, FileDoesNotExistError)
throw(e)
end
end
fmap
end

function CloudSeis.cache_foldmap!(io::CSeis{T,N,BloscCompressor}, extentindex::Integer, force=false) where {T,N}
if extentindex == io.cache.extentindex && io.cache.type (CACHE_ALL,CACHE_ALL_LEFT_JUSTIFY) && !force
Expand All @@ -41,7 +68,7 @@ function CloudSeis.cache_foldmap!(io::CSeis{T,N,BloscCompressor}, extentindex::I
extentindex
end

function Base.flush(io::CSeis{T,N,BloscCompressor}) where {T,N}
function Base.flush(io::CSeis{T,N,<:BloscCompressor}) where {T,N}
if io.cache.extentindex == 0
return nothing
end
Expand Down
15 changes: 15 additions & 0 deletions ext/ZfpExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ function CloudSeis.cache_from_file!(io::CSeis{T,N,<:ZfpCompressor}, extentindex,
nothing
end

function CloudSeis.read_foldmap!(io::Union{CSeis{T,N,<:ZfpCompressor},CSeis{T,N,CloudSeisCvxCompressor}}, extentindex::Integer, fmap::Vector{UInt8}; serial=false) where {T,N}
try
nfmap = read!(io.extents[extentindex].container, io.extents[extentindex].name, Vector{Int}(undef, 1); serial=false)[1]
cfmap = read!(io.extents[extentindex].container, io.extents[extentindex].name, Vector{UInt8}(undef, nfmap); offset=sizeof(Int), serial)
_fmap = unsafe_wrap(Array, convert(Ptr{Int}, pointer(fmap)), length(io.extents[extentindex].frameindices); own=false)
zfp_decompress!(_fmap, cfmap)
catch e
fmap .= 0
if !isa(e, FileDoesNotExistError)
throw(e)
end
end
fmap
end

function CloudSeis.cache_foldmap!(io::Union{CSeis{T,N,<:ZfpCompressor},CSeis{T,N,CloudSeisCvxCompressor}}, extentindex::Integer, force=false) where {T,N}
io.mode == "r" || partialcache_error()

Expand Down
79 changes: 76 additions & 3 deletions src/CloudSeis.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module CloudSeis

using AbstractStorage, Distributed, JSON, Pkg, Random, TeaSeis, UUIDs
using AbstractStorage, Distributed, JSON, Pkg, ProgressMeter, Random, TeaSeis, UUIDs

struct TracePropertyDef{T}
label::String
Expand Down Expand Up @@ -1186,6 +1186,18 @@ cache(io::CSeis) = io.cache

partialcache_error() = error("partial caching is only allowed in read-only \"r\" mode")

function read_foldmap!(io::Union{CSeis{T,N,NotACompressor}, CSeis{T,N,LeftJustifyCompressor}}, extentindex::Integer, fmap::Vector{UInt8}; serial=false) where {T,N}
try
read!(io.extents[extentindex].container, io.extents[extentindex].name, fmap; serial)
catch e
fmap .= 0
if !isa(e, FileDoesNotExistError)
throw(e)
end
end
fmap
end

function cache_foldmap!(io::Union{CSeis{T,N,NotACompressor}, CSeis{T,N,LeftJustifyCompressor}}, extentindex::Integer, force=false) where {T,N}
io.mode == "r" || partialcache_error()

Expand All @@ -1196,7 +1208,7 @@ function cache_foldmap!(io::Union{CSeis{T,N,NotACompressor}, CSeis{T,N,LeftJusti
if isfile(io.extents[extentindex].container, io.extents[extentindex].name)
@debug "reading foldmap for extent $extentindex from block-storage..."
t = @elapsed begin
io.cache.data = read!(io.extents[extentindex].container, io.extents[extentindex].name, Vector{UInt8}(undef, cachesize_foldmap(io, extentindex)))
io.cache.data = read_foldmap!(io, extentindex, Vector{UInt8}(undef, cachesize_foldmap(io, extentindex)))
end
mb = length(io.cache.data)/1_000_000
mbps = mb/t
Expand Down Expand Up @@ -1282,7 +1294,67 @@ function foldmap(io::CSeis, extentindex)
nbytes = sizeof(Int)*length(io.extents[extentindex].frameindices)
reinterpret(Int, view(io.cache.data, 1:nbytes))
end
foldmap(io::CSeis) = foldmap(io, io.cache.extentindex)

function foldmap_from_storage!(extents, extentindex, fmap_extents)
extent = extents[extentindex]

try
read!(extent.container, extent.name, fmap_extents[extentindex]; serial=true)
catch e
fmap_extents[extentindex] .= 0
if !isa(e, FileDoesNotExistError)
throw(e)
end
end
nothing
end

function foldmap_all_reshape(io, fmap_extents)
fmap = zeros(Int, ntuple(i->size(io,2+i), ndims(io)-2))
i2 = 0
for (iextent,fmap_extent) in enumerate(fmap_extents)
i1 = i2 + 1
i2 = i1 + length(io.extents[iextent].frameindices) - 1
fmap[i1:i2] .= reinterpret(Int, fmap_extent)
end
fmap
end

function foldmap_all(io, nasync, showprogress)
nextents = length(io.extents)

fmap_extents = [zeros(UInt8, sizeof(Int)*length(extent.frameindices)) for extent in io.extents]

r = 1:nasync:nextents
p = Progress(length(r); desc="loading foldmap...", enabled=showprogress)
for i in r
@sync for extentindex in i:min(i+nasync, nextents)
@async read_foldmap!(io, extentindex, fmap_extents[extentindex]; serial=true)
end
next!(p)
end

foldmap_all_reshape(io, fmap_extents)
end

"""
foldmap(io; all=false, nasync=2048, showprogress=false)
Return the foldmap for either the currently cached extent (if `all=false`)
or the entire dataset if `all=true`. If `all=true`, then the operation is
performed using an asynchronous map over the extents. The number of asynchronous
tasks in this map is controled by `nasync`. Use `showprogress=true` to display
a progress bar while loading the foldmap.
"""
function foldmap(io::CSeis; all=false, nasync=2048, showprogress=false)
local fmap
if all
fmap = foldmap_all(io, nasync, showprogress)
else
fmap = foldmap(io, io.cache.extentindex)
end
fmap
end

unsafe_foldmap(io::CSeis, extentindex) = unsafe_wrap(Array, convert(Ptr{Int}, pointer(io.cache.data)), (length(io.extents[extentindex].frameindices)); own=false)
unsafe_foldmap(io::CSeis) = unsafe_foldmap(io, io.cache.extentindex)
Expand Down Expand Up @@ -2521,6 +2593,7 @@ description!,
domains,
fold,
fold!,
foldmap,
geometry,
getframe,
getframehdrs,
Expand Down
13 changes: 13 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,19 @@ const compressors = Sys.iswindows() ? ("none","blosc","leftjustify","zfp") : ("n
@test f == [11 for i=1:12]
end

@testset "full read for foldmap, lstarts=$lstarts, lincs=$lincs" for (lstrts,lncs) = (((1,1,1),(1,1,1)), ((2,3,4),(5,6,7)))
r = uuid4()
io = csopen_robust(mkcontainer(cloud, "test-$r-cs"), "w", axis_lengths=[10,11,12], axis_lstarts=lstrts, axis_lincs=lncs, compressor=compressor, compressor_options=compressor_options, frames_per_extent=2)

x = rand(Float32,10,11,12)
write(io, x, :, :, :)
close(io)

io = csopen(mkcontainer(cloud, "test-$r-cs"))
f = foldmap(io; all=true)
@test f == [11 for i=1:12]
end

@testset "similarto" begin
r = uuid4()
io = csopen_robust(mkcontainer(cloud, "test-$r-cs"), "w",
Expand Down

0 comments on commit 80143ad

Please sign in to comment.