Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 117 additions & 61 deletions backend/src/shengji_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ pub async fn entrypoint<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>
backend_storage: S,
stats: Arc<Mutex<InMemoryStats>>,
) {
let _ = handle_user_connected(tx, rx, ws_id, logger, backend_storage, stats).await;
// Handle the result, logging error if connection setup fails
if let Err(e) =
handle_user_connected(tx, rx, ws_id, logger.clone(), backend_storage, stats).await
{
error!(logger, "User connection handler failed"; "ws_id" => ws_id, "error" => format!("{:?}", e));
}
info!(logger, "User connection handler finished"; "ws_id" => ws_id);
}

async fn send_to_user(
Expand Down Expand Up @@ -82,31 +88,39 @@ async fn handle_user_connected<S: Storage<VersionedGame, E>, E: std::fmt::Debug
}
};

// Subscribe to messages for the room. After this point, we should
// no longer use tx! It's owned by the backend storage.
let (subscribe_player_id_tx, subscribe_player_id_rx) = oneshot::channel::<PlayerID>();
// Spawn the subscription task *before* attempting registration.
// Use Option<PlayerID> in the channel.
let (subscribe_player_id_tx, subscribe_player_id_rx) = oneshot::channel::<Option<PlayerID>>();
tokio::task::spawn(player_subscribe_task(
logger.clone(),
name.clone(),
tx.clone(),
tx.clone(), // Clone tx for the task
subscribe_player_id_rx,
subscription,
));

