From ae2171d0a5e9676b32866b7611df5e276b2961e8 Mon Sep 17 00:00:00 2001 From: ras0q Date: Sat, 25 Jan 2025 10:12:04 +0900 Subject: [PATCH 1/4] feat: impl SpeakerService --- server/src/event.rs | 2 +- server/src/speaker_phone.rs | 26 +++- server/src/speaker_phone/error.rs | 27 ++++ server/src/speaker_phone/grpc.rs | 143 +++++++++++++++++++ server/src/speaker_phone/impl.rs | 230 ++++++++++++++++++++++++++++++ 5 files changed, 424 insertions(+), 4 deletions(-) create mode 100644 server/src/speaker_phone/error.rs create mode 100644 server/src/speaker_phone/grpc.rs create mode 100644 server/src/speaker_phone/impl.rs diff --git a/server/src/event.rs b/server/src/event.rs index 4a5c3f1a..c6df0f80 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -13,7 +13,7 @@ pub use error::Error; #[serde(tag = "event", rename_all = "snake_case")] pub enum Event { Explorer(crate::explore::ExplorerAction), - SpkeakerPhone(crate::speaker_phone::SpeakerPhone), + SpeakerPhone(crate::speaker_phone::SpeakerPhone), Message(Message), Reaction(crate::reaction::Reaction), } diff --git a/server/src/speaker_phone.rs b/server/src/speaker_phone.rs index 8486490a..2e76bd79 100644 --- a/server/src/speaker_phone.rs +++ b/server/src/speaker_phone.rs @@ -1,10 +1,18 @@ //! `speaker_phone.proto` +pub mod error; +pub mod grpc; +mod r#impl; + +use std::sync::Arc; + use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; use crate::prelude::{IntoStatus, Timestamp}; +pub use error::Error; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(transparent)] pub struct SpeakerPhoneId(pub uuid::Uuid); @@ -94,7 +102,7 @@ pub trait SpeakerPhoneService: Send + Sync + 'static { } #[allow(clippy::type_complexity)] -pub trait ProvideSpeakerPhone: Send + Sync + 'static { +pub trait ProvideSpeakerPhoneService: Send + Sync + 'static { type Context; type SpeakerPhoneService: SpeakerPhoneService; @@ -180,8 +188,20 @@ pub trait ProvideSpeakerPhone: Send + Sync + 'static { let ctx = self.context(); self.speaker_phone_service().search_channels(ctx, params) } +} - // TODO: build_server(this: Arc) -> SpeakerPhoneServiceServer<...> +pub fn build_server(this: Arc) -> SpeakerPhoneServiceServer +where + State: ProvideSpeakerPhoneService + crate::session::ProvideSessionService, +{ + let service = grpc::ServiceImpl::new(this); + SpeakerPhoneServiceServer::new(service) } -pub use schema::speaker_phone::speaker_phone_service_server::SERVICE_NAME; +#[derive(Debug, Clone, Copy, Default)] +pub struct SpeakerPhoneServiceImpl; + +pub type SpeakerPhoneServiceServer = + schema::speaker_phone::speaker_phone_service_server::SpeakerPhoneServiceServer< + grpc::ServiceImpl, + >; diff --git a/server/src/speaker_phone/error.rs b/server/src/speaker_phone/error.rs new file mode 100644 index 00000000..613693ed --- /dev/null +++ b/server/src/speaker_phone/error.rs @@ -0,0 +1,27 @@ +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Not found")] + NotFound, + #[error("Database error")] + Sqlx(#[from] sqlx::Error), + #[error(transparent)] + Status(#[from] tonic::Status), +} + +impl From for tonic::Status { + fn from(value: Error) -> Self { + match value { + Error::NotFound => tonic::Status::not_found("Not found"), + Error::Sqlx(e) => { + tracing::error!(error = &e as &dyn std::error::Error); + tonic::Status::internal("Database error") + } + Error::Status(e) => { + tracing::error!(error = &e as &dyn std::error::Error); + tonic::Status::internal("Status error") + } + } + } +} + +pub type Result = std::result::Result; diff --git a/server/src/speaker_phone/grpc.rs b/server/src/speaker_phone/grpc.rs new file mode 100644 index 00000000..3f7cf884 --- /dev/null +++ b/server/src/speaker_phone/grpc.rs @@ -0,0 +1,143 @@ +use std::sync::Arc; + +use schema::speaker_phone as schema; + +use crate::prelude::IntoStatus; + +// MARK: type conversions + +impl From for schema::SpeakerPhone { + fn from(value: super::SpeakerPhone) -> Self { + let super::SpeakerPhone { + id, + position, + receive_range, + name, + created_at, + updated_at, + } = value; + Self { + id: id.0.to_string(), + position: Some(position.into()), + receive_range, + name: name.0, + created_at: Some(created_at.into()), + updated_at: Some(updated_at.into()), + } + } +} + +// MARK: ServiceImpl + +pub struct ServiceImpl { + state: Arc, +} + +impl Clone for ServiceImpl +where + State: super::ProvideSpeakerPhoneService, +{ + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + } + } +} + +impl ServiceImpl +where + State: super::ProvideSpeakerPhoneService + crate::session::ProvideSessionService, +{ + pub(super) fn new(state: Arc) -> Self { + Self { state } + } +} + +#[async_trait::async_trait] +impl schema::speaker_phone_service_server::SpeakerPhoneService for ServiceImpl +where + State: super::ProvideSpeakerPhoneService + crate::session::ProvideSessionService, +{ + async fn get_speaker_phone( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let (_, _, schema::GetSpeakerPhoneRequest { id }) = request.into_parts(); + let params = super::GetSpeakerPhoneParams { + id: super::SpeakerPhoneId( + uuid::Uuid::parse_str(&id) + .map_err(|_| tonic::Status::invalid_argument("Invalid UUID"))?, + ), + }; + let sperker_phone = self + .state + .get_speaker_phone(params) + .await + .map_err(IntoStatus::into_status)? + .into(); + let res = schema::GetSpeakerPhoneResponse { + speaker_phone: Some(sperker_phone), + }; + Ok(tonic::Response::new(res)) + } + + async fn create_speaker_phone( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let (_, _, schema::CreateSpeakerPhoneRequest { position, name }) = request.into_parts(); + let Some(position) = position else { + return Err(tonic::Status::invalid_argument("Position is required")); + }; + + let params = super::CreateSpeakerPhoneParams { + name, + position: position.into(), + }; + let speaker_phone = self + .state + .create_speaker_phone(params) + .await + .map_err(IntoStatus::into_status)? + .into(); + let res = schema::CreateSpeakerPhoneResponse { + speaker_phone: Some(speaker_phone), + }; + Ok(tonic::Response::new(res)) + } + + async fn get_available_channels( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + let params = super::GetAvailableChannelsParams {}; + let channels = self + .state + .get_available_channels(params) + .await + .map_err(IntoStatus::into_status)? + .into_iter() + .map(|channel| channel.0) + .collect(); + let res = schema::GetAvailableChannelsResponse { channels }; + Ok(tonic::Response::new(res)) + } + + async fn search_channels( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let (_, _, schema::SearchChannelsRequest { name }) = request.into_parts(); + let params = super::SearchChannelsParams { name }; + let hits = self + .state + .search_channels(params) + .await + .map_err(IntoStatus::into_status)? + .into_iter() + .map(|channel| channel.0) + .collect(); + let res = schema::SearchChannelsResponse { hits }; + Ok(tonic::Response::new(res)) + } +} diff --git a/server/src/speaker_phone/impl.rs b/server/src/speaker_phone/impl.rs new file mode 100644 index 00000000..944fcbe9 --- /dev/null +++ b/server/src/speaker_phone/impl.rs @@ -0,0 +1,230 @@ +use futures::FutureExt; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, MySqlPool}; + +use crate::prelude::IntoStatus; + +const RECEIVE_RANGE: u32 = 100; + +impl super::SpeakerPhoneService for super::SpeakerPhoneServiceImpl +where + Context: AsRef + + AsRef + + crate::event::ProvideEventService + + crate::traq::channel::ProvideTraqChannelService, +{ + type Error = super::Error; + + fn get_speaker_phone<'a>( + &'a self, + ctx: &'a Context, + params: super::GetSpeakerPhoneParams, + ) -> futures::future::BoxFuture<'a, Result> { + get_speaker_phone(ctx.as_ref(), params).boxed() + } + + fn get_speaker_phones_in_area<'a>( + &'a self, + ctx: &'a Context, + params: super::GetSpeakerPhonesInAreaParams, + ) -> futures::future::BoxFuture<'a, Result, Self::Error>> { + get_speaker_phones_in_area(ctx.as_ref(), params).boxed() + } + + fn create_speaker_phone<'a>( + &'a self, + ctx: &'a Context, + params: super::CreateSpeakerPhoneParams, + ) -> futures::future::BoxFuture<'a, Result> { + create_speaker_phone(ctx, ctx.as_ref(), params).boxed() + } + + fn load_all_speaker_phones<'a>( + &'a self, + ctx: &'a Context, + params: super::LoadAllSpeakerPhonesParams, + ) -> futures::future::BoxFuture<'a, Result<(), Self::Error>> { + load_all_speaker_phones(ctx.as_ref(), ctx.as_ref(), params).boxed() + } + + fn get_available_channels<'a>( + &'a self, + ctx: &'a Context, + params: super::GetAvailableChannelsParams, + ) -> futures::future::BoxFuture<'a, Result, Self::Error>> { + get_available_channels(ctx, params).boxed() + } + + fn search_channels<'a>( + &'a self, + ctx: &'a Context, + params: super::SearchChannelsParams, + ) -> futures::future::BoxFuture<'a, Result, Self::Error>> { + search_channels(ctx, params).boxed() + } +} + +// MARK: DB operations + +#[derive(Debug, Clone, Hash, Deserialize, Serialize, FromRow)] +struct SpeakerPhoneRow { + pub id: uuid::Uuid, + pub position_x: u32, + pub position_y: u32, + pub receive_range: u32, + pub name: String, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +impl From for super::SpeakerPhone { + fn from(value: SpeakerPhoneRow) -> Self { + Self { + id: super::SpeakerPhoneId(value.id), + position: crate::world::Coordinate { + x: value.position_x, + y: value.position_y, + }, + receive_range: value.receive_range, + name: super::Channel(value.name), + created_at: super::Timestamp(value.created_at), + updated_at: super::Timestamp(value.updated_at), + } + } +} + +async fn get_speaker_phone( + pool: &MySqlPool, + params: super::GetSpeakerPhoneParams, +) -> Result { + let super::GetSpeakerPhoneParams { + id: super::SpeakerPhoneId(id), + } = params; + let speaker_phone: Option = + sqlx::query_as(r#"SELECT * FROM `speaker_phones` WHERE `id` = ?"#) + .bind(id) + .fetch_optional(pool) + .await?; + speaker_phone + .map(|row| row.into()) + .ok_or(super::Error::NotFound) +} + +async fn get_speaker_phones_in_area( + pool: &MySqlPool, + params: super::GetSpeakerPhonesInAreaParams, +) -> Result, super::Error> { + let super::GetSpeakerPhonesInAreaParams { center, size } = params; + // TODO: SpeakerPhoneの中央はAreaにないが範囲が被っているやつも含めたいね + let speaker_phones: Vec = sqlx::query_as( + r#" + SELECT * FROM `speaker_phones` + WHERE + `position_x` BETWEEN ? AND ? + AND `position_y` BETWEEN ? AND ? + "#, + ) + .bind(center.x - size.width / 2) + .bind(center.x + size.width / 2) + .bind(center.y - size.height / 2) + .bind(center.y + size.height / 2) + .fetch_all(pool) + .await?; + Ok(speaker_phones.into_iter().map(Into::into).collect()) +} + +async fn create_speaker_phone( + event_service: &impl crate::event::ProvideEventService, + pool: &MySqlPool, + params: super::CreateSpeakerPhoneParams, +) -> Result { + let super::CreateSpeakerPhoneParams { position, name } = params; + let speaker_phone = SpeakerPhoneRow { + id: uuid::Uuid::now_v7(), + position_x: position.x, + position_y: position.y, + receive_range: RECEIVE_RANGE, + name, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + sqlx::query( + r#" + INSERT INTO `speaker_phones` (`id`, `position_x`, `position_y`, `receive_range`, `name`, `created_at`, `updated_at`) + VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(speaker_phone.id) + .bind(speaker_phone.position_x) + .bind(speaker_phone.position_y) + .bind(speaker_phone.receive_range) + .bind(speaker_phone.name) + .bind(speaker_phone.created_at) + .bind(speaker_phone.updated_at) + .execute(pool) + .await?; + tracing::info!(id = %speaker_phone.id, "Created a speaker phone"); + let speaker_phone = get_speaker_phone( + pool, + super::GetSpeakerPhoneParams { + id: super::SpeakerPhoneId(speaker_phone.id), + }, + ) + .await?; + + event_service + .publish_event(crate::event::Event::SpeakerPhone(speaker_phone.clone())) + .await + .map_err(crate::prelude::IntoStatus::into_status)?; + + Ok(speaker_phone) +} + +async fn load_all_speaker_phones( + pool: &MySqlPool, + task: &crate::task::TaskManager, + _params: super::LoadAllSpeakerPhonesParams, +) -> Result<(), super::Error> { + let _speaker_phones: Vec = sqlx::query_as(r#"SELECT * FROM `speaker_phones`"#) + .fetch_all(pool) + .await?; + task.spawn(|_cancellation_token| async { + todo!("messagesをsubscribeしてtraQに投げる"); + }) + .await; + Ok(()) +} + +async fn get_available_channels( + traq_channel_service: &impl crate::traq::channel::ProvideTraqChannelService, + _params: super::GetAvailableChannelsParams, +) -> Result, super::Error> { + let params = crate::traq::channel::GetAllChannelsParams {}; + let channels = traq_channel_service + .get_all_channels(params) + .await + .map_err(IntoStatus::into_status)? + .into_iter() + .map(|channel| super::Channel(channel.path)) + .collect(); + Ok(channels) +} + +async fn search_channels( + traq_channel_service: &impl crate::traq::channel::ProvideTraqChannelService, + params: super::SearchChannelsParams, +) -> Result, super::Error> { + let super::SearchChannelsParams { name } = params; + let params = crate::traq::channel::GetAllChannelsParams {}; + let channels = traq_channel_service + .get_all_channels(params) + .await + .map_err(IntoStatus::into_status)? + .into_iter() + .map(|channel| super::Channel(channel.path)); + let hits = channels + .into_iter() + .filter(|channel| channel.0.contains(&name)) + .collect(); + Ok(hits) +} From ab19838ff00efe97fe2f67e1078f1c335a1016e2 Mon Sep 17 00:00:00 2001 From: ras0q Date: Sat, 25 Jan 2025 12:28:22 +0900 Subject: [PATCH 2/4] use saturating_sub/add --- server/src/speaker_phone/impl.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/speaker_phone/impl.rs b/server/src/speaker_phone/impl.rs index 944fcbe9..a99a9a57 100644 --- a/server/src/speaker_phone/impl.rs +++ b/server/src/speaker_phone/impl.rs @@ -124,10 +124,10 @@ async fn get_speaker_phones_in_area( AND `position_y` BETWEEN ? AND ? "#, ) - .bind(center.x - size.width / 2) - .bind(center.x + size.width / 2) - .bind(center.y - size.height / 2) - .bind(center.y + size.height / 2) + .bind(center.x.saturating_sub(size.width / 2)) + .bind(center.x.saturating_add(size.width / 2)) + .bind(center.y.saturating_sub(size.height / 2)) + .bind(center.y.saturating_add(size.height / 2)) .fetch_all(pool) .await?; Ok(speaker_phones.into_iter().map(Into::into).collect()) From c99141f032443ba1606c690f272b97336237ecfa Mon Sep 17 00:00:00 2001 From: ras0q Date: Sat, 25 Jan 2025 12:44:56 +0900 Subject: [PATCH 3/4] write schema --- server/migrations/1_init.sql | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/migrations/1_init.sql b/server/migrations/1_init.sql index e17ee91d..1b609283 100644 --- a/server/migrations/1_init.sql +++ b/server/migrations/1_init.sql @@ -31,3 +31,14 @@ CREATE TABLE IF NOT EXISTS `reactions` ( PRIMARY KEY (`id`), FOREIGN KEY (`user_id`) REFERENCES `users` (`id`) ); + +CREATE TABLE IF NOT EXISTS `speaker_phones` ( + `id` BINARY(16) NOT NULL, + `position_x` INT UNSIGNED NOT NULL, + `position_y` INT UNSIGNED NOT NULL, + `receive_range` INT UNSIGNED NOT NULL, + `name` VARCHAR(255) NOT NULL, + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); From dea023b3d15f78ca56fb0f38d23dab0a04012269 Mon Sep 17 00:00:00 2001 From: ras0q Date: Sat, 25 Jan 2025 12:46:23 +0900 Subject: [PATCH 4/4] =?UTF-8?q?pub=20use=20=E6=88=BB=E3=81=97=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/speaker_phone.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/speaker_phone.rs b/server/src/speaker_phone.rs index 2e76bd79..152dae1d 100644 --- a/server/src/speaker_phone.rs +++ b/server/src/speaker_phone.rs @@ -205,3 +205,5 @@ pub type SpeakerPhoneServiceServer = schema::speaker_phone::speaker_phone_service_server::SpeakerPhoneServiceServer< grpc::ServiceImpl, >; + +pub use schema::speaker_phone::speaker_phone_service_server::SERVICE_NAME;