Skip to content

Commit 2186f82

Browse files
Fix insertion rate limit
1 parent 072cedb commit 2186f82

File tree

3 files changed

+28
-6
lines changed

3 files changed

+28
-6
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
2.0.1
2+
3+
* Fixed a bug introduced in 2.0 where flush time was incorrectly updated
4+
* Improved thread management in context class
5+
6+
---
7+
8+
2.0.0
9+
10+
* Add thread management context class
11+
12+
---
13+
114
1.0.2
215

316
* Better CMakeLists.txt for finding Boost

include/nudb/impl/basic_store.ipp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,10 @@ insert(
344344
auto const sleep =
345345
s_->rate && rate > s_->rate;
346346
m.unlock();
347+
348+
// The caller of insert must be blocked when the rate of insertion
349+
// (measured in approximate bytes per second) exceeds the maximum rate
350+
// that can be flushed. The precise sleep duration is not important.
347351
if(sleep)
348352
std::this_thread::sleep_for(milliseconds{25});
349353
}
@@ -750,6 +754,7 @@ flush()
750754
#endif
751755
{
752756
unique_lock_type m{m_};
757+
s_->when = clock_type::now();
753758
if(! s_->p1.empty())
754759
{
755760
std::size_t work;
@@ -774,7 +779,6 @@ flush()
774779
#endif
775780
}
776781
s_->p1.periodic_activity();
777-
s_->when = clock_type::now();
778782
}
779783
if(ec_)
780784
ecb_.store(true);

include/nudb/impl/context.ipp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,10 @@ run()
7373
break;
7474
if(first)
7575
{
76-
auto const status = cv_f_.wait_until(
77-
lock, when + std::chrono::seconds{1});
76+
cv_f_.wait_until(lock, when + std::chrono::seconds{1});
7877
if(stop_)
7978
break;
80-
if(status == std::cv_status::timeout && ! waiting_.empty())
79+
if(! waiting_.empty())
8180
{
8281
// Move everything in waiting_ to flushing_
8382
for(auto store = waiting_.head_; store;
@@ -89,7 +88,13 @@ run()
8988
when = clock_type::now();
9089
if(flushing_.empty())
9190
continue;
92-
cv_f_.notify_all();
91+
92+
if (num_threads_ > 1)
93+
{
94+
// Let other threads flush while this one splices
95+
cv_f_.notify_all();
96+
continue;
97+
}
9398
}
9499
else
95100
{
@@ -103,7 +108,7 @@ run()
103108
}
104109
}
105110

106-
// process everything in flushing_
111+
// Process everything in flushing_
107112
for(;;)
108113
if(! flush_one())
109114
break;

0 commit comments

Comments
 (0)