Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions cpp/src/arrow/util/compression_zstd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,27 @@ Status ZSTDError(size_t ret, const char* prefix_msg) {
return Status::IOError(prefix_msg, ZSTD_getErrorName(ret));
}

// ----------------------------------------------------------------------
// ZSTD context deleter implementation
struct ZSTDContextDeleter {
void operator()(ZSTD_DCtx* r) {
if (r) {
ZSTD_freeDCtx(r);
}
}
void operator()(ZSTD_CCtx* r) {
if (r) {
ZSTD_freeCCtx(r);
}
}
};

// ----------------------------------------------------------------------
// ZSTD decompressor implementation

class ZSTDDecompressor : public Decompressor {
public:
ZSTDDecompressor() : stream_(ZSTD_createDStream()) {}
ZSTDDecompressor() : stream_(ZSTD_createDStream()), finished_(false) {}

~ZSTDDecompressor() override { ZSTD_freeDStream(stream_); }

Expand Down Expand Up @@ -187,9 +202,14 @@ class ZSTDCodec : public Codec {
DCHECK_EQ(output_buffer_len, 0);
output_buffer = &empty_buffer;
}

size_t ret = ZSTD_decompress(output_buffer, static_cast<size_t>(output_buffer_len),
input, static_cast<size_t>(input_len));
// Decompression context for ZSTD contains several large heap allocations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How large is "large" here? This is proposing to keep those per-thread heap allocations alive until the threads themselves are joined (which typically happens at process exit for a thread pool).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC 5-10MB in total. Enough to hurt performance with small blocks (i.e. Parquet with 8kB row groups) both due to memory management and cache trashing, not enough to hurt in terms of total memory footprint.

Would have liked to slave those allocations to the arrow default memory pool for proper tracing, but that feature is exclusive to the static linkage of ZSTD.

I did deliberately avoid managing a pool per instance, assuming that there may be many instances of this class, more than threads in the thread pools.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To that means that reuse of the contexts should be governed at a higher level, for example the Parquet reader. Perhaps do how the Rust implementation did and expose some kind of "decompression context" API?

Copy link
Author

@Ext3h Ext3h Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure about that - the problematic free-threaded case there is the use of thread pools within feather/ipc. They'd need a thread_local like patterns in any case. Which means instead of one central thread_local there would simply be one at 3+ code locations instead.

Exposing the context for use with Parquet would require exposing it all the way out to parquet::WriterProperties::Builder - and then you'd possibly still end up with multiple writer instances wrongly sharing a context, rendering threading of those writers suddenly impossible. If anything you'd need to export a threading aware "context pool" rather than a context, but that would be equal to reinventing thread_local except worse in terms of cache locality and undesirable extra synchronization primitives.

The Rust implementation did not encounter those issues as there is no sharing of the context permitted in the first place due to being constrained by language features. And correspondingly also no aggressive threading using a (potentially) shared state.

Ultimately, having exactly one cached context per thread for the single shot compression/decompression API is the recommended usage pattern from the ZSTD maintainers, and aligns best with the available API:

/*= Decompression context
 *  When decompressing many times,
 *  it is recommended to allocate a context only once,
 *  and reuse it for each successive compression operation.
 *  This will make workload friendlier for system's memory.
 *  Use one context per thread for parallel execution. */
typedef struct ZSTD_DCtx_s ZSTD_DCtx;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... after checking the Rust implementation, it really should have used a thread_local! scoped context as well. That went badly, where it's now creating that ZSTD context even if LZ4 is selected, it's creating one distinct context per usage location, and it's still creating a new context for a lot of potentially short-lived objects. Also it missed that there is not just a need for the CompressionContext but also the DecompressionContext specifically when talking about the IPC library which uses compression in both directions...

Copy link
Author

@Ext3h Ext3h Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like:

#include <memory>
#include <mutex>
#include <map>
#include <set>

class LocalContext
{
};

// Container, only updated in rare case. Doubles as factory for `LocalContext`.
class ContextFactory {
public:
    // To be called by `LocalContextReference`.
    // Returned context is to be explicitly released if external reference expires first.
    std::shared_ptr<LocalContext> Create() {
        auto instance = std::make_shared<LocalContext>();
        std::scoped_lock lock(door_);
        instances_.emplace(instance);
        return instance;
    }

    void Release(const std::shared_ptr<LocalContext>& instance)
    {
        assert(instance);
        // This will result in LocalContext expiring outside of the mutex to avoid unnecessary contention.
        std::scoped_lock lock(door_);
        instances_.erase(instance);
    }

private:
    // Strong references are held here, but expiry of TLS can clear them out.
    std::set<std::shared_ptr<LocalContext>> instances_;
    std::mutex door_;
};

// Thread-local cache at the usage location.
class LocalContextReference {
public:
    ~LocalContextReference() {
        // Need to inform all `ContextFactory ` that are still alive, that they may release the strong reference to `LocalContext` associated with this thread.
        for(const auto& instance : instances_)
        {
            // Factory can be expired before this.
            if(auto factory = instance.first.lock())
            {
                factory->Release(instance.second.lock());
            }
        }
    }

