Skip to content

feat: add github client dedicated for ci #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ sqlx = { version = "0.8.1", features = ["runtime-tokio", "tls-rustls", "postgres
chrono = "0.4"

itertools = "0.13.0"
derive_builder = "0.20.0"

[dev-dependencies]
insta = "1.26"
derive_builder = "0.20.0"
wiremock = "0.6.0"
base64 = "0.22.1"
tracing-test = "0.2.4"
Expand Down
54 changes: 42 additions & 12 deletions src/bin/bors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::time::Duration;

use anyhow::Context;
use bors::{
create_app, create_bors_process, create_github_client, load_repositories, BorsContext,
BorsGlobalEvent, CommandParser, PgDbClient, ServerState, TeamApiClient, WebhookSecret,
create_app, create_bors_process, create_github_client, create_github_client_from_access_token,
load_repositories, BorsContextBuilder, BorsGlobalEvent, CommandParser, PgDbClient, ServerState,
TeamApiClient, WebhookSecret,
};
use clap::Parser;
use sqlx::postgres::PgConnectOptions;
Expand All @@ -18,7 +19,9 @@ use tracing_subscriber::filter::EnvFilter;
/// How often should the bot check DB state, e.g. for handling timeouts.
const PERIODIC_REFRESH: Duration = Duration::from_secs(120);

#[derive(clap::Parser)]
const GITHUB_API_URL: &str = "https://api.github.com";

