Skip to content

Commit

Permalink
[STATS] Add @statsdoc annotation for bookie server request stats
Browse files Browse the repository at this point in the history

Descriptions of the changes in this PR:

*Motivation*

As part of [BP-36](apache#1785), this PR is to document the request stats
for bookkeeper server.

*Changes*

- add `parent` and `happensAfter` in StatsDoc
- convert bookie request stats to use StatsDoc for documenting metrics

Master Issue: apache#1785 




Reviewers: Jia Zhai <None>

This closes apache#1839 from sijie/bp36_add_parent_and_happensafter
  • Loading branch information
sijie authored Nov 29, 2018
1 parent b0708b5 commit 8979e9e
Show file tree
Hide file tree
Showing 18 changed files with 470 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.bookkeeper.bookie.stats;

import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
Expand All @@ -28,6 +29,7 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;

import lombok.Getter;
Expand Down Expand Up @@ -56,11 +58,19 @@ public class BookieStats {
@StatsDoc(name = BOOKIE_FORCE_LEDGER, help = "total force operations occurred on a bookie")
private final Counter forceLedgerOps;
// Bookie Operation Latency Stats
@StatsDoc(name = BOOKIE_ADD_ENTRY, help = "operations stats of AddEntry on a bookie")
@StatsDoc(
name = BOOKIE_ADD_ENTRY,
help = "operations stats of AddEntry on a bookie",
parent = ADD_ENTRY
)
private final OpStatsLogger addEntryStats;
@StatsDoc(name = BOOKIE_RECOVERY_ADD_ENTRY, help = "operation stats of RecoveryAddEntry on a bookie")
private final OpStatsLogger recoveryAddEntryStats;
@StatsDoc(name = BOOKIE_READ_ENTRY, help = "operation stats of ReadEntry on a bookie")
@StatsDoc(
name = BOOKIE_READ_ENTRY,
help = "operation stats of ReadEntry on a bookie",
parent = READ_ENTRY
)
private final OpStatsLogger readEntryStats;
// Bookie Operation Bytes Stats
@StatsDoc(name = BOOKIE_ADD_ENTRY_BYTES, help = "bytes stats of AddEntry on a bookie")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@
package org.apache.bookkeeper.proto;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -68,7 +40,6 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import lombok.AccessLevel;
Expand All @@ -80,9 +51,6 @@
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
Expand Down Expand Up @@ -146,37 +114,8 @@ public class BookieRequestProcessor implements RequestProcessor {
// Expose Stats
private final BKStats bkStats = BKStats.getInstance();
private final boolean statsEnabled;
private final OpStatsLogger addRequestStats;
private final OpStatsLogger addEntryStats;
final OpStatsLogger readRequestStats;
final OpStatsLogger readEntryStats;
final OpStatsLogger forceLedgerStats;
final OpStatsLogger forceLedgerRequestStats;
final OpStatsLogger fenceReadRequestStats;
final OpStatsLogger fenceReadEntryStats;
final OpStatsLogger fenceReadWaitStats;
final OpStatsLogger readEntrySchedulingDelayStats;
final OpStatsLogger longPollPreWaitStats;
final OpStatsLogger longPollWaitStats;
final OpStatsLogger longPollReadStats;
final OpStatsLogger longPollReadRequestStats;
final Counter readLastEntryNoEntryErrorCounter;
final OpStatsLogger writeLacRequestStats;
final OpStatsLogger writeLacStats;
final OpStatsLogger readLacRequestStats;
final OpStatsLogger readLacStats;
final OpStatsLogger getBookieInfoRequestStats;
final OpStatsLogger getBookieInfoStats;
final OpStatsLogger channelWriteStats;
final OpStatsLogger addEntryBlockedStats;
final OpStatsLogger readEntryBlockedStats;

final AtomicInteger addsInProgress = new AtomicInteger(0);
final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
final AtomicInteger addsBlocked = new AtomicInteger(0);
final AtomicInteger readsInProgress = new AtomicInteger(0);
final AtomicInteger readsBlocked = new AtomicInteger(0);
final AtomicInteger maxReadsInProgress = new AtomicInteger(0);

private final RequestStats requestStats;

final Semaphore addsSemaphore;
final Semaphore readsSemaphore;
Expand Down Expand Up @@ -248,86 +187,13 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,

// Expose Stats
this.statsEnabled = serverCfg.isStatisticsEnabled();
this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT);
this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY);
this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT);
this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST);
this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);

this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
this.requestStats = new RequestStats(statsLogger);

int maxAdds = serverCfg.getMaxAddsInProgressLimit();
addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null;

int maxReads = serverCfg.getMaxReadsInProgressLimit();
readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;

statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return addsInProgress;
}
});

statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return addsBlocked;
}
});

statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return readsInProgress;
}
});

statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return readsBlocked;
}
});

}

