From c4a9812b52a8c8be6615184d8a7c3b13c3f14e87 Mon Sep 17 00:00:00 2001 From: QIUZHILEI <2925212608@qq.com> Date: Fri, 17 Nov 2023 10:25:56 +0800 Subject: [PATCH 1/3] Improve code documentation and comments. Signed-off-by: QIUZHILEI <2925212608@qq.com> --- .vscode/settings.json | 3 - Cargo.toml | 31 +-- dagrs_core/Cargo.toml | 25 --- dagrs_core/src/engine/error.rs | 38 ---- dagrs_core/src/lib.rs | 18 -- dagrs_core/src/parser/error.rs | 55 ----- dagrs_core/src/parser/mod.rs | 78 ------- dagrs_core/src/task/cmd.rs | 62 ------ dagrs_core/src/task/default_task.rs | 152 ------------- dagrs_core/src/task/error.rs | 58 ----- dagrs_core/src/task/mod.rs | 183 ---------------- dagrs_core/src/utils/gen_macro.rs | 42 ---- {dagrs_derive => derive}/Cargo.toml | 3 +- {dagrs_derive => derive}/src/lib.rs | 18 +- .../dependencies.rs => derive/src/relay.rs | 41 ++-- {dagrs_derive => derive}/src/task.rs | 50 +---- examples/compute_dag.rs | 72 +++++-- ...om_parser.rs => custom_parser_and_task.rs} | 37 ++-- examples/custom_task.rs | 92 -------- examples/dependencies.rs | 82 +++++--- examples/derive_task.rs | 41 ++-- examples/engine.rs | 28 +-- examples/impl_action.rs | 46 ---- examples/use_macro.rs | 52 ----- examples/yaml_dag.rs | 3 +- {dagrs_core/src => src}/bin/dagrs.rs | 2 +- {dagrs_core/src => src}/engine/dag.rs | 199 ++++++++++-------- {dagrs_core/src => src}/engine/graph.rs | 46 ++-- {dagrs_core/src => src}/engine/mod.rs | 43 +++- src/lib.rs | 25 ++- src/task/action.rs | 86 ++++++++ src/task/cmd.rs | 51 +++++ src/task/default_task.rs | 180 ++++++++++++++++ src/task/mod.rs | 96 +++++++++ {dagrs_core/src => src}/task/state.rs | 116 ++++++++-- .../src => src}/utils/default_logger.rs | 0 {dagrs_core/src => src}/utils/env.rs | 21 +- {dagrs_core/src => src}/utils/log.rs | 12 +- {dagrs_core/src => src}/utils/mod.rs | 9 +- src/utils/parser.rs | 45 ++++ src/yaml/mod.rs | 119 +++++++++++ .../src/parser => src/yaml}/yaml_parser.rs | 28 +-- .../src/task => src/yaml}/yaml_task.rs | 24 +-- tests/config/script_run_failed.yaml | 4 +- tests/dag_job_test.rs | 78 ++++--- tests/yaml_parser_test.rs | 73 +++---- 46 files changed, 1195 insertions(+), 1372 deletions(-) delete mode 100644 .vscode/settings.json delete mode 100644 dagrs_core/Cargo.toml delete mode 100644 dagrs_core/src/engine/error.rs delete mode 100644 dagrs_core/src/lib.rs delete mode 100644 dagrs_core/src/parser/error.rs delete mode 100644 dagrs_core/src/parser/mod.rs delete mode 100644 dagrs_core/src/task/cmd.rs delete mode 100644 dagrs_core/src/task/default_task.rs delete mode 100644 dagrs_core/src/task/error.rs delete mode 100644 dagrs_core/src/task/mod.rs delete mode 100644 dagrs_core/src/utils/gen_macro.rs rename {dagrs_derive => derive}/Cargo.toml (85%) rename {dagrs_derive => derive}/src/lib.rs (51%) rename dagrs_derive/src/dependencies.rs => derive/src/relay.rs (87%) rename {dagrs_derive => derive}/src/task.rs (75%) rename examples/{custom_parser.rs => custom_parser_and_task.rs} (78%) delete mode 100644 examples/custom_task.rs delete mode 100644 examples/impl_action.rs delete mode 100644 examples/use_macro.rs rename {dagrs_core/src => src}/bin/dagrs.rs (97%) rename {dagrs_core/src => src}/engine/dag.rs (71%) rename {dagrs_core/src => src}/engine/graph.rs (81%) rename {dagrs_core/src => src}/engine/mod.rs (77%) create mode 100644 src/task/action.rs create mode 100644 src/task/cmd.rs create mode 100644 src/task/default_task.rs create mode 100644 src/task/mod.rs rename {dagrs_core/src => src}/task/state.rs (52%) rename {dagrs_core/src => src}/utils/default_logger.rs (100%) rename {dagrs_core/src => src}/utils/env.rs (73%) rename {dagrs_core/src => src}/utils/log.rs (95%) rename {dagrs_core/src => src}/utils/mod.rs (82%) create mode 100644 src/utils/parser.rs create mode 100644 src/yaml/mod.rs rename {dagrs_core/src/parser => src/yaml}/yaml_parser.rs (84%) rename {dagrs_core/src/task => src/yaml}/yaml_task.rs (76%) diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 39a5ca1..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "rust-analyzer.cargo.features": "all" -} diff --git a/Cargo.toml b/Cargo.toml index c206fe8..58311a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,30 +9,33 @@ readme = "README.md" repository = "https://github.com/open-rust-initiative/dagrs" keywords = ["DAG", "task", "async", "parallel", "concurrent"] +[workspace] +members = ["derive","."] + +[dependencies] +yaml-rust = "0.4.5" +bimap = "0.6.1" +clap = { version = "4.2.2", features = ["derive"] } +anymap2 = "0.13.0" +tokio = { version = "1.28", features = ["rt", "sync","rt-multi-thread"] } +derive ={ path = "derive", version = "0.3.0"} [dev-dependencies] log = "0.4" simplelog = "0.12" -[workspace] -members = ["dagrs_derive","dagrs_core"] - - -[dependencies] -dagrs_core = {path = "dagrs_core" , version = "0.3.0"} -dagrs_derive ={ path = "dagrs_derive", version = "0.3.0"} - [features] -default = ["dagrs_core/logger"] -yaml = ["dagrs_core/yaml"] -derive = ["dagrs_derive/derive"] +default = ["logger"] +logger = [] +yaml = [] +derive = ["derive/derive"] -[[example]] -name = "custom_log" +[[bin]] +name = "dagrs" required-features = ["yaml"] [[example]] -name = "custom_parser" +name = "custom_log" required-features = ["yaml"] [[example]] diff --git a/dagrs_core/Cargo.toml b/dagrs_core/Cargo.toml deleted file mode 100644 index 65be3be..0000000 --- a/dagrs_core/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "dagrs_core" -version = "0.3.0" -edition = "2021" -license = "MIT OR Apache-2.0" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -yaml-rust = "0.4.5" -bimap = "0.6.1" -clap = { version = "4.2.2", features = ["derive"] } -anymap2 = "0.13.0" -thiserror = "1.0.30" -tokio = { version = "1.28", features = ["rt", "sync","rt-multi-thread"] } - -[features] -default = ["logger"] -logger = [] -yaml = [] - - -[[bin]] -name = "dagrs" -required-features = ["yaml"] diff --git a/dagrs_core/src/engine/error.rs b/dagrs_core/src/engine/error.rs deleted file mode 100644 index 6c6805b..0000000 --- a/dagrs_core/src/engine/error.rs +++ /dev/null @@ -1,38 +0,0 @@ -//! Errors that may be raised by building and running dag jobs. - -use thiserror::Error; - -use crate::parser::ParserError; -use crate::task::RunningError; - -#[derive(Debug, Error)] -/// A synthesis of all possible errors. -pub enum DagError { - /// Error that occurs when running action. - #[error("{0}")] - RunningError(RunningError), - /// Yaml file parsing error. - #[error("{0}")] - YamlParserError(ParserError), - /// Task dependency error. - #[error("Task[{0}] dependency task not exist.")] - RelyTaskIllegal(String), - /// There are loops in task dependencies. - #[error("Illegal directed a cyclic graph, loop Detect!")] - LoopGraph, - /// There are no tasks in the job. - #[error("There are no tasks in the job.")] - EmptyJob, -} - -impl From for DagError { - fn from(value: ParserError) -> Self { - Self::YamlParserError(value) - } -} - -impl From for DagError { - fn from(value: RunningError) -> Self { - Self::RunningError(value) - } -} diff --git a/dagrs_core/src/lib.rs b/dagrs_core/src/lib.rs deleted file mode 100644 index 7fdcb03..0000000 --- a/dagrs_core/src/lib.rs +++ /dev/null @@ -1,18 +0,0 @@ -extern crate anymap2; -extern crate bimap; -extern crate clap; -extern crate proc_macro; -extern crate tokio; -#[cfg(feature = "yaml")] -extern crate yaml_rust; - -pub use engine::{Dag, DagError, Engine}; -pub use parser::*; -pub use task::{alloc_id, Action, DefaultTask, Input, NopAction, Output, RunningError, Task}; -#[cfg(feature = "yaml")] -pub use task::{CommandAction, YamlTask}; -pub use utils::{gen_macro, log, EnvVar, LogLevel, Logger}; -mod engine; -mod parser; -mod task; -mod utils; diff --git a/dagrs_core/src/parser/error.rs b/dagrs_core/src/parser/error.rs deleted file mode 100644 index 862eb92..0000000 --- a/dagrs_core/src/parser/error.rs +++ /dev/null @@ -1,55 +0,0 @@ -//! Errors that may occur during configuration file parsing. - -use thiserror::Error; - -/// Errors that may occur while parsing task configuration files. -#[derive(Debug, Error)] -pub enum ParserError { - /// Configuration file not found. - #[error("File not found. [{0}]")] - FileNotFound(#[from] std::io::Error), - #[error("{0}")] - YamlTaskError(YamlTaskError), - #[error("{0}")] - FileContentError(FileContentError), -} - -/// Error about file information. -#[derive(Debug, Error, PartialEq, Eq)] -pub enum FileContentError { - /// The format of the yaml configuration file is not standardized. - #[error("{0}")] - IllegalYamlContent(#[from] yaml_rust::ScanError), - /// Config file has no content. - #[error("File is empty! [{0}]")] - Empty(String), -} - -/// Errors about task configuration items. -#[derive(Debug, Error, PartialEq, Eq)] -pub enum YamlTaskError { - /// The configuration file should start with `dagrs:`. - #[error("File content is not start with 'dagrs'.")] - StartWordError, - /// No task name configured. - #[error("Task has no name field. [{0}]")] - NoNameAttr(String), - /// The specified task predecessor was not found. - #[error("Task cannot find the specified predecessor. [{0}]")] - NotFoundPrecursor(String), - /// `script` is not defined. - #[error("The 'script' attribute is not defined. [{0}]")] - NoScriptAttr(String), -} - -impl From for ParserError { - fn from(value: FileContentError) -> Self { - ParserError::FileContentError(value) - } -} - -impl From for ParserError { - fn from(value: YamlTaskError) -> Self { - ParserError::YamlTaskError(value) - } -} diff --git a/dagrs_core/src/parser/mod.rs b/dagrs_core/src/parser/mod.rs deleted file mode 100644 index 6b9050b..0000000 --- a/dagrs_core/src/parser/mod.rs +++ /dev/null @@ -1,78 +0,0 @@ -//! Parsing configuration files. -//! -//! # Config file parser -//! -//! When users customize configuration files, the program needs to use the configuration -//! file parser defined by this module. The parser is responsible for parsing the content -//! defined in the configuration file into a series of tasks with dependencies. -//! -//! The program provides a default Yaml configuration file parser: [`YamlParser`]. However, -//! users are allowed to customize the parser, which requires the user to implement the [`Parser`] trait. -//! Currently, the program only supports configuration files in *.yaml format, and may support -//! configuration files in *.json format in the future. -//! -//! # The basic format of the yaml configuration file is as follows: -//! ```yaml -//! dagrs: -//! a: -//! name: "Task 1" -//! after: [ b, c ] -//! cmd: echo a -//! b: -//! name: "Task 2" -//! after: [ c, f, g ] -//! cmd: echo b -//! c: -//! name: "Task 3" -//! after: [ e, g ] -//! cmd: echo c -//! d: -//! name: "Task 4" -//! after: [ c, e ] -//! cmd: echo d -//! e: -//! name: "Task 5" -//! after: [ h ] -//! cmd: echo e -//! f: -//! name: "Task 6" -//! after: [ g ] -//! cmd: python3 ./tests/config/test.py -//! g: -//! name: "Task 7" -//! after: [ h ] -//! cmd: node ./tests/config/test.js -//! h: -//! name: "Task 8" -//! cmd: echo h -//! ``` -//! -//! Users can execute arbitrary commands of the operating system. If users -//! want to run other types of script tasks, they need to implement the [`Action`] trait by themselves, -//! and before parsing the configuration file, they need to provide a specific type that implements -//! the [`Action`] trait in the form of key-value pairs: . - -use std::{collections::HashMap, sync::Arc}; - -pub use error::*; -#[cfg(feature = "yaml")] -pub use yaml_parser::YamlParser; -#[cfg(feature = "yaml")] -mod yaml_parser; - -use crate::{task::Task, Action}; - -mod error; - -/// Generic parser traits. If users want to customize the configuration file parser, they must implement this trait. -/// [`YamlParser`] is an example of [`Parser`] -pub trait Parser { - /// Parses the contents of a configuration file into a series of tasks with dependencies. - /// If the user customizes the script execution logic, it is necessary to provide specific - /// types that implement the [`Action`] trait for certain tasks in the form of key-value pairs. - fn parse_tasks( - &self, - file: &str, - specific_actions: HashMap>, - ) -> Result>, ParserError>; -} diff --git a/dagrs_core/src/task/cmd.rs b/dagrs_core/src/task/cmd.rs deleted file mode 100644 index 6add6cd..0000000 --- a/dagrs_core/src/task/cmd.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! OS command `Action`. -//! -//! # `cmd` attribute command wrapper. -//! -//! Specify the command to be executed in the `cmd` attribute of the `yaml` configuration -//! file, and the `Yaml` parser will package the command as a `CommandAction`, which implements -//! the `Action` trait and defines the specific logic of executing the command. - -use std::{process::Command, sync::Arc}; - -use crate::{log, utils::EnvVar}; - -use super::{Action, CmdExecuteError, Input, Output, RunningError}; - -/// Can be used to run a command. -pub struct CommandAction { - command: String, -} - -impl CommandAction { - #[allow(unused)] - pub fn new(cmd: &str) -> Self { - Self { - command: cmd.to_owned(), - } - } -} - -impl Action for CommandAction { - fn run(&self, input: Input, _env: Arc) -> Result { - let mut args = Vec::new(); - let mut cmd = if cfg!(target_os = "windows") { - args.push("-Command"); - Command::new("powershell") - } else { - args.push("-c"); - Command::new("sh") - }; - args.push(&self.command); - input.get_iter().for_each(|input| { - if let Some(inp) = input.get::() { - args.push(inp) - } - }); - let out = match cmd.args(args).output() { - Ok(o) => o, - Err(e) => return Err(CmdExecuteError::new(e.to_string()).into()), - }; - let code = out.status.code().unwrap_or(-1); - if code == 0 { - let mut out = String::from_utf8(out.stdout).unwrap(); - if cfg!(target_os = "windows") { - out = out.replace("\r\n", " "); - } - Ok(Output::new(out)) - } else { - let err_msg = String::from_utf8(out.stderr).unwrap(); - log::error(err_msg.clone()); - Err(CmdExecuteError::new(err_msg).into()) - } - } -} diff --git a/dagrs_core/src/task/default_task.rs b/dagrs_core/src/task/default_task.rs deleted file mode 100644 index 1d2e507..0000000 --- a/dagrs_core/src/task/default_task.rs +++ /dev/null @@ -1,152 +0,0 @@ -use crate::NopAction; - -use super::{Action, Task, ID_ALLOCATOR}; -use std::sync::Arc; - -/// A default implementation of the Task trait. In general, use it to define the tasks of dagrs. -pub struct DefaultTask { - /// id is the unique identifier of each task, it will be assigned by the global [`IDAllocator`] - /// when creating a new task, you can find this task through this identifier. - id: usize, - /// The task's name. - name: String, - /// Id of the predecessor tasks. - precursors: Vec, - /// Perform specific actions. - action: Arc, -} - -impl DefaultTask { - /// Allocate a new [`DefaultTask`], the specific task behavior is a structure that implements [`Action`]. - /// - /// # Example - /// - /// ```rust - /// use dagrs::{DefaultTask, Output,Input, Action,EnvVar,RunningError}; - /// use std::sync::Arc; - /// struct SimpleAction(usize); - /// - /// impl Action for SimpleAction { - /// fn run(&self, input: Input, env: Arc) -> Result { - /// Ok(Output::new(self.0 + 10)) - /// } - /// } - /// - /// let action = SimpleAction(10); - /// let task = DefaultTask::new(action, "Increment action"); - /// ``` - /// - /// `SimpleAction` is a struct that impl [`Action`]. Since task will be - /// executed in separated threads, [`Send`] and [`Sync`] is needed. - /// - /// **Note:** This method will take the ownership of struct that impl [`Action`]. - pub fn new(action: impl Action + 'static + Send + Sync, name: &str) -> Self { - DefaultTask { - id: ID_ALLOCATOR.alloc(), - action: Arc::new(action), - name: name.to_owned(), - precursors: Vec::new(), - } - } - - /// Allocate a new [`DefaultTask`] from any action and name. - /// The specific task behavior is a structure that implements [`Action`]. - /// - /// # Example - /// - /// ```rust - /// use dagrs::{DefaultTask, Output,Input, Action,EnvVar,RunningError}; - /// use std::sync::Arc; - /// struct SimpleAction(usize); - /// - /// impl Action for SimpleAction { - /// fn run(&self, input: Input, env: Arc) -> Result { - /// Ok(Output::new(self.0 + 10)) - /// } - /// } - /// - /// let action = Arc::new(SimpleAction(10)); - /// let task = DefaultTask::from(action, String::from("Increment action")); - /// ``` - /// - /// `SimpleAction` is a struct that impl [`Action`]. Since task will be - /// executed in separated threads, [`Send`] and [`Sync`] is needed. - /// - /// **Note:** This method will take the ownership of struct that impl [`Action`]. - pub fn from(action: Arc, name: String) -> Self { - DefaultTask { - id: ID_ALLOCATOR.alloc(), - action, - name, - precursors: Vec::new(), - } - } - - /// Tasks that shall be executed before this one. - /// - /// # Example - /// ```rust - /// use dagrs::{Action,DefaultTask,Input,Output,RunningError,EnvVar}; - /// use std::sync::Arc; - /// struct SimpleAction {}; - /// impl Action for SimpleAction { - /// fn run(&self, input: Input, env: Arc) -> Result { - /// Ok(Output::empty()) - /// } - /// } - /// let mut t1 = DefaultTask::new(SimpleAction{}, "Task 1"); - /// let mut t2 = DefaultTask::new(SimpleAction{}, "Task 2"); - /// t2.set_predecessors(&[&t1]); - /// ``` - /// In above code, `t1` will be executed before `t2`. - pub fn set_predecessors<'a>( - &mut self, - predecessors: impl IntoIterator, - ) { - self.precursors - .extend(predecessors.into_iter().map(|t| t.id())) - } - - /// The same as `exec_after`, but input are tasks' ids - /// rather than reference to [`DefaultTask`]. - pub fn set_predecessors_by_id(&mut self, predecessors_id: impl IntoIterator) { - self.precursors.extend(predecessors_id) - } - - pub fn set_name(&mut self, name: &str) { - self.name = name.to_owned(); - } - - pub fn set_action(&mut self, action: impl Action + 'static + Send + Sync) { - self.action = Arc::new(action); - } -} - -impl Task for DefaultTask { - fn action(&self) -> Arc { - self.action.clone() - } - - fn precursors(&self) -> &[usize] { - &self.precursors - } - - fn id(&self) -> usize { - self.id - } - - fn name(&self) -> String { - self.name.clone() - } -} - -impl Default for DefaultTask { - fn default() -> Self { - Self { - id: ID_ALLOCATOR.alloc(), - name: "default".to_owned(), - precursors: Vec::new(), - action: Arc::new(NopAction), - } - } -} diff --git a/dagrs_core/src/task/error.rs b/dagrs_core/src/task/error.rs deleted file mode 100644 index 4da8bd6..0000000 --- a/dagrs_core/src/task/error.rs +++ /dev/null @@ -1,58 +0,0 @@ -//! There is an error about task runtime. - -use std::fmt::Display; - -use thiserror::Error; - -/// Errors that may be generated when the specific behavior of the task is run. -/// This is just a simple error handling. When running the tasks in the configuration file, -/// some errors can be found by the user, which is convenient for debugging. -/// It also allows users to return expected errors in custom task behavior. However, even -/// if this error is expected, it will cause the execution of the entire task to fail. -#[derive(Debug)] -pub struct RunningError { - msg: String, -} - -/// command produces incorrect behavior when run. -#[derive(Error, Debug)] -pub struct CmdExecuteError { - msg: String, -} - -impl RunningError { - pub fn new(msg: String) -> Self { - Self { msg } - } - pub fn from_err(err: T) -> Self { - Self { - msg: err.to_string(), - } - } -} - -impl Display for RunningError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.msg) - } -} - -impl CmdExecuteError { - pub fn new(msg: String) -> Self { - Self { msg } - } -} - -impl Display for CmdExecuteError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "cmd execution error: {}", self.msg) - } -} - -impl From for RunningError { - fn from(value: CmdExecuteError) -> Self { - RunningError { - msg: value.to_string(), - } - } -} diff --git a/dagrs_core/src/task/mod.rs b/dagrs_core/src/task/mod.rs deleted file mode 100644 index c451874..0000000 --- a/dagrs_core/src/task/mod.rs +++ /dev/null @@ -1,183 +0,0 @@ -//! Relevant definitions of tasks. -//! -//! # Task execution mode of the Dag engine -//! -//! Currently, the Dag execution engine has two execution modes: -//! The first mode is to execute tasks through user-written yaml configuration file, and then -//! hand them over to the dag engine for execution. The command to be executed can be specified in yaml. -//! -//!# The basic format of the yaml configuration file is as follows: -//! ```yaml -//! dagrs: -//! a: -//! name: "Task 1" -//! after: [ b, c ] -//! cmd: echo a -//! b: -//! name: "Task 2" -//! after: [ c, f, g ] -//! cmd: echo b -//! c: -//! name: "Task 3" -//! after: [ e, g ] -//! cmd: echo c -//! d: -//! name: "Task 4" -//! after: [ c, e ] -//! cmd: echo d -//! e: -//! name: "Task 5" -//! after: [ h ] -//! cmd: echo e -//! f: -//! name: "Task 6" -//! after: [ g ] -//! cmd: python3 ./tests/config/test.py -//! g: -//! name: "Task 7" -//! after: [ h ] -//! cmd: node ./tests/config/test.js -//! h: -//! name: "Task 8" -//! cmd: echo h -//! ``` -//! The necessary attributes for tasks in the yaml configuration file are: -//! id: unique identifier, such as 'a' -//! name: task name -//! after: Indicates which task to execute after, this attribute is optional -//! cmd: command to execute, such as 'ls ./' -//! -//! -//! The second mode is that the user program defines the task, which requires the -//! user to implement the [`Action`] trait of the task module and rewrite the -//! run function. -//! -//! # Example -//! -//! ```rust -//! use dagrs::{Action,EnvVar,Output,RunningError,Input}; -//! use std::sync::Arc; -//! struct SimpleAction{ -//! limit:u32 -//! } -//! -//! impl Action for SimpleAction{ -//! fn run(&self, input: Input,env:Arc) -> Result { -//! let mut sum=0; -//! for i in 0..self.limit{ -//! sum+=i; -//! } -//! Ok(Output::new(sum)) -//! } -//! } -//! -//! ``` -//! -//! # Task definition. -//! -//! Currently, two different execution modes correspond to two different task types, -//! namely [`DefaultTask`] and [`YamlTask`]. -//! When users program to implement task logic, the engine uses [`DefaultTask`]; -//! When the user provides the yaml configuration file, the internal engine uses [`YamlTask`]; -//! -//! These two task types both implement the [`Task`] trait, that is to say, users can also -//! customize tasks and assign more functions and attributes to tasks. However, a task must -//! have four fixed properties (the four standard properties contained in DefaultTask): -//! - id: use [`ID_ALLOCATOR`] to get a global task unique identifier, the type must be `usize` -//! - name: the task name specified by the user, the type must be `String` -//! - predecessor_tasks: the predecessor task of this task, the type must be `Vec` -//! - action: the specific behavior to be performed by the task, the type must be `Arc` -//! -//! If users want to customize Task, they can refer to the implementation of these two specific [`Task`]. - -use std::fmt::Debug; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; - -use crate::utils::EnvVar; - -#[cfg(feature = "yaml")] -pub use self::cmd::CommandAction; -pub use self::default_task::DefaultTask; -pub use self::error::{CmdExecuteError, RunningError}; -pub(crate) use self::state::ExecState; -pub use self::state::{Input, Output}; -#[cfg(feature = "yaml")] -pub use self::yaml_task::YamlTask; - -mod cmd; -mod default_task; -mod error; -mod state; -mod yaml_task; - -/// Action Trait. -/// [`Action`] represents the specific behavior to be executed. -pub trait Action { - /// The specific behavior to be performed by the task. - fn run(&self, input: Input, env: Arc) -> Result; -} - -/// Tasks can have many attributes, among which `id`, `name`, `predecessor_tasks`, and -/// `runnable` attributes are required, and users can also customize some other attributes. -/// [`DefaultTask`] in this module is a [ `Task`], the DAG engine uses it as the basic -/// task by default. -/// -/// A task must provide methods to obtain precursors and required attributes, just as -/// the methods defined below, users who want to customize tasks must implement these methods. -pub trait Task: Send + Sync { - /// Get a reference to an executable action. - fn action(&self) -> Arc; - /// Get the id of all predecessor tasks of this task. - fn precursors(&self) -> &[usize]; - /// Get the id of this task. - fn id(&self) -> usize; - /// Get the name of this task. - fn name(&self) -> String; -} - -/// IDAllocator for DefaultTask -struct IDAllocator { - id: AtomicUsize, -} - -pub struct NopAction; - -impl Debug for dyn Task { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!( - f, - "{},\t{},\t{:?}", - self.id(), - self.name(), - self.precursors() - ) - } -} - -impl IDAllocator { - fn alloc(&self) -> usize { - let origin = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - if origin > self.id.load(std::sync::atomic::Ordering::Relaxed) { - panic!("Too many tasks.") - } else { - origin - } - } -} - -impl Action for NopAction { - fn run(&self, _input: Input, _env: Arc) -> Result { - Ok(Output::empty()) - } -} - -/// The global task uniquely identifies an instance of the allocator. -static ID_ALLOCATOR: IDAllocator = IDAllocator { - id: AtomicUsize::new(1), -}; - -/// public function to assign task's id. -pub fn alloc_id() -> usize { - ID_ALLOCATOR.alloc() -} diff --git a/dagrs_core/src/utils/gen_macro.rs b/dagrs_core/src/utils/gen_macro.rs deleted file mode 100644 index 97552d3..0000000 --- a/dagrs_core/src/utils/gen_macro.rs +++ /dev/null @@ -1,42 +0,0 @@ -/// Macros for generating simple tasks. - -/// # Example -/// -/// ```rust -/// use dagrs::{Dag, Action, Input, EnvVar, Output, RunningError, DefaultTask, gen_task,Task}; -/// use std::sync::Arc; -/// let task = gen_task!("task A", |input, env| { -/// Ok(Output::empty()) -/// }); -/// assert_eq!(task.id(),1); -/// assert_eq!(task.name(),"task A"); -/// ``` -#[macro_export] -macro_rules! gen_task { - ($task_name:literal,$action:expr) => {{ - use std::sync::Arc; - use $crate::{Action, EnvVar, Input, Output, RunningError}; - pub struct SimpleAction; - impl Action for SimpleAction { - fn run(&self, input: Input, env: Arc) -> Result { - Ok($action(input, env)) - } - } - DefaultTask::new(SimpleAction, $task_name) - }}; -} - -#[macro_export] -macro_rules! gen_action { - ($action:expr) => {{ - use std::sync::Arc; - use $crate::{Action, EnvVar, Input, Output, RunningError}; - pub struct SimpleAction; - impl Action for SimpleAction { - fn run(&self, input: Input, env: Arc) -> Result { - Ok($action(input, env)) - } - } - SimpleAction - }}; -} diff --git a/dagrs_derive/Cargo.toml b/derive/Cargo.toml similarity index 85% rename from dagrs_derive/Cargo.toml rename to derive/Cargo.toml index 82b3ae2..4e5c02a 100644 --- a/dagrs_derive/Cargo.toml +++ b/derive/Cargo.toml @@ -1,8 +1,7 @@ [package] -name = "dagrs_derive" +name = "derive" version = "0.3.0" edition = "2021" -license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dagrs_derive/src/lib.rs b/derive/src/lib.rs similarity index 51% rename from dagrs_derive/src/lib.rs rename to derive/src/lib.rs index 95d4acb..bab0adb 100644 --- a/dagrs_derive/src/lib.rs +++ b/derive/src/lib.rs @@ -1,30 +1,34 @@ use proc_macro::TokenStream; +extern crate proc_macro2; extern crate quote; extern crate syn; #[cfg(feature = "derive")] -mod dependencies; +mod relay; #[cfg(feature = "derive")] mod task; +/// [`CustomTask`] is a derived macro that may be used when customizing tasks. It can only be +/// marked on the structure, and the user needs to specify four attributes of the custom task +/// type, which are task(attr="id"), task(attr = "name"), task(attr = "precursors ") and +/// task(attr = "action"), which are used in the `derive_task` example. #[cfg(feature = "derive")] -#[proc_macro_derive(Task, attributes(task))] +#[proc_macro_derive(CustomTask, attributes(task))] pub fn derive_task(input: TokenStream) -> TokenStream { use crate::task::parse_task; use syn::{parse_macro_input, DeriveInput}; - let input = parse_macro_input!(input as DeriveInput); parse_task(&input).into() } +/// The [`dependencies!`] macro allows users to specify all task dependencies in an easy-to-understand +/// way. It will return to the user a series of `DefaultTask` in the order of tasks given by the user. #[cfg(feature = "derive")] #[proc_macro] pub fn dependencies(input: TokenStream) -> TokenStream { - use dependencies::Tasks; - - use crate::dependencies::generate_task; - + use crate::relay::generate_task; + use relay::Tasks; let tasks = syn::parse_macro_input!(input as Tasks); let relies = tasks.resolve_dependencies(); if let Err(err) = relies { diff --git a/dagrs_derive/src/dependencies.rs b/derive/src/relay.rs similarity index 87% rename from dagrs_derive/src/dependencies.rs rename to derive/src/relay.rs index 982e0d3..c526fd4 100644 --- a/dagrs_derive/src/dependencies.rs +++ b/derive/src/relay.rs @@ -2,17 +2,17 @@ use proc_macro2::Ident; use std::collections::{HashMap, HashSet}; use syn::{parse::Parse, Token}; -pub struct Relay { - pub task: Ident, - pub successors: Vec, +pub(crate) struct Relay { + pub(crate) task: Ident, + pub(crate) successors: Vec, } -pub struct Task { - pub task: Ident, - pub precursors: Vec, +pub(crate) struct Task { + pub(crate) task: Ident, + pub(crate) precursors: Vec, } -pub struct Tasks(pub Vec); +pub(crate) struct Tasks(pub(crate) Vec); impl Parse for Tasks { fn parse(input: syn::parse::ParseStream) -> syn::Result { @@ -50,7 +50,7 @@ impl Tasks { Ok(()) } - pub fn resolve_dependencies(self) -> syn::Result> { + pub(crate) fn resolve_dependencies(self) -> syn::Result> { self.check_duplicate()?; let mut seq = Vec::new(); let tasks: HashMap> = self @@ -80,25 +80,13 @@ impl Tasks { } } -pub fn generate_task(tasks: Vec) -> proc_macro2::TokenStream { - let tasks_defined_token: proc_macro2::TokenStream = init_tasks(&tasks); - let init_pres_token: proc_macro2::TokenStream = init_precursors(&tasks); - let tasks_ident: Vec = tasks.into_iter().map(|item| item.task).collect(); - quote::quote!({ - #tasks_defined_token - #init_pres_token - vec![#(#tasks_ident,)*] - }) -} - fn init_tasks(tasks: &[Task]) -> proc_macro2::TokenStream { let mut token = proc_macro2::TokenStream::new(); for task in tasks.iter() { let ident = &task.task; let name = ident.to_string(); token.extend(quote::quote!( - let mut #ident=dagrs::DefaultTask::default(); - #ident.set_name(#name); + let mut #ident=dagrs::DefaultTask::new(#name); )); } token @@ -120,3 +108,14 @@ fn init_precursors(tasks: &[Task]) -> proc_macro2::TokenStream { } token } + +pub(crate) fn generate_task(tasks: Vec) -> proc_macro2::TokenStream { + let tasks_defined_token: proc_macro2::TokenStream = init_tasks(&tasks); + let init_pres_token: proc_macro2::TokenStream = init_precursors(&tasks); + let tasks_ident: Vec = tasks.into_iter().map(|item| item.task).collect(); + quote::quote!({ + #tasks_defined_token + #init_pres_token + vec![#(#tasks_ident,)*] + }) +} diff --git a/dagrs_derive/src/task.rs b/derive/src/task.rs similarity index 75% rename from dagrs_derive/src/task.rs rename to derive/src/task.rs index 00e4e23..2f90d71 100644 --- a/dagrs_derive/src/task.rs +++ b/derive/src/task.rs @@ -1,7 +1,7 @@ use proc_macro2::TokenStream; use syn::{ Data, DeriveInput, Expr, ExprLit, Field, Fields, GenericArgument, Ident, Lit, MetaNameValue, - PathArguments, Type, TypeParamBound, + PathArguments, Type, }; const ID: &str = "id"; @@ -10,7 +10,7 @@ const PRECURSORS: &str = "precursors"; const ACTION: &str = "action"; #[allow(unused)] -pub fn parse_task(input: &DeriveInput) -> TokenStream { +pub(crate) fn parse_task(input: &DeriveInput) -> TokenStream { let struct_ident = &input.ident; let fields = match input.data { Data::Struct(ref str) => &str.fields, @@ -97,8 +97,8 @@ fn validate_name(field: &Field) -> syn::Result { let ty = str.path.segments.last().unwrap(); if ty.ident.eq("String") { Ok(quote::quote!( - fn name(&self) -> String { - self.#ident.clone() + fn name(&self) -> &str { + &self.#ident } )) } else { @@ -149,44 +149,16 @@ fn validate_action(field: &Field) -> syn::Result { let ident = &field.ident; let err = Err(syn::Error::new_spanned( &field.ty, - "The type of `id` should be `Arc`", + "The type of `id` should be `Action`", )); if let Type::Path(ref str) = field.ty { let ty = str.path.segments.last().unwrap(); - if ty.ident.eq("Arc") { - match ty.arguments { - PathArguments::AngleBracketed(ref inner) => { - if let GenericArgument::Type(Type::TraitObject(tto)) = - inner.args.last().unwrap() - { - let bounds_ident = tto - .bounds - .iter() - .map(|bound| { - if let TypeParamBound::Trait(ref t) = bound { - Some(t) - } else { - None - } - }) - .filter(|bound| !bound.is_none()) - .map(|bound| bound.unwrap().path.get_ident().unwrap().to_string()) - .collect::>(); - if bounds_ident.iter().any(|bound| bound.eq("Action")) { - Ok(quote::quote!( - fn action(&self) -> Arc { - self.#ident.clone() - } - )) - } else { - err - } - } else { - err - } + if ty.ident.eq("Action") { + Ok(quote::quote!( + fn action(&self) -> Action { + self.#ident.clone() } - _ => err, - } + )) } else { err } @@ -197,7 +169,7 @@ fn validate_action(field: &Field) -> syn::Result { fn generate_impl(struct_ident: &Ident, fields_function: proc_macro2::TokenStream) -> TokenStream { quote::quote!( - impl Task for #struct_ident{ + impl dagrs::Task for #struct_ident{ #fields_function } unsafe impl Send for #struct_ident{} diff --git a/examples/compute_dag.rs b/examples/compute_dag.rs index 51cbd94..c53370f 100644 --- a/examples/compute_dag.rs +++ b/examples/compute_dag.rs @@ -13,36 +13,62 @@ extern crate dagrs; use std::sync::Arc; -use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError}; +use dagrs::{log, Complex, Dag, DefaultTask, EnvVar, Input, LogLevel, Output}; -macro_rules! generate_task { - ($action:ident($val:expr),$name:expr) => {{ - pub struct $action(usize); - impl Action for $action { - fn run(&self, input: Input, env: Arc) -> Result { - let base = env.get::("base").unwrap(); - let mut sum = self.0; - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() * base); - Ok(Output::new(sum)) - } - } - DefaultTask::new($action($val), $name) - }}; +struct Compute(usize); + +impl Complex for Compute { + fn run(&self, input: Input, env: Arc) -> Output { + let base = env.get::("base").unwrap(); + let mut sum = self.0; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + } } fn main() { // initialization log. let _initialized = log::init_logger(LogLevel::Info, None); // generate some tasks. - let a = generate_task!(A(1), "Compute A"); - let mut b = generate_task!(B(2), "Compute B"); - let mut c = generate_task!(C(4), "Compute C"); - let mut d = generate_task!(D(8), "Compute D"); - let mut e = generate_task!(E(16), "Compute E"); - let mut f = generate_task!(F(32), "Compute F"); - let mut g = generate_task!(G(64), "Compute G"); + let a = DefaultTask::with_action("Compute A", Compute(1)); + + let mut b = DefaultTask::with_action("Compute B", Compute(2)); + + let mut c = DefaultTask::new("Compute C"); + c.set_action(Compute(4)); + + let mut d = DefaultTask::new("Compute D"); + d.set_action(Compute(8)); + + let mut e = DefaultTask::with_closure("Compute E", |input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 16; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + let mut f = DefaultTask::with_closure("Compute F", |input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 32; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + + let mut g = DefaultTask::new("Compute G"); + g.set_closure(|input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 64; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + // Set up task dependencies. b.set_predecessors(&[&a]); c.set_predecessors(&[&a]); diff --git a/examples/custom_parser.rs b/examples/custom_parser_and_task.rs similarity index 78% rename from examples/custom_parser.rs rename to examples/custom_parser_and_task.rs index 40b2591..ca58f35 100644 --- a/examples/custom_parser.rs +++ b/examples/custom_parser_and_task.rs @@ -1,3 +1,5 @@ +//! Implement the Task trait to customize task properties. +//! MyTask is basically the same as DefaultTask provided by dagrs. //! Implement the Parser interface to customize the task configuration file parser. //! The content of the configuration file is as follows: //! @@ -18,29 +20,24 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::{fs, sync::Arc}; -use dagrs::{log, Action, CommandAction, Dag, LogLevel, Parser, ParserError, Task}; +use dagrs::{log, Action, CommandAction, Dag, LogLevel, ParseError, Parser, Task}; struct MyTask { tid: (String, usize), name: String, precursors: Vec, precursors_id: Vec, - action: Arc, + action: Action, } impl MyTask { - pub fn new( - txt_id: &str, - precursors: Vec, - name: String, - action: Arc, - ) -> Self { + pub fn new(txt_id: &str, precursors: Vec, name: String, action: CommandAction) -> Self { Self { tid: (txt_id.to_owned(), dagrs::alloc_id()), name, precursors, precursors_id: Vec::new(), - action, + action: Action::Structure(Arc::new(action)), } } @@ -58,7 +55,7 @@ impl MyTask { } impl Task for MyTask { - fn action(&self) -> Arc { + fn action(&self) -> Action { self.action.clone() } fn precursors(&self) -> &[usize] { @@ -67,8 +64,8 @@ impl Task for MyTask { fn id(&self) -> usize { self.tid.1 } - fn name(&self) -> String { - self.name.clone() + fn name(&self) -> &str { + &self.name } } @@ -85,26 +82,26 @@ impl Display for MyTask { struct ConfigParser; impl ConfigParser { - fn load_file(&self, file: &str) -> Result, ParserError> { - let contents = fs::read_to_string(file)?; + fn load_file(&self, file: &str) -> Result, ParseError> { + let contents = fs::read_to_string(file).map_err(|e| e.to_string())?; let lines: Vec = contents.lines().map(|line| line.to_string()).collect(); Ok(lines) } fn parse_one(&self, item: String) -> MyTask { - let attr: Vec<&str> = item.split(',').collect(); + let attr: Vec<&str> = item.split(",").collect(); let pres_item = *attr.get(2).unwrap(); let pres = if pres_item.eq("") { Vec::new() } else { - pres_item.split(' ').map(|pre| pre.to_string()).collect() + pres_item.split(" ").map(|pre| pre.to_string()).collect() }; - let id = *attr.first().unwrap(); + let id = *attr.get(0).unwrap(); let name = attr.get(1).unwrap().to_string(); let cmd = *attr.get(3).unwrap(); - MyTask::new(id, pres, name, Arc::new(CommandAction::new(cmd))) + MyTask::new(id, pres, name, CommandAction::new(cmd)) } } @@ -112,8 +109,8 @@ impl Parser for ConfigParser { fn parse_tasks( &self, file: &str, - _specific_actions: HashMap>, - ) -> Result>, ParserError> { + _specific_actions: HashMap, + ) -> Result>, ParseError> { let content = self.load_file(file)?; let mut map = HashMap::new(); let mut tasks = Vec::new(); diff --git a/examples/custom_task.rs b/examples/custom_task.rs deleted file mode 100644 index a49627a..0000000 --- a/examples/custom_task.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! Implement the Task trait to customize task properties. -//! MyTask is basically the same as DefaultTask provided by dagrs. - -use std::sync::Arc; - -use dagrs::{alloc_id, log, Action, Dag, EnvVar, Input, LogLevel, Output, RunningError, Task}; - -struct MyTask { - id: usize, - name: String, - predecessor_tasks: Vec, - action: Arc, -} - -impl MyTask { - pub fn new(action: Arc, name: &str) -> Self { - MyTask { - id: alloc_id(), - action, - name: name.to_owned(), - predecessor_tasks: Vec::new(), - } - } - - pub fn set_predecessors(&mut self, predecessors: &[&MyTask]) { - self.predecessor_tasks - .extend(predecessors.iter().map(|t| t.id())) - } -} - -impl Task for MyTask { - fn action(&self) -> Arc { - self.action.clone() - } - - fn precursors(&self) -> &[usize] { - &self.predecessor_tasks - } - - fn id(&self) -> usize { - self.id - } - - fn name(&self) -> String { - self.name.clone() - } -} - -macro_rules! generate_task { - ($action:ident($val:expr),$name:expr) => {{ - pub struct $action(usize); - impl Action for $action { - fn run(&self, input: Input, env: Arc) -> Result { - let base = env.get::("base").unwrap(); - let mut sum = self.0; - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() * base); - Ok(Output::new(sum)) - } - } - MyTask::new(Arc::new($action($val)), $name) - }}; -} - -fn main() { - let _initialized = log::init_logger(LogLevel::Info, None); - let a = generate_task!(A(1), "Compute A"); - let mut b = generate_task!(B(2), "Compute B"); - let mut c = generate_task!(C(4), "Compute C"); - let mut d = generate_task!(D(8), "Compute D"); - let mut e = generate_task!(E(16), "Compute E"); - let mut f = generate_task!(F(32), "Compute F"); - let mut g = generate_task!(G(64), "Compute G"); - - b.set_predecessors(&[&a]); - c.set_predecessors(&[&a]); - d.set_predecessors(&[&a]); - e.set_predecessors(&[&b, &c]); - f.set_predecessors(&[&c, &d]); - g.set_predecessors(&[&b, &e, &f]); - - let mut env = EnvVar::new(); - env.set("base", 2usize); - - let mut dag = Dag::with_tasks(vec![a, b, c, d, e, f, g]); - dag.set_env(env); - assert!(dag.start().unwrap()); - - let res = dag.get_result::().unwrap(); - println!("The result is {}.", res); -} diff --git a/examples/dependencies.rs b/examples/dependencies.rs index 0cb6585..39f682e 100644 --- a/examples/dependencies.rs +++ b/examples/dependencies.rs @@ -1,30 +1,50 @@ -use dagrs_core::{ - log, DefaultTask, EnvVar, LogLevel, -}; -use dagrs_derive::dependencies; +use dagrs::{dependencies, log, Complex, EnvVar, Input, LogLevel, Output}; +use std::sync::Arc; -macro_rules! action { - ($action:ident($val:expr)) => {{ - use dagrs::{Action, EnvVar, Input, Output, RunningError}; - use std::sync::Arc; - struct $action(usize); - impl Action for $action { - fn run(&self, input: Input, env: Arc) -> Result { - let base = env.get::("base").unwrap(); - let mut sum = self.0; - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() * base); - Ok(Output::new(sum)) - } - } - $action($val) - }}; +/// The `dependencies` macro allows users to specify all task dependencies in an easy-to-understand +/// way. It will return to the user a series of `DefaultTask` in the order of tasks given by the user. +/// +/// # Example +/// +/// ↱----------↴ +/// B -→ E --→ G +/// ↗ ↗ ↗ +/// A --→ C / +/// ↘ ↘ / +/// D -→ F +/// +/// If you want to define a task graph with such dependencies, the code is as follows: +/// +/// let mut tasks=dependencies!( +/// a -> b c d, +/// b -> e g, +/// c -> e f, +/// d -> f, +/// e -> g, +/// f -> g, +/// g -> +/// ); +/// +/// Note that although task g has no successor tasks, "g->" must also be written. The return +/// value type tasks is a Vec. The name of each task is the same as the given +/// identifier, which can be expressed as an array as [ "a","b","c","d","e","f","g"]. + +struct Compute(usize); + +impl Complex for Compute { + fn run(&self, input: Input, env: Arc) -> Output { + let base = env.get::("base").unwrap(); + let mut sum = self.0; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + } } fn main() { let _initialized = log::init_logger(LogLevel::Info, None); - let mut tasks: Vec = dependencies!( + let mut tasks = dependencies!( a -> b c d, b -> e g, c -> e f, @@ -34,10 +54,22 @@ fn main() { g -> ); let mut x = 1; - tasks.iter_mut().for_each(|task| { + for index in 0..4 { + tasks[index].set_action(Compute(x * 2)); x *= 2; - task.set_action(action!(Compute(x))); - }); + } + + for index in 4..7 { + tasks[index].set_closure(|input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 0; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + } + let mut dag = dagrs::Dag::with_tasks(tasks); let mut env = EnvVar::new(); env.set("base", 2usize); diff --git a/examples/derive_task.rs b/examples/derive_task.rs index 3307abb..bfda5a8 100644 --- a/examples/derive_task.rs +++ b/examples/derive_task.rs @@ -1,9 +1,27 @@ +use dagrs::{Action, CustomTask, Output, Task}; use std::sync::Arc; -use dagrs::{Action, Task}; -use dagrs_core::Output; - -#[derive(Task)] +/// `CustomTask` is a derived macro that may be used when customizing tasks. It can only be +/// marked on the structure, and the user needs to specify four attributes of the custom task +/// type, which are task(attr="id"), task(attr = "name"), task(attr = "precursors ") and +/// task(attr = "action"), which are used in the `derive_task` example. +/// +/// # Example +/// +/// ```rust +/// #[derive(CustomTask)] +/// struct MyTask { +/// #[task(attr = "id")] +/// id: usize, +/// #[task(attr = "name")] +/// name: String, +/// #[task(attr = "precursors")] +/// pre: Vec, +/// #[task(attr = "action")] +/// action: Action, +/// } +/// ``` +#[derive(CustomTask)] struct MyTask { #[task(attr = "id")] id: usize, @@ -12,22 +30,11 @@ struct MyTask { #[task(attr = "precursors")] pre: Vec, #[task(attr = "action")] - action: Arc, -} - -struct SimpleAction(i32); -impl Action for SimpleAction { - fn run( - &self, - _input: dagrs_core::Input, - _env: Arc, - ) -> Result { - Ok(Output::empty()) - } + action: Action, } fn main() { - let action = Arc::new(SimpleAction(10)); + let action = Action::Closure(Arc::new(|_, _| Output::empty())); let task = MyTask { id: 10, name: "mytask".to_owned(), diff --git a/examples/engine.rs b/examples/engine.rs index ef33463..044a1f9 100644 --- a/examples/engine.rs +++ b/examples/engine.rs @@ -2,11 +2,9 @@ extern crate dagrs; -use std::{collections::HashMap}; +use std::collections::HashMap; -use dagrs::{ - gen_task, log, Dag, DefaultTask, Engine, LogLevel, -}; +use dagrs::{log, Dag, DefaultTask, Engine, LogLevel, Output}; fn main() { // initialization log. let _initialized = log::init_logger(LogLevel::Info, None); @@ -14,18 +12,15 @@ fn main() { let mut engine = Engine::default(); // Create some task for dag1. - let t1_a = gen_task!( - "Compute A1", - |_input: Input, _env: Arc| Output::new(20usize) - ); - let mut t1_b = gen_task!("Compute B1", |input: Input, _env: Arc| { + let t1_a = DefaultTask::with_closure("Compute A1", |_, _| Output::new(20usize)); + let mut t1_b = DefaultTask::with_closure("Compute B1", |input, _| { let mut sum = 10; input.get_iter().for_each(|input| { sum += input.get::().unwrap(); }); Output::new(sum) }); - let mut t1_c = gen_task!("Compute C1", |input: Input, _env: Arc| { + let mut t1_c = DefaultTask::with_closure("Compute C1", |input, _| { let mut sum = 20; input.get_iter().for_each(|input| { sum += input.get::().unwrap(); @@ -33,7 +28,7 @@ fn main() { Output::new(sum) }); - let mut t1_d = gen_task!("Compute D1", |input: Input, _env: Arc| { + let mut t1_d = DefaultTask::with_closure("Compute D1", |input, _| { let mut sum = 30; input.get_iter().for_each(|input| { sum += input.get::().unwrap(); @@ -48,25 +43,22 @@ fn main() { engine.append_dag("graph1", dag1); // Create some task for dag2. - let t2_a = gen_task!( - "Compute A2", - |_input: Input, _env: Arc| Output::new(2usize) - ); - let mut t2_b = gen_task!("Compute B2", |input: Input, _env: Arc| { + let t2_a = DefaultTask::with_closure("Compute A2", |_, _| Output::new(2usize)); + let mut t2_b = DefaultTask::with_closure("Compute B2", |input, _| { let mut sum = 4; input.get_iter().for_each(|input| { sum *= input.get::().unwrap(); }); Output::new(sum) }); - let mut t2_c = gen_task!("Compute C2", |input: Input, _env: Arc| { + let mut t2_c = DefaultTask::with_closure("Compute C2", |input, _| { let mut sum = 8; input.get_iter().for_each(|input| { sum *= input.get::().unwrap(); }); Output::new(sum) }); - let mut t2_d = gen_task!("Compute D2", |input: Input, _env: Arc| { + let mut t2_d = DefaultTask::with_closure("Compute D2", |input, _| { let mut sum = 16; input.get_iter().for_each(|input| { sum *= input.get::().unwrap(); diff --git a/examples/impl_action.rs b/examples/impl_action.rs deleted file mode 100644 index bbb3357..0000000 --- a/examples/impl_action.rs +++ /dev/null @@ -1,46 +0,0 @@ -//! Implement the Action trait to define the task logic. - -extern crate dagrs; - -use std::sync::Arc; - -use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError}; - -struct SimpleAction(usize); - -/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function. -/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate. -impl Action for SimpleAction { - fn run(&self, input: Input, env: Arc) -> Result { - let base = env.get::("base").unwrap(); - let mut sum = self.0; - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() * base); - Ok(Output::new(sum)) - } -} - -fn main() { - // Initialize the global logger - let _initialized = log::init_logger(LogLevel::Info, None); - // Generate four tasks. - let a = DefaultTask::new(SimpleAction(10), "Task a"); - let mut b = DefaultTask::new(SimpleAction(20), "Task b"); - let mut c = DefaultTask::new(SimpleAction(30), "Task c"); - let mut d = DefaultTask::new(SimpleAction(40), "Task d"); - // Set the precursor for each task. - b.set_predecessors(&[&a]); - c.set_predecessors(&[&a]); - d.set_predecessors(&[&b, &c]); - // Take these four tasks as a Dag. - let mut dag = Dag::with_tasks(vec![a, b, c, d]); - // Set a global environment variable for this dag. - let mut env = EnvVar::new(); - env.set("base", 2usize); - dag.set_env(env); - // Begin execution. - assert!(dag.start().unwrap()); - // Get execution result - assert_eq!(dag.get_result::().unwrap(), 220); -} diff --git a/examples/use_macro.rs b/examples/use_macro.rs deleted file mode 100644 index dc8e118..0000000 --- a/examples/use_macro.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! Use the gen_task provided by dagrs! Macros define simple tasks. -//! Execute graph: -//! B -//! ↗ ↘ -//! A D -//! ↘ ↗ -//! C - -extern crate dagrs; - -use dagrs::{gen_task, log, Dag, DefaultTask, EnvVar, LogLevel}; -use dagrs_core::gen_action; - -fn main() { - let _initialized = log::init_logger(LogLevel::Info, None); - let a = gen_task!("Compute A", |_input, _env| Output::new(20usize)); - let mut b = gen_task!("Compute B", |input: Input, _env: Arc| { - let mut sum = 0; - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap()); - Output::new(sum) - }); - - let mut c = gen_task!("Compute C", |input: Input, env: Arc| { - let mut sum = 0; - let base = env.get::("base").unwrap(); - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() * base); - Output::new(sum) - }); - let action = gen_action!(|input: Input, env: Arc| { - let mut sum = 0; - let base = env.get::("base").unwrap(); - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() - base); - Output::new(sum) - }); - let mut d = DefaultTask::new(action, "Compute D"); - - b.set_predecessors(&[&a]); - c.set_predecessors(&[&a]); - d.set_predecessors(&[&b, &c]); - let mut job = Dag::with_tasks(vec![a, b, c, d]); - let mut env = EnvVar::new(); - env.set("base", 2usize); - job.set_env(env); - assert!(job.start().unwrap()); - assert_eq!(job.get_result::().unwrap(), 56); -} diff --git a/examples/yaml_dag.rs b/examples/yaml_dag.rs index 473a595..8b68846 100644 --- a/examples/yaml_dag.rs +++ b/examples/yaml_dag.rs @@ -2,9 +2,8 @@ extern crate dagrs; -use std::collections::HashMap; - use dagrs::{log, Dag, LogLevel}; +use std::collections::HashMap; fn main() { let _initialized = log::init_logger(LogLevel::Info, None); diff --git a/dagrs_core/src/bin/dagrs.rs b/src/bin/dagrs.rs similarity index 97% rename from dagrs_core/src/bin/dagrs.rs rename to src/bin/dagrs.rs index cb5b062..d736191 100644 --- a/dagrs_core/src/bin/dagrs.rs +++ b/src/bin/dagrs.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use clap::Parser; -use dagrs_core::{log, Dag, LogLevel}; +use dagrs::{log, Dag, LogLevel}; #[derive(Parser, Debug)] #[command(name = "dagrs", version = "0.2.0")] diff --git a/dagrs_core/src/engine/dag.rs b/src/engine/dag.rs similarity index 71% rename from dagrs_core/src/engine/dag.rs rename to src/engine/dag.rs index d5c6149..5326aab 100644 --- a/dagrs_core/src/engine/dag.rs +++ b/src/engine/dag.rs @@ -1,57 +1,51 @@ -//! The Dag -//! -//! # [`Dag`] is dagrs's main body. -//! -//! [`Dag`] embodies the scheduling logic of tasks written by users or tasks in a given configuration file. -//! A Dag contains multiple tasks. This task can be added to a Dag as long as it implements -//! the [`Task`] trait, and the user needs to define specific execution logic for the task, that is, -//! implement the [`Action`] trait and override the `run` method. -//! -//! The execution process of Dag is roughly as follows: -//! - The user gives a list of tasks `tasks`. These tasks can be parsed from configuration files, or provided -//! by user programming implementations. -//! - Internally generate [`Graph`] based on task dependencies, and generate execution sequences based on `rely_graph`. -//! - The task is scheduled to start executing asynchronously. -//! - The task will wait to get the result `execute_states` generated by the execution of the predecessor task. -//! - If the result of the predecessor task can be obtained, check the continuation status `can_continue`, if it -//! is true, continue to execute the defined logic, if it is false, trigger `handle_error`, and cancel the -//! execution of the subsequent task. -//! - After all tasks are executed, set the continuation status to false, which means that the tasks of the dag -//! cannot be scheduled for execution again. -//! -//! # Example -//! ```rust -//! use dagrs::{log,LogLevel,Dag, DefaultTask, gen_task, Output,Input,EnvVar,RunningError,Action}; -//! use std::sync::Arc; -//! log::init_logger(LogLevel::Info,None); -//! let task=gen_task!("Simple Task",|input,_env|{ -//! Ok(Output::new(1)) -//! }); -//! let mut dag=Dag::with_tasks(vec![task]); -//! assert!(dag.start().unwrap()) -//! -//! ``` - +use super::{graph::Graph, DagError}; +use crate::{ + task::{ExecState, Input, Task}, + utils::{log, EnvVar}, + Action, Parser, +}; +use anymap2::any::CloneAnySendSync; use std::{ collections::HashMap, + panic::{self, AssertUnwindSafe}, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, }; - -use crate::{ - parser::Parser, - task::{ExecState, Input, Task}, - utils::{log, EnvVar}, - Action, -}; -use anymap2::any::CloneAnySendSync; use tokio::task::JoinHandle; -use super::{error::DagError, graph::Graph}; - -/// A Dag represents a set of tasks. Use it to build a multitasking Dag. +/// [`Dag`] is dagrs's main body. +/// +/// [`Dag`] embodies the scheduling logic of tasks written by users or tasks in a given configuration file. +/// A Dag contains multiple tasks. This task can be added to a Dag as long as it implements +/// the [`Task`] trait, and the user needs to define specific execution logic for the task, that is, +/// implement the [`Action`] trait and override the `run` method. +/// +/// The execution process of Dag is roughly as follows: +/// - The user gives a list of tasks `tasks`. These tasks can be parsed from configuration files, or provided +/// by user programming implementations. +/// - Internally generate `Graph` based on task dependencies, and generate execution sequences based on `rely_graph`. +/// - The task is scheduled to start executing asynchronously. +/// - The task will wait to get the result `execute_states` generated by the execution of the predecessor task. +/// - If the result of the predecessor task can be obtained, check the continuation status `can_continue`, if it +/// is true, continue to execute the defined logic, if it is false, trigger `handle_error`, and cancel the +/// execution of the subsequent task. +/// - After all tasks are executed, set the continuation status to false, which means that the tasks of the dag +/// cannot be scheduled for execution again. +/// +/// # Example +/// ```rust +/// use dagrs::{log,LogLevel,Dag, DefaultTask, Output,Input,EnvVar,Action}; +/// use std::sync::Arc; +/// log::init_logger(LogLevel::Info,None); +/// let task=DefaultTask::with_closure("Simple Task",|_input,_env|{ +/// Output::new(1) +/// }); +/// let mut dag=Dag::with_tasks(vec![task]); +/// assert!(dag.start().unwrap()) +/// +/// ``` #[derive(Debug)] pub struct Dag { /// Store all tasks' infos. @@ -90,7 +84,7 @@ impl Dag { } /// Create a dag by adding a series of tasks. - pub fn with_tasks(tasks: Vec) -> Self { + pub fn with_tasks(tasks: Vec) -> Dag { let mut dag = Dag::new(); tasks.into_iter().for_each(|task| { let task = Box::new(task) as Box; @@ -103,7 +97,7 @@ impl Dag { #[cfg(feature = "yaml")] pub fn with_yaml( file: &str, - specific_actions: HashMap>, + specific_actions: HashMap, ) -> Result { use crate::YamlParser; let parser = Box::new(YamlParser); @@ -114,7 +108,7 @@ impl Dag { pub fn with_config_file_and_parser( file: &str, parser: Box, - specific_actions: HashMap>, + specific_actions: HashMap, ) -> Result { Dag::read_tasks(file, parser, specific_actions) } @@ -124,7 +118,7 @@ impl Dag { fn read_tasks( file: &str, parser: Box, - specific_actions: HashMap>, + specific_actions: HashMap, ) -> Result { let mut dag = Dag::new(); let tasks = parser.parse_tasks(file, specific_actions)?; @@ -156,7 +150,7 @@ impl Dag { let rely_index = self .rely_graph .find_index_by_id(rely_task_id) - .ok_or(DagError::RelyTaskIllegal(task.name()))?; + .ok_or(DagError::RelyTaskIllegal(task.name().to_string()))?; self.rely_graph.add_edge(rely_index, index); } @@ -170,9 +164,11 @@ impl Dag { /// - Create a graph from task dependencies. /// - Generate task heart sequence according to topological sorting of graph. pub(crate) fn init(&mut self) -> Result<(), DagError> { - self.tasks.keys().for_each(|id| { - self.execute_states - .insert(*id, Arc::new(ExecState::new(*id))); + self.tasks.values().for_each(|task| { + self.execute_states.insert( + task.id(), + Arc::new(ExecState::new(task.id(), task.name().to_string())), + ); }); self.create_graph()?; @@ -222,36 +218,32 @@ impl Dag { }); // Wait for the status of each task to execute. If there is an error in the execution of a task, // the engine will fail to execute and give up executing tasks that have not yet been executed. - let mut exe_success = true; - for handle in handles { - let complete = handle.1.await.map_or_else( - |err| { + for (tid, handle) in handles { + match handle.await { + Ok(succeed) => { + if !succeed { + self.handle_error(tid).await; + } + } + Err(err) => { log::error(format!( "Task execution encountered an unexpected error! {}", err )); - false - }, - |state| state, - ); - if !complete { - log::error(format!( - "Task execution failed! [{}]", - self.tasks[&handle.0].name() - )); - self.handle_error(&handle.0).await; - exe_success = false; + self.handle_error(tid).await; + } } } - self.can_continue.store(false, Ordering::Release); - exe_success + self.can_continue + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() } /// Execute a given task asynchronously. fn execute_task(&self, task: Arc>) -> JoinHandle { let env = self.env.clone(); let task_id = task.id(); - let task_name = task.name(); + let task_name = task.name().to_string(); let execute_state = self.execute_states[&task_id].clone(); let task_out_degree = self.rely_graph.get_node_out_degree(&task_id); let wait_for_input: Vec> = task @@ -261,6 +253,7 @@ impl Dag { .collect(); let action = task.action(); let can_continue = self.can_continue.clone(); + tokio::spawn(async move { // Wait for the execution result of the predecessor task let mut inputs = Vec::new(); @@ -269,7 +262,7 @@ impl Dag { // When the task execution result of the predecessor can be obtained, judge whether // the continuation flag is set to false, if it is set to false, cancel the specific // execution logic of the task and return immediately. - if !can_continue.load(Ordering::Acquire) { + if !can_continue.load(Ordering::Acquire) || !wait_for.success() { return true; } if let Some(content) = wait_for.get_output() { @@ -278,21 +271,42 @@ impl Dag { } } } - log::info(format!("Executing Task[name: {}]", task_name)); + log::info(format!( + "Executing task [name: {}, id: {}]", + task_name, task_id + )); // Concrete logical behavior for performing tasks. - match action.run(Input::new(inputs), env) { - Ok(out) => { - // Store execution results - execute_state.set_output(out); - execute_state.semaphore().add_permits(task_out_degree); - log::info(format!("Task executed successfully. [name: {}]", task_name)); - true - } - Err(err) => { - log::error(format!("Task failed[name: {}]. {}", task_name, err)); - false - } - } + panic::catch_unwind(AssertUnwindSafe(|| action.run(Input::new(inputs), env))) + .map_or_else( + |_| { + log::error(format!( + "Execution failed [name: {}, id: {}]", + task_name, task_id + )); + false + }, + |out| { + // Store execution results + if out.is_err() { + log::error(format!( + "Execution failed [name: {}, id: {}]\nerr: {}", + task_name, + task_id, + out.get_err().unwrap() + )); + false + } else { + execute_state.set_output(out); + execute_state.exe_success(); + execute_state.semaphore().add_permits(task_out_degree); + log::info(format!( + "Execution succeed [name: {}, id: {}]", + task_name, task_id + )); + true + } + }, + ) }) } @@ -303,13 +317,19 @@ impl Dag { /// know that some tasks have errors and cannot continue to execute. /// After that, the follow-up task finds that the flag that can continue to execute is set /// to false, and the specific behavior of executing the task will be cancelled. - async fn handle_error(&self, error_task_id: &usize) { - self.can_continue.store(false, Ordering::Release); + async fn handle_error(&self, error_task_id: usize) { + if self + .can_continue + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed) + .is_err() + { + return; + } // Find the position of the faulty task in the execution sequence. let index = self .exe_sequence .iter() - .position(|tid| *tid == *error_task_id) + .position(|tid| *tid == error_task_id) .unwrap(); for i in index..self.exe_sequence.len() { @@ -336,6 +356,7 @@ impl Dag { } } + /// Get the output of all tasks. pub fn get_results(&self) -> HashMap> { let mut hm = HashMap::new(); for (id, state) in &self.execute_states { diff --git a/dagrs_core/src/engine/graph.rs b/src/engine/graph.rs similarity index 81% rename from dagrs_core/src/engine/graph.rs rename to src/engine/graph.rs index c0c4171..b76d8f5 100644 --- a/dagrs_core/src/engine/graph.rs +++ b/src/engine/graph.rs @@ -1,25 +1,27 @@ -//! Task Graph -//! -//! # Graph stores dependency relations. -//! -//! [`Graph`] represents a series of tasks with dependencies, and stored in an adjacency -//! list. It must be a directed acyclic graph, that is, the dependencies of the task -//! cannot form a loop, otherwise the engine will not be able to execute the task successfully. -//! It has some useful methods for building graphs, such as: adding edges, nodes, etc. -//! And the most important of which is the `topo_sort` function, which uses topological -//! sorting to generate the execution sequence of tasks. -//! -//! # An example of a directed acyclic graph -//! -//! task1 -→ task3 ---→ task6 ---- -//! | ↗ ↓ ↓ ↘ -//! | / task5 ---→ task7 ---→ task9 -//! ↓ / ↑ ↓ ↗ -//! task2 -→ task4 ---→ task8 ---- -//! -//! The task execution sequence can be as follows: -//! task1->task2->task3->task4->task5->task6->task7->task8->task9 -//! +/*! +Task Graph + +# Graph stores dependency relations. + +[`Graph`] represents a series of tasks with dependencies, and stored in an adjacency +list. It must be a directed acyclic graph, that is, the dependencies of the task +cannot form a loop, otherwise the engine will not be able to execute the task successfully. +It has some useful methods for building graphs, such as: adding edges, nodes, etc. +And the most important of which is the `topo_sort` function, which uses topological +sorting to generate the execution sequence of tasks. + +# An example of a directed acyclic graph + +task1 -→ task3 ---→ task6 ---- + | ↗ ↓ ↓ ↘ + | / task5 ---→ task7 ---→ task9 + ↓ / ↑ ↓ ↗ +task2 -→ task4 ---→ task8 ---- + +The task execution sequence can be as follows: +task1->task2->task3->task4->task5->task6->task7->task8->task9 + +*/ use bimap::BiMap; diff --git a/dagrs_core/src/engine/mod.rs b/src/engine/mod.rs similarity index 77% rename from dagrs_core/src/engine/mod.rs rename to src/engine/mod.rs index c91b4ed..6e87e9b 100644 --- a/dagrs_core/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -1,7 +1,5 @@ //! The Engine //! -//! # dagrs core logic. -//! //! [`Dag`] consists of a series of executable tasks with dependencies. A Dag can be executed //! alone as a job. We can get the execution result and execution status of dag. //! [`Engine`] can manage multiple [`Dag`]. An Engine can consist of multiple Dags of different @@ -12,15 +10,13 @@ //! the Dags are added to the Engine , executing each Dag in turn. pub use dag::Dag; -pub use error::DagError; mod dag; -mod error; mod graph; -use std::collections::HashMap; - +use crate::ParseError; use anymap2::any::CloneAnySendSync; +use std::collections::HashMap; use tokio::runtime::Runtime; use crate::log; @@ -36,6 +32,20 @@ pub struct Engine { runtime: Runtime, } +/// Errors that may be raised by building and running dag jobs. +#[derive(Debug)] +/// A synthesis of all possible errors. +pub enum DagError { + /// Yaml file parsing error. + ParserError(ParseError), + /// Task dependency error. + RelyTaskIllegal(String), + /// There are loops in task dependencies. + LoopGraph, + /// There are no tasks in the job. + EmptyJob, +} + impl Engine { /// Add a Dag to the Engine and assign a sequence number to the Dag. /// It should be noted that different Dags should specify different names. @@ -48,7 +58,7 @@ impl Engine { self.sequence.insert(len + 1, name.to_string()); } Err(err) => { - log::error(format!("Some error occur: {}", err)); + log::error(format!("Some error occur: {}", err.to_err_msg())); } } } @@ -96,3 +106,22 @@ impl Default for Engine { } } } + +impl DagError { + fn to_err_msg(&self) -> String { + match self { + Self::EmptyJob => "There are no tasks in the job.".to_string(), + Self::LoopGraph => "Illegal directed a cyclic graph, loop Detect!".to_string(), + Self::RelyTaskIllegal(ref name) => { + format!("Task[{}] dependency task not exist.", name) + } + Self::ParserError(ref msg) => msg.0.to_string(), + } + } +} + +impl From for DagError { + fn from(value: ParseError) -> Self { + Self::ParserError(value) + } +} diff --git a/src/lib.rs b/src/lib.rs index c06b688..4ac7621 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,24 @@ -pub use dagrs_core::{self, *}; +extern crate anymap2; +extern crate bimap; +extern crate clap; #[cfg(feature = "derive")] -pub use dagrs_derive::{self, *}; +extern crate derive; +extern crate tokio; +#[cfg(feature = "yaml")] +extern crate yaml_rust; + +#[cfg(feature = "derive")] +pub use derive::*; +pub use engine::{Dag, DagError, Engine}; +pub use task::{ + alloc_id, Action, CommandAction, Complex, DefaultTask, Input, Output, Simple, Task, +}; +pub use utils::{log, EnvVar, LogLevel, Logger, ParseError, Parser}; +#[cfg(feature = "yaml")] +pub use yaml::{FileContentError, FileNotFound, YamlParser, YamlTask, YamlTaskError}; + +pub mod engine; +pub mod task; +pub mod utils; +#[cfg(feature = "yaml")] +pub mod yaml; diff --git a/src/task/action.rs b/src/task/action.rs new file mode 100644 index 0000000..763d85e --- /dev/null +++ b/src/task/action.rs @@ -0,0 +1,86 @@ +use crate::{EnvVar, Input, Output}; +use std::sync::Arc; + +/// The type of closure that performs logic. +/// # [`Simple`] +/// +/// The specific type of [`Simple`] is `dyn Fn(Input, Arc) -> Output + Send + Sync + 'static`, +/// which represents a closure. +/// +/// # Example +/// +/// ```rust +/// use std::sync::Arc; +/// use dagrs::{Action,Input,EnvVar,Output}; +/// +/// let closure=|_input,_env|Output::new(10); +/// let action=Action::Closure(Arc::new(closure)); +/// ``` +pub type Simple = dyn Fn(Input, Arc) -> Output + Send + Sync + 'static; + +/// More complex types of execution logic. +/// # [`Complex`] +/// +/// The usage of closures is suitable for simple cases. If the user wants to store some private +/// properties when defining execution logic, the [`Complex`] trait can meet the needs. +/// +/// # Example +/// +/// ```rust +/// use std::sync::Arc; +/// use dagrs::{Action,Input,EnvVar,Output,Complex}; +/// +/// struct HelloAction{ +/// statement:String, +/// repeat:usize, +/// } +/// +/// impl Complex for HelloAction{ +/// fn run(&self, input: Input, env: Arc) -> Output{ +/// for i in 0..self.repeat{ +/// println!("{}",self.statement); +/// } +/// Output::empty() +/// } +/// } +/// +/// let hello=HelloAction{ +/// statement:"hello world!".to_string(), +/// repeat:10 +/// }; +/// let action=Action::Structure(Arc::new(hello)); +/// ``` +pub trait Complex { + fn run(&self, input: Input, env: Arc) -> Output; +} + +/// Task specific behavior +/// +/// [`Action`] stores the specific execution logic of a task. Action::Closure(Arc<[`Simple`]>) represents a +/// closure, and Action::Structure(Arc) represents a specific type that +/// implements the [`Complex`] trait. +/// Attributes that must exist in each task are used to store specific execution logic. Specific +/// execution logic can be given in two forms: given a closure or a specific type that implements +/// a Complex trait. +pub enum Action { + Closure(Arc), + Structure(Arc), +} + +impl Action { + pub fn run(&self, input: Input, env: Arc) -> Output { + match self { + Self::Closure(closure) => closure(input, env), + Self::Structure(structure) => structure.run(input, env), + } + } +} + +impl Clone for Action { + fn clone(&self) -> Self { + match self { + Self::Closure(closure) => Self::Closure(closure.clone()), + Self::Structure(structure) => Self::Structure(structure.clone()), + } + } +} diff --git a/src/task/cmd.rs b/src/task/cmd.rs new file mode 100644 index 0000000..0faa781 --- /dev/null +++ b/src/task/cmd.rs @@ -0,0 +1,51 @@ +use crate::{Complex, EnvVar, Input, Output}; +use std::process::Command; +use std::sync::Arc; + +/// [`CommandAction`] is a specific implementation of [`Complex`], used to execute operating system commands. +pub struct CommandAction { + command: String, +} + +impl CommandAction { + #[allow(unused)] + pub fn new(cmd: &str) -> Self { + Self { + command: cmd.to_owned(), + } + } +} + +impl Complex for CommandAction { + fn run(&self, input: Input, _env: Arc) -> Output { + let mut args = Vec::new(); + let mut cmd = if cfg!(target_os = "windows") { + args.push("-Command"); + Command::new("powershell") + } else { + args.push("-c"); + Command::new("sh") + }; + args.push(&self.command); + + input.get_iter().for_each(|input| { + if let Some(inp) = input.get::() { + args.push(inp) + } + }); + let out = match cmd.args(args).output() { + Ok(o) => o, + Err(e) => return Output::Err(e.to_string()), + }; + + if out.status.success() { + let mut out = String::from_utf8(out.stdout).unwrap(); + if cfg!(target_os = "windows") { + out = out.replace("\r\n", " ").replace('\n', " "); + } + Output::new(out) + } else { + Output::Err(String::from_utf8(out.stderr).unwrap()) + } + } +} diff --git a/src/task/default_task.rs b/src/task/default_task.rs new file mode 100644 index 0000000..af21541 --- /dev/null +++ b/src/task/default_task.rs @@ -0,0 +1,180 @@ +use super::{Action, Complex, Task, ID_ALLOCATOR}; +use crate::{EnvVar, Input, Output}; +use std::sync::Arc; + +/// Common task types +/// +/// [`DefaultTask`] is a default implementation of the [`Task`] trait. Users can use this task +/// type to build tasks to meet most needs. +/// +/// There are four ways to create a DefaultTask: +/// +/// #Example +/// +/// Using the `default` function to create a task is the simplest way. The name of the task defaults +/// to "Task $id", and a closure without output is created by default as the execution logic of +/// the task. Subsequently, users can specify the task name through the `set_name` function, and +/// use the `set_action` or `set_closure` function to specify execution logic for the task. +/// +/// ```rust +/// use dagrs::DefaultTask; +/// let mut task=DefaultTask::default(); +/// ``` +/// Use the `new` function to create a task. The only difference between this function and the `default` +/// function is that the task name is also given when creating the task. The subsequent work is the +/// same as mentioned in the `default` function. +/// +/// ```rust +/// use dagrs::DefaultTask; +/// let mut task=DefaultTask::new("task"); +/// ``` +/// To build task execution logic, please see `action` module. +/// +/// Use the `with_closure` function to create a task and give it a task name and execution logic. +/// The execution logic is given in the form of a closure. +/// +/// ```rust +/// use dagrs::{ DefaultTask, Output }; +/// let mut task = DefaultTask::with_closure("task",|_input,_env|Output::empty()); +/// ``` +/// Use the `with_action` function to create a task and give it a name and execution logic. +/// The execution logic is given in the form of a concrete type that implements the [`Complex`] trait. +/// For an explanation of the [`Complex`] feature, please see the `action` module. +/// +/// ```rust +/// use dagrs::{ DefaultTask, Complex, Output, Input, EnvVar }; +/// use std::sync::Arc; +/// +/// struct Act(u32); +/// +/// impl Complex for Act{ +/// fn run(&self, input: Input, env: Arc) -> Output{ +/// Output::new(self.0+10) +/// } +/// } +/// +/// let mut task = DefaultTask::with_action("task",Act(20)); +/// ``` +/// +/// A default implementation of the Task trait. In general, use it to define the tasks of dagrs. +pub struct DefaultTask { + /// id is the unique identifier of each task, it will be assigned by the global [`IDAllocator`] + /// when creating a new task, you can find this task through this identifier. + id: usize, + /// The task's name. + name: String, + /// Id of the predecessor tasks. + precursors: Vec, + /// Perform specific actions. + action: Action, +} + +impl DefaultTask { + /// Create a task and specify the task name. You may need to call the `set_action` or `set_closure` function later. + pub fn new(name: &str) -> Self { + let action = |_, _| Output::empty(); + DefaultTask { + id: ID_ALLOCATOR.alloc(), + action: Action::Closure(Arc::new(action)), + name: name.to_owned(), + precursors: Vec::new(), + } + } + /// Create a task, give the task name, and provide a specific type that implements the [`Complex`] trait as the specific + /// execution logic of the task. + pub fn with_action(name: &str, action: impl Complex + Send + Sync + 'static) -> Self { + DefaultTask { + id: ID_ALLOCATOR.alloc(), + action: Action::Structure(Arc::new(action)), + name: name.to_owned(), + precursors: Vec::new(), + } + } + + /// Create a task, give the task name, and provide a closure as the specific execution logic of the task. + pub fn with_closure( + name: &str, + action: impl Fn(Input, Arc) -> Output + Send + Sync + 'static, + ) -> Self { + DefaultTask { + id: ID_ALLOCATOR.alloc(), + action: Action::Closure(Arc::new(action)), + name: name.to_owned(), + precursors: Vec::new(), + } + } + + /// Give the task a name. + pub fn set_name(&mut self, name: &str) { + self.name = name.to_string(); + } + + /// Tasks that shall be executed before this one. + /// + /// # Example + /// ```rust + /// use dagrs::{DefaultTask,Output}; + /// let t1 = DefaultTask::with_closure("Task 1", |_input,_env|Output::empty()); + /// let mut t2 = DefaultTask::with_closure("Task 2",|_input,_env|Output::empty()); + /// t2.set_predecessors(&[&t1]); + /// ``` + /// In above code, `t1` will be executed before `t2`. + pub fn set_predecessors<'a>( + &mut self, + predecessors: impl IntoIterator, + ) { + self.precursors + .extend(predecessors.into_iter().map(|t| t.id())) + } + + /// The same as `exec_after`, but input are tasks' ids + /// rather than reference to [`DefaultTask`]. + pub fn set_predecessors_by_id(&mut self, predecessors_id: impl IntoIterator) { + self.precursors.extend(predecessors_id) + } + + /// Provide a closure to specify execution logic for the task. + pub fn set_closure( + &mut self, + action: impl Fn(Input, Arc) -> Output + Send + Sync + 'static, + ) { + self.action = Action::Closure(Arc::new(action)); + } + + /// Provide a concrete type that implements the [`Complex`] trait to specify execution logic for the task. + pub fn set_action(&mut self, action: impl Complex + Send + Sync + 'static) { + self.action = Action::Structure(Arc::new(action)) + } +} + +impl Task for DefaultTask { + fn action(&self) -> Action { + self.action.clone() + } + + fn precursors(&self) -> &[usize] { + &self.precursors + } + + fn id(&self) -> usize { + self.id + } + + fn name(&self) -> &str { + &self.name + } +} + +impl Default for DefaultTask { + fn default() -> Self { + let id = ID_ALLOCATOR.alloc(); + let name = format!("Task {}", id); + let action = |_, _| Output::empty(); + Self { + id, + name, + precursors: Vec::new(), + action: Action::Closure(Arc::new(action)), + } + } +} diff --git a/src/task/mod.rs b/src/task/mod.rs new file mode 100644 index 0000000..e140b51 --- /dev/null +++ b/src/task/mod.rs @@ -0,0 +1,96 @@ +//! Relevant definitions of tasks. +//! +//! # [`Task`]: the basic unit of scheduling +//! +//! A [`Task`] is the basic unit for scheduling execution of a dagrs. [`Task`] itself is a trait and users +//! should use its concrete implementation [`DefaultTask`]. Of course, users can also customize [`Task`], +//! but it should be noted that whether it is the default [`DefaultTask`] or a user-defined task type, they +//! all need to have the following fields: +//! - `id`: type is `usize`. When allocating tasks, there is a global task `id` allocator. +//! Users can call the `alloc_id()` function to assign ids to tasks, and the obtained `id` type is `usize`. +//! - `name`: type is `String`. This field represents the task name. +//! - `action`: type is [`Action`]. This field is used to store the specific execution logic of the task. +//! - `precursors`: type is `Vec`. This field is used to store the predecessor task `id` of this task. +//! +//! # [`Action`]: specific logical behavior +//! +//! Each task has an [`Action`] field inside, which stores the specific execution logic of the task. +//! [`Action`] is an enumeration type. For [`Simple`] execution logic, you only need to provide a closure for [`Action`]. +//! For slightly more complex execution logic, you can implement the [`Complex`] trait. For detailed analysis, +//! please see the `action` module. +//! +//! # [`Input`] and [`Output`] +//! +//! Each task may produce output and may require the output of its predecessor task as its input. +//! [`Output`] is used to construct and store the output obtained by task execution. [`Input`] is used as a tool +//! to provide users with the output of the predecessor task. +use std::fmt::Debug; +use std::sync::atomic::AtomicUsize; + +pub use self::action::{Action, Complex, Simple}; +pub use self::cmd::CommandAction; +pub use self::default_task::DefaultTask; +pub(crate) use self::state::ExecState; +pub use self::state::{Input, Output}; + +mod action; +mod cmd; +mod default_task; +mod state; +/// The Task trait +/// +/// Tasks can have many attributes, among which `id`, `name`, `predecessor_tasks`, and +/// `action` attributes are required, and users can also customize some other attributes. +/// [`DefaultTask`] in this module is a [`Task`], the DAG engine uses it as the basic +/// task by default. +/// +/// A task must provide methods to obtain precursors and required attributes, just as +/// the methods defined below, users who want to customize tasks must implement these methods. +pub trait Task: Send + Sync { + /// Get a reference to an executable action. + fn action(&self) -> Action; + /// Get the id of all predecessor tasks of this task. + fn precursors(&self) -> &[usize]; + /// Get the id of this task. + fn id(&self) -> usize; + /// Get the name of this task. + fn name(&self) -> &str; +} + +/// IDAllocator for DefaultTask +struct IDAllocator { + id: AtomicUsize, +} + +impl Debug for dyn Task { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "{},\t{},\t{:?}", + self.id(), + self.name(), + self.precursors() + ) + } +} + +impl IDAllocator { + fn alloc(&self) -> usize { + let origin = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if origin > self.id.load(std::sync::atomic::Ordering::Relaxed) { + panic!("Too many tasks.") + } else { + origin + } + } +} + +/// The global task uniquely identifies an instance of the allocator. +static ID_ALLOCATOR: IDAllocator = IDAllocator { + id: AtomicUsize::new(1), +}; + +/// public function to assign task's id. +pub fn alloc_id() -> usize { + ID_ALLOCATOR.alloc() +} diff --git a/dagrs_core/src/task/state.rs b/src/task/state.rs similarity index 52% rename from dagrs_core/src/task/state.rs rename to src/task/state.rs index 1579ea4..7c28f9a 100644 --- a/dagrs_core/src/task/state.rs +++ b/src/task/state.rs @@ -1,20 +1,38 @@ -//! Task state +//! Task input and output //! -//! # Input, output, and state of tasks. +//! [`Output`] and [`Input`] represent the output and input of the task respectively. //! -//! [`Output`] represents the output generated when the task completes successfully. -//! The user can use the `new` function to construct an [`Output`] representing the -//! generated output, and use the `empty` function to construct an empty [`Output`] -//! if the task does not generate output. +//! # [`Output`] //! -//! [`Input`] represents the input required by the task, and the input comes from the -//! output produced by multiple predecessor tasks of this task. Users can read and -//! write the content of [`Input`] ([`Input`] is actually constructed by cloning multiple -//! [`Output`]), so as to realize the logic of the program. +//! Users should consider the output results of the task when defining the specific +//! behavior of the task. The input results may be: normal output, no output, or task +//! execution error message. +//! It should be noted that the content stored in [`Output`] must implement the [`Clone`] trait. //! -//! [`ExeState`] internally stores [`]Output`], which represents whether the execution of -//! the task is successful, and its internal semaphore is used to synchronously obtain -//! the output of the predecessor task as the input of this task. +//! # Example +//! In general, a task may produce output or no output: +//! ```rust +//! use dagrs::Output; +//! let out=Output::new(10); +//! let non_out=Output::empty(); +//! ``` +//! In some special cases, when a predictable error occurs in the execution of a task's +//! specific behavior, the user can choose to return the error message as the output of +//! the task. Of course, this will cause subsequent tasks to abandon execution. +//! +//! ```rust +//! use dagrs::Output; +//! let err_out = Output::Err("some error messages!".to_string()); +//! ``` +//! +//! # [`Input`] +//! +//! [`Input`] represents the input required by the task. The input comes from the output +//! generated by multiple predecessor tasks of the task. If a predecessor task does not produce +//! output, the output will not be stored in [`Input`]. +//! [`Input`] will be used directly by the user without user construction. [`Input`] is actually +//! constructed by cloning multiple [`Output`]. Users can obtain the content stored in [`Input`] +//! to implement the logic of the program. use std::{ slice::Iter, @@ -24,9 +42,12 @@ use std::{ use anymap2::{any::CloneAnySendSync, Map}; use tokio::sync::Semaphore; +/// Container type to store task output. type Content = Map; -/// Describe task's running result. +/// [`ExeState`] internally stores [`Output`], which represents whether the execution of +/// the task is successful, and its internal semaphore is used to synchronously obtain +/// the output of the predecessor task as the input of this task. #[derive(Debug)] pub(crate) struct ExecState { /// The execution succeed or not. @@ -35,6 +56,7 @@ pub(crate) struct ExecState { output: AtomicPtr, /// Task output identified by id. tid: usize, + task_name: String, /// The semaphore is used to control the synchronous blocking of subsequent tasks to obtain the /// execution results of this task. /// When a task is successfully executed, the permits inside the semaphore will be increased to @@ -47,7 +69,10 @@ pub(crate) struct ExecState { /// Output produced by a task. #[derive(Debug)] -pub struct Output(Option); +pub enum Output { + Out(Option), + Err(String), +} /// Task's input value. pub struct Input(Vec); @@ -55,11 +80,12 @@ pub struct Input(Vec); #[allow(dead_code)] impl ExecState { /// Construct a new [`ExeState`]. - pub(crate) fn new(task_id: usize) -> Self { + pub(crate) fn new(task_id: usize, task_name: String) -> Self { Self { success: AtomicBool::new(false), output: AtomicPtr::new(std::ptr::null_mut()), tid: task_id, + task_name, semaphore: Semaphore::new(0), } } @@ -74,9 +100,11 @@ impl ExecState { /// [`Output`] for fetching internal storage. /// This function is generally not called directly, but first uses the semaphore for synchronization control. pub(crate) fn get_output(&self) -> Option { - unsafe { self.output.load(Ordering::Relaxed).as_ref().unwrap() } - .0 - .clone() + if let Some(out) = unsafe { self.output.load(Ordering::Relaxed).as_ref() } { + out.get_out() + } else { + None + } } /// The task execution succeed or not. @@ -85,10 +113,27 @@ impl ExecState { self.success.load(Ordering::Relaxed) } + pub(crate) fn exe_success(&self) { + self.success.store(true, Ordering::Relaxed) + } + + pub(crate) fn get_err(&self) -> Option { + if let Some(out) = unsafe { self.output.load(Ordering::Relaxed).as_ref() } { + out.get_err() + } else { + None + } + } + /// Use id to indicate the output of which task. pub(crate) fn tid(&self) -> usize { self.tid } + + pub(crate) fn task_name(&self) -> &str { + &self.task_name + } + /// The semaphore is used to control the synchronous acquisition of task output results. /// Under normal circumstances, first use the semaphore to obtain a permit, and then call /// the `get_output` function to obtain the output. If the current task is not completed @@ -107,12 +152,41 @@ impl Output { pub fn new(val: H) -> Self { let mut map = Content::new(); assert!(map.insert(val).is_none(), "[Error] map insert fails."); - Self(Some(map)) + Self::Out(Some(map)) } /// Construct an empty [`Output`]. pub fn empty() -> Self { - Self(None) + Self::Out(None) + } + + /// Construct an [`Output`]` with an error message. + pub fn error(msg: String) -> Self { + Self::Err(msg) + } + + /// Determine whether [`Output`] stores error information. + pub(crate) fn is_err(&self) -> bool { + match self { + Self::Err(_) => true, + Self::Out(_) => false, + } + } + + /// Get the contents of [`Output`]. + pub(crate) fn get_out(&self) -> Option { + match self { + Self::Out(ref out) => out.clone(), + Self::Err(_) => None, + } + } + + /// Get error information stored in [`Output`]. + pub(crate) fn get_err(&self) -> Option { + match self { + Self::Out(_) => None, + Self::Err(err) => Some(err.to_string()), + } } } diff --git a/dagrs_core/src/utils/default_logger.rs b/src/utils/default_logger.rs similarity index 100% rename from dagrs_core/src/utils/default_logger.rs rename to src/utils/default_logger.rs diff --git a/dagrs_core/src/utils/env.rs b/src/utils/env.rs similarity index 73% rename from dagrs_core/src/utils/env.rs rename to src/utils/env.rs index 2865662..ba23647 100644 --- a/dagrs_core/src/utils/env.rs +++ b/src/utils/env.rs @@ -1,20 +1,15 @@ -//! Global environment variables. -//! -//! # Environment variable -//! -//! When multiple tasks are running, they may need to share the same data or read -//! the same configuration information. Environment variables can meet this requirement. -//! Before all tasks run, the user builds a [`EnvVar`] and sets all the environment -//! variables. One [`EnvVar`] corresponds to one dag. All tasks in a job can -//! be shared and immutable at runtime. environment variables. - -use std::collections::HashMap; - use anymap2::{any::CloneAnySendSync, Map}; +use std::collections::HashMap; pub type Variable = Map; -/// environment variable. +/// # Environment variable. +/// +/// When multiple tasks are running, they may need to share the same data or read +/// the same configuration information. Environment variables can meet this requirement. +/// Before all tasks run, the user builds a [`EnvVar`] and sets all the environment +/// variables. One [`EnvVar`] corresponds to one dag. All tasks in a job can +/// be shared and immutable at runtime. environment variables. #[derive(Debug, Default)] pub struct EnvVar { variables: HashMap, diff --git a/dagrs_core/src/utils/log.rs b/src/utils/log.rs similarity index 95% rename from dagrs_core/src/utils/log.rs rename to src/utils/log.rs index d5a4825..78574e5 100644 --- a/dagrs_core/src/utils/log.rs +++ b/src/utils/log.rs @@ -14,12 +14,11 @@ //! to a file, which needs to be specified by the user. use std::{ + fmt::{Debug, Display}, fs::File, sync::{Arc, OnceLock}, }; -use thiserror::Error; - #[cfg(feature = "logger")] use super::default_logger::{ default_debug, default_error, default_info, default_warn, init_default_logger, @@ -72,12 +71,17 @@ pub trait Logger { fn error(&self, msg: String); } -#[derive(Debug, Error)] +#[derive(Debug)] pub enum LoggerError { - #[error("Logger has been already initialized!")] AlreadyInitialized, } +impl Display for LoggerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Logger has been already initialized!") + } +} + /// Logger instance. pub(crate) static LOG: OnceLock> = OnceLock::new(); diff --git a/dagrs_core/src/utils/mod.rs b/src/utils/mod.rs similarity index 82% rename from dagrs_core/src/utils/mod.rs rename to src/utils/mod.rs index 4673331..4c5ae82 100644 --- a/dagrs_core/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,17 +1,14 @@ //! general tool. //! -//! # dagrs tool module. -//! //! This module contains common tools for the program, such as: loggers, environment //! variables, task generation macros. -#[macro_use] -pub mod gen_macro; -mod env; - #[cfg(feature = "logger")] mod default_logger; +mod env; pub mod log; +mod parser; pub use self::env::EnvVar; pub use self::log::{LogLevel, Logger}; +pub use self::parser::{ParseError, Parser}; diff --git a/src/utils/parser.rs b/src/utils/parser.rs new file mode 100644 index 0000000..eabc160 --- /dev/null +++ b/src/utils/parser.rs @@ -0,0 +1,45 @@ +//! Task configuration file parser interface +use crate::{task::Task, Action}; +use std::{collections::HashMap, error::Error, fmt::Display}; + +/// Generic parser traits. If users want to customize the configuration file parser, they must implement this trait. +/// The yaml module's `YamlParser` is an example. +pub trait Parser { + /// Parses the contents of a configuration file into a series of tasks with dependencies. + /// Parameter Description: + /// - file: path information of the configuration file + /// - specific_actions: When parsing the configuration file, the specific execution logic + /// of some tasks does not need to be specified in the configuration file, but is given + /// through this map. In the map's key-value pair, the key represents the unique identifier + /// of the task in the task's configuration file, and the value represents the execution + /// logic given by the user. + /// + /// Return value description: + /// If an error is encountered during the parsing process, the return result is ParserError. + /// Instead, return a series of concrete types that implement the [`Task`] trait. + /// This may involve user-defined [`Task`], you can refer to `YamlTask` under the yaml module. + fn parse_tasks( + &self, + file: &str, + specific_actions: HashMap, + ) -> Result>, ParseError>; +} + +/// Errors that may occur during configuration file parsing. +/// This structure stores error information. Users need to customize error types and implement conversion +/// from custom error types to [`ParseError`]. +/// By default, a conversion from `String` type to [`ParseError`] is provided. +#[derive(Debug)] +pub struct ParseError(pub Box); + +impl Display for ParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0.to_string()) + } +} + +impl From for ParseError { + fn from(value: String) -> Self { + ParseError(value.into()) + } +} diff --git a/src/yaml/mod.rs b/src/yaml/mod.rs new file mode 100644 index 0000000..a9af571 --- /dev/null +++ b/src/yaml/mod.rs @@ -0,0 +1,119 @@ +//! yaml configuration file type parser +//! +//! # Config file parser +//! +//! Use yaml configuration files to define a series of tasks, which eliminates the need for users to write code. +//! [`YamlParser`] is responsible for parsing the yaml configuration file into a series of [`YamlTask`]. +//! The program specifies the properties of the yaml task configuration file. The basic format of the yaml +//! configuration file is as follows: +//! +//! ```yaml +//! dagrs: +//! a: +//! name: "Task 1" +//! after: [ b, c ] +//! cmd: echo a +//! b: +//! name: "Task 2" +//! after: [ c, f, g ] +//! cmd: echo b +//! c: +//! name: "Task 3" +//! after: [ e, g ] +//! cmd: echo c +//! d: +//! name: "Task 4" +//! after: [ c, e ] +//! cmd: echo d +//! e: +//! name: "Task 5" +//! after: [ h ] +//! cmd: echo e +//! f: +//! name: "Task 6" +//! after: [ g ] +//! cmd: python3 ./tests/config/test.py +//! g: +//! name: "Task 7" +//! after: [ h ] +//! cmd: node ./tests/config/test.js +//! h: +//! name: "Task 8" +//! cmd: echo h +//! ``` +//! +//! Users can read the yaml configuration file programmatically or by using the compiled `dagrs` +//! command line tool. Either way, you need to enable the `yaml` feature. +//! +//! # Example +//! +//! ```rust +//! use dagrs::Dag; +//! let dag = Dag::with_yaml("some_path",std::collections::HashMap::new()); +//! ``` + +mod yaml_parser; +mod yaml_task; + +pub use self::yaml_parser::YamlParser; +pub use self::yaml_task::YamlTask; + +use crate::ParseError; + +/// Errors about task configuration items. +#[derive(Debug)] +pub enum YamlTaskError { + /// The configuration file should start with `dagrs:`. + StartWordError, + /// No task name configured. + NoNameAttr(String), + /// The specified task predecessor was not found. + NotFoundPrecursor(String), + /// `script` is not defined. + NoScriptAttr(String), +} + +/// Error about file information. +#[derive(Debug)] +pub enum FileContentError { + /// The format of the yaml configuration file is not standardized. + IllegalYamlContent(yaml_rust::ScanError), + Empty(String), +} + +/// Configuration file not found. +pub struct FileNotFound(pub std::io::Error); + +impl From for ParseError { + fn from(value: YamlTaskError) -> Self { + match value { + YamlTaskError::StartWordError => { + "File content is not start with 'dagrs'.".to_string().into() + } + YamlTaskError::NoNameAttr(ref msg) => { + format!("Task has no name field. [{}]", msg).into() + } + YamlTaskError::NotFoundPrecursor(ref msg) => { + format!("Task cannot find the specified predecessor. [{}]", msg).into() + } + YamlTaskError::NoScriptAttr(ref msg) => { + format!("The 'script' attribute is not defined. [{}]", msg).into() + } + } + } +} + +impl From for ParseError { + fn from(value: FileContentError) -> Self { + match value { + FileContentError::IllegalYamlContent(ref err) => err.to_string().into(), + FileContentError::Empty(ref file) => format!("File is empty! [{}]", file).into(), + } + } +} + +impl From for ParseError { + fn from(value: FileNotFound) -> Self { + format!("File not found. [{}]", value.0).into() + } +} diff --git a/dagrs_core/src/parser/yaml_parser.rs b/src/yaml/yaml_parser.rs similarity index 84% rename from dagrs_core/src/parser/yaml_parser.rs rename to src/yaml/yaml_parser.rs index 2358dcf..6eeb5e7 100644 --- a/dagrs_core/src/parser/yaml_parser.rs +++ b/src/yaml/yaml_parser.rs @@ -1,27 +1,18 @@ //! Default yaml configuration file parser. +use super::{FileContentError, FileNotFound, YamlTask, YamlTaskError}; +use crate::{utils::ParseError, Action, CommandAction, Parser, Task}; use std::{collections::HashMap, fs::File, io::Read, sync::Arc}; - use yaml_rust::{Yaml, YamlLoader}; -use crate::{ - task::{CommandAction, Task, YamlTask}, - Action, -}; - -use super::{ - error::{FileContentError, ParserError, YamlTaskError}, - Parser, -}; - /// An implementation of [`Parser`]. It is the default yaml configuration file parser. pub struct YamlParser; impl YamlParser { /// Given file path, and load configuration file. - fn load_file(&self, file: &str) -> Result { + fn load_file(&self, file: &str) -> Result { let mut content = String::new(); - let mut yaml = File::open(file)?; + let mut yaml = File::open(file).map_err(FileNotFound)?; yaml.read_to_string(&mut content).unwrap(); Ok(content) } @@ -37,7 +28,7 @@ impl YamlParser { &self, id: &str, item: &Yaml, - specific_action: Option>, + specific_action: Option, ) -> Result { // Get name first let name = item["name"] @@ -63,7 +54,7 @@ impl YamlParser { id, precursors, name, - Arc::new(CommandAction::new(cmd)) as Arc, + Action::Structure(Arc::new(CommandAction::new(cmd))), )) } } @@ -73,8 +64,8 @@ impl Parser for YamlParser { fn parse_tasks( &self, file: &str, - mut specific_actions: HashMap>, - ) -> Result>, ParserError> { + mut specific_actions: HashMap, + ) -> Result>, ParseError> { let content = self.load_file(file)?; // Parse Yaml let yaml_tasks = @@ -86,6 +77,7 @@ impl Parser for YamlParser { let yaml_tasks = yaml_tasks[0]["dagrs"] .as_hash() .ok_or(YamlTaskError::StartWordError)?; + let mut tasks = Vec::new(); let mut map = HashMap::new(); // Read tasks @@ -107,7 +99,7 @@ impl Parser for YamlParser { if map.contains_key(&pre[..]) { pres.push(map[&pre[..]]); } else { - return Err(YamlTaskError::NotFoundPrecursor(task.name()).into()); + return Err(YamlTaskError::NotFoundPrecursor(task.name().to_string()).into()); } } task.init_precursors(pres); diff --git a/dagrs_core/src/task/yaml_task.rs b/src/yaml/yaml_task.rs similarity index 76% rename from dagrs_core/src/task/yaml_task.rs rename to src/yaml/yaml_task.rs index c709cd5..43b83f6 100644 --- a/dagrs_core/src/task/yaml_task.rs +++ b/src/yaml/yaml_task.rs @@ -4,12 +4,10 @@ //! //! [`YamlTask`] implements the [`Task`] trait, which represents the tasks in the yaml //! configuration file, and a yaml configuration file will be parsed into a series of [`YamlTask`]. -//! It is different from [`DefaultTask`], in addition to the four mandatory attributes of the +//! It is different from `DefaultTask`, in addition to the four mandatory attributes of the //! task type, he has several additional attributes. -use std::sync::Arc; - -use super::{Action, Task, ID_ALLOCATOR}; +use crate::{alloc_id, Action, Task}; /// Task struct for yaml file. pub struct YamlTask { @@ -18,23 +16,17 @@ pub struct YamlTask { id: usize, name: String, /// Precursor identifier defined in yaml. - #[allow(unused)] precursors: Vec, precursors_id: Vec, - action: Arc, + action: Action, } impl YamlTask { #[allow(unused)] - pub fn new( - yaml_id: &str, - precursors: Vec, - name: String, - action: Arc, - ) -> Self { + pub fn new(yaml_id: &str, precursors: Vec, name: String, action: Action) -> Self { Self { yid: yaml_id.to_owned(), - id: ID_ALLOCATOR.alloc(), + id: alloc_id(), name, precursors, precursors_id: Vec::new(), @@ -62,7 +54,7 @@ impl YamlTask { } impl Task for YamlTask { - fn action(&self) -> Arc { + fn action(&self) -> Action { self.action.clone() } fn precursors(&self) -> &[usize] { @@ -71,7 +63,7 @@ impl Task for YamlTask { fn id(&self) -> usize { self.id } - fn name(&self) -> String { - self.name.clone() + fn name(&self) -> &str { + &self.name } } diff --git a/tests/config/script_run_failed.yaml b/tests/config/script_run_failed.yaml index cb5a5f9..9ec74ea 100644 --- a/tests/config/script_run_failed.yaml +++ b/tests/config/script_run_failed.yaml @@ -22,11 +22,11 @@ dagrs: f: name: "Task 6" after: [ g ] - cmd: Deno.core.print("f\n") + cmd: err_cmd g: name: "Task 7" after: [ h ] - cmd: Deno.core.print("g\n") + cmd: err_cmd h: name: "Task 8" cmd: echo h \ No newline at end of file diff --git a/tests/dag_job_test.rs b/tests/dag_job_test.rs index afbd1cf..6549ba3 100644 --- a/tests/dag_job_test.rs +++ b/tests/dag_job_test.rs @@ -1,21 +1,18 @@ -//! Some tests of the dag engine. - use std::{collections::HashMap, sync::Arc}; -use dagrs::{ - log, Action, Dag, DagError, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError, -}; +///! Some tests of the dag engine. +use dagrs::{log, Complex, Dag, DagError, DefaultTask, EnvVar, Input, LogLevel, Output}; #[test] fn yaml_task_correct_execute() { - let _initialized = log::init_logger(LogLevel::Info, None); + let _initialized = log::init_logger(LogLevel::Off, None); let mut job = Dag::with_yaml("tests/config/correct.yaml", HashMap::new()).unwrap(); assert!(job.start().unwrap()); } #[test] fn yaml_task_loop_graph() { - let _initialized = log::init_logger(LogLevel::Info, None); + let _initialized = log::init_logger(LogLevel::Off, None); let res = Dag::with_yaml("tests/config/loop_error.yaml", HashMap::new()) .unwrap() .start(); @@ -24,7 +21,7 @@ fn yaml_task_loop_graph() { #[test] fn yaml_task_self_loop_graph() { - let _initialized = log::init_logger(LogLevel::Info, None); + let _initialized = log::init_logger(LogLevel::Off, None); let res = Dag::with_yaml("tests/config/self_loop_error.yaml", HashMap::new()) .unwrap() .start(); @@ -33,36 +30,19 @@ fn yaml_task_self_loop_graph() { #[test] fn yaml_task_failed_execute() { - let _initialized = log::init_logger(LogLevel::Info, None); + let _initialized = log::init_logger(LogLevel::Off, None); let res = Dag::with_yaml("tests/config/script_run_failed.yaml", HashMap::new()) .unwrap() .start(); assert!(!res.unwrap()); } -macro_rules! generate_task { - ($task:ident($val:expr),$name:expr) => {{ - pub struct $task(usize); - impl Action for $task { - fn run(&self, input: Input, env: Arc) -> Result { - let base = env.get::("base").unwrap(); - let mut sum = self.0; - input - .get_iter() - .for_each(|i| sum += i.get::().unwrap() * base); - Ok(Output::new(sum)) - } - } - DefaultTask::new($task($val), $name) - }}; -} - #[test] fn task_loop_graph() { - let _initialized = log::init_logger(LogLevel::Info, None); - let mut a = generate_task!(A(1), "Compute A"); - let mut b = generate_task!(B(2), "Compute B"); - let mut c = generate_task!(C(4), "Compute C"); + let _initialized = log::init_logger(LogLevel::Off, None); + let mut a = DefaultTask::with_closure("a", |_, _| Output::empty()); + let mut b = DefaultTask::with_closure("b", |_, _| Output::empty()); + let mut c = DefaultTask::with_closure("c", |_, _| Output::empty()); a.set_predecessors(&[&b]); b.set_predecessors(&[&c]); c.set_predecessors(&[&a]); @@ -78,37 +58,53 @@ fn task_loop_graph() { #[test] fn non_job() { - let _initialized = log::init_logger(LogLevel::Info, None); + let _initialized = log::init_logger(LogLevel::Off, None); let tasks: Vec = Vec::new(); let res = Dag::with_tasks(tasks).start(); assert!(res.is_err()); - println!("{}", res.unwrap_err()); } struct FailedActionC(usize); -impl Action for FailedActionC { - fn run(&self, _input: Input, env: Arc) -> Result { +impl Complex for FailedActionC { + fn run(&self, _input: Input, env: Arc) -> Output { let base = env.get::("base").unwrap(); - Ok(Output::new(base / self.0)) + Output::new(base / self.0) } } struct FailedActionD(usize); -impl Action for FailedActionD { - fn run(&self, _input: Input, _env: Arc) -> Result { - Err(RunningError::new("error".to_string())) +impl Complex for FailedActionD { + fn run(&self, _input: Input, _env: Arc) -> Output { + Output::Err("error".to_string()) } } +macro_rules! generate_task { + ($task:ident($val:expr),$name:literal) => {{ + pub struct $task(usize); + impl Complex for $task { + fn run(&self, input: Input, env: Arc) -> Output { + let base = env.get::("base").unwrap(); + let mut sum = self.0; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + } + } + DefaultTask::with_action($name, $task($val)) + }}; +} + #[test] fn task_failed_execute() { - let _initialized = log::init_logger(LogLevel::Info, None); + let _initialized = log::init_logger(LogLevel::Off, None); let a = generate_task!(A(1), "Compute A"); let mut b = generate_task!(B(2), "Compute B"); - let mut c = DefaultTask::new(FailedActionC(0), "Compute C"); - let mut d = DefaultTask::new(FailedActionD(1), "Compute D"); + let mut c = DefaultTask::with_action("Compute C", FailedActionC(0)); + let mut d = DefaultTask::with_action("Compute D", FailedActionD(1)); let mut e = generate_task!(E(16), "Compute E"); let mut f = generate_task!(F(32), "Compute F"); let mut g = generate_task!(G(64), "Compute G"); diff --git a/tests/yaml_parser_test.rs b/tests/yaml_parser_test.rs index da92c27..95f46af 100644 --- a/tests/yaml_parser_test.rs +++ b/tests/yaml_parser_test.rs @@ -1,76 +1,73 @@ use std::collections::HashMap; -use dagrs::{FileContentError, Parser, ParserError, YamlParser, YamlTaskError}; +use dagrs::{ParseError, Parser, Task, YamlParser}; #[test] fn file_not_found_test() { - let no_such_file = YamlParser.parse_tasks("./no_such_file.yaml", HashMap::new()); - assert!(matches!(no_such_file, Err(ParserError::FileNotFound(_)))); + let no_such_file: Result>, ParseError> = + YamlParser.parse_tasks("./no_such_file.yaml", HashMap::new()); + // let err = no_such_file.unwrap_err().to_string(); + // println!("{err}"); + assert!(no_such_file.is_err()) } #[test] fn illegal_yaml_content() { - let illegal_content = + let illegal_content: Result>, ParseError> = YamlParser.parse_tasks("tests/config/illegal_content.yaml", HashMap::new()); - assert!(matches!( - illegal_content, - Err(ParserError::FileContentError( - FileContentError::IllegalYamlContent(_) - )) - )); + // let err = illegal_content.unwrap_err().to_string(); + // println!("{err}"); + assert!(illegal_content.is_err()) } #[test] fn empty_content() { - let empty_content = YamlParser.parse_tasks("tests/config/empty_file.yaml", HashMap::new()); - assert!(matches!( - empty_content, - Err(ParserError::FileContentError(FileContentError::Empty(_))) - )) + let empty_content: Result>, ParseError> = + YamlParser.parse_tasks("tests/config/empty_file.yaml", HashMap::new()); + // let err = empty_content.unwrap_err().to_string(); + // println!("{err}"); + assert!(empty_content.is_err()) } #[test] fn yaml_no_start_with_dagrs() { - let forget_dagrs = + let forget_dagrs: Result>, ParseError> = YamlParser.parse_tasks("tests/config/no_start_with_dagrs.yaml", HashMap::new()); - assert!(matches!( - forget_dagrs, - Err(ParserError::YamlTaskError(YamlTaskError::StartWordError)) - )); + // let err = forget_dagrs.unwrap_err().to_string(); + // println!("{err}"); + assert!(forget_dagrs.is_err()) } #[test] fn yaml_task_no_name() { - let no_task_name = YamlParser.parse_tasks("tests/config/no_task_name.yaml", HashMap::new()); - assert!(matches!( - no_task_name, - Err(ParserError::YamlTaskError(YamlTaskError::NoNameAttr(_))) - )); + let no_task_name: Result>, ParseError> = + YamlParser.parse_tasks("tests/config/no_task_name.yaml", HashMap::new()); + // let err = no_task_name.unwrap_err().to_string(); + // println!("{err}"); + assert!(no_task_name.is_err()) } #[test] fn yaml_task_not_found_precursor() { - let not_found_pre = + let not_found_pre: Result>, ParseError> = YamlParser.parse_tasks("tests/config/precursor_not_found.yaml", HashMap::new()); - assert!(matches!( - not_found_pre, - Err(ParserError::YamlTaskError( - YamlTaskError::NotFoundPrecursor(_) - )) - )); + // let err = not_found_pre.unwrap_err().to_string(); + // println!("{err}"); + assert!(not_found_pre.is_err()) } #[test] fn yaml_task_no_script_config() { - let script = YamlParser.parse_tasks("tests/config/no_script.yaml", HashMap::new()); - assert!(matches!( - script, - Err(ParserError::YamlTaskError(YamlTaskError::NoScriptAttr(_))) - )); + let script: Result>, ParseError> = + YamlParser.parse_tasks("tests/config/no_script.yaml", HashMap::new()); + // let err = script.unwrap_err().to_string(); + // println!("{err}"); + assert!(script.is_err()) } #[test] fn correct_parse() { - let tasks = YamlParser.parse_tasks("tests/config/correct.yaml", HashMap::new()); + let tasks: Result>, ParseError> = + YamlParser.parse_tasks("tests/config/correct.yaml", HashMap::new()); assert!(tasks.is_ok()); } From b13bd64deb108c338ca7431de9449306133da462 Mon Sep 17 00:00:00 2001 From: QIUZHILEI <2925212608@qq.com> Date: Fri, 17 Nov 2023 10:31:22 +0800 Subject: [PATCH 2/3] Improve code documentation and comments. Signed-off-by: QIUZHILEI <2925212608@qq.com> --- src/task/action.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/task/action.rs b/src/task/action.rs index 763d85e..a0fca7e 100644 --- a/src/task/action.rs +++ b/src/task/action.rs @@ -31,24 +31,24 @@ pub type Simple = dyn Fn(Input, Arc) -> Output + Send + Sync + 'static; /// use dagrs::{Action,Input,EnvVar,Output,Complex}; /// /// struct HelloAction{ -/// statement:String, -/// repeat:usize, +/// statement: String, +/// repeat: usize, /// } /// /// impl Complex for HelloAction{ /// fn run(&self, input: Input, env: Arc) -> Output{ -/// for i in 0..self.repeat{ -/// println!("{}",self.statement); -/// } -/// Output::empty() -/// } +/// for i in 0..self.repeat { +/// println!("{}",self.statement); +/// } +/// Output::empty() +/// } /// } /// -/// let hello=HelloAction{ -/// statement:"hello world!".to_string(), -/// repeat:10 +/// let hello=HelloAction { +/// statement: "hello world!".to_string(), +/// repeat: 10 /// }; -/// let action=Action::Structure(Arc::new(hello)); +/// let action = Action::Structure(Arc::new(hello)); /// ``` pub trait Complex { fn run(&self, input: Input, env: Arc) -> Output; From a1598d2b40d536948763348cd55e145c622c2bff Mon Sep 17 00:00:00 2001 From: QIUZHILEI <2925212608@qq.com> Date: Fri, 17 Nov 2023 11:33:57 +0800 Subject: [PATCH 3/3] Improve the README.md document. Signed-off-by: QIUZHILEI <2925212608@qq.com> --- README.md | 198 ++++++++++++++++++++++++++++------------ examples/actions.rs | 19 ++++ examples/compute_dag.rs | 1 - 3 files changed, 157 insertions(+), 61 deletions(-) create mode 100644 examples/actions.rs diff --git a/README.md b/README.md index 0e206b8..92a4dbd 100644 --- a/README.md +++ b/README.md @@ -48,97 +48,166 @@ Among them, each task may produce output, and may also require the output of som ### Programmatically implement task definition -Users need to program to implement the `Action` trait to define the specific logic of the task, and then build a series of `DefaultTask`. The example: `examples/compute_dag.rs`. `DefaultTask` is the default implementation of the Task trait, and it has several mandatory attributes: +Users need to program to implement the `Action` trait to define the specific logic of the task, and then build a series of `DefaultTask`. -- `id`: uniquely identifies the task assigned by the global ID assigner -- `name`: the name of the task -- `predecessor_tasks`: the predecessor tasks of this task -- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task +First, users need to define some specific task logic. There are two ways to define task logic: + +- Create a closure whose type is `Simple`, which is suitable for simple scenarios. +- Create a type and implement the `Complex` trait, which is suitable for more complex situations. For example, if the logic of the task is to execute a system command, the command string needs to be recorded in some way. You can create a `Commad` structure with a string attribute inside to store the command string. + +You can refer to examples:`examples/actions.rs`. + +In the second step, you need to use the defined task logic to create specific tasks. Here you may need to use the `DefaultTask` type, which provides users with several ways to create `Task`. `DefaultTask` allows you to specify specific task logic for the task and give the task a name. Please refer to the documentation for specific function functions. + +In the third step, you need to specify dependencies for the defined series of tasks. Here you need to use the `set_predecessors` function of `DefaultTask`. This function requires you to specify a series of predecessor tasks for the current task. + +The fourth step is to create a `Dag` and put all the defined tasks into the `Dag` scheduler. + +Optional step: You can specify an environment variable for `Dag`. This environment variable is available in all tasks. In some specific tasks, this behavior can be useful. + +Finally, don’t forget to initialize the logger, and then you can call the `start` function of `Dag` to start executing all tasks. + +You can refer to an example for the above complete steps: `examples/compute_dag.rs` Here is the `examples/impl_action.rs` example: ```rust -//! Implement the Action trait to define the task logic. +//! Only use Dag, execute a job. The graph is as follows: +//! +//! ↱----------↴ +//! B -→ E --→ G +//! ↗ ↗ ↗ +//! A --→ C / +//! ↘ ↘ / +//! D -→ F +//! +//! The final execution result is 272. + +extern crate dagrs; use std::sync::Arc; -use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError}; +use dagrs::{log, Complex, Dag, DefaultTask, EnvVar, Input, LogLevel, Output}; -struct SimpleAction(usize); +struct Compute(usize); -/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function. -/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate. -impl Action for SimpleAction { - fn run(&self, input: Input, env: Arc) -> Result { +impl Complex for Compute { + fn run(&self, input: Input, env: Arc) -> Output { let base = env.get::("base").unwrap(); let mut sum = self.0; input .get_iter() .for_each(|i| sum += i.get::().unwrap() * base); - Ok(Output::new(sum)) + Output::new(sum) } } fn main() { - // Initialize the global logger + // initialization log. let _initialized = log::init_logger(LogLevel::Info, None); - // Generate four tasks. - let a = DefaultTask::new(SimpleAction(10), "Task a"); - let mut b = DefaultTask::new(SimpleAction(20), "Task b"); - let mut c = DefaultTask::new(SimpleAction(30), "Task c"); - let mut d = DefaultTask::new(SimpleAction(40), "Task d"); - // Set the precursor for each task. + // generate some tasks. + let a = DefaultTask::with_action("Compute A", Compute(1)); + + let mut b = DefaultTask::with_action("Compute B", Compute(2)); + + let mut c = DefaultTask::new("Compute C"); + c.set_action(Compute(4)); + + let mut d = DefaultTask::new("Compute D"); + d.set_action(Compute(8)); + + let mut e = DefaultTask::with_closure("Compute E", |input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 16; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + let mut f = DefaultTask::with_closure("Compute F", |input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 32; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + + let mut g = DefaultTask::new("Compute G"); + g.set_closure(|input, env| { + let base = env.get::("base").unwrap(); + let mut sum = 64; + input + .get_iter() + .for_each(|i| sum += i.get::().unwrap() * base); + Output::new(sum) + }); + + // Set up task dependencies. b.set_predecessors(&[&a]); c.set_predecessors(&[&a]); - d.set_predecessors(&[&b, &c]); - // Take these four tasks as a Dag. - let mut dag = Dag::with_tasks(vec![a, b, c, d]); + d.set_predecessors(&[&a]); + e.set_predecessors(&[&b, &c]); + f.set_predecessors(&[&c, &d]); + g.set_predecessors(&[&b, &e, &f]); + // Create a new Dag. + let mut dag = Dag::with_tasks(vec![a, b, c, d, e, f, g]); // Set a global environment variable for this dag. let mut env = EnvVar::new(); env.set("base", 2usize); dag.set_env(env); - // Begin execution. + // Start executing this dag assert!(dag.start().unwrap()); - // Get execution result - assert_eq!(dag.get_result::().unwrap(), 220); + // Get execution result. + let res = dag.get_result::().unwrap(); + println!("The result is {}.", res); } ``` **explain:** -First, we define `SimpleAction` and implement the `Action` trait for this structure. In the rewritten run function, we simply get the output value of the predecessor task and multiply it by the environment variable `base`. Then accumulate the multiplied result to itself self.0. - -After defining the specific task logic, start creating the prerequisites for executing `Dag`: -Initialize the global logger first. Here we set the log level to Info, and do not give the log output file, let the log output to the console by default. +First, we initialize the logger, declare the `Compute` type, and implement the `Complex` trait for it. In the rewritten run function, we simply get the output value of the predecessor task and multiply it by the environment variable `base`. Then accumulate the multiplied result to itself self.0. -Create a `DefaultTask` with `SimpleAction` and give the task a name. Then set the dependencies between tasks. +Next, we define 6 tasks and show the usage of some functions in the `DefaultTask` type. Set predecessor tasks for each task. -Then create a Dag and assign it a global environment variable. +Then, create a `Dag`, set a base environment variable for it, and use the start method to start executing all tasks. Finally we call the `start` function of `Dag` to execute all tasks. After the task is executed, call the `get_result` function to obtain the final execution result of the task. The graph formed by the task is shown below: ```mermaid -flowchart LR; - A((Task a))-->B; A-->C; B((Task b))-->D; C((Task c))-->D((Task d)); +flowchart LR + A-->B + A-->C + B-->D + B-->F + C-->D + C-->E + D-->F + E-->F ``` The execution order is a->c->b->d. ```bash -$cargo run -[Start] -> Task a -> Task c -> Task b -> Task d -> [End] -Executing Task[name: Task a] -Task executed successfully. [name: Task a] -Executing Task[name: Task b] -Executing Task[name: Task c] -Task executed successfully. [name: Task b] -Task executed successfully. [name: Task c] -Executing Task[name: Task d] -Task executed successfully. [name: Task d] - -Process finished with exit code 0 +$ cargo run --example compute_dag +[Start] -> Compute A -> Compute B -> Compute D -> Compute C -> Compute F -> Compute E -> Compute G -> [End] +Executing task [name: Compute A, id: 1] +Execution succeed [name: Compute A, id: 1] +Executing task [name: Compute C, id: 3] +Executing task [name: Compute B, id: 2] +Executing task [name: Compute D, id: 4] +Execution succeed [name: Compute C, id: 3] +Execution succeed [name: Compute B, id: 2] +Execution succeed [name: Compute D, id: 4] +Executing task [name: Compute F, id: 6] +Executing task [name: Compute E, id: 5] +Execution succeed [name: Compute F, id: 6] +Execution succeed [name: Compute E, id: 5] +Executing task [name: Compute G, id: 7] +Execution succeed [name: Compute G, id: 7] +The result is 272. ``` ### `Yaml` configuration file @@ -192,8 +261,8 @@ These yaml-defined task items form a complex dependency graph. In the yaml confi To parse the yaml configured file, you need to compile this project, requiring rust version >= 1.70: ```bash -$cargo build --release -$ .\target\release\dagrs.exe --help +$ cargo build --release --features=yaml +$ ./target/release/dagrs.exe --help Usage: dagrs.exe [OPTIONS] --yaml Options: @@ -213,7 +282,7 @@ Options: We can try an already defined file at `tests/config/correct.yaml` ```bash -$./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info +$ ./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info [Start] -> Task 8 -> Task 5 -> Task 7 -> Task 6 -> Task 3 -> Task 2 -> Task 1 -> Task 4 -> [End] Executing Task[name: Task 8] Executing Task[name: Task 5] @@ -227,10 +296,19 @@ Executing Task[name: Task 1] You can see an example: `examples/yaml_dag.rs`. In fact, you can also programmatically read the yaml configuration file generation task, which is very simple, just use the `with_yaml` function provided by `Dag` to parse the configuration file. +-------------------------------------- + **In addition to these two methods, `dagrs` also supports advanced task custom configuration.** -- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference, or refer to `example/custom_task.rs`. -- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference, or refer to `examples/custom_parser.rs` +- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference. No matter how you customize the task type, the customized task type must have the following attributes: + - `id`: uniquely identifies the task assigned by the global ID assigner + - `name`: the name of the task + - `predecessor_tasks`: the predecessor tasks of this task + - `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task + +- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference. + +`examples/custom_parser_and_task.rs` is an example of a custom task type and a custom configuration file parser ## Analyze the logic of task execution @@ -323,27 +401,27 @@ gantt ### Basic function usage -`examples/compute_dag.rs`: Use a custom macro to generate multiple simple tasks. +`examples/compute_dag.rs`: A complete usage example of dagrs. -`examples/impl_action.rs`: Define a simple Action to build multiple tasks with the same logic. +`examples/action.rs`: Two ways to define the specific logic of a task. -`examples/yaml_dag.rs`: Spawn multiple tasks with a given yaml configuration file。 - -`examples/use_macro.rs`: Use the `gen_task` macro provided by `dagrs` to generate multiple simple tasks。 +`examples/yaml_dag.rs`: Example of reading yaml configuration file (needs to enable `yaml` features). `examples/engine.rs`: Using `Engine` to manage multiple dags with different task types. ### Advanced Features -`examples/custom_task.rs`: Implement the `Task` trait and define your own task type. - -`examples/custom_parser.rs`: Implement the `Parser` trait to define your own task configuration file parser。 +`examples/custom_parser_and_task.rs`: Custom task types and configuration file parsers. `examples/custom_log.rs`: Implement the `Logger` trait to define your own global logger. +`examples/derive_task.rs`:Use `CustomTask` derived macros to help customize task types. + +`examples/dependencies.rs`:Use the `dependencies!` macro to specify dependencies in an intuitive way and define a series of tasks. + ## Contribution -The dagrs project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review. +The `dagrs` project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review. ### What's the contribution diff --git a/examples/actions.rs b/examples/actions.rs new file mode 100644 index 0000000..594e2ac --- /dev/null +++ b/examples/actions.rs @@ -0,0 +1,19 @@ +//! Construct two different actions. +//! - Create a closure directly +//! - Implementing Complex, the type can have some additional information. +use dagrs::{Complex, DefaultTask, Output}; + +struct Act(usize); + +impl Complex for Act { + fn run(&self, _input: dagrs::Input, _env: std::sync::Arc) -> Output { + Output::new(self.0 + 10) + } +} +fn main() { + let simple = |_input, _env| Output::new("simple"); + let _simple_task = DefaultTask::with_closure("simple task", simple); + + let complex = Act(20); + let _complex_task = DefaultTask::with_action("complex action", complex); +} diff --git a/examples/compute_dag.rs b/examples/compute_dag.rs index c53370f..1188518 100644 --- a/examples/compute_dag.rs +++ b/examples/compute_dag.rs @@ -12,7 +12,6 @@ extern crate dagrs; use std::sync::Arc; - use dagrs::{log, Complex, Dag, DefaultTask, EnvVar, Input, LogLevel, Output}; struct Compute(usize);