#[derive(Parser)]
struct Opts {
/// Github App ID.
#[arg(long, env = "APP_ID")]
Expand All @@ -39,6 +42,10 @@ struct Opts {
/// Prefix used for bot commands in PR comments.
#[arg(long, env = "CMD_PREFIX", default_value = "@bors")]
cmd_prefix: String,

/// Prefix used for bot commands in PR comments.
#[arg(long, env = "CI_ACCESS_TOKEN")]
ci_access_token: Option<String>,
}

/// Starts a server that receives GitHub webhooks and generates events into a queue
Expand Down Expand Up @@ -81,15 +88,30 @@ fn try_main(opts: Opts) -> anyhow::Result<()> {
let db = runtime
.block_on(initialize_db(&opts.db))
.context("Cannot initialize database")?;
let team_api = TeamApiClient::default();
let (client, loaded_repos) = runtime.block_on(async {
let client = create_github_client(
let team_api_client = TeamApiClient::default();
let gh_app_client = runtime.block_on(async {
create_github_client(
opts.app_id.into(),
"https://api.github.com".to_string(),
GITHUB_API_URL.to_string(),
opts.private_key.into(),
)?;
let repos = load_repositories(&client, &team_api).await?;
Ok::<_, anyhow::Error>((client, repos))
)
})?;
let ci_client = match opts.ci_access_token {
Some(access_token) => {
let client = runtime.block_on(async {
tracing::warn!("creating client ci");
create_github_client_from_access_token(
GITHUB_API_URL.to_string(),
access_token.into(),
)
})?;
Some(client)
}
None => None,
};
let loaded_repos = runtime.block_on(async {
let repos = load_repositories(&gh_app_client, &team_api_client).await?;
Ok::<_, anyhow::Error>(repos)
})?;

let mut repos = HashMap::default();
Expand All @@ -108,8 +130,16 @@ fn try_main(opts: Opts) -> anyhow::Result<()> {
repos.insert(name, Arc::new(repo));
}

let ctx = BorsContext::new(CommandParser::new(opts.cmd_prefix), Arc::new(db), repos);
let (repository_tx, global_tx, bors_process) = create_bors_process(ctx, client, team_api);
let ctx = BorsContextBuilder::default()
.parser(CommandParser::new(opts.cmd_prefix))
.db(Arc::new(db))
.repositories(repos)
.gh_app_client(gh_app_client)
.ci_client(ci_client)
.team_api_client(team_api_client)
.build()
.unwrap();
let (repository_tx, global_tx, bors_process) = create_bors_process(ctx);

let refresh_tx = global_tx.clone();
let refresh_process = async move {
Expand Down
1 change: 1 addition & 0 deletions src/bors/command/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum CommandPart<'a> {
KeyValue { key: &'a str, value: &'a str },
}

#[derive(Clone)]
pub struct CommandParser {
prefix: String,
}
Expand Down
29 changes: 13 additions & 16 deletions src/bors/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,24 @@ use std::{
sync::{Arc, RwLock},
};

use crate::{bors::command::CommandParser, github::GithubRepoName, PgDbClient};
use derive_builder::Builder;
use octocrab::Octocrab;

use crate::{bors::command::CommandParser, github::GithubRepoName, PgDbClient, TeamApiClient};

use super::RepositoryState;

#[derive(Builder)]
pub struct BorsContext {
pub parser: CommandParser,
pub db: Arc<PgDbClient>,
#[builder(field(
ty = "HashMap<GithubRepoName, Arc<RepositoryState>>",
build = "RwLock::new(self.repositories.clone())"
))]
pub repositories: RwLock<HashMap<GithubRepoName, Arc<RepositoryState>>>,
}

impl BorsContext {
pub fn new(
parser: CommandParser,
db: Arc<PgDbClient>,
repositories: HashMap<GithubRepoName, Arc<RepositoryState>>,
) -> Self {
let repositories = RwLock::new(repositories);
Self {
parser,
db,
repositories,
}
}
pub gh_app_client: Octocrab,
#[builder(default)]
pub ci_client: Option<Octocrab>,
pub team_api_client: TeamApiClient,
}
19 changes: 5 additions & 14 deletions src/bors/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use anyhow::Context;
use octocrab::Octocrab;
use tracing::Instrument;

use crate::bors::command::{BorsCommand, CommandParseError};
Expand All @@ -17,7 +16,7 @@ use crate::bors::handlers::workflow::{
handle_check_suite_completed, handle_workflow_completed, handle_workflow_started,
};
use crate::bors::{BorsContext, Comment, RepositoryState};
use crate::{load_repositories, PgDbClient, TeamApiClient};
use crate::{load_repositories, PgDbClient};

#[cfg(test)]
use crate::tests::util::TestSyncMarker;
Expand Down Expand Up @@ -142,16 +141,12 @@ pub static WAIT_FOR_REFRESH: TestSyncMarker = TestSyncMarker::new();
pub async fn handle_bors_global_event(
event: BorsGlobalEvent,
ctx: Arc<BorsContext>,
gh_client: &Octocrab,
team_api_client: &TeamApiClient,
) -> anyhow::Result<()> {
let db = Arc::clone(&ctx.db);
match event {
BorsGlobalEvent::InstallationsChanged => {
let span = tracing::info_span!("Installations changed");
reload_repos(ctx, gh_client, team_api_client)
.instrument(span)
.await?;
reload_repos(ctx).instrument(span).await?;
}
BorsGlobalEvent::Refresh => {
let span = tracing::info_span!("Refresh");
Expand All @@ -161,7 +156,7 @@ pub async fn handle_bors_global_event(
let repo = Arc::clone(&repo);
async {
let subspan = tracing::info_span!("Repo", repo = repo.repository().to_string());
refresh_repository(repo, Arc::clone(&db), team_api_client)
refresh_repository(repo, Arc::clone(&db), &ctx.team_api_client)
.instrument(subspan)
.await
}
Expand Down Expand Up @@ -274,12 +269,8 @@ async fn handle_comment(
Ok(())
}

async fn reload_repos(
ctx: Arc<BorsContext>,
gh_client: &Octocrab,
team_api_client: &TeamApiClient,
) -> anyhow::Result<()> {
let reloaded_repos = load_repositories(gh_client, team_api_client).await?;
async fn reload_repos(ctx: Arc<BorsContext>) -> anyhow::Result<()> {
let reloaded_repos = load_repositories(&ctx.gh_app_client, &ctx.team_api_client).await?;
let mut repositories = ctx.repositories.write().unwrap();
for repo in repositories.values() {
if !reloaded_repos.contains_key(repo.repository()) {
Expand Down
1 change: 1 addition & 0 deletions src/bors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use arc_swap::ArcSwap;
pub use command::CommandParser;
pub use comment::Comment;
pub use context::BorsContext;
pub use context::BorsContextBuilder;
#[cfg(test)]
pub use handlers::WAIT_FOR_REFRESH;
pub use handlers::{handle_bors_global_event, handle_bors_repository_event};
Expand Down
11 changes: 11 additions & 0 deletions src/github/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ pub fn create_github_client(
.context("Could not create octocrab builder")
}

pub fn create_github_client_from_access_token(
github_url: String,
access_token: SecretString,
) -> anyhow::Result<Octocrab> {
Octocrab::builder()
.base_uri(github_url)?
.user_access_token(access_token)
.build()
.context("Could not create octocrab builder")
}

/// Loads repositories that are connected to the given GitHub App client.
/// The anyhow::Result<RepositoryState> is intended, because we wanted to have
/// a hard error when the repos fail to load when the bot starts, but only log
Expand Down
13 changes: 4 additions & 9 deletions src/github/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ use crate::bors::event::BorsEvent;
use crate::bors::{handle_bors_global_event, handle_bors_repository_event, BorsContext};
use crate::github::webhook::GitHubWebhook;
use crate::github::webhook::WebhookSecret;
use crate::{BorsGlobalEvent, BorsRepositoryEvent, TeamApiClient};
use crate::{BorsGlobalEvent, BorsRepositoryEvent};

use anyhow::Error;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::Router;
use octocrab::Octocrab;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -83,8 +82,6 @@ pub async fn github_webhook_handler(
/// them.
pub fn create_bors_process(
ctx: BorsContext,
gh_client: Octocrab,
team_api: TeamApiClient,
) -> (
mpsc::Sender<BorsRepositoryEvent>,
mpsc::Sender<BorsGlobalEvent>,
Expand All @@ -104,7 +101,7 @@ pub fn create_bors_process(
{
tokio::join!(
consume_repository_events(ctx.clone(), repository_rx),
consume_global_events(ctx.clone(), global_rx, gh_client, team_api)
consume_global_events(ctx.clone(), global_rx)
);
}
// In real execution, the bot runs forever. If there is something that finishes
Expand All @@ -116,7 +113,7 @@ pub fn create_bors_process(
_ = consume_repository_events(ctx.clone(), repository_rx) => {
tracing::error!("Repository event handling process has ended");
}
_ = consume_global_events(ctx.clone(), global_rx, gh_client, team_api) => {
_ = consume_global_events(ctx.clone(), global_rx) => {
tracing::error!("Global event handling process has ended");
}
}
Expand Down Expand Up @@ -146,15 +143,13 @@ async fn consume_repository_events(
async fn consume_global_events(
ctx: Arc<BorsContext>,
mut global_rx: mpsc::Receiver<BorsGlobalEvent>,
gh_client: Octocrab,
team_api: TeamApiClient,
) {
while let Some(event) = global_rx.recv().await {
let ctx = ctx.clone();

let span = tracing::info_span!("GlobalEvent");
tracing::debug!("Received global event: {event:#?}");
if let Err(error) = handle_bors_global_event(event, ctx, &gh_client, &team_api)
if let Err(error) = handle_bors_global_event(event, ctx)
.instrument(span.clone())
.await
{
Expand Down
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ mod github;
mod permissions;
mod utils;

pub use bors::{event::BorsGlobalEvent, event::BorsRepositoryEvent, BorsContext, CommandParser};
pub use bors::{
event::BorsGlobalEvent, event::BorsRepositoryEvent, BorsContext, BorsContextBuilder,
CommandParser,
};
pub use database::PgDbClient;
pub use github::{
api::create_github_client,
api::create_github_client_from_access_token,
api::load_repositories,
server::{create_app, create_bors_process, ServerState},
WebhookSecret,
Expand Down
1 change: 1 addition & 0 deletions src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) struct UserPermissionsResponse {
github_ids: HashSet<UserId>,
}

#[derive(Clone)]
pub struct TeamApiClient {
base_url: String,
}
Expand Down
16 changes: 11 additions & 5 deletions src/tests/mocks/bors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::tests::mocks::{
};
use crate::tests::webhook::{create_webhook_request, TEST_WEBHOOK_SECRET};
use crate::{
create_app, create_bors_process, BorsContext, BorsGlobalEvent, CommandParser, PgDbClient,
ServerState, WebhookSecret,
create_app, create_bors_process, BorsContextBuilder, BorsGlobalEvent, CommandParser,
PgDbClient, ServerState, WebhookSecret,
};

use super::pull_request::{GitHubPullRequestEventPayload, PullRequestChangeEvent};
Expand Down Expand Up @@ -109,10 +109,16 @@ impl BorsTester {
repos.insert(name, Arc::new(repo));
}

let ctx = BorsContext::new(CommandParser::new("@bors".to_string()), db.clone(), repos);
let ctx = BorsContextBuilder::default()
.parser(CommandParser::new("@bors".to_string()))
.db(db.clone())
.repositories(repos)
.gh_app_client(mock.github_client())
.team_api_client(mock.team_api_client())
.build()
.unwrap();

let (repository_tx, global_tx, bors_process) =
create_bors_process(ctx, mock.github_client(), mock.team_api_client());
let (repository_tx, global_tx, bors_process) = create_bors_process(ctx);

let state = ServerState::new(
repository_tx,
Expand Down
Loading