    std::shared_ptr<LocalContext> Get(const std::shared_ptr<ContextFactory>& factory)
    {
        assert(factory);
        // Not possible until C++26...
        // Need std::weak_ptr<T>::owner_equal from C++26.
        auto it = instances_.find(factory);
        if(it == instances_.end()) {
            it = instances_.emplace(std::piecewise_construct, std::forward_as_tuple(factory), std::forward_as_tuple(factory->Create())).first;
        }
        return it->second.lock();
    }

private:
    std::map<std::weak_ptr<ContextFactory>, std::weak_ptr<LocalContext>> instances_;
};

// At usage location.
thread_local LocalContextReference reference;

At minimum this would require a (reasonable...) backport of std::weak_ptr<T>::owner_equal to work with older C++ standards in the line instances_.find(factory). We can not use std::unordered_map with std::weak_ptr because std::weak_ptr<T>::owner_hash - unlike owner_equal - is not possible to back port as it requires private interfaces on std::weak_ptr.

Apart from that it should behave acceptable. std::map in uncontested cache lines / TLS is surprisingly fast. Any creation and destruction of instances is kept strictly out of contested code locations.

Copy link
Author

@Ext3h Ext3h Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't really say I like the idea though, that bad use of that interface (i.e. not explicitly sharing the factory as the user of the outermost interface!) would still inevitably result in carrying multiple instances of the thread local scope instead.

Makes it harder to use, and potentially even backfires in terms of peak memory consumption. Even worst, that map in the TLS is accumulating a memory leak of expired shared ptrs - which due to extensive use of std::make_shared are usually actually fused allocations.

I would rather take the risk of leaking a few MB of memory per thread, assuming that threads are usually a resource developers are good at tracking and tearing down when no longer intended for re-use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't really say I like the idea though, that bad use of that interface (i.e. not explicitly sharing the factory as the user of the outermost interface!) would still inevitably result in carrying multiple instances of the thread local scope instead.

Two non-exclusive answers: 1) documentation 2) make caching optional in the Codec constructor

Makes it harder to use, and potentially even backfires in terms of peak memory consumption. Even worst, that map in the TLS is accumulating a memory leak of expired shared ptrs - which due to extensive use of std::make_shared are usually actually fused allocations.

Right, but the underlying compression context will be destroyed anyway, which is what matters.

(and a mitigation for this is to scrub expired weak_ptrs depending on heuristics)

assuming that threads are usually a resource developers are good at tracking and tearing down when no longer intended for re-use

Arrow uses its internal thread pool extensively, so that doesn't apply here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, a simpler alternative to this non-static ThreadLocal class is to manage a bounded-size freelist of contexts at the Codec level. This would probably cut down on implementation complexity, though it would also limit context reuse if the number of decompressing threads goes above the limit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow uses its internal thread pool extensively, so that doesn't apply here.

One global thread pool, and the size of that pool is bounded to the number of CPU cores. From my perspective, this means that there is already a sane limit in the number of threads.

Meanwhile, for any application I can think of, it should be more or less be safe to assume that any thread that did require a thread_local scoped context once, will also either need it again during the lifetime of the process, or the thread will have a limited life time itself. It's not like someone is going to spawn thousands of threads for single-shot compression, and then somehow additionally keeps all of them around indefinitely without ever using them for the same task again. (While also not having enough RAM to pay for that allocated context, while still having enough RAM to pay for the remaining overhead of an OS thread...)

By the way, a simpler alternative to this non-static ThreadLocal class is to manage a bounded-size freelist of contexts at the Codec level. This would probably cut down on implementation complexity, though it would also limit context reuse if the number of decompressing threads goes above the limit.

That would imply a global synchronization primitive on the freelist, and also completely messes up the cache locality in case the cached context is jumping cores due to the non-local properties of any naively implemented freelist. You'd at least have to implement an instance stealing pattern in order to preserve locality in the good path. Also "bounded-size" - assuming that you had wanted to block until an instance became free - means there's now a source of priority inversion, and the only way to avoid is to "coincidentally" set the upper bound exactly to the number of threads.

The only potential benefit was to be had in the case of potentially short-lived threads - but the overhead of creating a fresh thread dominates the cost for that ZSTD context allocation bound to the TLS by magnitudes so certainly not worth the effort to optimize for.

By all means, thread_local is appropriate here for holding the context.

// This method is additionally used in a free-threaded context so caching in a class
// member is not possible.
thread_local std::unique_ptr<ZSTD_DCtx, ZSTDContextDeleter> decompression_context{
ZSTD_createDCtx(), ZSTDContextDeleter{}};
size_t ret = ZSTD_decompressDCtx(decompression_context.get(), output_buffer,
static_cast<size_t>(output_buffer_len), input,
static_cast<size_t>(input_len));
if (ZSTD_isError(ret)) {
return ZSTDError(ret, "ZSTD decompression failed: ");
}
Expand All @@ -207,8 +227,14 @@ class ZSTDCodec : public Codec {

Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override {
size_t ret = ZSTD_compress(output_buffer, static_cast<size_t>(output_buffer_len),
input, static_cast<size_t>(input_len), compression_level_);
// Compression context for ZSTD contains several large heap allocations.
// This method is additionally used in a free-threaded context so caching in a class
// member is not possible.
thread_local std::unique_ptr<ZSTD_CCtx, ZSTDContextDeleter> compression_context{
ZSTD_createCCtx(), ZSTDContextDeleter{}};
size_t ret = ZSTD_compressCCtx(compression_context.get(), output_buffer,
static_cast<size_t>(output_buffer_len), input,
static_cast<size_t>(input_len), compression_level_);
if (ZSTD_isError(ret)) {
return ZSTDError(ret, "ZSTD compression failed: ");
}
Expand Down