diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc index 093e0cc2bfe..2b7adb6e1f2 100644 --- a/src/commands/cmd_key.cc +++ b/src/commands/cmd_key.cc @@ -23,6 +23,7 @@ #include "commander.h" #include "commands/ttl_util.h" #include "error_constants.h" +#include "rocksdb/slice.h" #include "server/redis_reply.h" #include "server/server.h" #include "storage/redis_db.h" @@ -356,6 +357,33 @@ class CommandDel : public Commander { } }; +class CommandDeleteRange : public Commander { + Status Parse(const std::vector &args) override { + if (args.size() != 3) { + if (args.size() != 2) { + return {Status::RedisParseErr, errInvalidSyntax}; + } + if (args[1].find('*') != args[1].size() - 1) { + return {Status::RedisParseErr, errInvalidSyntax}; + } + args_[1] = args[1].substr(0, args[1].size() - 1); + } + return Status::OK(); + } + + public: + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + rocksdb::Slice key_begin, key_end; + key_begin = args_[1]; + key_end = args_.size() == 3 ? args_[2] : ""; + redis::Database redis(srv->storage, conn->GetNamespace()); + auto s = redis.DeleteRange(ctx, key_begin, key_end); + if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; + *output = redis::RESP_OK; + return Status::OK(); + } +}; + class CommandRename : public Commander { public: Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { @@ -595,6 +623,7 @@ REDIS_REGISTER_COMMANDS(Key, MakeCmdAttr("ttl", 2, "read-only", 1, 1 MakeCmdAttr("pexpiretime", 2, "read-only", 1, 1, 1), MakeCmdAttr("del", -2, "write no-dbsize-check", 1, -1, 1), MakeCmdAttr("unlink", -2, "write no-dbsize-check", 1, -1, 1), + MakeCmdAttr("deleterange", -2, "write exclusive", 1, -1, 1), MakeCmdAttr("rename", 3, "write", 1, 2, 1), MakeCmdAttr("renamenx", 3, "write", 1, 2, 1), MakeCmdAttr("copy", -3, "write", 1, 2, 1), diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 1ff52c8b008..d9c6de91506 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -207,6 +207,17 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector &k return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +rocksdb::Status Database::DeleteRange(engine::Context &ctx, const Slice &start, const Slice &end) { + std::string ns_start = AppendNamespacePrefix(start); + std::string ns_end; + if (!end.empty()) { + ns_end = AppendNamespacePrefix(end); + } else { + ns_end = util::StringNext(ns_start); + } + return storage_->DeleteRange(ctx, ns_start, ns_end); +} + rocksdb::Status Database::Exists(engine::Context &ctx, const std::vector &keys, int *ret) { std::vector ns_keys; ns_keys.reserve(keys.size()); diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index 9563b1e1d6a..109408269b5 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -111,6 +111,7 @@ class Database { [[nodiscard]] rocksdb::Status Expire(engine::Context &ctx, const Slice &user_key, uint64_t timestamp); [[nodiscard]] rocksdb::Status Del(engine::Context &ctx, const Slice &user_key); [[nodiscard]] rocksdb::Status MDel(engine::Context &ctx, const std::vector &keys, uint64_t *deleted_cnt); + [[nodiscard]] rocksdb::Status DeleteRange(engine::Context &ctx, const Slice &start, const Slice &end); [[nodiscard]] rocksdb::Status Exists(engine::Context &ctx, const std::vector &keys, int *ret); [[nodiscard]] rocksdb::Status TTL(engine::Context &ctx, const Slice &user_key, int64_t *ttl); [[nodiscard]] rocksdb::Status GetExpireTime(engine::Context &ctx, const Slice &user_key, uint64_t *timestamp); diff --git a/tests/gocase/unit/deleterange/deleterange_test.go b/tests/gocase/unit/deleterange/deleterange_test.go new file mode 100644 index 00000000000..642ac8d61d8 --- /dev/null +++ b/tests/gocase/unit/deleterange/deleterange_test.go @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package deleterange + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +func TestDeleteRange(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + DeleteRangeTest(t, rdb, ctx) +} + +func DeleteRangeTest(t *testing.T, rdb *redis.Client, ctx context.Context) { + t.Run("DELETERANGE ALL", func(t *testing.T) { + require.NoError(t, rdb.FlushDB(ctx).Err()) + util.Populate(t, rdb, "key:", 1000, 10) + require.NoError(t, rdb.Do(ctx, "deleterange", "*").Err()) + keys := scanAll(t, rdb) + require.Len(t, keys, 0) + }) + + t.Run("DELETERANGE BY PERFERIX", func(t *testing.T) { + require.NoError(t, rdb.FlushDB(ctx).Err()) + + for _, key := range []string{"aa", "aab", "aabb", "ab", "abb", "ba", "cc", "cd", "dd"} { + require.NoError(t, rdb.Set(ctx, key, "hello", 0).Err()) + } + deleterange(t, rdb, "aa*") + keys := scanAll(t, rdb) + require.Equal(t, []string{"ab", "abb", "ba", "cc", "cd", "dd"}, keys) + + deleterange(t, rdb, "c*") + keys = scanAll(t, rdb) + require.Equal(t, []string{"ab", "abb", "ba", "dd"}, keys) + + deleterange(t, rdb, "d*") + keys = scanAll(t, rdb) + require.Equal(t, []string{"ab", "abb", "ba"}, keys) + + deleterange(t, rdb, "a*") + keys = scanAll(t, rdb) + require.Equal(t, []string{"ba"}, keys) + + deleterange(t, rdb, "*") + keys = scanAll(t, rdb) + require.Equal(t, []string(nil), keys) + }) + + t.Run("DELETERANGE with multi namespace", func(t *testing.T) { + require.NoError(t, rdb.FlushDB(ctx).Err()) + require.NoError(t, rdb.ConfigSet(ctx, "requirepass", "foobared").Err()) + + tokens := []string{"test_ns_token1", "test_ns_token2"} + keyPrefixes := []string{"key1*", "key2*"} + namespaces := []string{"test_ns1", "test_ns2"} + + for i := 0; i < 2; i++ { + require.NoError(t, rdb.Do(ctx, "AUTH", "foobared").Err()) + require.NoError(t, rdb.Do(ctx, "NAMESPACE", "ADD", namespaces[i], tokens[i]).Err()) + require.NoError(t, rdb.Do(ctx, "AUTH", tokens[i]).Err()) + + for k := 0; k < 1000; k++ { + require.NoError(t, rdb.Set(ctx, fmt.Sprintf("%s:%d", keyPrefixes[i], k), "hello", 0).Err()) + } + for k := 0; k < 100; k++ { + require.NoError(t, rdb.Set(ctx, strconv.Itoa(k), "hello", 0).Err()) + } + } + + for i := 0; i < 2; i++ { + require.NoError(t, rdb.Do(ctx, "AUTH", tokens[i]).Err()) + require.NoError(t, rdb.Do(ctx, "deleterange", keyPrefixes[i]).Err()) + + keys := scanAll(t, rdb, "match", keyPrefixes[i]) + require.Len(t, keys, 0) + + keys = scanAll(t, rdb) + require.Len(t, keys, 100) + } + }) + + t.Run("Deleterange reject invalid input", func(t *testing.T) { + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "hello").Err(), ".*syntax error.*") + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "hel*o").Err(), ".*syntax error.*") + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "*hello").Err(), ".*syntax error.*") + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "[").Err(), ".*syntax error.*") + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "\\").Err(), ".*syntax error.*") + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "[a").Err(), ".*syntax error.*") + util.ErrorRegexp(t, rdb.Do(ctx, "DELETERANGE", "[a-]").Err(), ".*syntax error.*") + }) +} + +func scan(t testing.TB, rdb *redis.Client, c string, args ...interface{}) (cursor string, keys []string) { + args = append([]interface{}{"SCAN", c}, args...) + r := rdb.Do(context.Background(), args...) + require.NoError(t, r.Err()) + require.Len(t, r.Val(), 2) + + rs := r.Val().([]interface{}) + cursor = rs[0].(string) + + for _, key := range rs[1].([]interface{}) { + keys = append(keys, key.(string)) + } + + return +} + +func deleterange(t testing.TB, rdb *redis.Client, c string, args ...interface{}) { + args = append([]interface{}{"DELETERANGE", c}, args...) + r := rdb.Do(context.Background(), args...) + require.NoError(t, r.Err()) +} + +func scanAll(t testing.TB, rdb *redis.Client, args ...interface{}) (keys []string) { + c := "0" + for { + cursor, keyList := scan(t, rdb, c, args...) + + c = cursor + keys = append(keys, keyList...) + + if c == "0" { + slices.Sort(keys) + keys = slices.Compact(keys) + return + } + } +}