Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/rdkafka_zstd.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ rd_kafka_resp_err_t rd_kafka_zstd_decompress(rd_kafka_broker_t *rkb,

switch (out_bufsize) {
case ZSTD_CONTENTSIZE_UNKNOWN:
/* Decompressed size cannot be determined, make a guess */
out_bufsize = inlen * 2;
/* Decompressed size cannot be determined, make a guess
* and only use inlen doubled if it is below max size */
out_bufsize = RD_MIN(inlen * 2,
(unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size);
break;
case ZSTD_CONTENTSIZE_ERROR:
/* Error calculating frame content size */
Expand Down Expand Up @@ -91,8 +93,17 @@ rd_kafka_resp_err_t rd_kafka_zstd_decompress(rd_kafka_broker_t *rkb,
/* Check if the destination size is too small */
if (ZSTD_getErrorCode(ret) == ZSTD_error_dstSize_tooSmall) {

/* If we are already at max size we can't grow further */
if (out_bufsize >=
(unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size)
break;

/* Grow quadratically */
out_bufsize += RD_MAX(out_bufsize * 2, 4000);
/* If growth would be more than the max size, clamp
* the value to max */
if (out_bufsize > (unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size)
out_bufsize = (unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size;

rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1);

Expand Down