Skip to content

Commit

Permalink
code clean.
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy committed Dec 1, 2023
1 parent 0e72b90 commit 5a3791f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void initiate() {
this.requestTimeNanos = MathUtils.nowInNano();
List<BookieId> ensemble = getLedgerMetadata().getEnsembleAt(startEntryId);
if (parallelRead) {
request = new ParallelReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize);
throw new UnsupportedOperationException("Batch read unsupported the parallelRead.");
} else {
request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize);
}
Expand Down Expand Up @@ -110,9 +110,6 @@ void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest entry) t
if (lh.throttler != null) {
lh.throttler.acquire();
}

// todo, need to handle ensumble change

if (isRecoveryRead) {
int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING;
clientCtx.getBookieClient().readEntries(to, lh.ledgerId, entry.eId,
Expand Down Expand Up @@ -171,29 +168,13 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) {
return false;
}
}
}

class ParallelReadRequest extends BatchedLedgerEntryRequest {

ParallelReadRequest(List<BookieId> ensemble,
long lId,
long eId,
int maxCount,
long maxSize) {
super(ensemble, lId, eId, maxCount, maxSize);
}


@Override
void read() {

}

@Override
BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) {
return null;
public String toString() {
return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId + maxCount, maxSize);
}
}

class SequenceReadRequest extends BatchedLedgerEntryRequest {

static final int NOT_FOUND = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ private boolean failBackToSingleRead() {
if (clientCtx.getConf().batchReadFailBackToSingleRead) {
return true;
}
// TODO: 2023/12/1
// version compatibility
LedgerMetadata ledgerMetadata = getLedgerMetadata();
return ledgerMetadata.getEnsembleSize() > ledgerMetadata.getWriteQuorumSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBufList toSend, WriteLacCallback cb,
Object ctx) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId,
final CompletionKey completionKey = new TxnCompletionKey(txnId,
OperationType.WRITE_LAC);
// writeLac is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
Expand Down Expand Up @@ -730,7 +730,7 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
return;
}
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId,
final CompletionKey completionKey = new TxnCompletionKey(txnId,
OperationType.FORCE_LEDGER);
// force is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
Expand Down Expand Up @@ -792,7 +792,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
}
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
completionKey = new TxnCompletionKey(txnId, OperationType.ADD_ENTRY);

// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
Expand Down Expand Up @@ -862,7 +862,7 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.READ_LAC);
completionKey = new TxnCompletionKey(txnId, OperationType.READ_LAC);

// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
Expand All @@ -884,7 +884,7 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {

public void getListOfEntriesOfLedger(final long ledgerId, GetListOfEntriesOfLedgerCallback cb) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
final CompletionKey completionKey = new TxnCompletionKey(txnId, OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
completionObjects.put(completionKey, new GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));

// Build the request.
Expand Down Expand Up @@ -946,7 +946,7 @@ private void readEntryInternal(final long ledgerId,
completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
completionKey = new TxnCompletionKey(txnId, OperationType.READ_ENTRY);

// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
Expand Down Expand Up @@ -1040,7 +1040,7 @@ private void readEntriesInternal(final long ledgerId,
if (useV2WireProtocol) {
request = BookieProtocol.BatchedReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, startEntryId, (short) flags, masterKey, txnId, maxCount, maxSize);
completionKey = new V3CompletionKey(txnId, OperationType.BATCH_READ_ENTRY);
completionKey = new TxnCompletionKey(txnId, OperationType.BATCH_READ_ENTRY);
} else {
throw new UnsupportedOperationException("Unsupported batch read entry operation for v3 protocol.");
}
Expand All @@ -1053,7 +1053,7 @@ private void readEntriesInternal(final long ledgerId,

public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_BOOKIE_INFO);
final CompletionKey completionKey = new TxnCompletionKey(txnId, OperationType.GET_BOOKIE_INFO);
completionObjects.put(completionKey,
new GetBookieInfoCompletion(
completionKey, cb, ctx));
Expand Down Expand Up @@ -1401,7 +1401,7 @@ private void readV2Response(final BookieProtocol.Response response) {

CompletionKey key;
if (OperationType.BATCH_READ_ENTRY == operationType) {
key = new V3CompletionKey(((BookieProtocol.BatchedReadResponse) response).getRequestId(), operationType);
key = new TxnCompletionKey(((BookieProtocol.BatchedReadResponse) response).getRequestId(), operationType);
} else {
key = acquireV2Key(response.ledgerId, response.entryId, operationType);
}
Expand Down Expand Up @@ -2373,21 +2373,23 @@ private void handleResponse(long ledgerId, long entryId,

// visable for testing
CompletionKey newCompletionKey(long txnId, OperationType operationType) {
return new V3CompletionKey(txnId, operationType);
return new TxnCompletionKey(txnId, operationType);
}

class V3CompletionKey extends CompletionKey {

public V3CompletionKey(long txnId, OperationType operationType) {
super(txnId, operationType);
class TxnCompletionKey extends CompletionKey {
final long txnId;

public TxnCompletionKey(long txnId, OperationType operationType) {
super(operationType);
this.txnId = txnId;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof V3CompletionKey)) {
if (!(obj instanceof TxnCompletionKey)) {
return false;
}
V3CompletionKey that = (V3CompletionKey) obj;
TxnCompletionKey that = (TxnCompletionKey) obj;
return this.txnId == that.txnId && this.operationType == that.operationType;
}

Expand All @@ -2404,12 +2406,10 @@ public String toString() {
}

abstract class CompletionKey {
final long txnId;
OperationType operationType;

CompletionKey(long txnId,
OperationType operationType) {
this.txnId = txnId;
CompletionKey(
OperationType operationType) {
this.operationType = operationType;
}

Expand Down Expand Up @@ -2472,28 +2472,28 @@ private long getTxnId() {
return txnIdGenerator.incrementAndGet();
}

private final Recycler<V2CompletionKey> v2KeyRecycler = new Recycler<V2CompletionKey>() {
private final Recycler<EntryCompletionKey> v2KeyRecycler = new Recycler<EntryCompletionKey>() {
@Override
protected V2CompletionKey newObject(
Recycler.Handle<V2CompletionKey> handle) {
return new V2CompletionKey(handle);
protected EntryCompletionKey newObject(
Recycler.Handle<EntryCompletionKey> handle) {
return new EntryCompletionKey(handle);
}
};

V2CompletionKey acquireV2Key(long ledgerId, long entryId,
EntryCompletionKey acquireV2Key(long ledgerId, long entryId,
OperationType operationType) {
V2CompletionKey key = v2KeyRecycler.get();
EntryCompletionKey key = v2KeyRecycler.get();
key.reset(ledgerId, entryId, operationType);
return key;
}

private class V2CompletionKey extends CompletionKey {
private final Handle<V2CompletionKey> recyclerHandle;
private class EntryCompletionKey extends CompletionKey {
private final Handle<EntryCompletionKey> recyclerHandle;
long ledgerId;
long entryId;

private V2CompletionKey(Handle<V2CompletionKey> handle) {
super(-1, null);
private EntryCompletionKey(Handle<EntryCompletionKey> handle) {
super(null);
this.recyclerHandle = handle;
}

Expand All @@ -2505,10 +2505,10 @@ void reset(long ledgerId, long entryId, OperationType operationType) {

@Override
public boolean equals(Object object) {
if (!(object instanceof V2CompletionKey)) {
if (!(object instanceof EntryCompletionKey)) {
return false;
}
V2CompletionKey that = (V2CompletionKey) object;
EntryCompletionKey that = (EntryCompletionKey) object;
return this.entryId == that.entryId
&& this.ledgerId == that.ledgerId
&& this.operationType == that.operationType;
Expand Down Expand Up @@ -2696,7 +2696,7 @@ private void initiateTLS() {
LOG.info("Initializing TLS to {}", channel);
assert state == ConnectionState.CONNECTING;
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.START_TLS);
final CompletionKey completionKey = new TxnCompletionKey(txnId, OperationType.START_TLS);
completionObjects.put(completionKey,
new StartTLSCompletion(completionKey));
BookkeeperProtocol.Request.Builder h = withRequestContext(BookkeeperProtocol.Request.newBuilder());
Expand Down

0 comments on commit 5a3791f

Please sign in to comment.