Skip to content
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
03b69e1
feat: impl tdigest.revrank
donghao526 Aug 17, 2025
97df4a1
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
70a39d3
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
dde8410
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
0d3e9cc
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
bb172a8
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
a64add4
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
3954b1f
Merge branch 'unstable' into feature/tdigest-revrank
PragmaTwice Aug 19, 2025
05d1202
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
f688e14
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
8bcad0f
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
46ac984
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
495e072
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
2b6785d
add golang test cases for tdigest.revrank
invalid-email-address Aug 20, 2025
3af3b54
feat: impl tdigest.revrank
invalid-email-address Aug 20, 2025
f3d85d3
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 20, 2025
e68689d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 21, 2025
eb8674f
feat: impl tdigest.revrank
invalid-email-address Aug 27, 2025
c70f410
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 27, 2025
b991d0d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 27, 2025
4c9a41d
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
invalid-email-address Aug 28, 2025
543fda0
fix(replication): Fix Seg Fault On Signal When Replication is Enabled…
zhixinwen Aug 20, 2025
e0d39a7
chore(.asf.yaml): make 2.13 a protected branches (#3129)
PragmaTwice Aug 20, 2025
a4ed14c
feat(ts): Add support for data writing and `TS.CREATE`, `TS.ADD/MADD`…
yezhizi Aug 20, 2025
9d6c532
feat(ts): Add `TS.INFO` command (#3133)
yezhizi Aug 22, 2025
ff658f8
chore(.asf.yaml): enable auto merge and disable wiki (#3137)
PragmaTwice Aug 22, 2025
53e82f8
chore: remove unused `autoResizeBlockAndSST` method and config (#3136)
jonahgao Aug 22, 2025
6df3309
feat(scripting): support strict key-accessing mode for lua scripting …
PragmaTwice Aug 23, 2025
201afed
feat(Dockerfile): add a UID for the user in the container (#3138)
SpecLad Aug 24, 2025
0851c22
feat(ts): Add data query support and `TS.RANGE` command (#3140)
yezhizi Aug 25, 2025
c7ed36f
feat(ts): Add `TS.GET` command (#3142)
yezhizi Aug 26, 2025
3a898fe
chore(config): enable `level_compaction_dynamic_level_bytes` by defau…
jonahgao Aug 26, 2025
3711578
perf(storage): eliminate unnecessary `rocksdb::DB::ListColumnFamilies…
jonahgao Aug 27, 2025
4b4f684
fix(scan): pattern-based SCAN iterations may skip remaining keys (#3146)
sryanyuan Aug 27, 2025
bd268b4
style: add some comments on TDigestRank
donghao526 Aug 28, 2025
8e6a7f9
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Aug 28, 2025
6662240
refactor: remove commented code
donghao526 Aug 28, 2025
e7f06a2
style: format code
donghao526 Aug 28, 2025
367981c
Merge branch 'unstable' into feature/tdigest-revrank
LindaSummer Aug 29, 2025
4b8cd6a
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Sep 18, 2025
07836fd
feat: sort the input using map in revrank
donghao526 Sep 19, 2025
f44bc56
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Sep 19, 2025
2aded75
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 14, 2025
f4a9c53
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
5023de8
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 25, 2025
e3629d9
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
ae05623
fix: fix format
donghao526 Oct 25, 2025
0cf8c8a
fix: fix clang-tidy
donghao526 Oct 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,49 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

class CommandTDigestRevRank : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];
inputs_.reserve(args.size() - 2);
for (size_t i = 2; i < args.size(); i++) {
auto value = ParseFloat(args[i]);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
inputs_.push_back(*value);
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(inputs_.size());
if (const auto s = tdigest.RevRank(ctx, key_name_, inputs_, result); !s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}

if (result.data()) {
std::vector<std::string> rev_ranks;
rev_ranks.reserve(result.size());
for (const auto v : result) {
rev_ranks.push_back(redis::Integer(v));
}
*output = redis::Array(rev_ranks);
} else {
*output = redis::BulkString("nan");
}
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> inputs_;
};

class CommandTDigestMinMax : public Commander {
public:
explicit CommandTDigestMinMax(bool is_min) : is_min_(is_min) {}
Expand Down Expand Up @@ -369,6 +412,7 @@ REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestRevRank>("tdigest.revrank", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange));
Expand Down
85 changes: 65 additions & 20 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class DummyCentroids {
return Valid();
}
bool Valid() const { return iter_ != centroids_.cend(); }
bool IsAtBegin() const { return iter_ == centroids_.cbegin(); }

StatusOr<Centroid> GetCentroid() const {
if (iter_ == centroids_.cend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
Expand Down Expand Up @@ -186,8 +188,37 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const Slice& digest_name, con
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string& ns_key, TDigestMetadata* metadata) {
if (metadata->unmerged_nodes == 0) {
return rocksdb::Status::OK();
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata->Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}

ctx.RefreshLatestSnapshot();
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
Expand All @@ -198,31 +229,45 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (metadata.unmerged_nodes > 0) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}
if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); !status.ok()) {
return status;
}
std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata.Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}
auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRank(dump_centroids, inputs, result);
if (!status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}
rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

ctx.RefreshLatestSnapshot();
if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class TDigest : public SubKeyScanner {

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);

rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down Expand Up @@ -117,6 +118,8 @@ class TDigest : public SubKeyScanner {
std::string internalSegmentGuardPrefixKey(const TDigestMetadata& metadata, const std::string& ns_key,
SegmentType seg) const;

rocksdb::Status mergeNodes(engine::Context& ctx, const std::string& ns_key, TDigestMetadata* metadata);

rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string& ns_key,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
const std::vector<double>* additional_buffer = nullptr,
Expand Down
72 changes: 72 additions & 0 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fmt/format.h>

#include <numeric>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -150,3 +151,74 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q) {
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}

template <typename TD>
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems specialized for RevRank rather than Rank.

Could we implement the Rank, then wrap the Iterator to a reverse version with the same logic to construct RevRank?

std::vector<size_t> indices(inputs.size());
std::iota(indices.begin(), indices.end(), 0);
std::sort(indices.begin(), indices.end(), [&inputs](size_t a, size_t b) { return inputs[a] < inputs[b]; });

result.resize(inputs.size());
size_t i = indices.size();
double cumulative_weight = 0;

// handle inputs larger than maximum
while (i > 0 && inputs[indices[i - 1]] > td.Max()) {
result[indices[i - 1]] = -1;
i--;
}

// reverse iterate through centroids and calculate reverse rank for each input
auto iter = td.End();
while (i > 0) {
auto centroid = GET_OR_RET(iter->GetCentroid());

if (centroid.mean > inputs[indices[i - 1]]) {
// mean > input, accumulate weight and move to prev centroid
cumulative_weight += centroid.weight;
} else if (centroid.mean == inputs[indices[i - 1]]) {
// mean == input, calculate reverse rank with half weight of current centroid
cumulative_weight += centroid.weight;
auto current_mean = centroid.mean;
auto current_mean_cumulative_weight = cumulative_weight + centroid.weight / 2;

// handle all the prev centroids which has the same mean
while (!iter->IsAtBegin() && iter->Prev()) {
auto next_centroid = GET_OR_RET(iter->GetCentroid());
if (current_mean != next_centroid.mean) {
// move back to the last equal centroid, because we will process it in the next loop
iter->Next();
break;
}
current_mean_cumulative_weight += centroid.weight / 2;
cumulative_weight += centroid.weight;
}

// assign the reverse rank for the inputs[indices[i - 1]]
result[indices[i - 1]] = static_cast<int>(current_mean_cumulative_weight);
i--;

// handle the prev inputs which has the same value
while ((i > 0) && (inputs[indices[i]] == inputs[indices[i - 1]])) {
result[indices[i - 1]] = result[indices[i]];
i--;
}
} else {
// mean < input, calculate reverse rank
result[indices[i - 1]] = static_cast<int>(cumulative_weight);
i--;
}

if (iter->IsAtBegin()) {
break;
}
iter->Prev();
}

// handle inputs less than minimum
while (i > 0) {
result[indices[i - 1]] = static_cast<int>(td.TotalWeight());
i--;
}
return Status::OK();
}
76 changes: 76 additions & 0 deletions tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,79 @@ TEST_F(RedisTDigestTest, Quantile_returns_nan_on_empty_tdigest) {
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.quantiles) << "should not have quantiles with empty tdigest";
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_contains_different_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 20, 30, 40, 50, 60};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {0, 10, 20, 30, 40, 50, 60, 70};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{6, 5, 4, 3, 2, 1, 0, -1};

for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_contains_several_identical_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 10, 10, 20, 20};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{3, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();

status = tdigest_->Add(*ctx_, test_digest_name, std::vector<double>{10});
ASSERT_TRUE(status.ok()) << status.ToString();

result.clear();
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result_new = std::vector<double>{4, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result_new[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_empty_tdigest) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<int> result;
result.reserve(2);
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{-2, -2};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}
Loading
Loading