diff --git a/server/src/main.rs b/server/src/main.rs index 1925a5d5..625ef6a7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -170,7 +170,7 @@ impl State { async fn graceful_shutdown(&self) -> anyhow::Result<()> { let duration = std::time::Duration::from_secs(5); let fut = self.task_manager.graceful_shutdown(); - tokio::time::timeout(duration, fut).await??; + tokio::time::timeout(duration, fut).await?; Ok(()) } } diff --git a/server/src/speaker_phone.rs b/server/src/speaker_phone.rs index 72160b99..5e03d7f7 100644 --- a/server/src/speaker_phone.rs +++ b/server/src/speaker_phone.rs @@ -43,6 +43,9 @@ pub struct CreateSpeakerPhoneParams { pub position: crate::world::Coordinate, } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] +pub struct LoadAllSpeakerPhonesParams {} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] pub struct GetAvailableChannelsParams {} @@ -64,11 +67,20 @@ pub trait SpeakerPhoneService: Send + Sync + 'static { ctx: &'a Context, params: GetSpeakerPhonesInAreaParams, ) -> BoxFuture<'a, Result, Self::Error>>; + /// DBに入れる + spawn subscribing event + /// spawnはTaskManager, subscribeはEventService参照 fn create_speaker_phone<'a>( &'a self, ctx: &'a Context, params: CreateSpeakerPhoneParams, ) -> BoxFuture<'a, Result>; + /// アプリ起動時の処理 + /// 既存のspeaker_phone全てでspawn subscribing eventする + fn load_all_speaker_phones<'a>( + &'a self, + ctx: &'a Context, + params: LoadAllSpeakerPhonesParams, + ) -> BoxFuture<'a, Result<(), Self::Error>>; fn get_available_channels<'a>( &'a self, ctx: &'a Context, @@ -130,6 +142,17 @@ pub trait ProvideSpeakerPhone: Send + Sync + 'static { self.speaker_phone_service() .create_speaker_phone(ctx, params) } + fn load_all_speaker_phones( + &self, + params: LoadAllSpeakerPhonesParams, + ) -> BoxFuture< + '_, + Result<(), >::Error>, + > { + let ctx = self.context(); + self.speaker_phone_service() + .load_all_speaker_phones(ctx, params) + } fn get_available_channels( &self, params: GetAvailableChannelsParams, diff --git a/server/src/task.rs b/server/src/task.rs index fdf30442..49afe251 100644 --- a/server/src/task.rs +++ b/server/src/task.rs @@ -1,10 +1,6 @@ use std::{future::Future, sync::Arc}; -use tokio::{ - sync::Mutex, - task::{JoinError, JoinSet}, -}; -use tokio_util::sync::CancellationToken; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; #[derive(Debug, Clone)] pub struct TaskManager(Arc); @@ -12,7 +8,7 @@ pub struct TaskManager(Arc); #[derive(Debug)] struct Inner { cancel: CancellationToken, - join_set: Mutex>, + task_tracker: TaskTracker, } impl Default for TaskManager { @@ -25,7 +21,7 @@ impl TaskManager { pub fn new() -> Self { let inner = Inner { cancel: CancellationToken::new(), - join_set: Mutex::new(JoinSet::new()), + task_tracker: TaskTracker::new(), }; Self(Arc::new(inner)) } @@ -37,24 +33,15 @@ impl TaskManager { { let child = self.0.cancel.child_token(); let fut = func(child.clone()); - self.0.join_set.lock().await.spawn(fut); + self.0.task_tracker.spawn(fut); child } #[tracing::instrument(skip_all)] - pub async fn graceful_shutdown(&self) -> Result<(), JoinError> { + pub async fn graceful_shutdown(&self) { self.0.cancel.cancel(); - let mut join_set = self.0.join_set.lock().await; - while let Some(res) = join_set.join_next().await { - match res { - Ok(()) => {} - Err(e) => { - tracing::error!(error = &e as &dyn std::error::Error, "join error"); - return Err(e); - } - } - } + self.0.task_tracker.close(); + self.0.task_tracker.wait().await; tracing::info!("Gracefully shut down"); - Ok(()) } } diff --git a/server/src/traq/bot.rs b/server/src/traq/bot.rs index a2b32bb8..1b8ecc80 100644 --- a/server/src/traq/bot.rs +++ b/server/src/traq/bot.rs @@ -4,8 +4,10 @@ use serde::{Deserialize, Serialize}; use crate::prelude::IntoStatus; +pub mod config; + #[derive(Debug, Clone)] -pub struct BuildparamsuestAsBotParams<'a> { +pub struct BuildRequestAsBotParams<'a> { pub method: http::Method, pub uri: &'a str, } @@ -30,13 +32,21 @@ pub struct OnMessageCreatedParams { pub message: super::message::TraqMessage, } +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct TraqBotConfig { + bot_id: String, + bot_user_id: String, + verification_token: String, + access_token: String, +} + pub trait TraqBotService: Send + Sync + 'static { type Error: IntoStatus; - fn build_paramsuest_as_bot<'a>( + fn build_request_as_bot<'a>( &'a self, ctx: &'a Context, - params: BuildparamsuestAsBotParams<'a>, + params: BuildRequestAsBotParams<'a>, ) -> BoxFuture<'a, Result>; fn subscribe_channel<'a>( &'a self, @@ -68,15 +78,15 @@ pub trait ProvideTraqBotService: Send + Sync + 'static { fn context(&self) -> &Self::Context; fn traq_bot_service(&self) -> &Self::TraqBotService; - fn build_paramsuest_as_bot<'a>( + fn build_request_as_bot<'a>( &'a self, - params: BuildparamsuestAsBotParams<'a>, + params: BuildRequestAsBotParams<'a>, ) -> BoxFuture< 'a, Result>::Error>, > { let ctx = self.context(); - self.traq_bot_service().build_paramsuest_as_bot(ctx, params) + self.traq_bot_service().build_request_as_bot(ctx, params) } fn subscribe_channel( &self, diff --git a/server/src/traq/bot/config.rs b/server/src/traq/bot/config.rs new file mode 100644 index 00000000..1d2129ff --- /dev/null +++ b/server/src/traq/bot/config.rs @@ -0,0 +1,123 @@ +use std::fmt; + +impl fmt::Debug for super::TraqBotConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TraqBotConfig") + .field("bot_id", &self.bot_id) + .field("bot_user_id", &self.bot_user_id) + .field("verification_token", &"REDACTED") + .field("access_token", &"REDACTED") + .finish() + } +} + +impl super::TraqBotConfig { + pub fn builder() -> Builder { + Builder { + bot_id: (), + bot_user_id: (), + verification_token: (), + access_token: (), + } + } +} + +pub struct Builder { + bot_id: BotId, + bot_user_id: BotUserId, + verification_token: VerificationToken, + access_token: AccessToken, +} + +impl + Builder +{ + pub fn bot_id( + self, + value: impl Into, + ) -> Builder { + let Self { + bot_id: _, + bot_user_id, + verification_token, + access_token, + } = self; + Builder { + bot_id: value.into(), + bot_user_id, + verification_token, + access_token, + } + } + + pub fn bot_user_id( + self, + value: impl Into, + ) -> Builder { + let Self { + bot_id, + bot_user_id: _, + verification_token, + access_token, + } = self; + Builder { + bot_id, + bot_user_id: value.into(), + verification_token, + access_token, + } + } + + pub fn verification_token( + self, + value: impl Into, + ) -> Builder { + let Self { + bot_id, + bot_user_id, + verification_token: _, + access_token, + } = self; + Builder { + bot_id, + bot_user_id, + verification_token: value.into(), + access_token, + } + } + + pub fn access_token( + self, + value: impl Into, + ) -> Builder { + let Self { + bot_id, + bot_user_id, + verification_token, + access_token: _, + } = self; + Builder { + bot_id, + bot_user_id, + verification_token, + access_token: value.into(), + } + } +} + +impl Builder { + pub fn build(self) -> super::TraqBotConfig { + let Self { + bot_id, + bot_user_id, + verification_token, + access_token, + } = self; + super::TraqBotConfig { + bot_id, + bot_user_id, + verification_token, + access_token, + } + } +}