Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 144 additions & 24 deletions core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,10 +41,13 @@
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand All @@ -53,12 +57,22 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {

// Default maximum size of 100,000 transactions before deferral is stopped and
// all existing transactions are processed immediately again
protected static final int DEFAULT_MAX_DEFERRED = 100_000;
public static final int DEFAULT_MAX_DEFERRED = 100_000;

public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new FateIdGenerator() {
@Override
public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
HashCode hashCode = Hashing.murmur3_128().hashBytes(fateKey.getSerialized());
long tid = hashCode.asLong() & 0x7fffffffffffffffL;
return FateId.from(instanceType, tid);
}
};

protected final Set<FateId> reserved;
protected final Map<FateId,Long> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
private final FateIdGenerator fateIdGenerator;

// This is incremented each time a transaction was unreserved that was non new
protected final SignalCount unreservedNonNewCount = new SignalCount();
Expand All @@ -67,11 +81,12 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
protected final SignalCount unreservedRunnableCount = new SignalCount();

public AbstractFateStore() {
this(DEFAULT_MAX_DEFERRED);
this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}

public AbstractFateStore(int maxDeferred) {
public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
this.reserved = new HashSet<>();
this.deferred = new HashMap<>();
}
Expand Down Expand Up @@ -239,12 +254,101 @@ public int getDeferredCount() {
}
}

private Optional<FateId> create(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);

try {
create(fateId, fateKey);
} catch (IllegalStateException e) {
Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
TStatus status = statusAndKey.getFirst();
Optional<FateKey> tFateKey = statusAndKey.getSecond();

// Case 1: Status is NEW so this is unseeded, we can return and allow the calling code
// to reserve/seed as long as the existing key is the same and not different as that would
// mean a collision
if (status == TStatus.NEW) {
Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from tid %s",
fateId.getTid());
Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
"Collision detected for tid %s", fateId.getTid());
// Case 2: Status is some other state which means already in progress
// so we can just log and return empty optional
} else {
log.trace("Existing transaction {} already exists for key {} with status {}", fateId,
fateKey, status);
return Optional.empty();
}
}

return Optional.of(fateId);
}

@Override
public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
final Optional<FateTxStore<T>> txStore;

// First make sure we can reserve in memory the fateId, if not
// we can return an empty Optional as it is reserved and in progress
// This reverses the usual order of creation and then reservation but
// this prevents a race condition by ensuring we can reserve first.
// This will create the FateTxStore before creation but this object
// is not exposed until after creation is finished so there should not
// be any errors.
final Optional<FateTxStore<T>> reservedTxStore;
synchronized (this) {
reservedTxStore = tryReserve(fateId);
}

// If present we were able to reserve so try and create
if (reservedTxStore.isPresent()) {
try {
if (create(fateKey).isPresent()) {
txStore = reservedTxStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could modify the code to keep the return value of create and validate it.

Suggested change
txStore = reservedTxStore;
Precondition.checkState(fateId.equals(fateIdFromCreate));
txStore = reservedTxStore;

} else {
// We already exist in a non-new state then un-reserve and an empty
// Optional will be returned. This is expected to happen when the
// system is busy and operations are not running, and we keep seeding them
synchronized (this) {
reserved.remove(fateId);
}
txStore = Optional.empty();
}
} catch (Exception e) {
// Clean up the reservation if the creation failed
// And then throw error
synchronized (this) {
reserved.remove(fateId);
}
if (e instanceof IllegalStateException) {
throw e;
} else {
throw new IllegalStateException(e);
}
}
} else {
// Could not reserve so return empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created the following after seeing another log stmt in this PR. Thought it would be good to log something here like it does elsewhere. These trace stmts will be nice for tracking down bugs

Suggested change
// Could not reserve so return empty
// Could not reserve so return empty
log.trace("Another thread currently has transaction {} key {} reserved", fateId,
fateKey);

txStore = Optional.empty();
}

return txStore;
}

protected abstract void create(FateId fateId, FateKey fateKey);

protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId);

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);

protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved);

protected abstract FateInstanceType getInstanceType();

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final FateId fateId;
protected final boolean isReserved;
Expand Down Expand Up @@ -337,34 +441,50 @@ public TStatus getStatus() {
return status;
}

@Override
public Optional<FateKey> getKey() {
verifyReserved(false);
return AbstractFateStore.this.getKey(fateId);
}

@Override
public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
verifyReserved(false);
return AbstractFateStore.this.getStatusAndKey(fateId);
}

@Override
public FateId getID() {
return fateId;
}
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
public interface FateIdGenerator {
FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
}

protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
}
}
162 changes: 162 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.hadoop.io.DataInputBuffer;

public class FateKey {

private final FateKeyType type;
private final Optional<KeyExtent> keyExtent;
private final Optional<ExternalCompactionId> compactionId;
private final byte[] serialized;

private FateKey(FateKeyType type, KeyExtent keyExtent) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.of(keyExtent);
this.compactionId = Optional.empty();
this.serialized = serialize(type, keyExtent);
}

private FateKey(FateKeyType type, ExternalCompactionId compactionId) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.of(compactionId);
this.serialized = serialize(type, compactionId);
}

private FateKey(byte[] serialized) {
try (DataInputBuffer buffer = new DataInputBuffer()) {
buffer.reset(serialized, serialized.length);
this.type = FateKeyType.valueOf(buffer.readUTF());
this.keyExtent = deserializeKeyExtent(type, buffer);
this.compactionId = deserializeCompactionId(type, buffer);
this.serialized = serialized;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public FateKeyType getType() {
return type;
}

public Optional<KeyExtent> getKeyExtent() {
return keyExtent;
}

public Optional<ExternalCompactionId> getCompactionId() {
return compactionId;
}

public byte[] getSerialized() {
return serialized;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FateKey fateKey = (FateKey) o;
return Arrays.equals(serialized, fateKey.serialized);
}

@Override
public int hashCode() {
return Arrays.hashCode(serialized);
}

public static FateKey deserialize(byte[] serialized) {
return new FateKey(serialized);
}

public static FateKey forSplit(KeyExtent extent) {
return new FateKey(FateKeyType.SPLIT, extent);
}

public static FateKey forCompactionCommit(ExternalCompactionId compactionId) {
return new FateKey(FateKeyType.COMPACTION_COMMIT, compactionId);
}

public enum FateKeyType {
SPLIT, COMPACTION_COMMIT
}

private static byte[] serialize(FateKeyType type, KeyExtent ke) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(type.toString());
ke.writeTo(dos);
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static byte[] serialize(FateKeyType type, ExternalCompactionId compactionId) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(type.toString());
dos.writeUTF(compactionId.canonical());
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Optional<KeyExtent> deserializeKeyExtent(FateKeyType type, DataInputBuffer buffer)
throws IOException {
switch (type) {
case SPLIT:
return Optional.of(KeyExtent.readFrom(buffer));
case COMPACTION_COMMIT:
return Optional.empty();
default:
throw new IllegalStateException("Unexpected FateInstanceType found " + type);
}
}

private static Optional<ExternalCompactionId> deserializeCompactionId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
switch (type) {
case SPLIT:
return Optional.empty();
case COMPACTION_COMMIT:
return Optional.of(ExternalCompactionId.of(buffer.readUTF()));
default:
throw new IllegalStateException("Unexpected FateInstanceType found " + type);
}
}
}
Loading