Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -40,10 +41,13 @@
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand Down Expand Up @@ -243,10 +247,43 @@ public int getDeferredCount() {
}
}

@Override
public long create(byte[] key) {
HashCode hashCode = Hashing.murmur3_128().hashBytes(key);
long tid = hashCode.asLong() & 0x7fffffffffffffffL;

Pair<TStatus,Optional<byte[]>> statusAndKey = getStatusAndKey(tid);
TStatus status = statusAndKey.getFirst();
Optional<byte[]> tKey = statusAndKey.getSecond();

// Case 1: Status of UNKNOWN means doesn't exist, so we can create
if (status == TStatus.UNKNOWN) {
create(tid, key);
// Case 2: Status is NEW so this is unseeded, we can return and allow the calling code
// to reserve/seed as long as the existing key is the same and not different as that would
// mean a collision
} else if (status == TStatus.NEW) {
Preconditions.checkState(tKey.isPresent(), "Tx key column is missing");
Preconditions.checkState(Arrays.equals(key, tKey.orElseThrow()),
"Collision detected for tid %s", tid);
// Case 3: Status is some other state which means already in progress
} else {
throw new IllegalStateException("Existing transaction already exists for: " + tid);
}

return tid;
}

protected abstract void create(long tid, byte[] key);

protected abstract Pair<TStatus,Optional<byte[]>> getStatusAndKey(long tid);

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(long tid);

protected abstract Optional<byte[]> getKey(long tid);

protected abstract FateTxStore<T> newFateTxStore(long tid, boolean isReserved);

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
Expand Down Expand Up @@ -343,34 +380,46 @@ public TStatus getStatus() {
return status;
}

@Override
public Optional<byte[]> getKey() {
verifyReserved(false);
return AbstractFateStore.this.getKey(tid);
}

@Override
public Pair<TStatus,Optional<byte[]>> getStatusAndKey() {
verifyReserved(false);
return AbstractFateStore.this.getStatusAndKey(tid);
}

@Override
public long getID() {
return tid;
}
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
}

protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Fate<T> {
private final Thread workFinder;

public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE, TX_KEY
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
*/
long create();

long create(byte[] key);

/**
* An interface that allows read/write access to the data related to a single fate operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.util.Pair;

/**
* Read only access to a Transaction Store.
*
Expand Down Expand Up @@ -87,6 +90,10 @@ interface ReadOnlyFateTxStore<T> {
*/
TStatus getStatus();

Optional<byte[]> getKey();

Pair<TStatus,Optional<byte[]>> getStatusAndKey();

/**
* Wait for the status of a transaction to change
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.util.Pair;

public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> {
protected final FateStore.FateTxStore<T> wrapped;

Expand Down Expand Up @@ -55,6 +59,16 @@ public FateStore.TStatus getStatus() {
return wrapped.getStatus();
}

@Override
public Optional<byte[]> getKey() {
return wrapped.getKey();
}

@Override
public Pair<TStatus,Optional<byte[]>> getStatusAndKey() {
return wrapped.getStatusAndKey();
}

@Override
public void setStatus(FateStore.TStatus status) {
wrapped.setStatus(status);
Expand Down
47 changes: 40 additions & 7 deletions core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.core.util.Pair;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
Expand Down Expand Up @@ -91,6 +94,27 @@ public long create() {
}
}

@Override
protected void create(long tid, byte[] key) {
// TODO: Should we somehow make this Atomic or clean up on failure to make sure
// that either both of these writes happen or none happen?
try {
zk.putPersistentData(getTXPath(tid), TStatus.NEW.name().getBytes(UTF_8),
NodeExistsPolicy.FAIL);
// The key was already used to generate the tid but we still need to store it
// separate to check later for collision detection
zk.putPersistentData(getTXPath(tid) + "/" + TxInfo.TX_KEY, serializeTxInfo(key),
NodeExistsPolicy.OVERWRITE);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
}

@Override
protected Pair<TStatus,Optional<byte[]>> getStatusAndKey(long tid) {
return new Pair<>(_getStatus(tid), getKey(tid));
}

private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {

private FateTxStoreImpl(long tid, boolean isReserved) {
Expand Down Expand Up @@ -227,13 +251,7 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) {
public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
verifyReserved(false);

try {
return deserializeTxInfo(txInfo, zk.getData(getTXPath(tid) + "/" + txInfo));
} catch (NoNodeException nne) {
return null;
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
return ZooStore.this.getTransactionInfo(txInfo, tid);
}

@Override
Expand Down Expand Up @@ -290,6 +308,16 @@ public List<ReadOnlyRepo<T>> getStack() {
}
}

private Serializable getTransactionInfo(TxInfo txInfo, long tid) {
try {
return deserializeTxInfo(txInfo, zk.getData(getTXPath(tid) + "/" + txInfo));
} catch (NoNodeException nne) {
return null;
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
}

@Override
protected TStatus _getStatus(long tid) {
try {
Expand All @@ -301,6 +329,11 @@ protected TStatus _getStatus(long tid) {
}
}

@Override
protected Optional<byte[]> getKey(long tid) {
return Optional.ofNullable((byte[]) getTransactionInfo(TxInfo.TX_KEY, tid));
}

@Override
protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
return new FateTxStoreImpl(tid, isReserved);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
Expand All @@ -31,9 +32,12 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.ReadOnlyRepo;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.StackOverflowException;
Expand All @@ -44,6 +48,7 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.core.util.Pair;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -80,6 +85,13 @@ public long create() {
return tid;
}

@Override
protected void create(long tid, byte[] key) {
// TODO: conditional mutation should be used to verify tid is new
newMutator(tid).putStatus(TStatus.NEW).putKey(key).putCreateTime(System.currentTimeMillis())
.mutate();
}

@Override
protected Stream<FateIdStatus> getTransactions() {
try {
Expand Down Expand Up @@ -109,6 +121,44 @@ protected TStatus _getStatus(long tid) {
});
}

@Override
protected Optional<byte[]> getKey(long tid) {
return scanTx(scanner -> {
scanner.setRange(getRow(tid));
TxInfoColumnFamily.TX_KEY_COLUMN.fetch(scanner);
return scanner.stream().map(e -> e.getValue().get()).findFirst();
});
}

@Override
protected Pair<TStatus,Optional<byte[]>> getStatusAndKey(long tid) {
return scanTx(scanner -> {
scanner.setRange(getRow(tid));
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
TxInfoColumnFamily.TX_KEY_COLUMN.fetch(scanner);

TStatus status = null;
byte[] key = null;

for (Entry<Key,Value> entry : scanner) {
final String qual = entry.getKey().getColumnQualifierData().toString();
switch (qual) {
case TxColumnFamily.STATUS:
status = TStatus.valueOf(entry.getValue().toString());
break;
case TxInfoColumnFamily.TX_KEY:
key = entry.getValue().get();
break;
default:
throw new IllegalStateException("Unexpected column qualifier: " + qual);
}
}

return new Pair<>(Optional.ofNullable(status).orElse(TStatus.UNKNOWN),
Optional.ofNullable(key));
});
}

@Override
protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
return new FateTxStoreImpl(tid, isReserved);
Expand Down Expand Up @@ -191,6 +241,9 @@ public Serializable getTransactionInfo(TxInfo txInfo) {
case TX_AGEOFF:
cq = TxInfoColumnFamily.TX_AGEOFF_COLUMN;
break;
case TX_KEY:
cq = TxInfoColumnFamily.TX_KEY_COLUMN;
break;
default:
throw new IllegalArgumentException("Unexpected TxInfo type " + txInfo);
}
Expand Down
Loading