-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathex018-local_cache.cpp
153 lines (120 loc) · 3.99 KB
/
ex018-local_cache.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include <chrono>
#include <functional>
#include <iostream>
#include <mutex>
#include <memory>
#include <syncstream>
#include "coke/mutex.h"
#include "coke/sleep.h"
#include "coke/wait.h"
/**
* This example shows how to use coke::Mutex to protect asynchronous operations.
*
* A common scenario is DNS cache. When the local cache of a domain name does
* not exist or expires, only one request is needed to update the DNS, and other
* requests wait for the updated result and use it directly.
*
* The update strategies for different requirements vary greatly. To avoid being
* verbose, only a simple scenario is shown here.
*/
class CacheHandle {
using clock_type = std::chrono::steady_clock;
using time_point_type = clock_type::time_point;
static auto now() { return clock_type::now(); }
public:
void set_value(const std::string &value, long expire_ms);
bool expired() const { return now() > expire_at; }
const std::string &get_value_ref() const { return value; }
private:
std::string value;
time_point_type expire_at;
};
class AsyncCache {
public:
using handle_ptr_t = std::shared_ptr<CacheHandle>;
using update_func_t = std::function<coke::Task<handle_ptr_t>()>;
AsyncCache(update_func_t func) : func(func) { }
~AsyncCache() = default;
handle_ptr_t try_get();
coke::Task<handle_ptr_t> get_or_update();
private:
coke::Mutex co_mtx;
std::mutex mtx;
handle_ptr_t ptr;
update_func_t func;
};
std::string current();
coke::Task<AsyncCache::handle_ptr_t> AsyncCache::get_or_update() {
// 1. Lock to make sure there is at most one worker updating cache.
coke::UniqueLock<coke::Mutex> lock(co_mtx);
co_await lock.lock();
handle_ptr_t hdl = try_get();
if (hdl) {
// 5. Other workers will get the updated value directly.
co_return hdl;
}
// 2. The first one who acquire the lock will execute the update process.
// Assuming there is no exception.
hdl = co_await func();
{
// 3. Update ptr so that other workers get the new value.
std::lock_guard<std::mutex> lg(mtx);
ptr = hdl;
}
// 4. Return new value and wake up other workers.
co_return hdl;
}
coke::Task<AsyncCache::handle_ptr_t> updater() {
static unsigned value = 0;
std::osyncstream(std::cout) << current() << "Update value" << std::endl;
auto ptr = std::make_shared<CacheHandle>();
// Simulate time-consuming asynchronous update operations.
co_await coke::sleep(1.0);
ptr->set_value(std::to_string(value++), 1000);
co_return ptr;
}
coke::Task<> use_cache(int id, AsyncCache &cache) {
std::osyncstream os(std::cout);
std::emit_on_flush(os);
for (int i = 0; i < 5; i++) {
// Try to check if there is already an unexpired value.
auto ptr = cache.try_get();
// If there is no value, update it.
// If there is already a worker updating, wait for it to complete and
// get the updated value.
if (!ptr)
ptr = co_await cache.get_or_update();
// Work with value.
os << current() << "Worker " << id
<< " use value " << ptr->get_value_ref() << std::endl;
co_await coke::sleep(0.6);
}
}
int main() {
AsyncCache cache(updater);
// Start two workers to use cache.
coke::sync_wait(
use_cache(1, cache),
use_cache(2, cache)
);
return 0;
}
void CacheHandle::set_value(const std::string &value, long expire_ms) {
if (expire_ms <= 0)
expire_at = time_point_type::max();
else
expire_at = now() + std::chrono::milliseconds(expire_ms);
this->value = value;
}
AsyncCache::handle_ptr_t AsyncCache::try_get() {
std::lock_guard<std::mutex> lg(mtx);
if (ptr && ptr->expired())
ptr.reset();
return ptr;
}
std::string current() {
static auto start = std::chrono::steady_clock::now();
auto now = std::chrono::steady_clock::now();
std::chrono::duration<double> d = now - start;
return "[" + std::to_string(d.count()) + "s] ";
}