Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,10 +41,13 @@
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand All @@ -53,12 +57,22 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {

// Default maximum size of 100,000 transactions before deferral is stopped and
// all existing transactions are processed immediately again
protected static final int DEFAULT_MAX_DEFERRED = 100_000;
public static final int DEFAULT_MAX_DEFERRED = 100_000;

public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new FateIdGenerator() {
@Override
public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
HashCode hashCode = Hashing.murmur3_128().hashBytes(fateKey.getSerialized());
long tid = hashCode.asLong() & 0x7fffffffffffffffL;
return FateId.from(instanceType, tid);
}
};

protected final Set<FateId> reserved;
protected final Map<FateId,Long> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
private final FateIdGenerator fateIdGenerator;

// This is incremented each time a transaction was unreserved that was non new
protected final SignalCount unreservedNonNewCount = new SignalCount();
Expand All @@ -67,11 +81,12 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
protected final SignalCount unreservedRunnableCount = new SignalCount();

public AbstractFateStore() {
this(DEFAULT_MAX_DEFERRED);
this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}

public AbstractFateStore(int maxDeferred) {
public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
this.reserved = new HashSet<>();
this.deferred = new HashMap<>();
}
Expand Down Expand Up @@ -239,12 +254,46 @@ public int getDeferredCount() {
}
}

@Override
public FateId create(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
TStatus status = statusAndKey.getFirst();
Optional<FateKey> tFateKey = statusAndKey.getSecond();

// Case 1: Status of UNKNOWN means doesn't exist, so we can create
if (status == TStatus.UNKNOWN) {
create(fateId, fateKey);
// Case 2: Status is NEW so this is unseeded, we can return and allow the calling code
// to reserve/seed as long as the existing key is the same and not different as that would
// mean a collision
} else if (status == TStatus.NEW) {
Preconditions.checkState(tFateKey.isPresent(), "Tx key column is missing");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a test for a collision between a random fate id and hash derived fate id. Im 99.9% sure this code works, but was wondering if creating a test for that is worthwhile for detecting regressions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keith-turner - I just added a test for collision with key generated fate id and random id

The test works by creating the transaction using the key and then the test deletes the key outside of fate (either out of accumulo table or replaces the ZK node with an empty key). After removing the key the transaction appears to the store just like a normal random transaction so the test then verifies the failure when calling create again and also ensures the transaction can still be reserved and used since it was essentially "converted" to a random transaction.

Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
"Collision detected for tid %s", fateId.getTid());
// Case 3: Status is some other state which means already in progress
} else {
throw new IllegalStateException(
"Existing transaction already exists for: " + fateId.getTid());
}

return fateId;
}

protected abstract void create(FateId fateId, FateKey fateKey);

protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId);

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);

protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved);

protected abstract FateInstanceType getInstanceType();

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final FateId fateId;
protected final boolean isReserved;
Expand Down Expand Up @@ -337,34 +386,50 @@ public TStatus getStatus() {
return status;
}

@Override
public Optional<FateKey> getKey() {
verifyReserved(false);
return AbstractFateStore.this.getKey(fateId);
}

@Override
public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
verifyReserved(false);
return AbstractFateStore.this.getStatusAndKey(fateId);
}

@Override
public FateId getID() {
return fateId;
}
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
public interface FateIdGenerator {
FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
}

protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
}
}
162 changes: 162 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.hadoop.io.DataInputBuffer;

public class FateKey {

private final FateKeyType type;
private final Optional<KeyExtent> keyExtent;
private final Optional<ExternalCompactionId> compactionId;
private final byte[] serialized;

private FateKey(FateKeyType type, KeyExtent keyExtent) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.of(keyExtent);
this.compactionId = Optional.empty();
this.serialized = serialize(type, keyExtent);
}

private FateKey(FateKeyType type, ExternalCompactionId compactionId) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.of(compactionId);
this.serialized = serialize(type, compactionId);
}

private FateKey(byte[] serialized) {
try (DataInputBuffer buffer = new DataInputBuffer()) {
buffer.reset(serialized, serialized.length);
this.type = FateKeyType.valueOf(buffer.readUTF());
this.keyExtent = deserializeKeyExtent(type, buffer);
this.compactionId = deserializeCompactionId(type, buffer);
this.serialized = serialized;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public FateKeyType getType() {
return type;
}

public Optional<KeyExtent> getKeyExtent() {
return keyExtent;
}

public Optional<ExternalCompactionId> getCompactionId() {
return compactionId;
}

public byte[] getSerialized() {
return serialized;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FateKey fateKey = (FateKey) o;
return Arrays.equals(serialized, fateKey.serialized);
}

@Override
public int hashCode() {
return Arrays.hashCode(serialized);
}

public static FateKey deserialize(byte[] serialized) {
return new FateKey(serialized);
}

public static FateKey forSplit(KeyExtent extent) {
return new FateKey(FateKeyType.SPLIT, extent);
}

public static FateKey forCompactionCommit(ExternalCompactionId compactionId) {
return new FateKey(FateKeyType.COMPACTION_COMMIT, compactionId);
}

public enum FateKeyType {
SPLIT, COMPACTION_COMMIT
}

private static byte[] serialize(FateKeyType type, KeyExtent ke) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(type.toString());
ke.writeTo(dos);
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static byte[] serialize(FateKeyType type, ExternalCompactionId compactionId) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(type.toString());
dos.writeUTF(compactionId.canonical());
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Optional<KeyExtent> deserializeKeyExtent(FateKeyType type, DataInputBuffer buffer)
throws IOException {
switch (type) {
case SPLIT:
return Optional.of(KeyExtent.readFrom(buffer));
case COMPACTION_COMMIT:
return Optional.empty();
default:
throw new IllegalStateException("Unexpected FateInstanceType found " + type);
}
}

private static Optional<ExternalCompactionId> deserializeCompactionId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
switch (type) {
case SPLIT:
return Optional.empty();
case COMPACTION_COMMIT:
return Optional.of(ExternalCompactionId.of(buffer.readUTF()));
default:
throw new IllegalStateException("Unexpected FateInstanceType found " + type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
*/
FateId create();

FateId create(FateKey fateKey);

/**
* An interface that allows read/write access to the data related to a single fate operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.util.Pair;

/**
* Read only access to a Transaction Store.
*
Expand Down Expand Up @@ -87,6 +90,10 @@ interface ReadOnlyFateTxStore<T> {
*/
TStatus getStatus();

Optional<FateKey> getKey();

Pair<TStatus,Optional<FateKey>> getStatusAndKey();

/**
* Wait for the status of a transaction to change
*
Expand Down
Loading