Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class Constants {

public static final String ZMANAGERS = "/managers";
public static final String ZMANAGER_LOCK = ZMANAGERS + "/lock";
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ private static Set<String> createPersistentWatcherPaths() {
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
Constants.ZRESOURCEGROUPS)) {
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
pathsToWatch.add(path);
}
return pathsToWatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,21 @@ public FateTxStore<T> reserve(FateId fateId) {
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);

@Override
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
Consumer<FateIdStatus> idConsumer) {

if (partitions.isEmpty()) {
return;
}

AtomicLong seen = new AtomicLong(0);

while (keepWaiting.getAsBoolean() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();
final boolean beforeDeferredOverflow = deferredOverflow.get();

try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) {
try (Stream<FateIdStatus> inProgress = getTransactions(partitions, IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(partitions, OTHER_RUNNABLE_SET)) {
// read the in progress transaction first and then everything else in order to process those
// first
var transactions = Stream.concat(inProgress, other);
Expand Down Expand Up @@ -200,6 +205,8 @@ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsu
if (beforeCount == unreservedRunnableCount.getCount()) {
long waitTime = 5000;
synchronized (deferred) {
deferred.keySet().removeIf(
fateId -> partitions.stream().noneMatch(partition -> partition.contains(fateId)));
if (!deferred.isEmpty()) {
waitTime = deferred.values().stream()
.mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
Expand Down Expand Up @@ -240,9 +247,11 @@ public ReadOnlyFateTxStore<T> read(FateId fateId) {
}

@Override
public Map<FateId,FateReservation> getActiveReservations() {
return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
public Map<FateId,FateReservation> getActiveReservations(Set<FatePartition> partitions) {
try (var stream = getTransactions(partitions, EnumSet.allOf(TStatus.class))) {
return stream.filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
}
}

protected boolean isRunnable(TStatus status) {
Expand Down Expand Up @@ -289,6 +298,9 @@ protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {

protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses);

protected abstract Stream<FateIdStatus> getTransactions(Set<FatePartition> partitions,
EnumSet<TStatus> statuses);

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);
Expand Down Expand Up @@ -418,7 +430,8 @@ public interface FateIdGenerator {
FateId newRandomId(FateInstanceType instanceType);
}

protected void seededTx() {
@Override
public void seeded() {
unreservedRunnableCount.increment();
}

Expand Down
48 changes: 45 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
Expand All @@ -51,6 +52,8 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.gson.JsonParser;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -76,6 +79,7 @@ public class Fate<T> extends FateClient<T> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
// Visible for FlakyFate test object
protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
private Set<FatePartition> currentPartitions = Set.of();

public enum TxInfo {
FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
Expand Down Expand Up @@ -208,8 +212,10 @@ public void run() {
fe -> fe.getFateOps().equals(fateOps) && fe.getName().equals(fateExecutorName))) {
log.debug("[{}] Adding FateExecutor for {} with {} threads", store.type(), fateOps,
poolSize);
fateExecutors.add(
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName));
var fateExecutor =
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName);
fateExecutors.add(fateExecutor);
fateExecutor.setPartitions(currentPartitions);
}
}
}
Expand All @@ -233,7 +239,11 @@ private class DeadReservationCleaner implements Runnable {
@Override
public void run() {
if (keepRunning.get()) {
store.deleteDeadReservations();
Set<FatePartition> partitions;
synchronized (fateExecutors) {
partitions = currentPartitions;
}
store.deleteDeadReservations(partitions);
}
}
}
Expand Down Expand Up @@ -369,6 +379,17 @@ public AtomicInteger getNeedMoreThreadsWarnCount() {
return needMoreThreadsWarnCount;
}

public void seeded(Set<FatePartition> partitions) {
synchronized (fateExecutors) {
if (Sets.intersection(currentPartitions, partitions).isEmpty()) {
return;
}
}

log.trace("Notified of seeding for {}", partitions);
store.seeded();
}

/**
* Initiates shutdown of background threads that run fate operations and cleanup fate data and
* optionally waits on them. Leaves the fate object in a state where it can still update and read
Expand Down Expand Up @@ -432,6 +453,27 @@ public void close() {
store.close();
}

public Set<FatePartition> getPartitions() {
synchronized (fateExecutors) {
return currentPartitions;
}
}

public Set<FatePartition> setPartitions(Set<FatePartition> partitions) {
Objects.requireNonNull(partitions);
Preconditions.checkArgument(
partitions.stream().allMatch(
fp -> fp.start().getType() == store.type() && fp.end().getType() == store.type()),
"type mismatch type:%s partitions:%s", store.type(), partitions);

synchronized (fateExecutors) {
var old = currentPartitions;
currentPartitions = Set.copyOf(partitions);
fateExecutors.forEach(fe -> fe.setPartitions(currentPartitions));
return old;
}
}

private boolean anyFateExecutorIsAlive() {
synchronized (fateExecutors) {
return fateExecutors.stream().anyMatch(FateExecutor::isAlive);
Expand Down
35 changes: 32 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.time.Duration;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -46,8 +49,11 @@ public class FateClient<T> {
private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

private AtomicReference<Consumer<FateId>> seedingConsumer = new AtomicReference<>(fid -> {});

public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
;
}

// get a transaction id back to the requester before doing any work
Expand All @@ -56,15 +62,33 @@ public FateId startTransaction() {
}

public FateStore.Seeder<T> beginSeeding() {
return store.beginSeeding();
var seeder = store.beginSeeding();
return new FateStore.Seeder<T>() {
@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
var cfuture = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
return cfuture.thenApply(optional -> {
optional.ifPresent(seedingConsumer.get());
return optional;
});
}

@Override
public void close() {
seeder.close();
}
};
}

public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
CompletableFuture<Optional<FateId>> cfuture;
try (var seeder = store.beginSeeding()) {
@SuppressWarnings("unused")
var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
cfuture = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
}
var optional = cfuture.join();
optional.ifPresent(seedingConsumer.get());
}

// start work in the transaction.. it is safe to call this
Expand All @@ -73,6 +97,7 @@ public void seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> re
boolean autoCleanUp, String goalMessage) {
Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage);
store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
seedingConsumer.get().accept(fateId);
}

// check on the transaction
Expand Down Expand Up @@ -176,4 +201,8 @@ public Exception getException(FateId fateId) {
public Stream<FateKey> list(FateKey.FateKeyType type) {
return store.list(type);
}

public void setSeedingConsumer(Consumer<FateId> seedingConsumer) {
this.seedingConsumer.set(seedingConsumer);
}
}
18 changes: 16 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -43,6 +44,8 @@
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class FateExecutor<T> {
private final Set<Fate.FateOperation> fateOps;
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>();
private final FateExecutorMetrics<T> fateExecutorMetrics;
private final AtomicReference<Set<FatePartition>> partitions = new AtomicReference<>(Set.of());

public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps, int poolSize,
String name) {
Expand Down Expand Up @@ -298,6 +302,11 @@ protected ConcurrentLinkedQueue<Integer> getIdleCountHistory() {
return idleCountHistory;
}

public void setPartitions(Set<FatePartition> partitions) {
Objects.requireNonNull(partitions);
this.partitions.set(Set.copyOf(partitions));
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
Expand All @@ -308,7 +317,12 @@ private class WorkFinder implements Runnable {
public void run() {
while (fate.getKeepRunning().get() && !isShutdown()) {
try {
fate.getStore().runnable(() -> fate.getKeepRunning().get(), fateIdStatus -> {
var localPartitions = partitions.get();
// if the set of partitions changes, we should stop looking for work w/ the old set of
// partitions
BooleanSupplier keepRunning =
() -> fate.getKeepRunning().get() && localPartitions == partitions.get();
fate.getStore().runnable(localPartitions, keepRunning, fateIdStatus -> {
// The FateId with the fate operation 'fateOp' is workable by this FateExecutor if
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps')
// 2) The transaction was cancelled while NEW. This is an edge case that needs to be
Expand All @@ -319,7 +333,7 @@ public void run() {
var fateOp = fateIdStatus.getFateOperation().orElse(null);
if ((fateOp != null && fateOps.contains(fateOp))
|| txCancelledWhileNew(status, fateOp)) {
while (fate.getKeepRunning().get() && !isShutdown()) {
while (keepRunning.getAsBoolean() && !isShutdown()) {
try {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

public class FateExecutorMetrics<T> implements MetricsProducer {
public class FateExecutorMetrics<T> {
private static final Logger log = LoggerFactory.getLogger(FateExecutorMetrics.class);
private final FateInstanceType type;
private final String poolName;
Expand All @@ -49,7 +48,6 @@ protected FateExecutorMetrics(FateInstanceType type, String poolName,
this.idleWorkerCount = idleWorkerCount;
}

@Override
public void registerMetrics(MeterRegistry registry) {
// noop if already registered or cleared
if (state == State.UNREGISTERED) {
Expand Down
Loading