Skip to content

Commit

Permalink
feat: switch to connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Mar 7, 2024
1 parent b30f35c commit bea12db
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 48 deletions.
46 changes: 45 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ buildit-utils = { path = "../buildit-utils" }
jsonwebtoken = "9.2.0"
size = "0.4.1"
dickens = { git = "https://github.com/AOSC-Dev/dickens.git", version = "0.1.0" }
deadpool-lapin = "0.11.0"
52 changes: 40 additions & 12 deletions server/src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{
use buildit_utils::github::{get_archs, update_abbs, OpenPRError, OpenPRRequest};
use chrono::Local;
use common::{ensure_job_queue, JobSource};
use lapin::{Channel, ConnectionProperties};

use serde_json::Value;
use std::{borrow::Cow, sync::Arc};
use std::borrow::Cow;
use teloxide::{
prelude::*,
types::{ChatAction, ParseMode},
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn telegram_send_build_request(
bot: &Bot,
build_request: BuildRequest<'_>,
msg: &Message,
channel: &Channel,
pool: deadpool_lapin::Pool,
) -> ResponseResult<()> {
let BuildRequest {
branch,
Expand All @@ -73,14 +73,37 @@ async fn telegram_send_build_request(

let archs = handle_archs_args(archs.to_vec());

let conn = match pool.get().await {
Ok(conn) => conn,
Err(err) => {
bot.send_message(
msg.chat.id,
format!("Failed to connect to message queue: {}", err),
)
.await?;
return Ok(());
}
};
let channel = match conn.create_channel().await {
Ok(conn) => conn,
Err(err) => {
bot.send_message(
msg.chat.id,
format!("Failed to connect to create channel: {}", err),
)
.await?;
return Ok(());
}
};

match send_build_request(
branch,
packages,
&archs,
github_pr,
JobSource::Telegram(msg.chat.id.0),
sha,
channel,
&channel,
)
.await
{
Expand Down Expand Up @@ -114,9 +137,9 @@ fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> {
archs
}

async fn status() -> anyhow::Result<String> {
async fn status(pool: deadpool_lapin::Pool) -> anyhow::Result<String> {
let mut res = String::from("__*Queue Status*__\n\n");
let conn = lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()).await?;
let conn = pool.get().await?;
let channel = conn.create_channel().await?;

for arch in ALL_ARCH {
Expand Down Expand Up @@ -183,7 +206,7 @@ pub async fn answer(
bot: Bot,
msg: Message,
cmd: Command,
channel: Arc<Channel>,
pool: deadpool_lapin::Pool,
) -> ResponseResult<()> {
bot.send_chat_action(msg.chat.id, ChatAction::Typing)
.await?;
Expand Down Expand Up @@ -293,8 +316,13 @@ pub async fn answer(
sha,
};

telegram_send_build_request(&bot, build_request, &msg, &channel)
.await?;
telegram_send_build_request(
&bot,
build_request,
&msg,
pool.clone(),
)
.await?;
} else {
bot.send_message(msg.chat.id, "Please list packages to build in pr info starting with '#buildit'.".to_string())
.await?;
Expand Down Expand Up @@ -341,7 +369,7 @@ pub async fn answer(
github_pr: None,
sha: &sha,
};
telegram_send_build_request(&bot, build_request, &msg, &channel).await?;
telegram_send_build_request(&bot, build_request, &msg, pool.clone()).await?;
}
return Ok(());
}
Expand All @@ -355,7 +383,7 @@ pub async fn answer(
)
.await?;
}
Command::Status => match status().await {
Command::Status => match status(pool).await {
Ok(status) => {
bot.send_message(msg.chat.id, status)
.parse_mode(ParseMode::MarkdownV2)
Expand Down Expand Up @@ -579,7 +607,7 @@ pub async fn answer(
archs.extend(ALL_ARCH);
}

match get_ready_message(&ARGS.amqp_addr, &archs).await {
match get_ready_message(pool, &archs).await {
Ok(map) => {
let mut res = String::new();
for (k, v) in map {
Expand Down
10 changes: 5 additions & 5 deletions server/src/github_webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::StreamExt;
use lapin::{
options::{BasicConsumeOptions, QueueDeclareOptions},
types::FieldTable,
Channel, ConnectionProperties,
Channel,
};
use log::{error, info};
use octocrab::Octocrab;
Expand All @@ -36,18 +36,18 @@ struct User {
login: String,
}

pub async fn get_webhooks_message() {
pub async fn get_webhooks_message(pool: deadpool_lapin::Pool) {
info!("Starting github webhook worker");
loop {
if let Err(e) = get_webhooks_message_inner().await {
if let Err(e) = get_webhooks_message_inner(pool.clone()).await {
error!("Error getting webhooks message: {e}");
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

async fn get_webhooks_message_inner() -> anyhow::Result<()> {
let conn = lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()).await?;
async fn get_webhooks_message_inner(pool: deadpool_lapin::Pool) -> anyhow::Result<()> {
let conn = pool.get().await?;
let channel = conn.create_channel().await?;
let _queue = channel
.queue_declare(
Expand Down
10 changes: 4 additions & 6 deletions server/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ use futures::StreamExt;
use lapin::{
options::{BasicAckOptions, BasicConsumeOptions},
types::FieldTable,
ConnectionProperties,
};
use log::{error, info, warn};
use std::time::Duration;

pub async fn heartbeat_worker_inner(amqp_addr: String) -> anyhow::Result<()> {
let conn = lapin::Connection::connect(&amqp_addr, ConnectionProperties::default()).await?;

pub async fn heartbeat_worker_inner(pool: deadpool_lapin::Pool) -> anyhow::Result<()> {
let conn = pool.get().await?;
let channel = conn.create_channel().await?;
let queue_name = "worker-heartbeat";
ensure_job_queue(queue_name, &channel).await?;
Expand Down Expand Up @@ -78,10 +76,10 @@ pub async fn heartbeat_worker_inner(amqp_addr: String) -> anyhow::Result<()> {
Ok(())
}

pub async fn heartbeat_worker(amqp_addr: String) -> anyhow::Result<()> {
pub async fn heartbeat_worker(pool: deadpool_lapin::Pool) -> anyhow::Result<()> {
loop {
info!("Starting heartbeat worker ...");
if let Err(err) = heartbeat_worker_inner(amqp_addr.clone()).await {
if let Err(err) = heartbeat_worker_inner(pool.clone()).await {
error!("Got error while starting heartbeat worker: {}", err);
}
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down
18 changes: 10 additions & 8 deletions server/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use lapin::{
message::Delivery,
options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
types::FieldTable,
BasicProperties, Channel, ConnectionProperties,
BasicProperties, Channel,
};
use log::{error, info, warn};
use octocrab::params::checks::CheckRunConclusion;
Expand All @@ -26,9 +26,11 @@ use std::time::Duration;
use teloxide::{prelude::*, types::ParseMode};

/// Observe job completion messages
pub async fn job_completion_worker_inner(bot: Bot, amqp_addr: &str) -> anyhow::Result<()> {
let conn = lapin::Connection::connect(amqp_addr, ConnectionProperties::default()).await?;

pub async fn job_completion_worker_inner(
bot: Bot,
pool: deadpool_lapin::Pool,
) -> anyhow::Result<()> {
let conn = pool.get().await?;
let channel = conn.create_channel().await?;
let _queue = channel
.queue_declare(
Expand Down Expand Up @@ -361,11 +363,11 @@ async fn handle_success_message(
}

pub async fn get_ready_message(
amqp_addr: &str,
pool: deadpool_lapin::Pool,
archs: &[&str],
) -> anyhow::Result<Vec<(String, String)>> {
let mut res = vec![];
let conn = lapin::Connection::connect(amqp_addr, ConnectionProperties::default()).await?;
let conn = pool.get().await?;
let channel = conn.create_channel().await?;

for i in archs {
Expand Down Expand Up @@ -425,10 +427,10 @@ pub fn update_retry(retry: Option<u8>) -> HandleSuccessResult {
}
}

pub async fn job_completion_worker(bot: Bot, amqp_addr: String) -> anyhow::Result<()> {
pub async fn job_completion_worker(bot: Bot, pool: deadpool_lapin::Pool) -> anyhow::Result<()> {
loop {
info!("Starting job completion worker ...");
if let Err(err) = job_completion_worker_inner(bot.clone(), &amqp_addr).await {
if let Err(err) = job_completion_worker_inner(bot.clone(), pool.clone()).await {
error!("Got error while starting job completion worker: {}", err);
}
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down
29 changes: 13 additions & 16 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,43 @@
use std::sync::Arc;

use lapin::{Channel, ConnectionProperties};
use log::info;
use server::bot::Command;
use server::github_webhooks::get_webhooks_message;
use server::{bot::answer, heartbeat::heartbeat_worker, job::job_completion_worker, ARGS};
use teloxide::prelude::*;

#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
env_logger::init();

info!("Starting AOSC BuildIt! server with args {:?}", *ARGS);

let bot = Bot::from_env();

tokio::spawn(heartbeat_worker(ARGS.amqp_addr.clone()));
tokio::spawn(job_completion_worker(bot.clone(), ARGS.amqp_addr.clone()));

let send_request_conn =
lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default())
.await
.unwrap();
// setup lapin connection pool
let mut cfg = deadpool_lapin::Config::default();
cfg.url = Some(ARGS.amqp_addr.clone());
let pool = cfg.create_pool(Some(deadpool_lapin::Runtime::Tokio1))?;

let channel = Arc::new(send_request_conn.create_channel().await.unwrap());
tokio::spawn(heartbeat_worker(pool.clone()));
tokio::spawn(job_completion_worker(bot.clone(), pool.clone()));

let handler =
Update::filter_message().branch(dptree::entry().filter_command::<Command>().endpoint(
|bot: Bot, channel: Arc<Channel>, msg: Message, cmd: Command| async move {
answer(bot, msg, cmd, channel).await
|bot: Bot, pool: deadpool_lapin::Pool, msg: Message, cmd: Command| async move {
answer(bot, msg, cmd, pool).await
},
));

let mut telegram = Dispatcher::builder(bot, handler)
// Pass the shared state to the handler as a dependency.
.dependencies(dptree::deps![channel.clone()])
.dependencies(dptree::deps![pool.clone()])
.enable_ctrlc_handler()
.build();

tokio::select! {
v = get_webhooks_message() => v,
v = get_webhooks_message(pool.clone()) => v,
v = telegram.dispatch() => v,
};

Ok(())
}

0 comments on commit bea12db

Please sign in to comment.