-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTCPStore.hpp
123 lines (86 loc) · 3.65 KB
/
TCPStore.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once
#include "Store.hpp"
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
namespace xoscar {
namespace detail {
class TCPServer;
class TCPClient;
class TCPCallbackClient;
struct SocketAddress {
std::string host{};
std::uint16_t port{};
};
} // namespace detail
struct TCPStoreOptions {
static constexpr std::uint16_t kDefaultPort = 29500;
std::uint16_t port = kDefaultPort;
bool isServer = false;
std::optional<std::size_t> numWorkers = std::nullopt;
bool waitWorkers = true;
std::chrono::milliseconds timeout = Store::kDefaultTimeout;
// A boolean value indicating whether multiple store instances can be
// initialized with the same host:port pair.
bool multiTenant = false;
};
class TCPStore : public Store {
public:
explicit TCPStore(std::string host, const TCPStoreOptions &opts = {});
[[deprecated("Use TCPStore(host, opts) instead.")]] explicit TCPStore(
const std::string &masterAddr,
std::uint16_t masterPort,
std::optional<int> numWorkers = std::nullopt,
bool isServer = false,
const std::chrono::milliseconds &timeout = kDefaultTimeout,
bool waitWorkers = true);
~TCPStore();
void set(const std::string &key,
const std::vector<uint8_t> &value) override;
std::vector<uint8_t>
compareSet(const std::string &key,
const std::vector<uint8_t> &expectedValue,
const std::vector<uint8_t> &desiredValue) override;
std::vector<uint8_t> get(const std::string &key) override;
int64_t add(const std::string &key, int64_t value) override;
bool deleteKey(const std::string &key) override;
// NOTE: calling other TCPStore APIs inside the callback is NOT threadsafe
// watchKey() is a blocking operation. It will register the socket on
// TCPStoreMasterDaemon and the callback on TCPStoreWorkerDaemon. It will
// return once it has verified the callback is registered on both background
// threads. Only one thread can call watchKey() at a time.
void watchKey(const std::string &key, WatchKeyCallback callback) override;
bool check(const std::vector<std::string> &keys) override;
int64_t getNumKeys() override;
void wait(const std::vector<std::string> &keys) override;
void wait(const std::vector<std::string> &keys,
const std::chrono::milliseconds &timeout) override;
void append(const std::string &key,
const std::vector<uint8_t> &value) override;
std::vector<std::vector<uint8_t>>
multiGet(const std::vector<std::string> &keys) override;
void multiSet(const std::vector<std::string> &keys,
const std::vector<std::vector<uint8_t>> &values) override;
bool hasExtendedApi() const override;
// Waits for all workers to join.
void waitForWorkers();
// Returns the hostname used by the TCPStore.
const std::string &getHost() const noexcept { return addr_.host; }
// Returns the port used by the TCPStore.
std::uint16_t getPort() const noexcept { return addr_.port; }
private:
int64_t incrementValueBy(const std::string &key, int64_t delta);
std::vector<uint8_t> doGet(const std::string &key);
void doWait(std::vector<std::string> keys,
std::chrono::milliseconds timeout);
detail::SocketAddress addr_;
std::shared_ptr<detail::TCPServer> server_;
std::unique_ptr<detail::TCPClient> client_;
std::unique_ptr<detail::TCPCallbackClient> callbackClient_;
std::optional<std::size_t> numWorkers_;
const std::string initKey_ = "init/";
const std::string keyPrefix_ = "/";
std::mutex activeOpLock_;
};
} // namespace xoscar