Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2208cdb
Support hash field expiration
jjz921024 Jul 7, 2024
e743f04
revert changes to GetMetadata
ltagliamonte-dd Jan 8, 2025
a1afdc5
fix CommandHStrlen
ltagliamonte-dd Jan 8, 2025
88ef4ca
Merge branch 'apache:unstable' into hfe
ltagliamonte-dd Jan 9, 2025
201c226
fix typo
ltagliamonte-dd Jan 9, 2025
f6d0cb7
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 9, 2025
d3a2b74
fix unit
ltagliamonte-dd Jan 10, 2025
45a864a
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 10, 2025
fccc2fe
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 13, 2025
9de4b52
add GetValidFieldCount method
jjz921024 Jan 11, 2025
4b87842
make the error msg consistent
jjz921024 Jan 12, 2025
b9aeaba
fix msg and use new func
ltagliamonte-dd Jan 13, 2025
a6a07bb
empty commit to re trigger CI
ltagliamonte-dd Jan 13, 2025
4240628
fix dereference
ltagliamonte-dd Jan 14, 2025
dbd9d13
remove unused var
ltagliamonte-dd Jan 14, 2025
67f41c3
remove empty line
ltagliamonte-dd Jan 14, 2025
cf0ff18
Merge branch 'unstable' into hfe
git-hulk Jan 17, 2025
35ce911
Merge branch 'unstable' into hfe
git-hulk Jan 19, 2025
3eefb37
Merge branch 'unstable' into hfe
git-hulk Jan 19, 2025
f0938a1
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 22, 2025
c5ec9f0
Update kvrocks.conf
ltagliamonte-dd Jan 25, 2025
3a726e1
address PragmaTwice feedbacks
ltagliamonte-dd Jan 25, 2025
b10d710
use string_view
ltagliamonte-dd Jan 25, 2025
f937ec8
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 25, 2025
9bbc8d9
address last comments
ltagliamonte-dd Jan 25, 2025
009fca3
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 26, 2025
6ebc2c8
Merge branch 'unstable' into hfe
ltagliamonte-dd Jan 28, 2025
1a45388
Merge remote-tracking branch 'origin/unstable' into hfe
ltagliamonte-dd Feb 10, 2025
581c1f4
Merge branch 'unstable' into hfe
aleksraiden Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ json-storage-format json
# Default: no
txn-context-enabled no

# Whether to enable the hash field expiration feature.
# NOTE: This option will only affect newly-created hash objects.
# Default: no
hash-field-expiration no

# Define the histogram bucket values.
#
# If enabled, those values will be used to store the command execution latency values
Expand Down
246 changes: 243 additions & 3 deletions src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ class CommandHStrlen : public Commander {
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(static_cast<int>(value.size()));
*output = s.IsNotFound() ? redis::Integer(0) : redis::Integer(static_cast<int>(value.size()));
return Status::OK();
}
};
Expand Down Expand Up @@ -475,6 +474,238 @@ class CommandHRandField : public Commander {
bool no_parameters_ = true;
};

class CommandFieldExpireBase : public Commander {
protected:
Status commonParse(const std::vector<std::string> &args, int start_idx) {
CommandParser parser(args, start_idx);
std::string_view expire_flag, num_flag;
uint64_t fields_num = 0;
while (parser.Good()) {
if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
break;
} else if (parser.EatEqICaseFlag("NX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::NX;
} else if (parser.EatEqICaseFlag("XX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::XX;
} else if (parser.EatEqICaseFlag("GT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::GT;
} else if (parser.EatEqICaseFlag("LT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::LT;
} else {
return parser.InvalidSyntax();
}
}

auto remains = parser.Remains();
auto size = args.size();
if (remains != fields_num) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

for (size_t i = size - remains; i < size; i++) {
fields_.emplace_back(args_[i]);
}

return Status::OK();
}

Status expireFieldExecute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) {
if (!srv->storage->GetConfig()->hash_field_expiration) {
return {Status::RedisExecErr, "field expiration feature is disabled"};
}

std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.ExpireFields(ctx, args_[1], expire_, fields_, field_expire_type_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}

return Status::OK();
}

Status ttlExpireExecute(engine::Context &ctx, Server *srv, Connection *conn, std::vector<int64_t> &ret) {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.TTLFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}

uint64_t expire_ = 0;
HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
std::vector<Slice> fields_;
};

class CommandHExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000 + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHPExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHPExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHExpireTime : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer((now + ttl) / 1000));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHPExpireTime : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer(now + ttl));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl));
}
return Status::OK();
}
};

class CommandHPTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl));
}
return Status::OK();
}
};

class CommandHPersist : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.PersistFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
Expand All @@ -492,6 +723,15 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1), )
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireAt>("hexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireTime>("hexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPExpire>("hpexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireAt>("hpexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireTime>("hpexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPTTL>("hpttl", -5, "read-only", 1, 1, 1), )

} // namespace redis
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},
{"hash-field-expiration", false, new YesNoField(&hash_field_expiration, false)},
{"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")},

/* rocksdb options */
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ struct Config {

bool skip_block_cache_deallocation_on_close = false;

// whether to enable hash field expiration feature
bool hash_field_expiration = false;

std::vector<double> histogram_bucket_boundaries;

struct RocksDB {
Expand Down
5 changes: 4 additions & 1 deletion src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "db_util.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
#include "types/redis_hash.h"

namespace engine {

Expand Down Expand Up @@ -132,7 +133,9 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl
return false;
}

return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
return IsMetadataExpired(ikey, metadata) ||
(metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)) ||
(metadata.Type() == kRedisHash && redis::Hash::IsFieldExpired(cached_metadata_, value));
}

} // namespace engine
Loading
Loading