diff --git a/Cargo.lock b/Cargo.lock index 9283fa4997..f82ad81fd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,6 +456,7 @@ name = "alvr_sockets" version = "21.0.0-dev01" dependencies = [ "alvr_common", + "alvr_packets", "alvr_session", "bincode", "profiling", diff --git a/alvr/packets/src/lib.rs b/alvr/packets/src/lib.rs index e03bd49606..f175cb8fa0 100644 --- a/alvr/packets/src/lib.rs +++ b/alvr/packets/src/lib.rs @@ -113,7 +113,7 @@ pub enum ClientConnectionResult { ConnectionAccepted { client_protocol_id: u64, display_name: String, - server_ip: IpAddr, + server_ip: IpAddr, // must be unused for now streaming_capabilities: Option, // todo: use String }, ClientStandby, diff --git a/alvr/server_core/src/c_api.rs b/alvr/server_core/src/c_api.rs index a9f7b59417..9c156f84e8 100644 --- a/alvr/server_core/src/c_api.rs +++ b/alvr/server_core/src/c_api.rs @@ -551,13 +551,6 @@ pub unsafe extern "C" fn alvr_duration_until_next_vsync(out_ns: *mut u64) -> boo } } -#[no_mangle] -pub unsafe extern "C" fn alvr_restart() { - if let Some(context) = SERVER_CORE_CONTEXT.write().take() { - context.restart(); - } -} - #[no_mangle] pub unsafe extern "C" fn alvr_shutdown() { SERVER_CORE_CONTEXT.write().take(); diff --git a/alvr/server_core/src/connection.rs b/alvr/server_core/src/connection.rs index a957f178b9..48f0b4672a 100644 --- a/alvr/server_core/src/connection.rs +++ b/alvr/server_core/src/connection.rs @@ -1,6 +1,5 @@ use crate::{ bitrate::BitrateManager, - hand_gestures::HandGestureManager, input_mapping::ButtonMappingManager, sockets::WelcomeSocket, statistics::StatisticsManager, @@ -21,14 +20,15 @@ use alvr_events::{AdbEvent, ButtonEvent, EventType}; use alvr_packets::{ BatteryInfo, ClientConnectionResult, ClientControlPacket, ClientListAction, ClientStatistics, NegotiatedStreamingConfig, RealTimeConfig, ReservedClientControlPacket, ServerControlPacket, - Tracking, VideoPacketHeader, AUDIO, HAPTICS, STATISTICS, TRACKING, VIDEO, + Tracking, VideoPacketHeader, VideoStreamingCapabilities, AUDIO, HAPTICS, STATISTICS, TRACKING, + VIDEO, }; use alvr_session::{ BodyTrackingSinkConfig, CodecType, ControllersEmulationMode, FrameSize, H264Profile, - OpenvrConfig, SessionConfig, SocketProtocol, + OpenvrConfig, SessionConfig, Settings, SocketProtocol, }; use alvr_sockets::{ - PeerType, ProtoControlSocket, StreamSocketBuilder, CONTROL_PORT, KEEPALIVE_INTERVAL, + ProtoControlSocket, SocketConnection, StreamSocketConfig, CONTROL_PORT, KEEPALIVE_INTERVAL, KEEPALIVE_TIMEOUT, WIRED_CLIENT_HOSTNAME, }; use std::{ @@ -65,7 +65,151 @@ fn is_streaming(client_hostname: &str) -> bool { .unwrap_or(false) } -pub fn contruct_openvr_config(session: &SessionConfig) -> OpenvrConfig { +fn get_view_res(config: FrameSize, default_res: UVec2) -> UVec2 { + let res = match config { + FrameSize::Scale(scale) => default_res.as_vec2() * scale, + FrameSize::Absolute { width, height } => { + let width = width as f32; + Vec2::new( + width, + height.map(|h| h as f32).unwrap_or_else(|| { + let default_res = default_res.as_vec2(); + width * default_res.y / default_res.x + }), + ) + } + }; + + UVec2::new(align32(res.x), align32(res.y)) +} + +pub fn construct_negotiated_streaming_config( + settings: &Settings, + streaming_caps: &VideoStreamingCapabilities, + client_ip: IpAddr, +) -> ConResult { + let stream_view_resolution = get_view_res( + settings.video.transcoding_view_resolution.clone(), + streaming_caps.default_view_resolution, + ); + + let fps = { + let mut best_match = 0_f32; + let mut min_diff = f32::MAX; + for rate in &streaming_caps.supported_refresh_rates { + let diff = (*rate - settings.video.preferred_fps).abs(); + if diff < min_diff { + best_match = *rate; + min_diff = diff; + } + } + best_match + }; + + if !streaming_caps + .supported_refresh_rates + .contains(&settings.video.preferred_fps) + { + warn!("Chosen refresh rate not supported. Using {fps}Hz"); + } + + let enable_foveated_encoding = + if let Switch::Enabled(config) = &settings.video.foveated_encoding { + let enable = streaming_caps.supports_foveated_encoding || config.force_enable; + + if !enable { + warn!("Foveated encoding is not supported by the client."); + } + + enable + } else { + false + }; + + let use_full_range = if settings + .video + .encoder_config + .server_overrides_use_full_range + { + settings.video.encoder_config.use_full_range + } else { + streaming_caps.prefer_full_range + }; + + let enable_hdr = if settings.video.encoder_config.server_overrides_enable_hdr { + settings.video.encoder_config.enable_hdr + } else { + streaming_caps.prefer_hdr + }; + + let encoding_gamma = if settings + .video + .encoder_config + .server_overrides_encoding_gamma + { + settings.video.encoder_config.encoding_gamma + } else { + streaming_caps.preferred_encoding_gamma + }; + + #[cfg_attr(target_os = "linux", expect(unused_variables))] + let game_audio_sample_rate = if let Switch::Enabled(game_audio_config) = + &settings.audio.game_audio + { + #[cfg(not(target_os = "linux"))] + { + let game_audio_device = + alvr_audio::AudioDevice::new_output(game_audio_config.device.as_ref()).to_con()?; + if let Switch::Enabled(microphone_config) = &settings.audio.microphone { + let (sink, source) = alvr_audio::AudioDevice::new_virtual_microphone_pair( + microphone_config.devices.clone(), + ) + .to_con()?; + if matches!( + microphone_config.devices, + alvr_session::MicrophoneDevicesConfig::VBCable + ) { + // VoiceMeeter and Custom devices may have arbitrary internal routing. + // Therefore, we cannot detect the loopback issue without knowing the routing. + if alvr_audio::is_same_device(&game_audio_device, &sink) + || alvr_audio::is_same_device(&game_audio_device, &source) + { + con_bail!("Game audio and microphone cannot point to the same device!"); + } + } + // else: + // Stream played via VA-CABLE-X will be directly routed to VA-CABLE-X's virtual microphone. + // Game audio will loop back to the game microphone if they are set to the same VA-CABLE-X device. + } + + game_audio_device.input_sample_rate().to_con()? + } + #[cfg(target_os = "linux")] + 44100 + } else { + 0 + }; + + let wired = client_ip.is_loopback(); + + Ok(NegotiatedStreamingConfig { + view_resolution: stream_view_resolution, + refresh_rate_hint: fps, + game_audio_sample_rate, + enable_foveated_encoding, + use_multimodal_protocol: streaming_caps.multimodal_protocol, + use_full_range, + encoding_gamma, + enable_hdr, + wired, + }) +} + +pub fn construct_openvr_config( + session: &SessionConfig, + streaming_caps: Option<&VideoStreamingCapabilities>, + stream_config: Option<&NegotiatedStreamingConfig>, +) -> OpenvrConfig { let old_config = session.openvr_config.clone(); let settings = session.to_settings(); @@ -154,7 +298,7 @@ pub fn contruct_openvr_config(session: &SessionConfig) -> OpenvrConfig { let nvenc_overrides = settings.video.encoder_config.nvenc; let amf_controls = settings.video.encoder_config.amf; - OpenvrConfig { + let mut config = OpenvrConfig { tracking_ref_only: settings.headset.tracking_ref_only, enable_vive_tracker_proxy: settings.headset.enable_vive_tracker_proxy, minimum_idr_interval_ms: settings.connection.minimum_idr_interval_ms, @@ -231,7 +375,73 @@ pub fn contruct_openvr_config(session: &SessionConfig) -> OpenvrConfig { _encoder_debug: settings.extra.logging.debug_groups.encoder, _decoder_debug: settings.extra.logging.debug_groups.decoder, ..old_config + }; + + if let (Some(streaming_caps), Some(stream_config)) = (streaming_caps, stream_config) { + let target_view_resolution = get_view_res( + settings.video.emulated_headset_view_resolution.clone(), + streaming_caps.default_view_resolution, + ); + + let encoder_profile = if settings.video.encoder_config.h264_profile == H264Profile::High { + let profile = if streaming_caps.encoder_high_profile { + H264Profile::High + } else { + H264Profile::Main + }; + + if profile != H264Profile::High { + warn!("High profile encoding is not supported by the client."); + } + + profile + } else { + settings.video.encoder_config.h264_profile + }; + + let mut enable_10_bits_encoding = + if settings.video.encoder_config.server_overrides_use_10bit { + settings.video.encoder_config.use_10bit + } else { + streaming_caps.prefer_10bit + }; + + if enable_10_bits_encoding && !streaming_caps.encoder_10_bits { + warn!("10 bits encoding is not supported by the client."); + enable_10_bits_encoding = false + } + + let codec = if settings.video.preferred_codec == CodecType::AV1 { + let codec = if streaming_caps.encoder_av1 { + CodecType::AV1 + } else { + CodecType::Hevc + }; + + if codec != CodecType::AV1 { + warn!("AV1 encoding is not supported by the client."); + } + + codec + } else { + settings.video.preferred_codec + }; + + config.eye_resolution_width = stream_config.view_resolution.x; + config.eye_resolution_height = stream_config.view_resolution.y; + config.target_eye_resolution_width = target_view_resolution.x; + config.target_eye_resolution_height = target_view_resolution.y; + config.refresh_rate = stream_config.refresh_rate_hint as _; + config.enable_foveated_encoding = enable_foveated_encoding; + config.h264_profile = encoder_profile as _; + config.use_10bit_encoder = enable_10_bits_encoding; + config.use_full_range_encoding = stream_config.use_full_range; + config.enable_hdr = stream_config.enable_hdr; + config.encoding_gamma = stream_config.encoding_gamma; + config.codec = codec as _; } + + config } // Alternate connection trials with manual IPs and clients discovered on the local network @@ -458,9 +668,9 @@ fn try_connect( ) -> ConResult { dbg_connection!("try_connect: Finding client and creating control socket"); - let (proto_socket, client_ip) = ProtoControlSocket::connect_to( + let (socket, client_ip, connection_result) = alvr_sockets::connect_to_client( + client_ips.keys().cloned().collect(), Duration::from_secs(1), - PeerType::AnyClient(client_ips.keys().cloned().collect()), )?; let Some(client_hostname) = client_ips.remove(&client_ip) else { @@ -475,7 +685,8 @@ fn try_connect( if let Err(e) = connection_pipeline( Arc::clone(&ctx), lifecycle_state, - proto_socket, + socket, + connection_result, client_hostname.clone(), client_ip, ) { @@ -503,7 +714,8 @@ fn try_connect( fn connection_pipeline( ctx: Arc, lifecycle_state: Arc>, - mut proto_socket: ProtoControlSocket, + socket: ProtoControlSocket, + connection_result: ClientConnectionResult, client_hostname: String, client_ip: IpAddr, ) -> ConResult { @@ -513,29 +725,7 @@ fn connection_pipeline( // to thos client, no other client can connect until handshake is finished. It will then be // temporarily relocked while shutting down the threads. let mut session_manager_lock = SESSION_MANAGER.write(); - - dbg_connection!("connection_pipeline: Setting client state in session"); - session_manager_lock.update_client_list( - client_hostname.clone(), - ClientListAction::SetConnectionState(ConnectionState::Connecting), - ); - session_manager_lock.update_client_list( - client_hostname.clone(), - ClientListAction::UpdateCurrentIp(Some(client_ip)), - ); - - let disconnect_notif = Arc::new(Condvar::new()); - - dbg_connection!("connection_pipeline: Getting client status packet"); - let connection_result = match proto_socket.recv(HANDSHAKE_ACTION_TIMEOUT) { - Ok(r) => r, - Err(ConnectionError::TryAgain(e)) => { - debug!("Failed to recive client connection packet. This is normal for USB connection.\n{e}"); - - return Ok(()); - } - Err(e) => return Err(e), - }; + let initial_settings = session_manager_lock.settings().clone(); let maybe_streaming_caps = if let ClientConnectionResult::ConnectionAccepted { client_protocol_id, @@ -565,6 +755,16 @@ fn connection_pipeline( return Ok(()); }; + dbg_connection!("connection_pipeline: Setting client state in session"); + session_manager_lock.update_client_list( + client_hostname.clone(), + ClientListAction::SetConnectionState(ConnectionState::Connecting), + ); + session_manager_lock.update_client_list( + client_hostname.clone(), + ClientListAction::UpdateCurrentIp(Some(client_ip)), + ); + let streaming_caps = if let Some(streaming_caps) = maybe_streaming_caps { alvr_packets::decode_video_streaming_capabilities(&streaming_caps).to_con()? } else { @@ -572,285 +772,89 @@ fn connection_pipeline( }; dbg_connection!("connection_pipeline: setting up negotiated streaming config"); + let negotiated_streaming_config = + construct_negotiated_streaming_config(&initial_settings, &streaming_caps, client_ip)?; - let initial_settings = session_manager_lock.settings().clone(); - - fn get_view_res(config: FrameSize, default_res: UVec2) -> UVec2 { - let res = match config { - FrameSize::Scale(scale) => default_res.as_vec2() * scale, - FrameSize::Absolute { width, height } => { - let width = width as f32; - Vec2::new( - width, - height.map(|h| h as f32).unwrap_or_else(|| { - let default_res = default_res.as_vec2(); - width * default_res.y / default_res.x - }), - ) - } - }; - - UVec2::new(align32(res.x), align32(res.y)) - } - - let stream_view_resolution = get_view_res( - initial_settings.video.transcoding_view_resolution.clone(), - streaming_caps.default_view_resolution, - ); + let stream_config_packet = alvr_packets::encode_stream_config( + session_manager_lock.session(), + &negotiated_streaming_config, + ) + .to_con()?; - let target_view_resolution = get_view_res( - initial_settings - .video - .emulated_headset_view_resolution - .clone(), - streaming_caps.default_view_resolution, + dbg_connection!("connection_pipeline: checking openvr config"); + let new_openvr_config = construct_openvr_config( + session_manager_lock.session(), + Some(&streaming_caps), + Some(&negotiated_streaming_config), ); - let fps = { - let mut best_match = 0_f32; - let mut min_diff = f32::MAX; - for rate in &streaming_caps.supported_refresh_rates { - let diff = (*rate - initial_settings.video.preferred_fps).abs(); - if diff < min_diff { - best_match = *rate; - min_diff = diff; - } - } - best_match - }; - - if !streaming_caps - .supported_refresh_rates - .contains(&initial_settings.video.preferred_fps) - { - warn!("Chosen refresh rate not supported. Using {fps}Hz"); - } - - let enable_foveated_encoding = - if let Switch::Enabled(config) = &initial_settings.video.foveated_encoding { - let enable = streaming_caps.supports_foveated_encoding || config.force_enable; - - if !enable { - warn!("Foveated encoding is not supported by the client."); - } - - enable - } else { - false - }; + if session_manager_lock.session().openvr_config != new_openvr_config { + dbg_connection!("connection_pipeline: sending restart signal"); - let encoder_profile = if initial_settings.video.encoder_config.h264_profile == H264Profile::High - { - let profile = if streaming_caps.encoder_high_profile { - H264Profile::High - } else { - H264Profile::Main - }; + session_manager_lock.session_mut().openvr_config = new_openvr_config; - if profile != H264Profile::High { - warn!("High profile encoding is not supported by the client."); - } + alvr_sockets::send_restart_signal(socket, stream_config_packet)?; - profile - } else { - initial_settings.video.encoder_config.h264_profile - }; + crate::notify_restart_driver(); - let mut enable_10_bits_encoding = if initial_settings - .video - .encoder_config - .server_overrides_use_10bit - { - initial_settings.video.encoder_config.use_10bit - } else { - streaming_caps.prefer_10bit - }; + *lifecycle_state.write() = LifecycleState::ShuttingDown; - if enable_10_bits_encoding && !streaming_caps.encoder_10_bits { - warn!("10 bits encoding is not supported by the client."); - enable_10_bits_encoding = false + return Ok(()); } - let use_full_range = if initial_settings - .video - .encoder_config - .server_overrides_use_full_range - { - initial_settings.video.encoder_config.use_full_range - } else { - streaming_caps.prefer_full_range - }; - - let enable_hdr = if initial_settings - .video - .encoder_config - .server_overrides_enable_hdr - { - initial_settings.video.encoder_config.enable_hdr - } else { - streaming_caps.prefer_hdr - }; - - let encoding_gamma = if initial_settings - .video - .encoder_config - .server_overrides_encoding_gamma - { - initial_settings.video.encoder_config.encoding_gamma - } else { - streaming_caps.preferred_encoding_gamma - }; - - let codec = if initial_settings.video.preferred_codec == CodecType::AV1 { - let codec = if streaming_caps.encoder_av1 { - CodecType::AV1 - } else { - CodecType::Hevc - }; - - if codec != CodecType::AV1 { - warn!("AV1 encoding is not supported by the client."); - } - - codec - } else { - initial_settings.video.preferred_codec - }; - - #[cfg_attr(target_os = "linux", allow(unused_variables))] - let game_audio_sample_rate = if let Switch::Enabled(game_audio_config) = - &initial_settings.audio.game_audio - { - #[cfg(not(target_os = "linux"))] - { - let game_audio_device = - alvr_audio::AudioDevice::new_output(game_audio_config.device.as_ref()).to_con()?; - if let Switch::Enabled(microphone_config) = &initial_settings.audio.microphone { - let (sink, source) = alvr_audio::AudioDevice::new_virtual_microphone_pair( - microphone_config.devices.clone(), - ) - .to_con()?; - if matches!( - microphone_config.devices, - alvr_session::MicrophoneDevicesConfig::VBCable - ) { - // VoiceMeeter and Custom devices may have arbitrary internal routing. - // Therefore, we cannot detect the loopback issue without knowing the routing. - if alvr_audio::is_same_device(&game_audio_device, &sink) - || alvr_audio::is_same_device(&game_audio_device, &source) - { - con_bail!("Game audio and microphone cannot point to the same device!"); - } - } - // else: - // Stream played via VA-CABLE-X will be directly routed to VA-CABLE-X's virtual microphone. - // Game audio will loop back to the game microphone if they are set to the same VA-CABLE-X device. - } - - game_audio_device.input_sample_rate().to_con()? - } - #[cfg(target_os = "linux")] - 44100 + let stream_protocol = if negotiated_streaming_config.wired { + SocketProtocol::Tcp } else { - 0 + initial_settings.connection.stream_protocol }; - let wired = client_ip.is_loopback(); - - dbg_connection!("connection_pipeline: send streaming config"); - let stream_config_packet = alvr_packets::encode_stream_config( - session_manager_lock.session(), - &NegotiatedStreamingConfig { - view_resolution: stream_view_resolution, - refresh_rate_hint: fps, - game_audio_sample_rate, - enable_foveated_encoding, - use_multimodal_protocol: streaming_caps.multimodal_protocol, - use_full_range, - encoding_gamma, - enable_hdr, - wired, + dbg_connection!("connection_pipeline: Finishing handshake"); + let mut socket = SocketConnection::from_client_connection( + socket, + HANDSHAKE_ACTION_TIMEOUT, + stream_config_packet, + StreamSocketConfig { + protocol: stream_protocol, + port: initial_settings.connection.stream_port, + send_buffer_bytes: initial_settings.connection.server_send_buffer_bytes, + recv_buffer_bytes: initial_settings.connection.server_recv_buffer_bytes, + max_packet_size: initial_settings.connection.packet_size as _, + dscp: initial_settings.connection.dscp, }, - ) - .to_con()?; - proto_socket.send(&stream_config_packet).to_con()?; - - let (mut control_sender, mut control_receiver) = - proto_socket.split(STREAMING_RECV_TIMEOUT).to_con()?; - - let mut new_openvr_config = contruct_openvr_config(session_manager_lock.session()); - new_openvr_config.eye_resolution_width = stream_view_resolution.x; - new_openvr_config.eye_resolution_height = stream_view_resolution.y; - new_openvr_config.target_eye_resolution_width = target_view_resolution.x; - new_openvr_config.target_eye_resolution_height = target_view_resolution.y; - new_openvr_config.refresh_rate = fps as _; - new_openvr_config.enable_foveated_encoding = enable_foveated_encoding; - new_openvr_config.h264_profile = encoder_profile as _; - new_openvr_config.use_10bit_encoder = enable_10_bits_encoding; - new_openvr_config.use_full_range_encoding = use_full_range; - new_openvr_config.enable_hdr = enable_hdr; - new_openvr_config.encoding_gamma = encoding_gamma; - new_openvr_config.codec = codec as _; - - if session_manager_lock.session().openvr_config != new_openvr_config { - session_manager_lock.session_mut().openvr_config = new_openvr_config; - - control_sender.send(&ServerControlPacket::Restarting).ok(); - - crate::notify_restart_driver(); - } + )?; - dbg_connection!("connection_pipeline: Send StartStream packet"); - control_sender - .send(&ServerControlPacket::StartStream) - .to_con()?; + dbg_connection!("connection_pipeline: Handshake successful, spawning threads"); - let signal = control_receiver.recv(HANDSHAKE_ACTION_TIMEOUT)?; - if !matches!(signal, ClientControlPacket::StreamReady) { - con_bail!("Got unexpected packet waiting for stream ack"); - } - dbg_connection!("connection_pipeline: Got StreamReady packet"); + let disconnect_notif = Arc::new(Condvar::new()); *ctx.statistics_manager.write() = Some(StatisticsManager::new( initial_settings.connection.statistics_history_size, - Duration::from_secs_f32(1.0 / fps), + Duration::from_secs_f32(1.0 / negotiated_streaming_config.refresh_rate_hint), if let Switch::Enabled(config) = &initial_settings.headset.controllers { config.steamvr_pipeline_frames } else { 0.0 }, )); + *ctx.bitrate_manager.lock() = BitrateManager::new( + initial_settings.video.bitrate.history_size, + negotiated_streaming_config.refresh_rate_hint, + ); + *ctx.tracking_manager.write() = + TrackingManager::new(initial_settings.connection.statistics_history_size); - *ctx.bitrate_manager.lock() = - BitrateManager::new(initial_settings.video.bitrate.history_size, fps); - - let stream_protocol = if wired { - SocketProtocol::Tcp - } else { - initial_settings.connection.stream_protocol - }; - - dbg_connection!("connection_pipeline: StreamSocket connect_to_client"); - let mut stream_socket = StreamSocketBuilder::connect_to_client( - HANDSHAKE_ACTION_TIMEOUT, - client_ip, - initial_settings.connection.stream_port, - stream_protocol, - initial_settings.connection.dscp, - initial_settings.connection.server_send_buffer_bytes, - initial_settings.connection.server_recv_buffer_bytes, - initial_settings.connection.packet_size as _, - )?; + let control_sender = Arc::new(Mutex::new(socket.request_reliable_stream()?)); + let mut video_sender = socket.request_unreliable_stream(VIDEO); + let game_audio_sender: alvr_sockets::StreamSender<()> = socket.request_unreliable_stream(AUDIO); + let haptics_sender = socket.request_unreliable_stream(HAPTICS); - let mut video_sender = stream_socket.request_stream(VIDEO); - let game_audio_sender: alvr_sockets::StreamSender<()> = stream_socket.request_stream(AUDIO); + let mut control_receiver = socket.subscribe_to_reliable_stream()?; let mut microphone_receiver: alvr_sockets::StreamReceiver<()> = - stream_socket.subscribe_to_stream(AUDIO, MAX_UNREAD_PACKETS); + socket.subscribe_to_unreliable_stream(AUDIO, MAX_UNREAD_PACKETS); let tracking_receiver = - stream_socket.subscribe_to_stream::(TRACKING, MAX_UNREAD_PACKETS); - let haptics_sender = stream_socket.request_stream(HAPTICS); + socket.subscribe_to_unreliable_stream::(TRACKING, MAX_UNREAD_PACKETS); let mut statics_receiver = - stream_socket.subscribe_to_stream::(STATISTICS, MAX_UNREAD_PACKETS); + socket.subscribe_to_unreliable_stream::(STATISTICS, MAX_UNREAD_PACKETS); let (video_channel_sender, video_channel_receiver) = std::sync::mpsc::sync_channel(initial_settings.connection.max_queued_server_video_frames); @@ -878,7 +882,7 @@ fn connection_pipeline( } }); - #[cfg_attr(target_os = "linux", allow(unused_variables))] + #[cfg_attr(target_os = "linux", expect(unused_variables))] let game_audio_thread = if let Switch::Enabled(config) = initial_settings.audio.game_audio.clone() { @@ -896,7 +900,7 @@ fn connection_pipeline( }), game_audio_sender.clone(), 2, - game_audio_sample_rate, + negotiated_streaming_config.game_audio_sample_rate, ) { error!("Audio record error: {e:?}"); } @@ -1013,13 +1017,8 @@ fn connection_pipeline( thread::spawn(|| ()) }; - *ctx.tracking_manager.write() = - TrackingManager::new(initial_settings.connection.statistics_history_size); - let hand_gesture_manager = Arc::new(Mutex::new(HandGestureManager::new())); - let tracking_receive_thread = thread::spawn({ let ctx = Arc::clone(&ctx); - let hand_gesture_manager = Arc::clone(&hand_gesture_manager); let initial_settings = initial_settings.clone(); let client_hostname = client_hostname.clone(); move || { @@ -1027,7 +1026,6 @@ fn connection_pipeline( &ctx, initial_settings, streaming_caps.multimodal_protocol, - hand_gesture_manager, tracking_receiver, || is_streaming(&client_hostname), ); @@ -1069,8 +1067,6 @@ fn connection_pipeline( } }); - let control_sender = Arc::new(Mutex::new(control_sender)); - let real_time_update_thread = thread::spawn({ let control_sender = Arc::clone(&control_sender); let client_hostname = client_hostname.clone(); @@ -1351,7 +1347,7 @@ fn connection_pipeline( let client_hostname = client_hostname.clone(); move || { while is_streaming(&client_hostname) { - match stream_socket.recv() { + match socket.recv_poll() { Ok(()) => (), Err(ConnectionError::TryAgain(_)) => continue, Err(e) => { @@ -1385,22 +1381,19 @@ fn connection_pipeline( } }); - { - if initial_settings.connection.enable_on_connect_script { - let on_connect_script = FILESYSTEM_LAYOUT.get().map(|l| l.connect_script()).unwrap(); - info!( - "Running on connect script (connect): {}", - on_connect_script.display() - ); - if let Err(e) = Command::new(&on_connect_script) - .env("ACTION", "connect") - .spawn() - { - warn!("Failed to run connect script: {e}"); - } + if initial_settings.connection.enable_on_connect_script { + let on_connect_script = FILESYSTEM_LAYOUT.get().map(|l| l.connect_script()).unwrap(); + info!( + "Running on connect script (connect): {}", + on_connect_script.display() + ); + if let Err(e) = Command::new(&on_connect_script) + .env("ACTION", "connect") + .spawn() + { + warn!("Failed to run connect script: {e}"); } } - if initial_settings.extra.capture.startup_video_recording { info!("Creating recording file"); crate::create_recording_file(&ctx, session_manager_lock.settings()); @@ -1415,7 +1408,7 @@ fn connection_pipeline( .send(ServerCoreEvent::ClientConnected) .ok(); - dbg_connection!("connection_pipeline: handshake finished; unlocking streams"); + dbg_connection!("connection_pipeline: Threads initialized; unlocking streams"); alvr_common::wait_rwlock(&disconnect_notif, &mut session_manager_lock); dbg_connection!("connection_pipeline: Begin connection shutdown"); diff --git a/alvr/server_core/src/lib.rs b/alvr/server_core/src/lib.rs index 16af3be03c..684b965be1 100644 --- a/alvr/server_core/src/lib.rs +++ b/alvr/server_core/src/lib.rs @@ -21,8 +21,7 @@ use alvr_common::{ once_cell::sync::Lazy, parking_lot::{Mutex, RwLock}, settings_schema::Switch, - warn, ConnectionState, DeviceMotion, Fov, LifecycleState, Pose, RelaxedAtomic, - DEVICE_ID_TO_PATH, + warn, ConnectionState, DeviceMotion, Fov, LifecycleState, Pose, DEVICE_ID_TO_PATH, }; use alvr_events::{EventType, HapticsEvent}; use alvr_filesystem as afs; @@ -175,7 +174,6 @@ pub fn registered_button_set() -> HashSet { pub struct ServerCoreContext { lifecycle_state: Arc>, - is_restarting: RelaxedAtomic, connection_context: Arc, connection_thread: Arc>>>, webserver_runtime: Option, @@ -236,9 +234,7 @@ impl ServerCoreContext { ( Self { lifecycle_state: Arc::new(RwLock::new(LifecycleState::StartingUp)), - is_restarting: RelaxedAtomic::new(false), connection_context, - connection_thread: Arc::new(RwLock::new(None)), webserver_runtime: Some(webserver_runtime), }, @@ -508,14 +504,6 @@ impl ServerCoreContext { .as_mut() .map(|stats| stats.duration_until_next_vsync()) } - - pub fn restart(self) { - dbg_server_core!("restart"); - - self.is_restarting.set(true); - - // drop is called here for self - } } impl Drop for ServerCoreContext { @@ -559,7 +547,7 @@ impl Drop for ServerCoreContext { { let mut session_manager_lock = SESSION_MANAGER.write(); session_manager_lock.session_mut().openvr_config = - connection::contruct_openvr_config(session_manager_lock.session()); + connection::construct_openvr_config(session_manager_lock.session(), None, None); } dbg_server_core!("Restore drivers registration backup"); diff --git a/alvr/server_core/src/tracking/mod.rs b/alvr/server_core/src/tracking/mod.rs index a70cfc5774..50dee82bb7 100644 --- a/alvr/server_core/src/tracking/mod.rs +++ b/alvr/server_core/src/tracking/mod.rs @@ -14,7 +14,6 @@ use crate::{ }; use alvr_common::{ glam::{EulerRot, Quat, Vec3}, - parking_lot::Mutex, ConnectionError, DeviceMotion, Pose, BODY_CHEST_ID, BODY_HIPS_ID, BODY_LEFT_ELBOW_ID, BODY_LEFT_FOOT_ID, BODY_LEFT_KNEE_ID, BODY_RIGHT_ELBOW_ID, BODY_RIGHT_FOOT_ID, BODY_RIGHT_KNEE_ID, DEVICE_ID_TO_PATH, HAND_LEFT_ID, HAND_RIGHT_ID, HEAD_ID, @@ -30,7 +29,6 @@ use std::{ cmp::Ordering, collections::{HashMap, VecDeque}, f32::consts::PI, - sync::Arc, time::Duration, }; @@ -314,10 +312,11 @@ pub fn tracking_loop( ctx: &ConnectionContext, initial_settings: Settings, multimodal_protocol: bool, - hand_gesture_manager: Arc>, mut tracking_receiver: StreamReceiver, is_streaming: impl Fn() -> bool, ) { + let mut hand_gesture_manager = HandGestureManager::new(); + let mut gestures_button_mapping_manager = initial_settings .headset @@ -457,8 +456,6 @@ pub fn tracking_loop( .and_then(|c| c.hand_tracking_interaction.as_option()), &mut gestures_button_mapping_manager, ) { - let mut hand_gesture_manager_lock = hand_gesture_manager.lock(); - if !device_motion_keys.contains(&*HAND_LEFT_ID) { if let Some(hand_skeleton) = tracking.hand_skeletons[0] { ctx.events_sender @@ -466,7 +463,7 @@ pub fn tracking_loop( hand_gestures::trigger_hand_gesture_actions( gestures_button_mapping_manager, *HAND_LEFT_ID, - &hand_gesture_manager_lock.get_active_gestures( + &hand_gesture_manager.get_active_gestures( hand_skeleton, gestures_config, *HAND_LEFT_ID, @@ -484,7 +481,7 @@ pub fn tracking_loop( hand_gestures::trigger_hand_gesture_actions( gestures_button_mapping_manager, *HAND_RIGHT_ID, - &hand_gesture_manager_lock.get_active_gestures( + &hand_gesture_manager.get_active_gestures( hand_skeleton, gestures_config, *HAND_RIGHT_ID, diff --git a/alvr/server_openvr/src/lib.rs b/alvr/server_openvr/src/lib.rs index 4089d4ab63..a8678ad6f5 100644 --- a/alvr/server_openvr/src/lib.rs +++ b/alvr/server_openvr/src/lib.rs @@ -271,16 +271,11 @@ extern "C" fn driver_ready_idle(set_default_chap: bool) { } } } - ServerCoreEvent::ShutdownPending => { + ServerCoreEvent::ShutdownPending | ServerCoreEvent::RestartPending => { + // Dropping the context SERVER_CORE_CONTEXT.write().take(); - unsafe { ShutdownSteamvr() }; - } - ServerCoreEvent::RestartPending => { - if let Some(context) = SERVER_CORE_CONTEXT.write().take() { - context.restart(); - } - + // todo: send different HUD message for shutdown or restart unsafe { ShutdownSteamvr() }; } } diff --git a/alvr/sockets/Cargo.toml b/alvr/sockets/Cargo.toml index c19ce8045b..0ae1ca9364 100644 --- a/alvr/sockets/Cargo.toml +++ b/alvr/sockets/Cargo.toml @@ -11,6 +11,7 @@ trace-performance = ["profiling/profile-with-tracy"] [dependencies] alvr_common.workspace = true +alvr_packets.workspace = true alvr_session.workspace = true bincode = "1" diff --git a/alvr/sockets/src/control_socket.rs b/alvr/sockets/src/control_socket.rs index d5f524045e..d71d009961 100644 --- a/alvr/sockets/src/control_socket.rs +++ b/alvr/sockets/src/control_socket.rs @@ -14,7 +14,7 @@ use std::{ // This corresponds to the length of the payload const FRAMED_PREFIX_LENGTH: usize = mem::size_of::(); -struct RecvState { +pub(crate) struct RecvState { packet_length: usize, // contains length prefix packet_cursor: usize, // counts also the length prefix bytes } @@ -96,9 +96,9 @@ fn framed_recv( } pub struct ControlSocketSender { - inner: TcpStream, - buffer: Vec, - _phantom: PhantomData, + pub(crate) inner: TcpStream, + pub(crate) buffer: Vec, + pub(crate) _phantom: PhantomData, } impl ControlSocketSender { @@ -108,10 +108,10 @@ impl ControlSocketSender { } pub struct ControlSocketReceiver { - inner: TcpStream, - buffer: Vec, - recv_state: Option, - _phantom: PhantomData, + pub(crate) inner: TcpStream, + pub(crate) buffer: Vec, + pub(crate) recv_state: Option, + pub(crate) _phantom: PhantomData, } impl ControlSocketReceiver { @@ -140,7 +140,7 @@ pub fn get_server_listener(timeout: Duration) -> Result { // Proto-control-socket that can send and receive any packet. After the split, only the packets of // the specified types can be exchanged pub struct ProtoControlSocket { - inner: TcpStream, + pub(crate) inner: TcpStream, } pub enum PeerType<'a> { diff --git a/alvr/sockets/src/lib.rs b/alvr/sockets/src/lib.rs index ed9ffb2b02..35e29caf64 100644 --- a/alvr/sockets/src/lib.rs +++ b/alvr/sockets/src/lib.rs @@ -2,11 +2,14 @@ mod backend; mod control_socket; mod stream_socket; -use alvr_common::{anyhow::Result, info}; -use alvr_session::{DscpTos, SocketBufferSize}; +use alvr_common::{anyhow::Result, con_bail, info, AnyhowToCon, ConResult, ToCon}; +use alvr_packets::{ClientControlPacket, ServerControlPacket}; +use alvr_session::{DscpTos, SocketBufferSize, SocketProtocol}; +use serde::{de::DeserializeOwned, Serialize}; use socket2::Socket; use std::{ - net::{IpAddr, Ipv4Addr}, + marker::PhantomData, + net::{IpAddr, Ipv4Addr, TcpListener}, time::Duration, }; @@ -93,3 +96,179 @@ fn set_dscp(socket: &Socket, dscp: Option) { socket.set_tos((tos << 2) as u32).ok(); } } + +// connect_to_client should be used on the server side. +// At the moment, the TcpListener is implemened on the client side so the API for this function +// is non-standard +// todo: convert to class when storing a TcpListener +pub fn connect_to_client( + client_ips: Vec, + timeout: Duration, +) -> ConResult<(ProtoControlSocket, IpAddr, T)> { + let (mut control_socket, client_ip) = + ProtoControlSocket::connect_to(timeout, PeerType::AnyClient(client_ips))?; + + let res = control_socket.recv(timeout)?; + + Ok((control_socket, client_ip, res)) +} + +pub fn listen_to_server( + listener_socket: &TcpListener, + timeout: Duration, + client_info: &impl Serialize, +) -> ConResult<(ProtoControlSocket, T)> { + let (mut control_socket, _) = + ProtoControlSocket::connect_to(timeout, PeerType::Server(listener_socket))?; + + control_socket.send(client_info).to_con()?; + + let config_packet = control_socket.recv(timeout)?; + + Ok((control_socket, config_packet)) +} + +pub fn send_restart_signal( + mut control_socket: ProtoControlSocket, + stream_config_packet: impl Serialize, +) -> ConResult<()> { + // We must send the config packet before, which will be unused + control_socket.send(&stream_config_packet).to_con()?; + + control_socket + .send(&ServerControlPacket::Restarting) + .to_con() +} + +pub struct StreamSocketConfig { + pub protocol: SocketProtocol, + pub port: u16, + pub send_buffer_bytes: SocketBufferSize, + pub recv_buffer_bytes: SocketBufferSize, + pub max_packet_size: usize, + pub dscp: Option, +} + +pub enum ServerConnectionResult { + Connected(SocketConnection), + Restarting, +} + +pub struct SocketConnection { + control_socket: ProtoControlSocket, + stream_socket: StreamSocket, +} + +impl SocketConnection { + // Note: the timeout resets after each internal operation + pub fn from_client_connection( + mut control_socket: ProtoControlSocket, + timeout: Duration, + stream_config_packet: impl Serialize, + socket_config: StreamSocketConfig, + ) -> ConResult { + let client_ip = control_socket.inner.peer_addr().to_con()?.ip(); + + control_socket.send(&stream_config_packet).to_con()?; + + control_socket + .send(&ServerControlPacket::StartStream) + .to_con()?; + + let signal = control_socket.recv(timeout)?; + if !matches!(signal, ClientControlPacket::StreamReady) { + con_bail!("Got unexpected packet waiting for stream ack"); + } + + let stream_socket = StreamSocketBuilder::connect_to_client( + timeout, + client_ip, + socket_config.port, + socket_config.protocol, + socket_config.dscp, + socket_config.send_buffer_bytes, + socket_config.recv_buffer_bytes, + socket_config.max_packet_size, + )?; + + Ok(Self { + control_socket, + stream_socket, + }) + } + + // Note: the timeout resets after each internal operation + pub fn from_server_connection( + mut control_socket: ProtoControlSocket, + timeout: Duration, + socket_config: StreamSocketConfig, + ) -> ConResult { + let server_ip = control_socket.inner.peer_addr().to_con()?.ip(); + + match control_socket.recv(timeout)? { + ServerControlPacket::StartStream => (), + ServerControlPacket::Restarting => return Ok(ServerConnectionResult::Restarting), + _ => con_bail!("Got unexpected packet waiting for stream start"), + } + + let stream_socket_builder = StreamSocketBuilder::listen_for_server( + timeout, + socket_config.port, + socket_config.protocol, + socket_config.dscp, + socket_config.send_buffer_bytes, + socket_config.recv_buffer_bytes, + ) + .to_con()?; + + control_socket + .send(&ClientControlPacket::StreamReady) + .to_con()?; + + let stream_socket = stream_socket_builder.accept_from_server( + server_ip, + socket_config.port, + socket_config.max_packet_size, + timeout, + )?; + + Ok(ServerConnectionResult::Connected(Self { + control_socket, + stream_socket, + })) + } + + pub fn request_reliable_stream(&self) -> ConResult> { + Ok(ControlSocketSender { + inner: self.control_socket.inner.try_clone().to_con()?, + buffer: vec![], + _phantom: PhantomData, + }) + } + + pub fn subscribe_to_reliable_stream(&self) -> ConResult> { + Ok(ControlSocketReceiver { + inner: self.control_socket.inner.try_clone().to_con()?, + buffer: vec![], + recv_state: None, + _phantom: PhantomData, + }) + } + + pub fn request_unreliable_stream(&self, stream_id: u16) -> StreamSender { + self.stream_socket.request_stream(stream_id) + } + + pub fn subscribe_to_unreliable_stream( + &mut self, + stream_id: u16, + max_concurrent_buffers: usize, + ) -> StreamReceiver { + self.stream_socket + .subscribe_to_stream(stream_id, max_concurrent_buffers) + } + + pub fn recv_poll(&mut self) -> ConResult<()> { + self.stream_socket.recv() + } +}