Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a timeout for peers who lost their connection during the game. #71

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class IceAdapter implements Callable<Integer>, AutoCloseable, FafRpcCallb
private RPCService rpcService;

private final ExecutorService executor = ExecutorHolder.getExecutor();
private final ScheduledExecutorService scheduledExecutor = ExecutorHolder.getScheduledExecutor();
private static final Lock lockGameSession = new ReentrantLock();

public static void main(String[] args) {
Expand All @@ -52,6 +53,7 @@ public Integer call() {
public void start() {
determineVersion();
log.info("Version: {}", VERSION);
log.info("Options: {}", iceOptions);

Debug.DELAY_UI_MS = iceOptions.getDelayUi();
Debug.ENABLE_DEBUG_WINDOW = iceOptions.isDebugWindow();
Expand Down Expand Up @@ -156,6 +158,7 @@ public static void close(int status) {
TrayIcon.close();

INSTANCE.executor.shutdown();
INSTANCE.scheduledExecutor.shutdown();
CompletableFuture.runAsync(
INSTANCE.executor::shutdownNow, CompletableFuture.delayedExecutor(250, TimeUnit.MILLISECONDS))
.thenRunAsync(() -> System.exit(status), CompletableFuture.delayedExecutor(250, TimeUnit.MILLISECONDS));
Expand Down Expand Up @@ -194,10 +197,22 @@ public static double getAcceptableLatency() {
return INSTANCE.iceOptions.getAcceptableLatency();
}

public static IceOptions getOptions() {
return INSTANCE.iceOptions;
}

public static Executor getExecutor() {
return INSTANCE.executor;
}

public static ScheduledExecutorService getScheduledExecutor() {
return INSTANCE.scheduledExecutor;
}

public static GPGNetServer getGpgNetServer() {
return INSTANCE.gpgNetServer;
}

public static GameSession getGameSession() {
return GAME_SESSION;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import picocli.CommandLine.Option;

@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class IceOptions {
@Option(names = "--id", required = true, description = "set the ID of the local player")
private int id;
Expand Down Expand Up @@ -62,4 +64,10 @@ public class IceOptions {
defaultValue = "wss://ice-telemetry.faforever.com",
description = "Telemetry server to connect to")
private String telemetryServer;

@Option(
names = "--timeout-seconds-in-game",
defaultValue = "190",
description = "Timeout connection in game peers (in seconds).")
private int timeoutSecondsInGame;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Periodically sends echo requests via the ICE data channel and initiates a reconnect after timeout
* ONLY THE OFFERING ADAPTER of a connection will send echos and reoffer.
*/
@Slf4j
public class PeerConnectivityCheckerModule {
@RequiredArgsConstructor
public class PeerConnectivityCheckerModule implements AutoCloseable {

private static final int ECHO_INTERVAL = 1000;

Expand All @@ -26,6 +28,10 @@ public class PeerConnectivityCheckerModule {
private volatile boolean running = false;
private volatile Thread checkerThread;

private final PeerConnectivityTimeoutModule timeoutModule = new PeerConnectivityTimeoutModule(ice, IceAdapter.getOptions(),
IceAdapter.getScheduledExecutor(),
IceAdapter.getGpgNetServer());

@Getter
private float averageRTT = 0.0f;

Expand All @@ -38,10 +44,6 @@ public class PeerConnectivityCheckerModule {
@Getter
private long invalidEchosReceived = 0;

public PeerConnectivityCheckerModule(PeerIceModule ice) {
this.ice = ice;
}

void start() {
LockUtil.executeWithLock(lockIce, () -> {
if (running) {
Expand Down Expand Up @@ -137,11 +139,19 @@ private void checkerThread() {
log.warn(
"Didn't receive any answer to echo requests for the past 10 seconds from {}, aborting connection",
peer.getRemoteLogin());
timeoutModule.start();
CompletableFuture.runAsync(ice::onConnectionLost, IceAdapter.getExecutor());
return;
}
timeoutModule.stopIfExist();
}

log.info("{} stopped gracefully", Thread.currentThread().getName());
}

@Override
public void close() {
stop();
timeoutModule.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.faforever.iceadapter.ice;

import com.faforever.iceadapter.IceOptions;
import com.faforever.iceadapter.gpgnet.GPGNetServer;
import com.faforever.iceadapter.gpgnet.GameState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Creates a scheduler that can terminate the connection when the time expires.
*/
@Slf4j
@RequiredArgsConstructor
public class PeerConnectivityTimeoutModule implements AutoCloseable {
private final PeerIceModule ice;
private final IceOptions iceOptions;
private final ScheduledExecutorService scheduledExecutor;
private final GPGNetServer gpgNetServer;

private ScheduledFuture<?> scheduledClosed;

public void start() {
if (scheduledClosed == null) {
gpgNetServer.getGameState().ifPresent(state -> {
if (state == GameState.LAUNCHING) {
log.info("Start timeout when {}, timeout seconds: {}", state, iceOptions.getTimeoutSecondsInGame());
scheduledClosed = scheduledExecutor.schedule(this::closeConnectionByTimeout, iceOptions.getTimeoutSecondsInGame(), TimeUnit.SECONDS);
}
});
}
}

public void stopIfExist() {
if (scheduledClosed != null) {
scheduledClosed.cancel(true);
scheduledClosed = null;
}
}

private void closeConnectionByTimeout() {
Peer peer = ice.getPeer();
log.info("Close {} by timeout", peer.getPeerIdentifier());
peer.close();
}

@Override
public void close() {
stopIfExist();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ void close() {
if (agent != null) {
agent.free();
}
connectivityChecker.stop();
connectivityChecker.close();
}

public long getConnectivityAttempsInThePast(final long millis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import lombok.experimental.UtilityClass;

@UtilityClass
Expand All @@ -10,4 +12,8 @@ public ExecutorService getExecutor() {
int numberOfCores = Runtime.getRuntime().availableProcessors();
return Executors.newFixedThreadPool(numberOfCores);
}

public ScheduledExecutorService getScheduledExecutor() {
return Executors.newScheduledThreadPool(100, Thread.ofVirtual().factory());
}
}