diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java index 03a6e12c..85260100 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java @@ -1,26 +1,19 @@ package tech.ydb.yoj.repository.db; import com.google.common.base.Preconditions; -import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; -import io.prometheus.client.Histogram.Timer; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.With; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import tech.ydb.yoj.repository.db.cache.TransactionLog; import tech.ydb.yoj.repository.db.exception.QueryInterruptedException; import tech.ydb.yoj.repository.db.exception.RetryableException; -import tech.ydb.yoj.util.lang.Strings; import javax.annotation.Nullable; import java.time.Duration; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static java.lang.String.format; @@ -44,38 +37,7 @@ */ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class StdTxManager implements TxManager, TxManagerState { - private static final Logger log = LoggerFactory.getLogger(StdTxManager.class); - private static final int DEFAULT_MAX_ATTEMPT_COUNT = 100; - private static final double[] TX_ATTEMPTS_BUCKETS = new double[] - {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 35, 40, 45, 50, 60, 70, 80, 90, 100}; - private static final double[] DURATION_BUCKETS = { - .001, .0025, .005, .0075, - .01, .025, .05, .075, - .1, .25, .5, .75, - 1, 2.5, 5, 7.5, - 10, 25, 50, 75, - 100 - }; - private static final Histogram totalDuration = Histogram.build("tx_total_duration_seconds", "Tx total duration (seconds)") - .labelNames("tx_name") - .buckets(DURATION_BUCKETS) - .register(); - private static final Histogram attemptDuration = Histogram.build("tx_attempt_duration_seconds", "Tx attempt duration (seconds)") - .labelNames("tx_name") - .buckets(DURATION_BUCKETS) - .register(); - private static final Histogram attempts = Histogram.build("tx_attempts", "Tx attempts spent to success") - .labelNames("tx_name") - .buckets(TX_ATTEMPTS_BUCKETS) - .register(); - private static final Counter results = Counter.build("tx_result", "Tx commits/rollbacks/fails") - .labelNames("tx_name", "result") - .register(); - private static final Counter retries = Counter.build("tx_retries", "Tx retry reasons") - .labelNames("tx_name", "reason") - .register(); - private static final AtomicLong txLogIdSeq = new AtomicLong(); @Getter private final Repository repository; @@ -90,8 +52,8 @@ public final class StdTxManager implements TxManager, TxManagerState { private final SeparatePolicy separatePolicy; @With private final TxNameGenerator txNameGenerator; - - private final long txLogId = txLogIdSeq.incrementAndGet(); + @With + private final TracerFactory tracerFactory; public StdTxManager(@NonNull Repository repository) { this( @@ -100,7 +62,8 @@ public StdTxManager(@NonNull Repository repository) { /* logContext */ null, /* options */ TxOptions.create(SERIALIZABLE_READ_WRITE), /* separatePolicy */ SeparatePolicy.LOG, - /* txNameGenerator */ new TxNameGenerator.Default() + /* txNameGenerator */ new TxNameGenerator.Default(), + /* tracerFactory */ StdTxManagerTracer.Default::new ); } @@ -191,62 +154,52 @@ public void tx(Runnable runnable) { @Override public T tx(Supplier supplier) { TxName txName = txNameGenerator.generate(); - String name = txName.name(); - - checkSeparatePolicy(separatePolicy, txName.logName()); - - RetryableException lastRetryableException = null; - TxImpl lastTx = null; - try (Timer ignored = totalDuration.labels(name).startTimer()) { - for (int attempt = 1; attempt <= maxAttemptCount; attempt++) { - try { - attempts.labels(name).observe(attempt); - T result; - try ( - var ignored1 = attemptDuration.labels(name).startTimer(); - var ignored2 = MDC.putCloseable("tx", formatTx(txName)); - var ignored3 = MDC.putCloseable("tx-id", formatTxId()); - var ignored4 = MDC.putCloseable("tx-name", txName.logName()) - ) { - RepositoryTransaction transaction = repository.startTransaction(options); - lastTx = new TxImpl(name, transaction, options); - result = lastTx.run(supplier); - } - if (options.isDryRun()) { - results.labels(name, "rollback").inc(); - results.labels(name, "dry_run").inc(); - } else { - results.labels(name, "commit").inc(); - } - return result; - } catch (RetryableException e) { - retries.labels(name, getExceptionNameForMetric(e)).inc(); - lastRetryableException = e; - if (attempt + 1 <= maxAttemptCount) { - try { - MILLISECONDS.sleep(e.getRetryPolicy().calcDuration(attempt).toMillis()); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new QueryInterruptedException("DB query interrupted", ex); + StdTxManagerTracer tracer = tracerFactory.create(options, txName); + + checkSeparatePolicy(separatePolicy, tracer); + + AtomicReference lastTxContainer = new AtomicReference<>(null); + try { + return tracer.wrapTx(() -> { + RetryableException lastRetryableException = null; + for (int attempt = 1; attempt <= maxAttemptCount; attempt++) { + try { + return tracer.wrapAttempt(logContext, attempt, () -> { + RepositoryTransaction transaction = repository.startTransaction(options); + var lastTx = new TxImpl(txName.name(), transaction, options); + lastTxContainer.set(lastTx); + return lastTx.run(supplier); + }); + } catch (RetryableException e) { + tracer.onRetry(e); + lastRetryableException = e; + if (attempt + 1 <= maxAttemptCount) { + try { + MILLISECONDS.sleep(e.getRetryPolicy().calcDuration(attempt).toMillis()); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new QueryInterruptedException("DB query interrupted", ex); + } } + } catch (Exception e) { + tracer.onException(); + throw e; } - } catch (Exception e) { - results.labels(name, "rollback").inc(); - throw e; } - } - results.labels(name, "fail").inc(); + tracer.onRetryExceeded(); - throw requireNonNull(lastRetryableException).rethrow(); + throw requireNonNull(lastRetryableException).rethrow(); + }); } finally { + TxImpl lastTx = lastTxContainer.get(); if (!options.isDryRun() && lastTx != null) { lastTx.runDeferredFinally(); } } } - private static void checkSeparatePolicy(SeparatePolicy separatePolicy, String txName) { + private static void checkSeparatePolicy(SeparatePolicy separatePolicy, StdTxManagerTracer tracer) { if (!Tx.Current.exists()) { return; } @@ -254,26 +207,11 @@ private static void checkSeparatePolicy(SeparatePolicy separatePolicy, String tx switch (separatePolicy) { case ALLOW -> { } - case STRICT -> - throw new IllegalStateException(format("Transaction %s was run when another transaction is active", txName)); - case LOG -> - log.warn("Transaction '{}' was run when another transaction is active. Perhaps unexpected behavior. " + - "Use TxManager.separate() to avoid this message", txName); + case STRICT -> throw new IllegalStateException("Transaction was run when another transaction is active"); + case LOG -> tracer.onLogSeparatePolicy(); } } - private String getExceptionNameForMetric(RetryableException e) { - return Strings.removeSuffix(e.getClass().getSimpleName(), "Exception"); - } - - private String formatTx(TxName txName) { - return formatTxId() + " {" + txName.logName() + (logContext != null ? "/" + logContext : "") + "}"; - } - - private String formatTxId() { - return Strings.leftPad(Long.toUnsignedString(txLogId, 36), 6, '0') + options.getIsolationLevel().getTxIdSuffix(); - } - @Override public TxManagerState getState() { return this; @@ -343,7 +281,8 @@ private class ReadonlyBuilderImpl implements ReadonlyBuilder { @Override public ReadonlyBuilder withStatementIsolationLevel(IsolationLevel isolationLevel) { Preconditions.checkArgument(isolationLevel.isReadOnly(), - "readOnly() can only be used with a read-only tx isolation level, but got: %s", isolationLevel); + "readOnly() can only be used with a read-only tx isolation level, but got: %s", isolationLevel + ); return withOptions(options.withIsolationLevel(isolationLevel)); } @@ -363,4 +302,9 @@ private enum SeparatePolicy { LOG, STRICT } + + @FunctionalInterface + public interface TracerFactory { + StdTxManagerTracer create(TxOptions options, TxName txName); + } } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManagerTracer.java b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManagerTracer.java new file mode 100644 index 00000000..adf25fea --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManagerTracer.java @@ -0,0 +1,198 @@ +package tech.ydb.yoj.repository.db; + +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import tech.ydb.yoj.repository.db.exception.RetryableException; +import tech.ydb.yoj.util.lang.Strings; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public interface StdTxManagerTracer { + T wrapTx(Supplier supplier); + + T wrapAttempt(String logContext, int attempt, Supplier supplier); + + void onRetry(RetryableException e); + + void onException(); + + void onRetryExceeded(); + + void onLogSeparatePolicy(); + + @RequiredArgsConstructor + final class Default implements StdTxManagerTracer { + private static final Logger log = LoggerFactory.getLogger(StdTxManagerTracer.class); + + private static final AtomicLong txLogIdSeq = new AtomicLong(); + + private static final double[] TX_ATTEMPTS_BUCKETS = new double[]{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 35, 40, 45, 50, 60, 70, 80, 90, 100 + }; + private static final double[] DURATION_BUCKETS = { + .001, .0025, .005, .0075, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10, 25, 50, 75, 100 + }; + private static final Histogram totalDuration = Histogram.build() + .name("tx_total_duration_seconds") + .help("Tx total duration (seconds)") + .labelNames("tx_name") + .buckets(DURATION_BUCKETS) + .register(); + private static final Histogram attemptDuration = Histogram.build() + .name("tx_attempt_duration_seconds") + .help("Tx attempt duration (seconds)") + .labelNames("tx_name") + .buckets(DURATION_BUCKETS) + .register(); + private static final Histogram attempts = Histogram.build() + .name("tx_attempts") + .help("Tx attempts spent to success") + .labelNames("tx_name") + .buckets(TX_ATTEMPTS_BUCKETS) + .register(); + private static final Counter results = Counter.build() + .name("tx_result") + .help("Tx commits/rollbacks/fails") + .labelNames("tx_name", "result") + .register(); + private static final Counter retries = Counter.build() + .name("tx_retries") + .help("Tx retry reasons") + .labelNames("tx_name", "reason") + .register(); + + private final long txLogId = txLogIdSeq.incrementAndGet(); + + private final TxOptions options; + private final TxName txName; + + @Override + public T wrapTx(Supplier supplier) { + try (Histogram.Timer ignored = totalDuration.labels(txName.name()).startTimer()) { + return supplier.get(); + } + } + + @Override + public T wrapAttempt(String logContext, int attempt, Supplier supplier) { + attempts.labels(txName.name()).observe(attempt); + + T result; + try ( + var ignored1 = attemptDuration.labels(txName.name()).startTimer(); + var ignored2 = MDC.putCloseable("tx", formatTx(txName, logContext)); + var ignored3 = MDC.putCloseable("tx-id", formatTxId()); + var ignored4 = MDC.putCloseable("tx-name", txName.logName() + )) { + result = supplier.get(); + } + + if (options.isDryRun()) { + results.labels(txName.name(), "rollback").inc(); + results.labels(txName.name(), "dry_run").inc(); + } else { + results.labels(txName.name(), "commit").inc(); + } + + return result; + } + + @Override + public void onRetry(RetryableException e) { + retries.labels(txName.name(), getExceptionNameForMetric(e)).inc(); + } + + @Override + public void onException() { + results.labels(txName.name(), "rollback").inc(); + } + + @Override + public void onRetryExceeded() { + results.labels(txName.name(), "fail").inc(); + } + + @Override + public void onLogSeparatePolicy() { + log.warn("Transaction '{}' was run when another transaction is active. Perhaps unexpected behavior. " + + "Use TxManager.separate() to avoid this message", txName.logName() + ); + } + + private static String getExceptionNameForMetric(RetryableException e) { + return Strings.removeSuffix(e.getClass().getSimpleName(), "Exception"); + } + + private String formatTx(TxName txName, String logContext) { + return formatTxId() + " {" + txName.logName() + (logContext != null ? "/" + logContext : "") + "}"; + } + + private String formatTxId() { + String leftPad = Strings.leftPad(Long.toUnsignedString(txLogId, 36), 6, '0'); + return leftPad + options.getIsolationLevel().getTxIdSuffix(); + } + } + + @RequiredArgsConstructor + final class Composite implements StdTxManagerTracer { + private final List tracers; + + public static Composite of(StdTxManagerTracer... tracers) { + return new Composite(Arrays.asList(tracers)); + } + + @Override + public T wrapTx(Supplier supplier) { + return wrapAll(supplier, StdTxManagerTracer::wrapTx); + } + + @Override + public T wrapAttempt(String logContext, int attempt, Supplier supplier) { + return wrapAll(supplier, (tracer, supp) -> tracer.wrapAttempt(logContext, attempt, supp)); + } + + @Override + public void onRetry(RetryableException e) { + runAll(t -> t.onRetry(e)); + } + + @Override + public void onException() { + runAll(StdTxManagerTracer::onException); + } + + @Override + public void onRetryExceeded() { + runAll(StdTxManagerTracer::onRetryExceeded); + } + + @Override + public void onLogSeparatePolicy() { + runAll(StdTxManagerTracer::onLogSeparatePolicy); + } + + private void runAll(Consumer action) { + for (StdTxManagerTracer tracer : tracers) { + action.accept(tracer); + } + } + + private T wrapAll(Supplier supplier, BiFunction, T> action) { + for (int i = tracers.size() - 1; i >= 0; i--) { + Supplier finalSupplier = supplier; + StdTxManagerTracer tracer = tracers.get(i); + supplier = () -> action.apply(tracer, finalSupplier); + } + return supplier.get(); + } + } +}