From 8979e9e0dc7f1c7e6bae2636c508ca529593e17a Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 28 Nov 2018 18:41:34 -0800 Subject: [PATCH] [STATS] Add @StatsDoc annotation for bookie server request stats Descriptions of the changes in this PR: *Motivation* As part of [BP-36](https://github.com/apache/bookkeeper/issues/1785), this PR is to document the request stats for bookkeeper server. *Changes* - add `parent` and `happensAfter` in StatsDoc - convert bookie request stats to use StatsDoc for documenting metrics Master Issue: #1785 Reviewers: Jia Zhai This closes #1839 from sijie/bp36_add_parent_and_happensafter --- .../bookkeeper/bookie/stats/BookieStats.java | 14 +- .../proto/BookieRequestProcessor.java | 185 ++-------- .../proto/ForceLedgerProcessorV3.java | 11 +- .../proto/GetBookieInfoProcessorV3.java | 10 +- .../proto/LongPollReadEntryProcessorV3.java | 6 +- .../bookkeeper/proto/PacketProcessorBase.java | 2 +- .../proto/PacketProcessorBaseV3.java | 6 +- .../bookkeeper/proto/ReadEntryProcessor.java | 13 +- .../proto/ReadEntryProcessorV3.java | 20 +- .../bookkeeper/proto/ReadLacProcessorV3.java | 10 +- .../apache/bookkeeper/proto/RequestStats.java | 342 ++++++++++++++++++ .../bookkeeper/proto/WriteEntryProcessor.java | 18 +- .../proto/WriteEntryProcessorV3.java | 12 +- .../bookkeeper/proto/WriteLacProcessorV3.java | 19 +- .../proto/ForceLedgerProcessorV3Test.java | 5 +- .../proto/WriteEntryProcessorTest.java | 5 +- .../proto/WriteEntryProcessorV3Test.java | 7 +- .../stats/annotations/StatsDoc.java | 20 + 18 files changed, 470 insertions(+), 235 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java index 72921d72971..5e033e9e9f3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.bookie.stats; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER; @@ -28,6 +29,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES; import lombok.Getter; @@ -56,11 +58,19 @@ public class BookieStats { @StatsDoc(name = BOOKIE_FORCE_LEDGER, help = "total force operations occurred on a bookie") private final Counter forceLedgerOps; // Bookie Operation Latency Stats - @StatsDoc(name = BOOKIE_ADD_ENTRY, help = "operations stats of AddEntry on a bookie") + @StatsDoc( + name = BOOKIE_ADD_ENTRY, + help = "operations stats of AddEntry on a bookie", + parent = ADD_ENTRY + ) private final OpStatsLogger addEntryStats; @StatsDoc(name = BOOKIE_RECOVERY_ADD_ENTRY, help = "operation stats of RecoveryAddEntry on a bookie") private final OpStatsLogger recoveryAddEntryStats; - @StatsDoc(name = BOOKIE_READ_ENTRY, help = "operation stats of ReadEntry on a bookie") + @StatsDoc( + name = BOOKIE_READ_ENTRY, + help = "operation stats of ReadEntry on a bookie", + parent = READ_ENTRY + ) private final OpStatsLogger readEntryStats; // Bookie Operation Bytes Stats @StatsDoc(name = BOOKIE_ADD_ENTRY_BYTES, help = "bytes stats of AddEntry on a bookie") diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 78b35ec6cd2..b883f74a21f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -21,34 +21,6 @@ package org.apache.bookkeeper.proto; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST; import static org.apache.bookkeeper.proto.RequestUtils.hasFlag; import com.google.common.annotations.VisibleForTesting; @@ -68,7 +40,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import lombok.AccessLevel; @@ -80,9 +51,6 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.tls.SecurityException; import org.apache.bookkeeper.tls.SecurityHandlerFactory; @@ -146,37 +114,8 @@ public class BookieRequestProcessor implements RequestProcessor { // Expose Stats private final BKStats bkStats = BKStats.getInstance(); private final boolean statsEnabled; - private final OpStatsLogger addRequestStats; - private final OpStatsLogger addEntryStats; - final OpStatsLogger readRequestStats; - final OpStatsLogger readEntryStats; - final OpStatsLogger forceLedgerStats; - final OpStatsLogger forceLedgerRequestStats; - final OpStatsLogger fenceReadRequestStats; - final OpStatsLogger fenceReadEntryStats; - final OpStatsLogger fenceReadWaitStats; - final OpStatsLogger readEntrySchedulingDelayStats; - final OpStatsLogger longPollPreWaitStats; - final OpStatsLogger longPollWaitStats; - final OpStatsLogger longPollReadStats; - final OpStatsLogger longPollReadRequestStats; - final Counter readLastEntryNoEntryErrorCounter; - final OpStatsLogger writeLacRequestStats; - final OpStatsLogger writeLacStats; - final OpStatsLogger readLacRequestStats; - final OpStatsLogger readLacStats; - final OpStatsLogger getBookieInfoRequestStats; - final OpStatsLogger getBookieInfoStats; - final OpStatsLogger channelWriteStats; - final OpStatsLogger addEntryBlockedStats; - final OpStatsLogger readEntryBlockedStats; - - final AtomicInteger addsInProgress = new AtomicInteger(0); - final AtomicInteger maxAddsInProgress = new AtomicInteger(0); - final AtomicInteger addsBlocked = new AtomicInteger(0); - final AtomicInteger readsInProgress = new AtomicInteger(0); - final AtomicInteger readsBlocked = new AtomicInteger(0); - final AtomicInteger maxReadsInProgress = new AtomicInteger(0); + + private final RequestStats requestStats; final Semaphore addsSemaphore; final Semaphore readsSemaphore; @@ -248,86 +187,13 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, // Expose Stats this.statsEnabled = serverCfg.isStatisticsEnabled(); - this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY); - this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST); - this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY); - this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER); - this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST); - this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST); - this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ); - this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST); - this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT); - this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY); - this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT); - this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT); - this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ); - this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST); - this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR); - this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC); - this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST); - this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC); - this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST); - this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO); - this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST); - this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE); - - this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT); - this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT); + this.requestStats = new RequestStats(statsLogger); int maxAdds = serverCfg.getMaxAddsInProgressLimit(); addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null; int maxReads = serverCfg.getMaxReadsInProgressLimit(); readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null; - - statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return addsInProgress; - } - }); - - statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return addsBlocked; - } - }); - - statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return readsInProgress; - } - }); - - statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return readsBlocked; - } - }); - } protected void onAddRequestStart(Channel channel) { @@ -336,21 +202,19 @@ protected void onAddRequestStart(Channel channel) { final long throttlingStartTimeNanos = MathUtils.nowInNano(); channel.config().setAutoRead(false); LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel); - addsBlocked.incrementAndGet(); + requestStats.blockAddRequest(); addsSemaphore.acquireUninterruptibly(); channel.config().setAutoRead(true); final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos); LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos); - addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS); - addsBlocked.decrementAndGet(); + requestStats.unblockAddRequest(delayNanos); } } - final int curr = addsInProgress.incrementAndGet(); - maxAddsInProgress.accumulateAndGet(curr, Integer::max); + requestStats.trackAddRequest(); } protected void onAddRequestFinish() { - addsInProgress.decrementAndGet(); + requestStats.untrackAddRequest(); if (addsSemaphore != null) { addsSemaphore.release(); } @@ -362,21 +226,19 @@ protected void onReadRequestStart(Channel channel) { final long throttlingStartTimeNanos = MathUtils.nowInNano(); channel.config().setAutoRead(false); LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel); - readsBlocked.incrementAndGet(); + requestStats.blockReadRequest(); readsSemaphore.acquireUninterruptibly(); channel.config().setAutoRead(true); final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos); LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos); - readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS); - readsBlocked.decrementAndGet(); + requestStats.unblockReadRequest(delayNanos); } } - final int curr = readsInProgress.incrementAndGet(); - maxReadsInProgress.accumulateAndGet(curr, Integer::max); + requestStats.trackReadRequest(); } protected void onReadRequestFinish() { - readsInProgress.decrementAndGet(); + requestStats.untrackReadRequest(); if (readsSemaphore != null) { readsSemaphore.release(); } @@ -384,12 +246,12 @@ protected void onReadRequestFinish() { @VisibleForTesting int maxAddsInProgressCount() { - return maxAddsInProgress.get(); + return requestStats.maxAddsInProgressCount(); } @VisibleForTesting int maxReadsInProgressCount() { - return maxReadsInProgress.get(); + return requestStats.maxReadsInProgressCount(); } @Override @@ -576,7 +438,7 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann .setStatus(addResponse.getStatus()) .setAddResponse(addResponse); BookkeeperProtocol.Response resp = response.build(); - write.sendResponse(addResponse.getStatus(), resp, addRequestStats); + write.sendResponse(addResponse.getStatus(), resp, requestStats.getAddRequestStats()); } } } @@ -610,7 +472,10 @@ private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, fin .setStatus(forceLedgerResponse.getStatus()) .setForceLedgerResponse(forceLedgerResponse); BookkeeperProtocol.Response resp = response.build(); - forceLedger.sendResponse(forceLedgerResponse.getStatus(), resp, forceLedgerRequestStats); + forceLedger.sendResponse( + forceLedgerResponse.getStatus(), + resp, + requestStats.getForceLedgerRequestStats()); } } } @@ -660,7 +525,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan .setStatus(readResponse.getStatus()) .setReadResponse(readResponse); BookkeeperProtocol.Response resp = response.build(); - read.sendResponse(readResponse.getStatus(), resp, readRequestStats); + read.sendResponse(readResponse.getStatus(), resp, requestStats.getReadRequestStats()); } } } @@ -740,8 +605,10 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch r.entryId); } - write.sendResponse(BookieProtocol.ETOOMANYREQUESTS, - ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), addRequestStats); + write.sendResponse( + BookieProtocol.ETOOMANYREQUESTS, + ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), + requestStats.getAddRequestStats()); } } } @@ -770,8 +637,10 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe r.entryId); } - read.sendResponse(BookieProtocol.ETOOMANYREQUESTS, - ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), readRequestStats); + read.sendResponse( + BookieProtocol.ETOOMANYREQUESTS, + ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), + requestStats.getReadRequestStats()); } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java index 0c8ef01fa87..f8891722a9e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java @@ -67,11 +67,11 @@ private ForceLedgerResponse getForceLedgerResponse() { "ledgerId must be {} but was {}", ledgerId, ledgerId1); if (BookieProtocol.EOK == rc) { - requestProcessor.getForceLedgerStats() + requestProcessor.getRequestStats().getForceLedgerStats() .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } else { - requestProcessor.getForceLedgerStats() + requestProcessor.getRequestStats().getForceLedgerStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } @@ -94,7 +94,7 @@ private ForceLedgerResponse getForceLedgerResponse() { .setStatus(forceLedgerResponse.getStatus()) .setForceLedgerResponse(forceLedgerResponse); Response resp = response.build(); - sendResponse(status, resp, requestProcessor.getForceLedgerRequestStats()); + sendResponse(status, resp, requestProcessor.getRequestStats().getForceLedgerRequestStats()); }; StatusCode status = null; try { @@ -124,7 +124,10 @@ public void safeRun() { .setStatus(forceLedgerResponse.getStatus()) .setForceLedgerResponse(forceLedgerResponse); Response resp = response.build(); - sendResponse(forceLedgerResponse.getStatus(), resp, requestProcessor.getForceLedgerRequestStats()); + sendResponse( + forceLedgerResponse.getStatus(), + resp, + requestProcessor.getRequestStats().getForceLedgerRequestStats()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java index d964957488b..fe315f62cae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java @@ -54,8 +54,8 @@ private GetBookieInfoResponse getGetBookieInfoResponse() { if (!isVersionCompatible()) { getBookieInfoResponse.setStatus(StatusCode.EBADVERSION); - requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getGetBookieInfoStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); return getBookieInfoResponse.build(); } @@ -80,8 +80,8 @@ private GetBookieInfoResponse getGetBookieInfoResponse() { } getBookieInfoResponse.setStatus(status); - requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getGetBookieInfoStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); return getBookieInfoResponse.build(); } @@ -98,6 +98,6 @@ private void sendResponse(GetBookieInfoResponse getBookieInfoResponse) { .setGetBookieInfoResponse(getBookieInfoResponse); sendResponse(response.getStatus(), response.build(), - requestProcessor.getGetBookieInfoRequestStats()); + requestProcessor.getRequestStats().getGetBookieInfoRequestStats()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java index fdbbd353043..6f25d684b6c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java @@ -101,7 +101,7 @@ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder, try { return super.readEntry(readResponseBuilder, entryId, true, startTimeSw); } catch (Bookie.NoEntryException e) { - requestProcessor.readLastEntryNoEntryErrorCounter.inc(); + requestProcessor.getRequestStats().getReadLastEntryNoEntryErrorCounter().inc(); logger.info( "No entry found while piggyback reading entry {} from ledger {} : previous lac = {}", entryId, ledgerId, previousLAC); @@ -153,7 +153,7 @@ private ReadResponse getLongPollReadResponse() { return buildErrorResponse(StatusCode.EIO, startTimeSw); } - registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw); + registerSuccessfulEvent(requestProcessor.getRequestStats().getLongPollPreWaitStats(), startTimeSw); lastPhaseStartTime.reset().start(); if (watched) { @@ -213,7 +213,7 @@ private synchronized void scheduleDeferredRead(boolean timeout) { expirationTimerTask.cancel(); } - registerEvent(timeout, requestProcessor.longPollWaitStats, lastPhaseStartTime); + registerEvent(timeout, requestProcessor.getRequestStats().getLongPollWaitStats(), lastPhaseStartTime); lastPhaseStartTime.reset().start(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index b7dee2d4a8d..54368a0c07b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -79,7 +79,7 @@ public void safeRun() { if (!isVersionCompatible()) { sendResponse(BookieProtocol.EBADVERSION, ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request), - requestProcessor.readRequestStats); + requestProcessor.getRequestStats().getReadRequestStats()); return; } processPacket(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java index 7dc29a38bf1..15765a252b2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java @@ -79,7 +79,7 @@ protected void sendResponse(StatusCode code, Object response, OpStatsLogger stat if (!channel.isWritable()) { LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel, StringUtils.requestToString(request)); - requestProcessor.getChannelWriteStats() + requestProcessor.getRequestStats().getChannelWriteStats() .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); return; @@ -93,10 +93,10 @@ protected void sendResponse(StatusCode code, Object response, OpStatsLogger stat public void operationComplete(ChannelFuture future) throws Exception { long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos); if (!future.isSuccess()) { - requestProcessor.getChannelWriteStats() + requestProcessor.getRequestStats().getChannelWriteStats() .registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS); } else { - requestProcessor.getChannelWriteStats() + requestProcessor.getRequestStats().getChannelWriteStats() .registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS); } if (StatusCode.EOK == code) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index edeb8a674f7..6566c7b4643 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import java.io.IOException; @@ -129,17 +128,17 @@ protected void processPacket() { LOG.trace("Read entry rc = {} for {}", errorCode, request); } if (errorCode == BookieProtocol.EOK) { - requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getReadEntryStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, request), - requestProcessor.readRequestStats); + requestProcessor.getRequestStats().getReadRequestStats()); } else { ReferenceCountUtil.release(data); - requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getReadEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, request), - requestProcessor.readRequestStats); + requestProcessor.getRequestStats().getReadRequestStats()); } recycle(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index e7e56533317..88b76627071 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -71,14 +71,14 @@ public ReadEntryProcessorV3(Request request, this.ledgerId = readRequest.getLedgerId(); this.entryId = readRequest.getEntryId(); if (RequestUtils.isFenceRequest(this.readRequest)) { - this.readStats = requestProcessor.fenceReadEntryStats; - this.reqStats = requestProcessor.fenceReadRequestStats; + this.readStats = requestProcessor.getRequestStats().getFenceReadEntryStats(); + this.reqStats = requestProcessor.getRequestStats().getFenceReadRequestStats(); } else if (readRequest.hasPreviousLAC()) { - this.readStats = requestProcessor.longPollReadStats; - this.reqStats = requestProcessor.longPollReadRequestStats; + this.readStats = requestProcessor.getRequestStats().getLongPollReadStats(); + this.reqStats = requestProcessor.getRequestStats().getLongPollReadRequestStats(); } else { - this.readStats = requestProcessor.readEntryStats; - this.reqStats = requestProcessor.readRequestStats; + this.readStats = requestProcessor.getRequestStats().getReadEntryStats(); + this.reqStats = requestProcessor.getRequestStats().getReadRequestStats(); } this.fenceThreadPool = fenceThreadPool; @@ -246,7 +246,7 @@ protected ReadResponse getReadResponse() { @Override public void safeRun() { - requestProcessor.readEntrySchedulingDelayStats.registerSuccessfulEvent( + requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent( MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); if (!isVersionCompatible()) { @@ -275,11 +275,11 @@ private void getFenceResponse(ReadResponse.Builder readResponse, StatusCode status; if (!fenceResult) { status = StatusCode.EIO; - registerFailedEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime); + registerFailedEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime); } else { status = StatusCode.EOK; readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer())); - registerSuccessfulEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime); + registerSuccessfulEvent(requestProcessor.getRequestStats().getFenceReadWaitStats(), lastPhaseStartTime); } if (null != entryBody) { @@ -296,7 +296,7 @@ private void sendFenceResponse(ReadResponse.Builder readResponse, // build the fence read response getFenceResponse(readResponse, entryBody, fenceResult); // register fence read stat - registerEvent(!fenceResult, requestProcessor.fenceReadEntryStats, startTimeSw); + registerEvent(!fenceResult, requestProcessor.getRequestStats().getFenceReadEntryStats(), startTimeSw); // send the fence read response sendResponse(readResponse.build()); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java index 898ddb0413b..a3bc31118c1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java @@ -102,11 +102,11 @@ private ReadLacResponse getReadLacResponse() { } if (status == StatusCode.EOK) { - requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getReadLacStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } else { - requestProcessor.readLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getReadLacStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } // Finally set the status and return readLacResponse.setStatus(status); @@ -126,6 +126,6 @@ private void sendResponse(ReadLacResponse readLacResponse) { .setReadLacResponse(readLacResponse); sendResponse(response.getStatus(), response.build(), - requestProcessor.readLacRequestStats); + requestProcessor.getRequestStats().getReadLacRequestStats()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java new file mode 100644 index 00000000000..1799e660d77 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.proto; + +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.annotations.StatsDoc; + +/** + * A umbrella class for request related stats. + */ +@StatsDoc( + name = SERVER_SCOPE, + category = CATEGORY_SERVER, + help = "Bookie request stats" +) +@Getter +public class RequestStats { + + final AtomicInteger addsInProgress = new AtomicInteger(0); + final AtomicInteger maxAddsInProgress = new AtomicInteger(0); + final AtomicInteger addsBlocked = new AtomicInteger(0); + final AtomicInteger readsInProgress = new AtomicInteger(0); + final AtomicInteger readsBlocked = new AtomicInteger(0); + final AtomicInteger maxReadsInProgress = new AtomicInteger(0); + + @StatsDoc( + name = ADD_ENTRY_REQUEST, + help = "request stats of AddEntry on a bookie" + ) + private final OpStatsLogger addRequestStats; + @StatsDoc( + name = ADD_ENTRY, + help = "operation stats of AddEntry on a bookie", + parent = ADD_ENTRY_REQUEST + ) + private final OpStatsLogger addEntryStats; + @StatsDoc( + name = READ_ENTRY_REQUEST, + help = "request stats of ReadEntry on a bookie" + ) + final OpStatsLogger readRequestStats; + @StatsDoc( + name = READ_ENTRY, + help = "operation stats of ReadEntry on a bookie", + parent = READ_ENTRY_REQUEST + ) + final OpStatsLogger readEntryStats; + @StatsDoc( + name = FORCE_LEDGER, + help = "operation stats of ForceLedger on a bookie", + parent = FORCE_LEDGER_REQUEST + ) + final OpStatsLogger forceLedgerStats; + @StatsDoc( + name = FORCE_LEDGER_REQUEST, + help = "request stats of ForceLedger on a bookie" + ) + final OpStatsLogger forceLedgerRequestStats; + @StatsDoc( + name = READ_ENTRY_FENCE_REQUEST, + help = "request stats of FenceRead on a bookie" + ) + final OpStatsLogger fenceReadRequestStats; + @StatsDoc( + name = READ_ENTRY_FENCE_READ, + help = "operation stats of FenceRead on a bookie", + parent = READ_ENTRY_FENCE_REQUEST, + happensAfter = READ_ENTRY_FENCE_WAIT + ) + final OpStatsLogger fenceReadEntryStats; + @StatsDoc( + name = READ_ENTRY_FENCE_WAIT, + help = "operation stats of FenceReadWait on a bookie", + parent = READ_ENTRY_FENCE_REQUEST + ) + final OpStatsLogger fenceReadWaitStats; + @StatsDoc( + name = READ_ENTRY_SCHEDULING_DELAY, + help = "operation stats of ReadEntry scheduling delays on a bookie" + ) + final OpStatsLogger readEntrySchedulingDelayStats; + @StatsDoc( + name = READ_ENTRY_LONG_POLL_PRE_WAIT, + help = "operation stats of LongPoll Reads pre wait time on a bookie", + parent = READ_ENTRY_LONG_POLL_REQUEST + ) + final OpStatsLogger longPollPreWaitStats; + @StatsDoc( + name = READ_ENTRY_LONG_POLL_WAIT, + help = "operation stats of LongPoll Reads wait time on a bookie", + happensAfter = READ_ENTRY_LONG_POLL_PRE_WAIT, + parent = READ_ENTRY_LONG_POLL_REQUEST + ) + final OpStatsLogger longPollWaitStats; + @StatsDoc( + name = READ_ENTRY_LONG_POLL_READ, + help = "operation stats of LongPoll Reads on a bookie", + happensAfter = READ_ENTRY_LONG_POLL_WAIT, + parent = READ_ENTRY_LONG_POLL_REQUEST + ) + final OpStatsLogger longPollReadStats; + @StatsDoc( + name = READ_ENTRY_LONG_POLL_REQUEST, + help = "request stats of LongPoll Reads on a bookie" + ) + final OpStatsLogger longPollReadRequestStats; + @StatsDoc( + name = READ_LAST_ENTRY_NOENTRY_ERROR, + help = "total NOENTRY errors of reading last entry on a bookie" + ) + final Counter readLastEntryNoEntryErrorCounter; + @StatsDoc( + name = WRITE_LAC_REQUEST, + help = "request stats of WriteLac on a bookie" + ) + final OpStatsLogger writeLacRequestStats; + @StatsDoc( + name = WRITE_LAC, + help = "operation stats of WriteLac on a bookie", + parent = WRITE_LAC_REQUEST + ) + final OpStatsLogger writeLacStats; + @StatsDoc( + name = READ_LAC_REQUEST, + help = "request stats of ReadLac on a bookie" + ) + final OpStatsLogger readLacRequestStats; + @StatsDoc( + name = READ_LAC, + help = "operation stats of ReadLac on a bookie", + parent = READ_LAC_REQUEST + ) + final OpStatsLogger readLacStats; + @StatsDoc( + name = GET_BOOKIE_INFO_REQUEST, + help = "request stats of GetBookieInfo on a bookie" + ) + final OpStatsLogger getBookieInfoRequestStats; + @StatsDoc( + name = GET_BOOKIE_INFO, + help = "operation stats of GetBookieInfo on a bookie" + ) + final OpStatsLogger getBookieInfoStats; + @StatsDoc( + name = CHANNEL_WRITE, + help = "channel write stats on a bookie" + ) + final OpStatsLogger channelWriteStats; + @StatsDoc( + name = ADD_ENTRY_BLOCKED, + help = "operation stats of AddEntry blocked on a bookie" + ) + final OpStatsLogger addEntryBlockedStats; + @StatsDoc( + name = READ_ENTRY_BLOCKED, + help = "operation stats of ReadEntry blocked on a bookie" + ) + final OpStatsLogger readEntryBlockedStats; + + public RequestStats(StatsLogger statsLogger) { + this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY); + this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST); + this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY); + this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER); + this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST); + this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST); + this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ); + this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST); + this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT); + this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY); + this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT); + this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT); + this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ); + this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST); + this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR); + this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC); + this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST); + this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC); + this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST); + this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO); + this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST); + this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE); + + this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT); + this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT); + + statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return addsInProgress; + } + }); + + statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return addsBlocked; + } + }); + + statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return readsInProgress; + } + }); + + statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return readsBlocked; + } + }); + } + + // + // Add requests + // + + void blockAddRequest() { + addsBlocked.incrementAndGet(); + } + + void unblockAddRequest(long delayNanos) { + addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS); + addsBlocked.decrementAndGet(); + } + + void trackAddRequest() { + final int curr = addsInProgress.incrementAndGet(); + maxAddsInProgress.accumulateAndGet(curr, Integer::max); + } + + void untrackAddRequest() { + addsInProgress.decrementAndGet(); + } + + int maxAddsInProgressCount() { + return maxAddsInProgress.get(); + } + + // + // Read requests + // + + void blockReadRequest() { + readsBlocked.incrementAndGet(); + } + + void unblockReadRequest(long delayNanos) { + readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS); + readsBlocked.decrementAndGet(); + } + + void trackReadRequest() { + final int curr = readsInProgress.incrementAndGet(); + maxReadsInProgress.accumulateAndGet(curr, Integer::max); + } + + void untrackReadRequest() { + readsInProgress.decrementAndGet(); + } + + int maxReadsInProgressCount() { + return maxReadsInProgress.get(); + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index f5af75ac4a6..70db7ce4919 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -63,7 +63,7 @@ protected void processPacket() { + " so rejecting the request from the client!"); sendResponse(BookieProtocol.EREADONLY, ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request), - requestProcessor.getAddRequestStats()); + requestProcessor.getRequestStats().getAddRequestStats()); request.release(); request.recycle(); return; @@ -104,11 +104,11 @@ protected void processPacket() { } if (rc != BookieProtocol.EOK) { - requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); sendResponse(rc, ResponseBuilder.buildErrorResponse(rc, request), - requestProcessor.getAddRequestStats()); + requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); } } @@ -117,15 +117,15 @@ protected void processPacket() { public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { if (BookieProtocol.EOK == rc) { - requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getAddEntryStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } else { - requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } sendResponse(rc, ResponseBuilder.buildAddResponse(request), - requestProcessor.getAddRequestStats()); + requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); recycle(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 7747e5c0e1a..c8ea0672104 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -80,11 +80,11 @@ private AddResponse getAddResponse() { public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { if (BookieProtocol.EOK == rc) { - requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getAddEntryStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } else { - requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } StatusCode status; @@ -105,7 +105,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, .setStatus(addResponse.getStatus()) .setAddResponse(addResponse); Response resp = response.build(); - sendResponse(status, resp, requestProcessor.getAddRequestStats()); + sendResponse(status, resp, requestProcessor.getRequestStats().getAddRequestStats()); } }; final EnumSet writeFlags; @@ -171,7 +171,7 @@ public void safeRun() { .setAddResponse(addResponse); Response resp = response.build(); sendResponse(addResponse.getStatus(), resp, - requestProcessor.getAddRequestStats()); + requestProcessor.getRequestStats().getAddRequestStats()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java index 7e42a7320af..691102bae12 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java @@ -71,11 +71,11 @@ private WriteLacResponse getWriteLacResponse() { @Override public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { if (BookieProtocol.EOK == rc) { - requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getWriteLacStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } else { - requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getWriteLacStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } StatusCode status; @@ -96,7 +96,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre .setStatus(writeLacResponse.getStatus()) .setWriteLacResponse(writeLacResponse); Response resp = response.build(); - sendResponse(status, resp, requestProcessor.writeLacRequestStats); + sendResponse(status, resp, requestProcessor.getRequestStats().getWriteLacRequestStats()); } }; @@ -130,8 +130,8 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre // If everything is okay, we return null so that the calling function // dosn't return a response back to the caller. if (!status.equals(StatusCode.EOK)) { - requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), - TimeUnit.NANOSECONDS); + requestProcessor.getRequestStats().getWriteLacStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); writeLacResponse.setStatus(status); return writeLacResponse.build(); } @@ -147,7 +147,10 @@ public void safeRun() { .setStatus(writeLacResponse.getStatus()) .setWriteLacResponse(writeLacResponse); Response resp = response.build(); - sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacRequestStats); + sendResponse( + writeLacResponse.getStatus(), + resp, + requestProcessor.getRequestStats().getWriteLacRequestStats()); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java index 37d4647343d..bab83fb3267 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java @@ -75,10 +75,7 @@ public void setup() { requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L); - when(requestProcessor.getForceLedgerStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger")); - when(requestProcessor.getForceLedgerRequestStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger_request")); + when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); processor = new ForceLedgerProcessorV3( request, channel, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 5901c2f5823..bbcffea08cd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -67,10 +67,7 @@ public void setup() { bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); - when(requestProcessor.getAddEntryStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry")); - when(requestProcessor.getAddRequestStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests")); + when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); processor = WriteEntryProcessor.create( request, channel, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java index df7b1532b63..292dc519ca9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java @@ -80,12 +80,7 @@ public void setup() { requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L); - when(requestProcessor.getAddEntryStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry")); - when(requestProcessor.getAddRequestStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests")); - when(requestProcessor.getChannelWriteStats()) - .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE")); + when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); processor = new WriteEntryProcessorV3( request, channel, diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java index 97f487a69a1..d2ca8c60bed 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/annotations/StatsDoc.java @@ -58,5 +58,25 @@ */ String help(); + /** + * The parent metric name. + * + *

It can used for analyzing the relationships + * between the metrics, especially for the latency metrics. + * + * @return the parent metric name + */ + String parent() default ""; + + /** + * The metric name of an operation that happens + * after the operation of this metric. + * + *

similar as {@link #parent()}, it can be used for analyzing + * the dependencies between metrics. + * + * @return the metric name of an operation that happens after the operation of this metric. + */ + String happensAfter() default ""; }