protected void onAddRequestStart(Channel channel) {
Expand All @@ -336,21 +202,19 @@ protected void onAddRequestStart(Channel channel) {
final long throttlingStartTimeNanos = MathUtils.nowInNano();
channel.config().setAutoRead(false);
LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel);
addsBlocked.incrementAndGet();
requestStats.blockAddRequest();
addsSemaphore.acquireUninterruptibly();
channel.config().setAutoRead(true);
final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos);
addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
addsBlocked.decrementAndGet();
requestStats.unblockAddRequest(delayNanos);
}
}
final int curr = addsInProgress.incrementAndGet();
maxAddsInProgress.accumulateAndGet(curr, Integer::max);
requestStats.trackAddRequest();
}

protected void onAddRequestFinish() {
addsInProgress.decrementAndGet();
requestStats.untrackAddRequest();
if (addsSemaphore != null) {
addsSemaphore.release();
}
Expand All @@ -362,34 +226,32 @@ protected void onReadRequestStart(Channel channel) {
final long throttlingStartTimeNanos = MathUtils.nowInNano();
channel.config().setAutoRead(false);
LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel);
readsBlocked.incrementAndGet();
requestStats.blockReadRequest();
readsSemaphore.acquireUninterruptibly();
channel.config().setAutoRead(true);
final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos);
readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
readsBlocked.decrementAndGet();
requestStats.unblockReadRequest(delayNanos);
}
}
final int curr = readsInProgress.incrementAndGet();
maxReadsInProgress.accumulateAndGet(curr, Integer::max);
requestStats.trackReadRequest();
}

protected void onReadRequestFinish() {
readsInProgress.decrementAndGet();
requestStats.untrackReadRequest();
if (readsSemaphore != null) {
readsSemaphore.release();
}
}

@VisibleForTesting
int maxAddsInProgressCount() {
return maxAddsInProgress.get();
return requestStats.maxAddsInProgressCount();
}

@VisibleForTesting
int maxReadsInProgressCount() {
return maxReadsInProgress.get();
return requestStats.maxReadsInProgressCount();
}

@Override
Expand Down Expand Up @@ -576,7 +438,7 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann
.setStatus(addResponse.getStatus())
.setAddResponse(addResponse);
BookkeeperProtocol.Response resp = response.build();
write.sendResponse(addResponse.getStatus(), resp, addRequestStats);
write.sendResponse(addResponse.getStatus(), resp, requestStats.getAddRequestStats());
}
}
}
Expand Down Expand Up @@ -610,7 +472,10 @@ private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, fin
.setStatus(forceLedgerResponse.getStatus())
.setForceLedgerResponse(forceLedgerResponse);
BookkeeperProtocol.Response resp = response.build();
forceLedger.sendResponse(forceLedgerResponse.getStatus(), resp, forceLedgerRequestStats);
forceLedger.sendResponse(
forceLedgerResponse.getStatus(),
resp,
requestStats.getForceLedgerRequestStats());
}
}
}
Expand Down Expand Up @@ -660,7 +525,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan
.setStatus(readResponse.getStatus())
.setReadResponse(readResponse);
BookkeeperProtocol.Response resp = response.build();
read.sendResponse(readResponse.getStatus(), resp, readRequestStats);
read.sendResponse(readResponse.getStatus(), resp, requestStats.getReadRequestStats());
}
}
}
Expand Down Expand Up @@ -740,8 +605,10 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch
r.entryId);
}

write.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), addRequestStats);
write.sendResponse(
BookieProtocol.ETOOMANYREQUESTS,
ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
requestStats.getAddRequestStats());
}
}
}
Expand Down Expand Up @@ -770,8 +637,10 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe
r.entryId);
}

read.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), readRequestStats);
read.sendResponse(
BookieProtocol.ETOOMANYREQUESTS,
ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
requestStats.getReadRequestStats());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ private ForceLedgerResponse getForceLedgerResponse() {
"ledgerId must be {} but was {}", ledgerId, ledgerId1);

if (BookieProtocol.EOK == rc) {
requestProcessor.getForceLedgerStats()
requestProcessor.getRequestStats().getForceLedgerStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
} else {
requestProcessor.getForceLedgerStats()
requestProcessor.getRequestStats().getForceLedgerStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
}
Expand All @@ -94,7 +94,7 @@ private ForceLedgerResponse getForceLedgerResponse() {
.setStatus(forceLedgerResponse.getStatus())
.setForceLedgerResponse(forceLedgerResponse);
Response resp = response.build();
sendResponse(status, resp, requestProcessor.getForceLedgerRequestStats());
sendResponse(status, resp, requestProcessor.getRequestStats().getForceLedgerRequestStats());
};
StatusCode status = null;
try {
Expand Down Expand Up @@ -124,7 +124,10 @@ public void safeRun() {
.setStatus(forceLedgerResponse.getStatus())
.setForceLedgerResponse(forceLedgerResponse);
Response resp = response.build();
sendResponse(forceLedgerResponse.getStatus(), resp, requestProcessor.getForceLedgerRequestStats());
sendResponse(
forceLedgerResponse.getStatus(),
resp,
requestProcessor.getRequestStats().getForceLedgerRequestStats());
}
}

Expand Down
Loading

0 comments on commit 8979e9e

Please sign in to comment.