diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 1451da24ca2..304cfb36bda 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -301,6 +301,11 @@ class RocksDBEventListener : public rocksdb::EventListener { return recentFlushedCfs; } + void resetFlushStats() { + std::unique_lock lock(mutex); + recentFlushedCfs.clear(); + } + private: UID logId; @@ -744,9 +749,11 @@ std::string getShardMappingKey(KeyRef key, StringRef prefix) { return prefix.toString() + key.toString(); } -void logRocksDBError(const rocksdb::Status& status, const std::string& method) { - auto level = status.IsTimedOut() ? SevWarn : SevError; - TraceEvent e(level, "ShardedRocksDBError"); +void logRocksDBError(const rocksdb::Status& status, const std::string& method, Severity sev = SevError) { + if (status.IsTimedOut()) { + sev = SevWarn; + } + TraceEvent e(sev, "ShardedRocksDBError"); e.setMaxFieldLength(10000) .detail("Error", status.ToString()) .detail("Method", method) @@ -1116,7 +1123,7 @@ struct PhysicalShard { flushOptions.allow_write_stall = false; auto status = db->Flush(flushOptions, cf); if (!status.ok()) { - logRocksDBError(status, "Flush"); + logRocksDBError(status, "Flush", SevWarn); } return status; } @@ -2311,14 +2318,16 @@ ACTOR Future flushShardWorker(std::shared_ptr rState, shardManager->getAllShards(); state rocksdb::DB* db = shardManager->getDb(); TraceEvent("FlushWorkerStarted", rState->logId).detail("FlushPeriod", SERVER_KNOBS->SHARDED_ROCKSDB_FLUSH_PERIOD); + state double flushPeriod = SERVER_KNOBS->SHARDED_ROCKSDB_FLUSH_PERIOD; try { loop { - wait(delay(SERVER_KNOBS->SHARDED_ROCKSDB_FLUSH_PERIOD)); + wait(delay(flushPeriod)); if (rState->closing) { break; } int count = 0; double start = now(); + bool finished = true; // If a flush happened recently, skip this iteration. if (start - eventListener->getLastWalFullFlushTime() > SERVER_KNOBS->SHARDED_ROCKSDB_FLUSH_PERIOD) { // Try to flush oldest CFs. @@ -2333,15 +2342,30 @@ ACTOR Future flushShardWorker(std::shared_ptr rState, uint64_t memtableSize = 0; db->GetIntProperty(physicalShard->cf, rocksdb::DB::Properties::kCurSizeAllMemTables, &memtableSize); if (memtableSize > 0) { - physicalShard->flush(); + auto status = physicalShard->flush(); + if (!status.ok()) { + // Flush could trigger write stalls. Skip flushing if error is seen. + finished = false; + flushPeriod = 60.0; + TraceEvent("PausedFlush") + .detail("ErrorMsg", status.ToString()) + .detail("FinishedFlushs", count) + .detail("FlushPeriod", flushPeriod); + break; + } ++count; } } + if (finished) { + eventListener->resetFlushStats(); + flushPeriod = SERVER_KNOBS->SHARDED_ROCKSDB_FLUSH_PERIOD; + } TraceEvent("ShardedRocksDBFlushWorker", rState->logId) .detail("FlushedCfs", count) .detail("RecentFlushedCfs", recentFlushedCfs.size()) .detail("LastWalFullFlushTime", eventListener->getLastWalFullFlushTime()) - .detail("FlushActorDuration", now() - start); + .detail("FlushActorDuration", now() - start) + .detail("Finished", finished); } } } catch (Error& e) {