diff --git a/impactor/build.gradle.kts b/impactor/build.gradle.kts index 49bc9cb3..ae3911b7 100644 --- a/impactor/build.gradle.kts +++ b/impactor/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { // Networking implementation("redis.clients:jedis:5.1.3") + implementation(files("libs/jedis-tool-kit-1.1.0-SNAPSHOT.jar")) testImplementation("net.kyori:adventure-text-serializer-ansi:4.14.0") testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.2") @@ -87,4 +88,4 @@ publishing { version = writeVersion(true) } } -} \ No newline at end of file +} diff --git a/impactor/src/main/java/net/impactdev/impactor/core/economy/accounts/AccountManager.java b/impactor/src/main/java/net/impactdev/impactor/core/economy/accounts/AccountManager.java index f64847be..74161675 100644 --- a/impactor/src/main/java/net/impactdev/impactor/core/economy/accounts/AccountManager.java +++ b/impactor/src/main/java/net/impactdev/impactor/core/economy/accounts/AccountManager.java @@ -27,9 +27,12 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import net.impactdev.impactor.api.economy.EconomyService; import net.impactdev.impactor.api.economy.accounts.Account; import net.impactdev.impactor.api.economy.currency.Currency; import net.impactdev.impactor.api.economy.transactions.details.EconomyTransactionType; +import net.impactdev.impactor.core.economy.ImpactorEconomyService; import net.impactdev.impactor.core.economy.storage.EconomyStorage; import org.jetbrains.annotations.Nullable; @@ -45,7 +48,12 @@ public final class AccountManager { public AccountManager(EconomyStorage storage) { this.accounts = Caffeine.newBuilder() - .expireAfterAccess(10, TimeUnit.MINUTES) + .expireAfterAccess(10, TimeUnit.SECONDS) + .evictionListener((AccountKey key, Account account, RemovalCause cause) -> { + if (key != null && EconomyService.instance() instanceof ImpactorEconomyService impactor) { + impactor.networking().releaseAccountLock(key.uuid); + } + }) .buildAsync((key, executor) -> storage.account(key.currency, key.uuid, builder -> builder)); } diff --git a/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/EconomyNetworkingService.java b/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/EconomyNetworkingService.java index d667b3ff..28e21b1e 100644 --- a/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/EconomyNetworkingService.java +++ b/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/EconomyNetworkingService.java @@ -25,14 +25,15 @@ package net.impactdev.impactor.core.economy.networking; +import com.google.common.collect.Maps; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.rockbb.jedis.toolkit.JedisLock; import net.impactdev.impactor.api.Impactor; import net.impactdev.impactor.api.economy.events.EconomyTransactionEvent; import net.impactdev.impactor.api.economy.events.EconomyTransferTransactionEvent; import net.impactdev.impactor.api.economy.transactions.EconomyTransaction; import net.impactdev.impactor.api.economy.transactions.EconomyTransferTransaction; -import net.impactdev.impactor.api.economy.transactions.details.EconomyTransactionType; import net.impactdev.impactor.api.logging.PluginLogger; import net.impactdev.impactor.core.economy.accounts.AccountManager; import net.impactdev.impactor.core.economy.accounts.ImpactorAccount; @@ -48,6 +49,7 @@ import net.kyori.adventure.key.Key; import org.jetbrains.annotations.NotNull; +import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -61,16 +63,35 @@ public final class EconomyNetworkingService implements MessageConsumer { private final ExpiringSet received = new ExpiringSet<>(5, TimeUnit.MINUTES); + // Holds Jedis Locks for transactions in order to release them as soon as possible + private final Map accountLocks = Maps.newHashMap(); + public EconomyNetworkingService(final BaseImpactorPlugin plugin, final AccountManager manager, final Messenger.Provider provider) { this.logger = plugin.logger(); this.manager = manager; this.messenger = provider.obtain(this); + // Lock Requests + Impactor.instance().events().subscribe(EconomyTransactionEvent.Pre.class, event -> { + if (!acquireAccountLock(event.account().owner())) event.cancelled(true); + }); + + Impactor.instance().events().subscribe(EconomyTransferTransactionEvent.Pre.class, event -> { + if (!acquireAccountLock(event.from().owner())) event.cancelled(true); + if (!acquireAccountLock(event.to().owner())) event.cancelled(true); + }); + + // Transaction Messaging Impactor.instance().events().subscribe(EconomyTransactionEvent.Post.class, event -> { + releaseAccountLock(event.account().owner()); + this.publishTransaction(event.transaction()); }); Impactor.instance().events().subscribe(EconomyTransferTransactionEvent.Post.class, event -> { + releaseAccountLock(event.from().owner()); + releaseAccountLock(event.to().owner()); + this.publishTransaction(event.transaction()); }); } @@ -156,15 +177,34 @@ private void processIncomingMessage(final @NotNull Message message) { TransactionContext context = transaction.context(); this.manager.accountIfPresent(context.account(), context.currency()) .map(x -> (ImpactorAccount) x) - .ifPresent(x -> this.manager.update(x, context.amount(), context.type())); + .ifPresent(x -> this.manager.invalidate(x.owner(), x.currency())); } else if(message instanceof TransferTransactionMessage transaction) { TransferTransactionContext context = transaction.context(); this.manager.accountIfPresent(context.from(), context.currency()) .map(x -> (ImpactorAccount) x) - .ifPresent(x -> this.manager.update(x, context.amount(), EconomyTransactionType.WITHDRAW)); + .ifPresent(x -> this.manager.invalidate(x.owner(), x.currency())); this.manager.accountIfPresent(context.to(), context.currency()) .map(x -> (ImpactorAccount) x) - .ifPresent(x -> this.manager.update(x, context.amount(), EconomyTransactionType.DEPOSIT)); + .ifPresent(x -> this.manager.invalidate(x.owner(), x.currency())); + } + } + + public boolean acquireAccountLock(UUID uuid) { + JedisLock lock = messenger.obtainLock(uuid); + + if (lock.acquire()) { + accountLocks.put(uuid, lock); + return true; + } else { + logger.severe("Failed to acquire lock for account: " + uuid); + return false; + } + } + + public void releaseAccountLock(UUID uuid) { + JedisLock lock = accountLocks.remove(uuid); + if (lock != null) { + lock.release(); } } } diff --git a/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/Messenger.java b/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/Messenger.java index d3526b88..603b95ab 100644 --- a/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/Messenger.java +++ b/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/Messenger.java @@ -25,14 +25,19 @@ package net.impactdev.impactor.core.economy.networking.messenger; +import com.rockbb.jedis.toolkit.JedisLock; import net.impactdev.impactor.core.economy.networking.consumption.MessageConsumer; import net.impactdev.impactor.core.economy.networking.messages.Message; import org.jetbrains.annotations.NotNull; +import java.util.UUID; + public interface Messenger { void publish(final @NotNull Message message); + JedisLock obtainLock(UUID uuid); + default void shutdown() {} interface Provider { diff --git a/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/redis/RedisMessenger.java b/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/redis/RedisMessenger.java index 5f602522..60020757 100644 --- a/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/redis/RedisMessenger.java +++ b/impactor/src/main/java/net/impactdev/impactor/core/economy/networking/messenger/redis/RedisMessenger.java @@ -28,7 +28,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; -import net.impactdev.impactor.api.Impactor; +import com.rockbb.jedis.toolkit.JedisLock; import net.impactdev.impactor.api.logging.PluginLogger; import net.impactdev.impactor.api.scheduler.v2.Scheduler; import net.impactdev.impactor.api.scheduler.v2.Schedulers; @@ -36,21 +36,13 @@ import net.impactdev.impactor.core.economy.networking.messages.Message; import net.impactdev.impactor.core.economy.networking.messenger.Messenger; import net.kyori.adventure.key.Key; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.jetbrains.annotations.NotNull; -import redis.clients.jedis.DefaultJedisClientConfig; -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.JedisClientConfig; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisPooled; -import redis.clients.jedis.JedisPubSub; -import redis.clients.jedis.Protocol; -import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.*; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; public final class RedisMessenger implements Messenger { @@ -118,6 +110,15 @@ public void publish(@NotNull Message message) { this.jedis.publish(this.channel.asString(), GSON.toJson(message.serialized())); } + @Override + public JedisLock obtainLock(UUID uuid) { + return switch (jedis) { + case JedisPooled pooled -> new JedisLock(pooled, uuid.toString(), 1000, 5000); + case JedisCluster cluster -> new JedisLock(cluster, uuid.toString(), 1000, 5000); + default -> throw new IllegalStateException("Unsupported Jedis type: " + jedis.getClass().getName()); + }; + } + @Override public void shutdown() { this.closing = true;