diff --git a/core/src/queue.rs b/core/src/queue.rs index 3fd8b771..3580de50 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -46,7 +46,9 @@ impl Queue { max_concurrent_requests: usize, ) -> Self { // Create channels - let (queue_sender, queue_receiver) = mpsc::channel(max_concurrent_requests); + // The queue rarely fails to send the QueueCommand due to a lack of buffer size. + // So, naively increasing the buffer size to twice than `max_concurrent_requests` to prevent the failure temporarily + let (queue_sender, queue_receiver) = mpsc::channel(2 * max_concurrent_requests); // Launch background queue task std::thread::spawn(move || { diff --git a/router/src/http/server.rs b/router/src/http/server.rs index a22af962..10bdef4c 100644 --- a/router/src/http/server.rs +++ b/router/src/http/server.rs @@ -121,7 +121,7 @@ async fn predict( info: Info, permit: Option| async move { let permit = match permit { - None => infer.acquire_permit().await, + None => infer.try_acquire_permit().map_err(ErrorResponse::from)?, Some(permit) => permit, }; @@ -347,7 +347,7 @@ async fn rerank( // Closure for rerank let rerank_inner = move |query: String, text: String, truncate: bool, infer: Infer| async move { - let permit = infer.acquire_permit().await; + let permit = infer.try_acquire_permit().map_err(ErrorResponse::from)?; let response = infer .predict( @@ -672,8 +672,12 @@ async fn embed( let local_infer = infer.clone(); let prompt_name = req.prompt_name.clone(); + + let permit = local_infer + .try_acquire_permit() + .map_err(ErrorResponse::from)?; + futures.push(async move { - let permit = local_infer.acquire_permit().await; local_infer .embed_pooled( input, @@ -855,8 +859,12 @@ async fn embed_sparse( let local_infer = infer.clone(); let prompt_name = req.prompt_name.clone(); + + let permit = local_infer + .try_acquire_permit() + .map_err(ErrorResponse::from)?; + futures.push(async move { - let permit = local_infer.acquire_permit().await; let response = local_infer .embed_sparse( input, @@ -1029,8 +1037,12 @@ async fn embed_all( let local_infer = infer.clone(); let prompt_name = req.prompt_name.clone(); + + let permit = local_infer + .try_acquire_permit() + .map_err(ErrorResponse::from)?; + futures.push(async move { - let permit = local_infer.acquire_permit().await; local_infer .embed_all( input, @@ -1223,8 +1235,12 @@ async fn openai_embed( compute_chars += input.count_chars(); let local_infer = infer.clone(); + + let permit = local_infer + .try_acquire_permit() + .map_err(ErrorResponse::from)?; + futures.push(async move { - let permit = local_infer.acquire_permit().await; local_infer .embed_pooled( input,