Skip to content

Commit

Permalink
Refactor invoker
Browse files Browse the repository at this point in the history
Implement Live status updates
  • Loading branch information
MikailBag committed Nov 25, 2019
1 parent 5fe7b2f commit 45bad0a
Show file tree
Hide file tree
Showing 45 changed files with 1,313 additions and 658 deletions.
191 changes: 177 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ db:
dropdb jjs
createdb jjs
echo "running migrations"
cd db
cd src/db
diesel migration run
echo "re-running migrations"
diesel migration redo
Expand Down
4 changes: 3 additions & 1 deletion jtl-cpp/include/valuer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ void set_data(void* data);

uint32_t get_problem_test_count();

void select_next_test(TestId next_test);
void select_next_test(TestId next_test, bool live);

void set_live_score(int live_score);

void finish(int score, bool treat_as_full, const JudgeLog& judge_log);

Expand Down
13 changes: 9 additions & 4 deletions jtl-cpp/src/builtin/val-icpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static Params read_config(ValuerSession* sess) {

void init(ValuerSession* const sess) {
auto const cfg = read_config(sess);
auto* const params = new Params;
auto* const params = new Params;
*params = cfg;
sess->set_data(params);
}
Expand All @@ -49,10 +49,11 @@ Params const& get_params(ValuerSession const* const sess) {

void begin(ValuerSession* const sess) {
assert(sess->get_problem_test_count() >= 1);
sess->select_next_test(1);
sess->select_next_test(1, true);
}

void on_test_end(ValuerSession* sess, JudgeLogTestEntry finished_test) {
bool next_test_is_sample = (finished_test.test_id + 1) <= get_params(sess).open_test_count;
if (finished_test.test_id <= get_params(sess).open_test_count) {
finished_test.components.expose_output();
finished_test.components.expose_test_data();
Expand All @@ -68,10 +69,14 @@ void on_test_end(ValuerSession* sess, JudgeLogTestEntry finished_test) {
sess->comment_public("ok, all tests passed");
} else {
sess->finish(0, false, judge_log);
sess->comment_public("solution failed on test %d: (status %s)", finished_test.test_id, finished_test.status_code.c_str());
sess->comment_public("solution failed on test %d: (status %s)", finished_test.test_id,
finished_test.status_code.c_str());
}
} else {
sess->select_next_test(finished_test.test_id + 1);
sess->select_next_test(finished_test.test_id + 1, true);
if (next_test_is_sample) {
sess->set_live_score(50);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions jtl-cpp/src/valuer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

static bool should_run;

void valuer::ValuerSession::select_next_test(valuer::TestId next_test) {
void valuer::ValuerSession::select_next_test(valuer::TestId next_test, bool live) {
assert(1 <= next_test && next_test <= problem_test_count);
printf("RUN %u\n", next_test);
printf("RUN %u %u\n", next_test, live ? 1 : 0);
fflush(stdout);
}

Expand Down Expand Up @@ -98,6 +98,10 @@ uint32_t valuer::ValuerSession::get_problem_test_count() {
return problem_test_count;
}

void valuer::ValuerSession::set_live_score(int score) {
printf("LIVE-SCORE %d\n", score);
}

void valuer::JudgeLog::add_test_entry(valuer::JudgeLogTestEntry const& test) {
tests.push_back(test);
}
Expand Down
14 changes: 14 additions & 0 deletions src/cli/queries/ViewRun.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
query ViewRun($runId: Int!) {
findRun(id: $runId) {
liveStatusUpdate {
finish,
currentTest,
liveScore
},
status {
kind,
code
},
score
}
}
1 change: 1 addition & 0 deletions src/cli/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ q!(ListToolchains);
q!(ListContests);
q!(Submit);
q!(ApiVersion);
q!(ViewRun);
51 changes: 48 additions & 3 deletions src/cli/src/submit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use frontend_api::Client;
use graphql_client::GraphQLQuery;
use serde_json::{json, Value};
use serde_json::Value;
use std::process::exit;
use structopt::StructOpt;

Expand Down Expand Up @@ -84,6 +84,51 @@ pub fn exec(opt: Opt, params: &super::CommonParams) -> Value {
))
.expect("network error")
.into_result();
let resp = resp.expect("submit failed");
json!({ "id": resp })
let run_id = resp.expect("submit failed").submit_simple.id;
println!("submitted: id={}", run_id);

let mut current_score = 0;
let mut current_test = 0;
let final_results = loop {
let poll_lsu_vars = crate::queries::view_run::Variables { run_id };
let resp = params
.client
.query::<_, crate::queries::view_run::ResponseData>(
&crate::queries::ViewRun::build_query(poll_lsu_vars),
)
.expect("network error")
.into_result();
let resp = resp
.expect("poll LSU failed")
.find_run
.expect("run not found");
let lsu = &resp.live_status_update;
if let Some(ct) = &lsu.current_test {
current_test = *ct;
}
if let Some(ls) = &lsu.live_score {
current_score = *ls;
}
println!(
"score = {}, running on test {}",
current_score, current_test
);
if lsu.finish {
println!("judging finished");
break resp;
}
std::thread::sleep(std::time::Duration::from_secs(1));
};

println!(
"status: {}({}), score: {}",
final_results.status.kind,
final_results.status.code,
final_results
.score
.map(|x| x.to_string())
.unwrap_or_else(|| "<missing>".to_string())
);

serde_json::Value::Null
}
5 changes: 3 additions & 2 deletions src/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2018"
diesel = { version = "1.4.3", features = ["postgres", "extras", "uuid", "uuidv07"] }
serde = { version = "1.0.101", features = ["derive"] }
uuid = { version = "0.7.4", features = ["serde", "v4"] }
snafu = "0.5.0"
snafu-derive = "0.5.0"
r2d2 = "0.8.6"
invoker-api = {path = "../invoker-api"}
bincode = "1.2.0"
anyhow = "1.0.22"
14 changes: 7 additions & 7 deletions src/db/migrations/2019-02-13-163254_initial/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ CREATE TABLE runs
id unsigned_integer DEFAULT nextval('run_id_seq') PRIMARY KEY NOT NULL,
toolchain_id VARCHAR(100) NOT NULL,
status_code VARCHAR(100) NOT NULL,
status_kind VARCHAR(100) NOT NULL,
status_kind VARCHAR(100) NOT NULL,
problem_id VARCHAR(100) NOT NULL,
score INTEGER NOT NULL,
rejudge_id unsigned_integer NOT NULL,
user_id UUID REFERENCES users(id) NOT NULL
score INTEGER NOT NULL,
rejudge_id unsigned_integer NOT NULL,
user_id UUID REFERENCES users (id) NOT NULL
);

CREATE UNIQUE INDEX runs_id_unique_index ON runs (id);
Expand All @@ -47,7 +47,7 @@ CREATE SEQUENCE inv_req_id_seq START WITH 0 MINVALUE 0;

CREATE table invocation_requests
(
id unsigned_integer DEFAULT nextval('inv_req_id_seq') UNIQUE PRIMARY KEY NOT NULL,
run_id unsigned_integer REFERENCES runs (id) NOT NULL,
invoke_revision unsigned_integer NOT NULL
id unsigned_integer DEFAULT nextval('inv_req_id_seq') UNIQUE PRIMARY KEY NOT NULL,
-- This is serialized `InvokeTask`. See `invoker-api` for its definition
invoke_task bytea NOT NULL
);
16 changes: 8 additions & 8 deletions src/db/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::{
repo::{DieselRepo, MemoryRepo, Repo},
Error,
};
use crate::repo::{DieselRepo, MemoryRepo, Repo};
use anyhow::{Context, Result};
use std::env;

pub struct ConnectOptions {
Expand All @@ -20,23 +18,25 @@ impl ConnectOptions {
}
}

pub fn connect(options: ConnectOptions) -> Result<Box<dyn Repo>, Error> {
pub fn connect(options: ConnectOptions) -> Result<Box<dyn Repo>> {
if let Some(pg_conn_str) = options.pg {
Ok(Box::new(DieselRepo::new(&pg_conn_str)?))
Ok(Box::new(
DieselRepo::new(&pg_conn_str).context("failed to connect to postgres")?,
))
} else {
Ok(Box::new(MemoryRepo::new()))
}
}

pub fn connect_env() -> Result<Box<dyn Repo>, Error> {
pub fn connect_env() -> Result<Box<dyn Repo>> {
let opts = ConnectOptions {
pg: env::var("DATABASE_URL").ok(),
};
opts.warn();
connect(opts)
}

pub fn connect_memory() -> Result<Box<dyn Repo>, Error> {
pub fn connect_memory() -> Result<Box<dyn Repo>> {
let opts = ConnectOptions { pg: None };
connect(opts)
}
51 changes: 1 addition & 50 deletions src/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,5 @@ pub mod schema;

pub use connect::connect_env;
pub use repo::Repo as DbConn;
use snafu_derive::Snafu;
use std::fmt::{self, Debug, Display, Formatter};

#[derive(Snafu, Debug)]
pub enum Error {
R2d2 {
source: r2d2::Error,
},
Diesel {
source: diesel::result::Error,
},
Other {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}

impl From<r2d2::Error> for Error {
fn from(source: r2d2::Error) -> Error {
Error::R2d2 { source }
}
}

impl From<diesel::result::Error> for Error {
fn from(source: diesel::result::Error) -> Error {
Error::Diesel { source }
}
}

struct StringError(&'static str);

impl Display for StringError {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
Display::fmt(self.0, f)
}
}

impl Debug for StringError {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
Debug::fmt(self.0, f)
}
}

impl std::error::Error for StringError {}

impl Error {
fn string(s: &'static str) -> Self {
Error::Other {
source: Box::new(StringError(s)),
}
}
}
pub use anyhow::Error;
26 changes: 13 additions & 13 deletions src/db/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@ mod memory;
pub use diesel_pg::DieselRepo;
pub use memory::MemoryRepo;

use crate::{schema::*, Error};
use crate::schema::*;
use anyhow::{bail, Result};

pub trait RunsRepo: std::fmt::Debug + Send + Sync {
fn run_new(&self, run_data: NewRun) -> Result<Run, Error>;
fn run_try_load(&self, run_id: RunId) -> Result<Option<Run>, Error>;
fn run_load(&self, run_id: RunId) -> Result<Run, Error> {
fn run_new(&self, run_data: NewRun) -> Result<Run>;
fn run_try_load(&self, run_id: RunId) -> Result<Option<Run>>;
fn run_load(&self, run_id: RunId) -> Result<Run> {
match self.run_try_load(run_id)? {
Some(run) => Ok(run),
None => Err(Error::string("run_load: unknown run_id")),
None => bail!("run_load: unknown run_id"),
}
}
fn run_update(&self, run_id: RunId, patch: RunPatch) -> Result<(), Error>;
fn run_delete(&self, run_id: RunId) -> Result<(), Error>;
fn run_select(&self, with_run_id: Option<RunId>, limit: Option<u32>)
-> Result<Vec<Run>, Error>;
fn run_update(&self, run_id: RunId, patch: RunPatch) -> Result<()>;
fn run_delete(&self, run_id: RunId) -> Result<()>;
fn run_select(&self, with_run_id: Option<RunId>, limit: Option<u32>) -> Result<Vec<Run>>;
}

pub trait InvocationRequestsRepo: Send + Sync {
fn inv_req_new(&self, inv_req_data: NewInvocationRequest) -> Result<InvocationRequest, Error>;
fn inv_req_pop(&self) -> Result<Option<InvocationRequest>, Error>;
fn inv_req_new(&self, inv_req_data: NewInvocationRequest) -> Result<InvocationRequest>;
fn inv_req_pop(&self) -> Result<Option<InvocationRequest>>;
}

pub trait UsersRepo: Send + Sync {
fn user_new(&self, user_data: NewUser) -> Result<User, Error>;
fn user_try_load_by_login(&self, login: &str) -> Result<Option<User>, Error>;
fn user_new(&self, user_data: NewUser) -> Result<User>;
fn user_try_load_by_login(&self, login: &str) -> Result<Option<User>>;
}

pub trait Repo: RunsRepo + InvocationRequestsRepo + UsersRepo {}
Loading

0 comments on commit 45bad0a

Please sign in to comment.