-
Notifications
You must be signed in to change notification settings - Fork 47
Feature: Add Max Players Game Setting and Fix Join Logic #478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
a51eb5e
47c51e7
cadb2ba
c44f0dd
a690cd5
be71083
67ea685
6d87af3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,11 @@ pub async fn entrypoint<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send> | |
| backend_storage: S, | ||
| stats: Arc<Mutex<InMemoryStats>>, | ||
| ) { | ||
| let _ = handle_user_connected(tx, rx, ws_id, logger, backend_storage, stats).await; | ||
| // Handle the result, logging error if connection setup fails | ||
| if let Err(e) = handle_user_connected(tx, rx, ws_id, logger.clone(), backend_storage, stats).await { | ||
| error!(logger, "User connection handler failed"; "ws_id" => ws_id, "error" => format!("{:?}", e)); | ||
| } | ||
| info!(logger, "User connection handler finished"; "ws_id" => ws_id); | ||
| } | ||
|
|
||
| async fn send_to_user( | ||
|
|
@@ -82,31 +86,39 @@ async fn handle_user_connected<S: Storage<VersionedGame, E>, E: std::fmt::Debug | |
| } | ||
| }; | ||
|
|
||
| // Subscribe to messages for the room. After this point, we should | ||
| // no longer use tx! It's owned by the backend storage. | ||
| let (subscribe_player_id_tx, subscribe_player_id_rx) = oneshot::channel::<PlayerID>(); | ||
| // Spawn the subscription task *before* attempting registration. | ||
| // Use Option<PlayerID> in the channel. | ||
| let (subscribe_player_id_tx, subscribe_player_id_rx) = oneshot::channel::<Option<PlayerID>>(); | ||
| tokio::task::spawn(player_subscribe_task( | ||
| logger.clone(), | ||
| name.clone(), | ||
| tx.clone(), | ||
| tx.clone(), // Clone tx for the task | ||
| subscribe_player_id_rx, | ||
| subscription, | ||
| )); | ||
|
|
||
| let (player_id, join_span) = register_user( | ||
| let registration_result = register_user( | ||
| logger.clone(), | ||
| name.clone(), | ||
| ws_id, | ||
| room.clone(), | ||
| backend_storage.clone(), | ||
| stats.clone(), | ||
| ) | ||
| .await | ||
| .map_err(|_| anyhow::anyhow!("Failed to register user"))?; | ||
| .await; | ||
|
|
||
| let (player_id, join_span) = match registration_result { | ||
| Ok(result) => result, | ||
| Err(e) => { | ||
| error!(logger, "User registration failed (error sent to client)"; "error" => format!("{:?}", e)); | ||
| let _ = subscribe_player_id_tx.send(None); | ||
| return Err(e); | ||
| } | ||
| }; | ||
|
|
||
| let logger = logger.new(o!("player_id" => player_id.0)); | ||
| info!(logger, "Successfully registered user"); | ||
| let _ = subscribe_player_id_tx.send(player_id); | ||
| let _ = subscribe_player_id_tx.send(Some(player_id)); | ||
|
|
||
| run_game_for_player( | ||
| logger.clone(), | ||
|
|
@@ -129,13 +141,13 @@ async fn player_subscribe_task( | |
| logger_: Logger, | ||
| name_: String, | ||
| tx: mpsc::UnboundedSender<Vec<u8>>, | ||
| subscribe_player_id_rx: oneshot::Receiver<PlayerID>, | ||
| subscribe_player_id_rx: oneshot::Receiver<Option<PlayerID>>, | ||
| mut subscription: mpsc::UnboundedReceiver<GameMessage>, | ||
| ) { | ||
| debug!(logger_, "Subscribed to messages"); | ||
| if let Ok(player_id) = subscribe_player_id_rx.await { | ||
| let logger_ = logger_.new(o!("player_id" => player_id.0)); | ||
| debug!(logger_, "Received player ID"); | ||
| if let Ok(player_id_option) = subscribe_player_id_rx.await { | ||
| let logger_ = logger_.new(o!("player_id" => format!("{:?}", player_id_option.map(|p|p.0)))); | ||
| debug!(logger_, "Received player ID option"); | ||
| while let Some(v) = subscription.recv().await { | ||
| let should_send = match &v { | ||
| GameMessage::State { .. } | ||
|
|
@@ -146,25 +158,35 @@ async fn player_subscribe_task( | |
| GameMessage::Beep { target } | GameMessage::Kicked { target } => *target == name_, | ||
| GameMessage::ReadyCheck { from } => *from != name_, | ||
| }; | ||
|
|
||
| let v = if should_send { | ||
| if let GameMessage::State { state } = v { | ||
| let g = InteractiveGame::new_from_state(state); | ||
| g.dump_state_for_player(player_id) | ||
| .ok() | ||
| .map(|state| GameMessage::State { state }) | ||
| } else { | ||
| Some(v) | ||
| // Only filter state if we have a valid player ID | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the player stay connected if they didn't get a player ID? that means that the room is at capacity, right? can we just close the connection and ask the player to reconnect in the frontend?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way I understand it we close the connection fairly quickly now when registration fails. The flow goes I think the player_subscribe_task might stay alive for a very short time after the error is sent by execute_operation until the oneshot channel signals failure (with None), but the main handler that would process further user messages (run_game_for_player) is never started, and the overall connection context is torn down due to the error in entrypoint. and so in doing so the "game full" error message is reliably sent before the connection closes completely, which helped me avoid an issue where the connection closure raced the error message delivery. This way, I think explicit frontend reconnect prompts should no longer be necessary - lmk your thoughts on this
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, got it. I understand what you're trying to do now |
||
| // Match on player_id_option first, then check if 'v' is GameMessage::State | ||
| match player_id_option { | ||
| Some(player_id) => { | ||
| if let GameMessage::State { state } = v { | ||
| let g = InteractiveGame::new_from_state(state); | ||
| g.dump_state_for_player(player_id) | ||
| .ok() | ||
| .map(|filtered_state| GameMessage::State { state: filtered_state }) | ||
| } else { | ||
| Some(v) | ||
| } | ||
| } | ||
| None => Some(v), | ||
| } | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| if let Some(v) = v { | ||
| if send_to_user(&tx, &v).await.is_err() { | ||
| if let Some(v_to_send) = v { | ||
| if send_to_user(&tx, &v_to_send).await.is_err() { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| error!(logger_, "Failed to receive player ID option from oneshot channel"); | ||
| } | ||
| debug!(logger_, "Subscription task completed"); | ||
| } | ||
|
|
@@ -176,11 +198,11 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>( | |
| room: String, | ||
| backend_storage: S, | ||
| stats: Arc<Mutex<InMemoryStats>>, | ||
| ) -> Result<(PlayerID, u64), ()> { | ||
| ) -> Result<(PlayerID, u64), anyhow::Error> { | ||
| let (player_id_tx, player_id_rx) = oneshot::channel(); | ||
| let logger_ = logger.clone(); | ||
| let name_ = name.clone(); | ||
| execute_operation( | ||
| let exec_result = execute_operation( | ||
| ws_id, | ||
| &room, | ||
| backend_storage.clone(), | ||
|
|
@@ -198,7 +220,7 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>( | |
|
|
||
| player_id_tx | ||
| .send((assigned_player_id, version, clients_to_disconnect)) | ||
| .map_err(|_| anyhow::anyhow!("Couldn't send player ID back".to_owned()))?; | ||
| .map_err(|_| anyhow::anyhow!("Receiver dropped before player ID could be sent"))?; | ||
| Ok(register_msgs | ||
| .into_iter() | ||
| .map(|(data, message)| GameMessage::Broadcast { data, message }) | ||
|
|
@@ -208,6 +230,10 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>( | |
| ) | ||
| .await; | ||
|
|
||
| if let Err(e) = exec_result { | ||
| return Err(e); | ||
| } | ||
|
|
||
| let header_messages = { | ||
| let stats = stats.lock().await; | ||
| stats.header_messages().to_vec() | ||
|
|
@@ -223,23 +249,26 @@ async fn register_user<S: Storage<VersionedGame, E>, E: std::fmt::Debug + Send>( | |
| ) | ||
| .await; | ||
|
|
||
| if let Ok((player_id, ws_id, websockets_to_disconnect)) = player_id_rx.await { | ||
| for id in websockets_to_disconnect { | ||
| info!(logger, "Disconnnecting existing client"; "kicked_ws_id" => id); | ||
| let _ = backend_storage | ||
| .clone() | ||
| .publish_to_single_subscriber( | ||
| room.as_bytes().to_vec(), | ||
| id, | ||
| GameMessage::Kicked { | ||
| target: name.clone(), | ||
| }, | ||
| ) | ||
| .await; | ||
| match player_id_rx.await { | ||
| Ok((player_id, version, websockets_to_disconnect)) => { | ||
| for id in websockets_to_disconnect { | ||
| info!(logger, "Disconnnecting existing client"; "kicked_ws_id" => id); | ||
| let _ = backend_storage | ||
| .clone() | ||
| .publish_to_single_subscriber( | ||
| room.as_bytes().to_vec(), | ||
| id, | ||
| GameMessage::Kicked { | ||
| target: name.clone(), | ||
| }, | ||
| ) | ||
| .await; | ||
| } | ||
| Ok((player_id, version)) | ||
| } | ||
| Err(_) => { | ||
| Err(anyhow::anyhow!("Failed to receive player ID after registration operation")) | ||
| } | ||
| Ok((player_id, ws_id)) | ||
| } else { | ||
| Err(()) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -294,7 +323,7 @@ async fn run_game_for_player<S: Storage<VersionedGame, E>, E: Send + std::fmt::D | |
| debug!(logger, "Exiting main game loop"); | ||
| } | ||
|
|
||
| async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>( | ||
| async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send + std::fmt::Debug>( | ||
| logger: Logger, | ||
| ws_id: usize, | ||
| caller: PlayerID, | ||
|
|
@@ -305,7 +334,7 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>( | |
| ) -> Result<(), E> { | ||
| match msg { | ||
| UserMessage::Beep => { | ||
| execute_immutable_operation( | ||
| if let Err(e) = execute_immutable_operation( | ||
| ws_id, | ||
| room_name, | ||
| backend_storage, | ||
|
|
@@ -324,7 +353,9 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>( | |
| }, | ||
| "send appropriate beep", | ||
| ) | ||
| .await; | ||
| .await { | ||
| error!(logger, "Beep operation failed"; "error" => format!("{:?}", e)); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
| UserMessage::Message(m) => { | ||
| backend_storage | ||
|
|
@@ -368,7 +399,7 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>( | |
| } | ||
| UserMessage::Kick(id) => { | ||
| info!(logger, "Kicking user"; "other" => id.0); | ||
| execute_operation( | ||
| if let Err(e) = execute_operation( | ||
| ws_id, | ||
| room_name, | ||
| backend_storage, | ||
|
|
@@ -379,56 +410,70 @@ async fn handle_user_action<S: Storage<VersionedGame, E>, E: Send>( | |
| target: kicked_player_name, | ||
| }]) | ||
| }, | ||
| "kick user", | ||
| "kick player", | ||
| ) | ||
| .await; | ||
| .await { | ||
| error!(logger, "Kick operation failed"; "target_id" => id.0, "error" => format!("{:?}", e)); | ||
| } | ||
| } | ||
| UserMessage::Action(action) => { | ||
| execute_operation( | ||
| let action_clone_for_log = action.clone(); | ||
|
||
| let logger_clone_for_log = logger.clone(); | ||
| if let Err(e) = execute_operation( | ||
| ws_id, | ||
| room_name, | ||
| backend_storage, | ||
| move |game, _, _| { | ||
| Ok(game | ||
| .interact(action, caller, &logger)? | ||
| .into_iter() | ||
| .map(|(data, message)| GameMessage::Broadcast { data, message }) | ||
| .collect()) | ||
| move |g, _version, _| { | ||
| g.interact(action, caller, &logger).map(|msgs| { | ||
| msgs.into_iter() | ||
| .map(|(data, message)| GameMessage::Broadcast { data, message }) | ||
| .collect() | ||
| }) | ||
| }, | ||
| "handle user action", | ||
| "perform action", | ||
| ) | ||
| .await; | ||
| .await { | ||
| error!(logger_clone_for_log, "Action execution failed"; "action" => format!("{:?}", action_clone_for_log), "error" => format!("{:?}", e)); | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn user_disconnected<S: Storage<VersionedGame, E>, E: Send>( | ||
| async fn user_disconnected<S: Storage<VersionedGame, E>, E: Send + std::fmt::Debug>( | ||
| room: String, | ||
| ws_id: usize, | ||
| backend_storage: S, | ||
| logger: slog::Logger, | ||
| parent: u64, | ||
| ) { | ||
| execute_operation( | ||
| let room_name = room.as_bytes().to_vec(); | ||
| let room_name_str = String::from_utf8_lossy(&room_name); | ||
|
||
|
|
||
| info!(logger, "User disconnected, cleaning up websocket association"); | ||
|
|
||
| // Clean up websocket association | ||
| if let Err(e) = execute_operation( | ||
| ws_id, | ||
| &room, | ||
| &room_name_str, | ||
| backend_storage.clone(), | ||
| move |_, _, associated_websockets| { | ||
| for ws in associated_websockets.values_mut() { | ||
| ws.retain(|w| *w != ws_id); | ||
| move |_g, _, associated_websockets| { | ||
| for player_websockets in associated_websockets.values_mut() { | ||
| player_websockets.retain(|id| *id != ws_id); | ||
| } | ||
| associated_websockets.retain(|_, player_websockets| !player_websockets.is_empty()); | ||
| Ok(vec![]) | ||
| }, | ||
| "disconnect player", | ||
| "clean up disconnected websocket", | ||
| ) | ||
| .await; | ||
| backend_storage | ||
| .unsubscribe(room.as_bytes().to_vec(), ws_id) | ||
| .await | ||
| { | ||
| error!(logger, "Failed to clean up websocket association on disconnect"; "error" => format!("{:?}", e)); | ||
| } | ||
|
|
||
| let _ = backend_storage | ||
| .unsubscribe(room_name, ws_id) | ||
| .await; | ||
| info!(logger, "Websocket disconnected"; | ||
| "room" => room, | ||
| "parent_span" => format!("{room}:{parent}"), | ||
| "span" => format!("{room}:ws_{ws_id}") | ||
| ); | ||
|
|
||
| info!(logger, "Finished user disconnected cleanup"; "parent_id" => parent); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just use dbg! here?