Skip to content

Commit

Permalink
Merge pull request #47 from wobcom/revert-43-feature/vickyctl/resolve…
Browse files Browse the repository at this point in the history
…-locks

Revert "vickyctl: handle poisoned locks"
  • Loading branch information
Kek5chen authored Jun 10, 2024
2 parents 09510d1 + 6720d3f commit 43789dc
Show file tree
Hide file tree
Showing 17 changed files with 164 additions and 990 deletions.
284 changes: 5 additions & 279 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions fairy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ pub struct FlakeRef {
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "result")]
pub enum TaskResult {
Success,
Error,
SUCCESS,
ERROR,
}

#[derive(Debug, Deserialize)]
#[serde(tag = "state")]
pub enum TaskStatus {
New,
Running,
Finished(TaskResult),
NEW,
RUNNING,
FINISHED(TaskResult),
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -173,9 +173,9 @@ async fn run_task(cfg: Arc<AppConfig>, task: Task) {
let result = match try_run_task(cfg.clone(), &task).await {
Err(e) => {
log::info!("task failed: {} {} {:?}", task.id, task.display_name, e);
TaskResult::Error
TaskResult::ERROR
}
Ok(_) => TaskResult::Success,
Ok(_) => TaskResult::SUCCESS,
};
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let _ = api::<_, ()>(
Expand Down
38 changes: 1 addition & 37 deletions vicky/src/bin/vicky/locks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use diesel::PgConnection;
use rocket::{get, patch};
use rocket::get;
use rocket::serde::json::Json;
use uuid::Uuid;
use vickylib::database::entities::{Database, Lock};
use vickylib::database::entities::lock::db_impl::LockDatabase;
use vickylib::database::entities::lock::PoisonedLock;
use crate::auth::{Machine, User};
use crate::errors::AppError;

Expand All @@ -13,11 +11,6 @@ async fn locks_get_poisoned(db: &Database) -> Result<Json<Vec<Lock>>, AppError>
Ok(Json(poisoned_locks))
}

async fn locks_get_detailed_poisoned(db: &Database) -> Result<Json<Vec<PoisonedLock>>, AppError> {
let poisoned_locks: Vec<PoisonedLock> = db.run(PgConnection::get_poisoned_locks_with_tasks).await?;
Ok(Json(poisoned_locks))
}

#[get("/poisoned")]
pub async fn locks_get_poisoned_user(
db: Database,
Expand All @@ -34,23 +27,6 @@ pub async fn locks_get_poisoned_machine(
locks_get_poisoned(&db).await
}


#[get("/poisoned_detailed")]
pub async fn locks_get_detailed_poisoned_user(
db: Database,
_user: User,
) -> Result<Json<Vec<PoisonedLock>>, AppError> {
locks_get_detailed_poisoned(&db).await
}

#[get("/poisoned_detailed", rank = 2)]
pub async fn locks_get_detailed_poisoned_machine(
db: Database,
_machine: Machine,
) -> Result<Json<Vec<PoisonedLock>>, AppError> {
locks_get_detailed_poisoned(&db).await
}

async fn locks_get_active(db: &Database) -> Result<Json<Vec<Lock>>, AppError> {
let locks: Vec<Lock> = db.run(PgConnection::get_active_locks).await?;
Ok(Json(locks))
Expand All @@ -71,15 +47,3 @@ pub async fn locks_get_active_machine(
) -> Result<Json<Vec<Lock>>, AppError> {
locks_get_active(&db).await
}

#[patch("/unlock/<lock_id>")]
pub async fn locks_unlock(
db: Database,
_user: Machine, // TODO: Should actually be user-only, but we don't have that yet
lock_id: String,
) -> Result<(), AppError> {
let lock_uuid = Uuid::try_parse(&lock_id)?;

db.run(move |conn| conn.unlock_lock(&lock_uuid)).await?;
Ok(())
}
34 changes: 10 additions & 24 deletions vicky/src/bin/vicky/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use vickylib::logs::LogDrain;
use vickylib::s3::client::S3Client;

use crate::events::{get_global_events, GlobalEvent};
use crate::locks::{
locks_get_active_machine, locks_get_active_user, locks_get_detailed_poisoned_machine,
locks_get_detailed_poisoned_user, locks_get_poisoned_machine, locks_get_poisoned_user,
locks_unlock,
};
use crate::locks::{locks_get_active_machine, locks_get_active_user, locks_get_poisoned_machine, locks_get_poisoned_user};
use crate::tasks::{
tasks_add, tasks_claim, tasks_finish, tasks_get_logs, tasks_get_machine, tasks_get_user,
tasks_put_logs, tasks_specific_get_machine, tasks_specific_get_user,
Expand Down Expand Up @@ -81,19 +77,19 @@ fn run_migrations(connection: &mut impl MigrationHarness<diesel::pg::Pg>) -> Res
Ok(_) => {
log::info!("Migrations successfully completed");
Ok(())
}
},
Err(e) => {
log::error!("Error running migrations {e}");
Err(AppError::MigrationError(e.to_string()))
}
}
}

async fn run_rocket_migrations(rocket: Rocket<Build>) -> Result<Rocket<Build>, Rocket<Build>> {
async fn run_rocket_migrations(rocket: Rocket<Build>) -> Result<Rocket<Build>,Rocket<Build>> {
let db: Database = Database::get_one(&rocket).await.unwrap();
match db.run(run_migrations).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
Err(_) => Err(rocket)
}
}

Expand Down Expand Up @@ -121,16 +117,12 @@ async fn main() -> anyhow::Result<()> {

let app_config = build_rocket.figment().extract::<Config>()?;

let oidc_config_resolved: OIDCConfigResolved =
reqwest::get(app_config.oidc_config.well_known_uri)
.await?
.json()
.await?;
let oidc_config_resolved: OIDCConfigResolved = reqwest::get(app_config.oidc_config.well_known_uri)
.await?
.json()
.await?;

log::info!(
"Fetched OIDC configuration, found jwks_uri={}",
oidc_config_resolved.jwks_uri
);
log::info!("Fetched OIDC configuration, found jwks_uri={}", oidc_config_resolved.jwks_uri);

let jwks_verifier = RemoteJwksVerifier::new(
oidc_config_resolved.jwks_uri.clone(),
Expand Down Expand Up @@ -170,10 +162,7 @@ async fn main() -> anyhow::Result<()> {
.manage(oidc_config_resolved)
.attach(Database::fairing())
.attach(AdHoc::config::<Config>())
.attach(AdHoc::try_on_ignite(
"run migrations",
run_rocket_migrations,
))
.attach(AdHoc::try_on_ignite("run migrations", run_rocket_migrations))
.mount("/api/v1/web-config", routes![get_web_config])
.mount("/api/v1/user", routes![get_user])
.mount("/api/v1/events", routes![get_global_events])
Expand All @@ -196,11 +185,8 @@ async fn main() -> anyhow::Result<()> {
routes![
locks_get_poisoned_user,
locks_get_poisoned_machine,
locks_get_detailed_poisoned_user,
locks_get_detailed_poisoned_machine,
locks_get_active_user,
locks_get_active_machine,
locks_unlock
],
)
.launch()
Expand Down
22 changes: 11 additions & 11 deletions vicky/src/bin/vicky/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ pub async fn tasks_get_logs<'a>(
EventStream! {

match task.status {
TaskStatus::New => {},
TaskStatus::Running => {
TaskStatus::NEW => {},
TaskStatus::RUNNING => {
let mut recv = log_drain.send_handle.subscribe();
let existing_log_messages = log_drain.get_logs(task_uuid.to_string()).await.unwrap();

Expand Down Expand Up @@ -137,7 +137,7 @@ pub async fn tasks_get_logs<'a>(
}
}
},
TaskStatus::Finished(_) => {
TaskStatus::FINISHED(_) => {
let logs = s3.get_logs(&id).await.unwrap();
for element in logs {
yield Event::data(element)
Expand Down Expand Up @@ -166,7 +166,7 @@ pub async fn tasks_put_logs(
.ok_or(AppError::HttpError(Status::NotFound))?;

match task.status {
TaskStatus::Running => {
TaskStatus::RUNNING => {
log_drain.push_logs(id, logs.lines.clone())?;
Ok(Json(()))
}
Expand All @@ -193,7 +193,7 @@ pub async fn tasks_claim(
.run(move |conn| conn.get_task(next_task.id))
.await?
.ok_or(AppError::HttpError(Status::NotFound))?;
task.status = TaskStatus::Running;
task.status = TaskStatus::RUNNING;
let task2 = task.clone();
db.run(move |conn| conn.update_task(&task2)).await?;
global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?;
Expand All @@ -220,9 +220,9 @@ pub async fn tasks_finish(

log_drain.finish_logs(&id).await?;

task.status = TaskStatus::Finished(finish.result.clone());
task.status = TaskStatus::FINISHED(finish.result.clone());

if finish.result == TaskResult::Error {
if finish.result == TaskResult::ERROR {
task.locks.iter_mut().for_each(|lock| lock.poison(&task.id));
}

Expand Down Expand Up @@ -253,7 +253,7 @@ pub async fn tasks_add(
) -> Result<Json<RoTask>, AppError> {
let task_uuid = Uuid::new_v4();

let task = Task::builder()
let task_manifest = Task::builder()
.with_id(task_uuid)
.with_display_name(&task.display_name)
.with_flake(&task.flake_ref.flake)
Expand All @@ -262,16 +262,16 @@ pub async fn tasks_add(
.requires_features(task.features.clone())
.build();

if check_lock_conflict(&task) {
if check_lock_conflict(&task_manifest) {
return Err(AppError::HttpError(Status::Conflict));
}

db.run(move |conn| conn.put_task(task)).await?;
db.run(move |conn| conn.put_task(&task_manifest)).await?;
global_events.send(GlobalEvent::TaskAdd)?;

let ro_task = RoTask {
id: task_uuid,
status: TaskStatus::New,
status: TaskStatus::NEW,
};

Ok(Json(ro_task))
Expand Down
Loading

0 comments on commit 43789dc

Please sign in to comment.