Skip to content

Commit

Permalink
add scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
hails committed Mar 20, 2020
1 parent 71fd9c1 commit f3909bb
Show file tree
Hide file tree
Showing 13 changed files with 519 additions and 48 deletions.
Empty file added migrations/.gitkeep
Empty file.
6 changes: 6 additions & 0 deletions migrations/00000000000000_diesel_initial_setup/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.

DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();
36 changes: 36 additions & 0 deletions migrations/00000000000000_diesel_initial_setup/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.




-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
2 changes: 2 additions & 0 deletions migrations/2020-03-20-151829_create_processes/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE processes
13 changes: 13 additions & 0 deletions migrations/2020-03-20-151829_create_processes/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Your SQL goes here

CREATE TABLE processes (
id serial PRIMARY KEY,
code text NOT NULL,
telegram_user_id TEXT NOT NULL,
status text NOT NULL,
created_at timestamp NOT NULL DEFAULT NOW(),
updated_at timestamp NOT NULL DEFAULT NOW(),

UNIQUE(telegram_user_Id, code)
);

53 changes: 6 additions & 47 deletions src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use regex::Regex;
use reqwest;
use select::document::Document;
use select::predicate::{Class, Name, Predicate};
use telegram_bot::*;

use crate::process;

pub async fn start(api: &Api, message: &Message) -> Result<(), Error> {
api.send(message.text_reply(format!(
"Olá, {}! Esse bot irá lhe ajudar a acompanhar o estado do seu processo.\n\
Expand All @@ -25,13 +23,15 @@ pub async fn invalid(api: &Api, message: &Message) -> Result<(), Error> {
Ok(())
}

pub async fn code(api: &Api, message: &Message, data: &String) -> Result<(), Error> {
pub async fn code(api: &Api, message: &Message, data: &str) -> Result<(), Error> {
let code: String = data.split_whitespace().skip(1).take(1).collect();

let reply = if code.is_empty() {
String::from("Você me precisa me enviar algum código")
} else {
let process = fetch_citizenship_status(&code).await.unwrap();
let process = process::start(message.from.id.to_string(), code)
.await
.unwrap();
format!(
"Status: {}\n\
Mensagem: {}",
Expand All @@ -43,44 +43,3 @@ pub async fn code(api: &Api, message: &Message, data: &String) -> Result<(), Err

Ok(())
}

struct Process {
status: String,
info: String,
}

async fn fetch_citizenship_status(code: &String) -> Result<Process, reqwest::Error> {
let res = reqwest::Client::new()
.post("https://nacionalidade.justica.gov.pt/Home/GetEstadoProcessoAjax")
.form(&[("SenhaAcesso", code)])
.send()
.await?;
let body = res.text().await?;

let document = Document::from(body.as_str());

let mut process = Process {
status: String::from("Unknown"),
info: String::from(""),
};

if let Some(st) = document.find(Class("active1").descendant(Name("p"))).last() {
process.status = st.text();
}

if let Some(st) = document.find(Class("active2").descendant(Name("p"))).last() {
process.status = st.text();
}

if let Some(st) = document.find(Class("active3").descendant(Name("p"))).last() {
process.status = st.text();
}

if let Some(st) = document.find(Class("container")).last() {
let re = Regex::new(r"\s+").unwrap();

process.info = re.replace_all(&st.text(), " ").trim().to_string();
}

Ok(process)
}
13 changes: 13 additions & 0 deletions src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
extern crate dotenv;

use diesel::pg::PgConnection;
use diesel::prelude::*;
use dotenv::dotenv;
use std::env;

pub fn establish_connection() -> PgConnection {
dotenv().ok();

let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
PgConnection::establish(&database_url).expect(&format!("Error connecting to {}", database_url))
}
13 changes: 12 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
#[macro_use]
extern crate diesel;

use std::env;

use futures::StreamExt;
use telegram_bot::*;

mod commands;
mod database;
mod models;
mod process;
mod scheduler;
mod schema;

#[tokio::main]
async fn main() -> Result<(), Error> {
let token = env::var("TELEGRAM_BOT_TOKEN").expect("TELEGRAM_BOT_TOKEN not set");
let api = Api::new(token);
let api = Api::new(&token);

scheduler::start(token.clone());

// Fetch new updates via long poll method
let mut stream = api.stream();
Expand Down
11 changes: 11 additions & 0 deletions src/models.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use chrono::NaiveDateTime;

#[derive(Queryable, Debug)]
pub struct Process {
pub id: i32,
pub code: String,
pub telegram_user_id: String,
pub status: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
}
105 changes: 105 additions & 0 deletions src/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use diesel::prelude::*;
use regex::Regex;
use reqwest;
use select::document::Document;
use select::predicate::{Class, Name, Predicate};
use std::error;

use crate::database;
use crate::models::*;
use crate::schema::processes::dsl::*;

#[derive(Debug)]
pub struct ProcessResponse {
pub status: String,
pub info: String,
}

pub async fn start(
telegram_id: String,
process_code: String,
) -> Result<ProcessResponse, Box<dyn error::Error>> {
let p = fetch_for_one(&process_code).await?;
save(&telegram_id, &process_code, &p.status).await;

Ok(p)
}

async fn fetch_for_one(process_code: &str) -> Result<ProcessResponse, Box<dyn error::Error>> {
Ok(fetch_citizenship_status(&process_code).await?)
}

async fn save(telegram_id: &str, process_code: &str, process_status: &str) {
let connection = database::establish_connection();
diesel::insert_into(processes)
.values((
telegram_user_id.eq(telegram_id),
code.eq(process_code),
status.eq(process_status.to_lowercase()),
))
.on_conflict((telegram_user_id, code))
.do_update()
.set(status.eq(process_status.to_lowercase()))
.execute(&connection)
.expect("error while inserting process");
}

pub async fn fetch_for_all() -> Result<Vec<(String, ProcessResponse)>, Box<dyn error::Error>> {
let connection = database::establish_connection();
let user_processes = processes
.filter(status.ne("finished"))
.load::<Process>(&connection)
.expect("Error loading processes");

let mut updated_processes = Vec::new();
for process in user_processes {
let process_response = fetch_for_one(&process.code).await?;
if process_response.status.to_lowercase() != process.status {
save(
&process.telegram_user_id,
&process.code,
&process_response.status,
)
.await;
updated_processes.push((process.telegram_user_id, process_response));
}
}

Ok(updated_processes)
}

async fn fetch_citizenship_status(process_code: &str) -> Result<ProcessResponse, reqwest::Error> {
let res = reqwest::Client::new()
.post("https://nacionalidade.justica.gov.pt/Home/GetEstadoProcessoAjax")
.form(&[("SenhaAcesso", process_code)])
.send()
.await?;
let body = res.text().await?;

let document = Document::from(body.as_str());

let mut p = ProcessResponse {
status: String::from("unknown"),
info: String::from(""),
};

if let Some(st) = document.find(Class("active1").descendant(Name("p"))).last() {
p.status = st.text();
}

if let Some(st) = document.find(Class("active2").descendant(Name("p"))).last() {
p.status = st.text();
}

if let Some(st) = document.find(Class("active3").descendant(Name("p"))).last() {
p.status = st.text();
}

if let Some(st) = document.find(Class("container")).last() {
let re = Regex::new(r"\s+").unwrap();

p.info = re.replace_all(&st.text(), " ").trim().to_string();
}

Ok(p)
}
Loading

0 comments on commit f3909bb

Please sign in to comment.