diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java b/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java index 307bb66..cc93bbb 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java @@ -35,6 +35,7 @@ public class IceAdapter implements Callable, AutoCloseable, FafRpcCallb private RPCService rpcService; private final ExecutorService executor = ExecutorHolder.getExecutor(); + private final ScheduledExecutorService scheduledExecutor = ExecutorHolder.getScheduledExecutor(); private static final Lock lockGameSession = new ReentrantLock(); public static void main(String[] args) { @@ -52,6 +53,7 @@ public Integer call() { public void start() { determineVersion(); log.info("Version: {}", VERSION); + log.info("Options: {}", iceOptions); Debug.DELAY_UI_MS = iceOptions.getDelayUi(); Debug.ENABLE_DEBUG_WINDOW = iceOptions.isDebugWindow(); @@ -156,6 +158,7 @@ public static void close(int status) { TrayIcon.close(); INSTANCE.executor.shutdown(); + INSTANCE.scheduledExecutor.shutdown(); CompletableFuture.runAsync( INSTANCE.executor::shutdownNow, CompletableFuture.delayedExecutor(250, TimeUnit.MILLISECONDS)) .thenRunAsync(() -> System.exit(status), CompletableFuture.delayedExecutor(250, TimeUnit.MILLISECONDS)); @@ -194,10 +197,22 @@ public static double getAcceptableLatency() { return INSTANCE.iceOptions.getAcceptableLatency(); } + public static IceOptions getOptions() { + return INSTANCE.iceOptions; + } + public static Executor getExecutor() { return INSTANCE.executor; } + public static ScheduledExecutorService getScheduledExecutor() { + return INSTANCE.scheduledExecutor; + } + + public static GPGNetServer getGpgNetServer() { + return INSTANCE.gpgNetServer; + } + public static GameSession getGameSession() { return GAME_SESSION; } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/IceOptions.java b/ice-adapter/src/main/java/com/faforever/iceadapter/IceOptions.java index 834232f..187dbc0 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/IceOptions.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/IceOptions.java @@ -3,11 +3,13 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.ToString; import picocli.CommandLine.Option; @Getter @NoArgsConstructor @AllArgsConstructor +@ToString public class IceOptions { @Option(names = "--id", required = true, description = "set the ID of the local player") private int id; @@ -62,4 +64,10 @@ public class IceOptions { defaultValue = "wss://ice-telemetry.faforever.com", description = "Telemetry server to connect to") private String telemetryServer; + + @Option( + names = "--timeout-seconds-in-game", + defaultValue = "190", + description = "Timeout connection in game peers (in seconds).") + private int timeoutSecondsInGame; } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java index bd67289..2863d6f 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java @@ -10,6 +10,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; /** @@ -17,7 +18,8 @@ * ONLY THE OFFERING ADAPTER of a connection will send echos and reoffer. */ @Slf4j -public class PeerConnectivityCheckerModule { +@RequiredArgsConstructor +public class PeerConnectivityCheckerModule implements AutoCloseable { private static final int ECHO_INTERVAL = 1000; @@ -26,6 +28,10 @@ public class PeerConnectivityCheckerModule { private volatile boolean running = false; private volatile Thread checkerThread; + private final PeerConnectivityTimeoutModule timeoutModule = new PeerConnectivityTimeoutModule(ice, IceAdapter.getOptions(), + IceAdapter.getScheduledExecutor(), + IceAdapter.getGpgNetServer()); + @Getter private float averageRTT = 0.0f; @@ -38,10 +44,6 @@ public class PeerConnectivityCheckerModule { @Getter private long invalidEchosReceived = 0; - public PeerConnectivityCheckerModule(PeerIceModule ice) { - this.ice = ice; - } - void start() { LockUtil.executeWithLock(lockIce, () -> { if (running) { @@ -137,11 +139,19 @@ private void checkerThread() { log.warn( "Didn't receive any answer to echo requests for the past 10 seconds from {}, aborting connection", peer.getRemoteLogin()); + timeoutModule.start(); CompletableFuture.runAsync(ice::onConnectionLost, IceAdapter.getExecutor()); return; } + timeoutModule.stopIfExist(); } log.info("{} stopped gracefully", Thread.currentThread().getName()); } + + @Override + public void close() { + stop(); + timeoutModule.close(); + } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityTimeoutModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityTimeoutModule.java new file mode 100644 index 0000000..eb576e0 --- /dev/null +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityTimeoutModule.java @@ -0,0 +1,54 @@ +package com.faforever.iceadapter.ice; + +import com.faforever.iceadapter.IceOptions; +import com.faforever.iceadapter.gpgnet.GPGNetServer; +import com.faforever.iceadapter.gpgnet.GameState; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Creates a scheduler that can terminate the connection when the time expires. + */ +@Slf4j +@RequiredArgsConstructor +public class PeerConnectivityTimeoutModule implements AutoCloseable { + private final PeerIceModule ice; + private final IceOptions iceOptions; + private final ScheduledExecutorService scheduledExecutor; + private final GPGNetServer gpgNetServer; + + private ScheduledFuture scheduledClosed; + + public void start() { + if (scheduledClosed == null) { + gpgNetServer.getGameState().ifPresent(state -> { + if (state == GameState.LAUNCHING) { + log.info("Start timeout when {}, timeout seconds: {}", state, iceOptions.getTimeoutSecondsInGame()); + scheduledClosed = scheduledExecutor.schedule(this::closeConnectionByTimeout, iceOptions.getTimeoutSecondsInGame(), TimeUnit.SECONDS); + } + }); + } + } + + public void stopIfExist() { + if (scheduledClosed != null) { + scheduledClosed.cancel(true); + scheduledClosed = null; + } + } + + private void closeConnectionByTimeout() { + Peer peer = ice.getPeer(); + log.info("Close {} by timeout", peer.getPeerIdentifier()); + peer.close(); + } + + @Override + public void close() { + stopIfExist(); + } +} diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java index 4a0e33f..4d4b440 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java @@ -559,7 +559,7 @@ void close() { if (agent != null) { agent.free(); } - connectivityChecker.stop(); + connectivityChecker.close(); } public long getConnectivityAttempsInThePast(final long millis) { diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java index 270959f..0546470 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java @@ -2,6 +2,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + import lombok.experimental.UtilityClass; @UtilityClass @@ -10,4 +12,8 @@ public ExecutorService getExecutor() { int numberOfCores = Runtime.getRuntime().availableProcessors(); return Executors.newFixedThreadPool(numberOfCores); } + + public ScheduledExecutorService getScheduledExecutor() { + return Executors.newScheduledThreadPool(100, Thread.ofVirtual().factory()); + } }