let (player_id, join_span) = register_user(
let registration_result = register_user(
logger.clone(),
name.clone(),
ws_id,
room.clone(),
backend_storage.clone(),
stats.clone(),
)
.await
.map_err(|_| anyhow::anyhow!("Failed to register user"))?;
.await;

let (player_id, join_span) = match registration_result {
Ok(result) => result,
Err(e) => {
error!(logger, "User registration failed (error sent to client)"; "error" => format!("{:?}", e));
let _ = subscribe_player_id_tx.send(None);
return Err(e);
}
};

let logger = logger.new(o!("player_id" => player_id.0));
info!(logger, "Successfully registered user");
let _ = subscribe_player_id_tx.send(player_id);
let _ = subscribe_player_id_tx.send(Some(player_id));

run_game_for_player(
logger.clone(),
Expand All @@ -129,13 +143,13 @@ async fn player_subscribe_task(
logger_: Logger,
name_: String,
tx: mpsc::UnboundedSender<Vec<u8>>,
subscribe_player_id_rx: oneshot::Receiver<PlayerID>,
subscribe_player_id_rx: oneshot::Receiver<Option<PlayerID>>,
mut subscription: mpsc::UnboundedReceiver<GameMessage>,
) {
debug!(logger_, "Subscribed to messages");
if let Ok(player_id) = subscribe_player_id_rx.await {
let logger_ = logger_.new(o!("player_id" => player_id.0));
debug!(logger_, "Received player ID");
if let Ok(player_id_option) = subscribe_player_id_rx.await {
let logger_ = logger_.new(o!("player_id" => dbg!(player_id_option.map(|p|p.0))));
debug!(logger_, "Received player ID option");
while let Some(v) = subscription.recv().await {
let should_send = match &v {
GameMessage::State { .. }
Expand All @@ -146,25 +160,40 @@ async fn player_subscribe_task(
GameMessage::Beep { target } | GameMessage::Kicked { target } => *target == name_,
GameMessage::ReadyCheck { from } => *from != name_,
};

let v = if should_send {
if let GameMessage::State { state } = v {
let g = InteractiveGame::new_from_state(state);
g.dump_state_for_player(player_id)
.ok()
.map(|state| GameMessage::State { state })
} else {
Some(v)
// Only filter state if we have a valid player ID
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the player stay connected if they didn't get a player ID? that means that the room is at capacity, right? can we just close the connection and ask the player to reconnect in the frontend?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand it we close the connection fairly quickly now when registration fails. The flow goes
1. register_user returns an Err when the room is full
2. handle_user_connected catches this Err
3. Instead of returning Ok(()), we have handle_user_connected propagate the Err
4. The top-level entrypoint function receives this Err, logs it, and the task for that specific connection terminates

I think the player_subscribe_task might stay alive for a very short time after the error is sent by execute_operation until the oneshot channel signals failure (with None), but the main handler that would process further user messages (run_game_for_player) is never started, and the overall connection context is torn down due to the error in entrypoint.

and so in doing so the "game full" error message is reliably sent before the connection closes completely, which helped me avoid an issue where the connection closure raced the error message delivery. This way, I think explicit frontend reconnect prompts should no longer be necessary - lmk your thoughts on this

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. I understand what you're trying to do now

// Match on player_id_option first, then check if 'v' is GameMessage::State
match player_id_option {
Some(player_id) => {
if let GameMessage::State { state } = v {
let g = InteractiveGame::new_from_state(state);
g.dump_state_for_player(player_id)
.ok()
.map(|filtered_state| GameMessage::State {
state: filtered_state,
})
} else {
Some(v)
}
}
None => Some(v),
}
} else {
None
};

if let Some(v) = v {
if send_to_user(&tx, &v).await.is_err() {
if let Some(v_to_send) = v {
if send_to_user(&tx, &v_to_send).await.is_err() {
break;
}
}
}
} else {
error!(
logger_,
"Failed to receive player ID option from oneshot channel"
);
}
debug!(logger_, "Subscription task completed");
}
Expand All @@ -176,11 +205,11 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>(
room: String,
backend_storage: S,
stats: Arc<Mutex<InMemoryStats>>,
) -> Result<(PlayerID, u64), ()> {
) -> Result<(PlayerID, u64), anyhow::Error> {
let (player_id_tx, player_id_rx) = oneshot::channel();
let logger_ = logger.clone();
let name_ = name.clone();
execute_operation(
let exec_result = execute_operation(
ws_id,
&room,
backend_storage.clone(),
Expand All @@ -198,7 +227,7 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>(

player_id_tx
.send((assigned_player_id, version, clients_to_disconnect))
.map_err(|_| anyhow::anyhow!("Couldn't send player ID back".to_owned()))?;
.map_err(|_| anyhow::anyhow!("Receiver dropped before player ID could be sent"))?;
Ok(register_msgs
.into_iter()
.map(|(data, message)| GameMessage::Broadcast { data, message })
Expand All @@ -208,6 +237,10 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>(
)
.await;

if let Err(e) = exec_result {
return Err(e);
}

let header_messages = {
let stats = stats.lock().await;
stats.header_messages().to_vec()
Expand All @@ -223,23 +256,26 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>(
)
.await;

if let Ok((player_id, ws_id, websockets_to_disconnect)) = player_id_rx.await {
for id in websockets_to_disconnect {
info!(logger, "Disconnnecting existing client"; "kicked_ws_id" => id);
let _ = backend_storage
.clone()
.publish_to_single_subscriber(
room.as_bytes().to_vec(),
id,
GameMessage::Kicked {
target: name.clone(),
},
)
.await;
match player_id_rx.await {
Ok((player_id, version, websockets_to_disconnect)) => {
for id in websockets_to_disconnect {
info!(logger, "Disconnnecting existing client"; "kicked_ws_id" => id);
let _ = backend_storage
.clone()
.publish_to_single_subscriber(
room.as_bytes().to_vec(),
id,
GameMessage::Kicked {
target: name.clone(),
},
)
.await;
}
Ok((player_id, version))
}
Ok((player_id, ws_id))
} else {
Err(())
Err(_) => Err(anyhow::anyhow!(
"Failed to receive player ID after registration operation"
)),
}
}

Expand Down Expand Up @@ -294,7 +330,7 @@ async fn run_game_for_player<S: Storage<VersionedGame, E>, E: Send + std::fmt::D
debug!(logger, "Exiting main game loop");
}

async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>(
async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send + std::fmt::Debug>(
logger: Logger,
ws_id: usize,
caller: PlayerID,
Expand All @@ -305,7 +341,7 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>(
) -> Result<(), E> {
match msg {
UserMessage::Beep => {
execute_immutable_operation(
if let Err(e) = execute_immutable_operation(
ws_id,
room_name,
backend_storage,
Expand All @@ -324,7 +360,10 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>(
},
"send appropriate beep",
)
.await;
.await
{
error!(logger, "Beep operation failed"; "error" => format!("{:?}", e));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo clippy will probably ask you to dbg this one too

}
}
UserMessage::Message(m) => {
backend_storage
Expand Down Expand Up @@ -368,7 +407,7 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>(
}
UserMessage::Kick(id) => {
info!(logger, "Kicking user"; "other" => id.0);
execute_operation(
if let Err(e) = execute_operation(
ws_id,
room_name,
backend_storage,
Expand All @@ -379,56 +418,73 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>(
target: kicked_player_name,
}])
},
"kick user",
"kick player",
)
.await;
.await
{
error!(logger, "Kick operation failed"; "target_id" => id.0, "error" => format!("{:?}", e));
}
}
UserMessage::Action(action) => {
execute_operation(
let action_ = action.clone();
let logger_ = logger.clone();
if let Err(e) = execute_operation(
ws_id,
room_name,
backend_storage,
move |game, _, _| {
Ok(game
.interact(action, caller, &logger)?
.into_iter()
.map(|(data, message)| GameMessage::Broadcast { data, message })
.collect())
move |g, _version, _| {
g.interact(action, caller, &logger).map(|msgs| {
msgs.into_iter()
.map(|(data, message)| GameMessage::Broadcast { data, message })
.collect()
})
},
"handle user action",
"perform action",
)
.await;
.await
{
error!(logger_, "Action execution failed"; "action" => format!("{:?}", action_), "error" => format!("{:?}", e));
}
}
}
Ok(())
}

async fn user_disconnected<S: Storage<VersionedGame, E>, E: Send>(
async fn user_disconnected<S: Storage<VersionedGame, E>, E: Send + std::fmt::Debug>(
room: String,
ws_id: usize,
backend_storage: S,
logger: slog::Logger,
parent: u64,
) {
execute_operation(
info!(
logger,
"User disconnected, cleaning up websocket association"
);

// Clean up websocket association
let _ = execute_operation(
ws_id,
&room,
backend_storage.clone(),
move |_, _, associated_websockets| {
for ws in associated_websockets.values_mut() {
ws.retain(|w| *w != ws_id);
}
associated_websockets.retain(|_, player_websockets| !player_websockets.is_empty());
Ok(vec![])
},
"disconnect player",
)
.await;
backend_storage

let _ = backend_storage
.unsubscribe(room.as_bytes().to_vec(), ws_id)
.await;

info!(logger, "Websocket disconnected";
"room" => room,
"parent_span" => format!("{room}:{parent}"),
"span" => format!("{room}:ws_{ws_id}")
"parent_span" => format!("{}:{}", room, parent),
"span" => format!("{}:ws_{}", room, ws_id)
);
}
2 changes: 2 additions & 0 deletions backend/src/state_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl InMemoryStats {
pub struct PublicGameInfo {
name: String,
num_players: usize,
max_players: Option<usize>,
}

pub async fn load_dump_file<S: Storage<VersionedGame, E>, E: Send + std::fmt::Debug>(
Expand Down Expand Up @@ -202,6 +203,7 @@ pub async fn public_games(
public_games.push(PublicGameInfo {
name,
num_players: versioned_game.game.players().len(),
max_players: versioned_game.game.max_players(),
});
}
}
Expand Down
Loading
Loading