Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap tokio channels & Information Package Def #75

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 51 additions & 36 deletions .github/workflows/base.yml
Original file line number Diff line number Diff line change
@@ -1,44 +1,59 @@
# Based on https://github.com/actions-rs/meta/blob/master/recipes/quickstart.md
#
#
#
#

# on: [ push, pull_request ]

# name: Base GitHub Action for Check, Test and Lints

# jobs:
# check:
# name: Check
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - uses: actions-rs/toolchain@v1
# with:
# profile: minimal
# toolchain: stable
# override: true
# - uses: actions-rs/cargo@v1
# with:
# command: check
# - uses: actions/setup-node@v3
# with:
# node-version: 16

# test:
# name: Test Suite
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - uses: actions-rs/toolchain@v1
# with:
# profile: minimal
# toolchain: stable
# override: true
# - uses: actions-rs/cargo@v1
# with:
# command: test

on: [ push, pull_request ]

name: Base GitHub Action for Check, Test and Lints

jobs:
check:
name: Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: check
- uses: actions/setup-node@v3
with:
node-version: 16

test:
name: Fmt Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --check

# test:
# name: Test Suite
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - uses: actions-rs/toolchain@v1
# with:
# profile: minimal
# toolchain: stable
# override: true
# - uses: actions-rs/cargo@v1
# with:
# command: test

# clippy:
# name: Clippy
Expand Down
53 changes: 2 additions & 51 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@ repository = "https://github.com/open-rust-initiative/dagrs"
keywords = ["DAG", "task", "async", "parallel", "concurrent"]

[workspace]
members = ["derive", "."]
members = ["."]

[dependencies]
yaml-rust = { version = "0.4.5", optional = true }
bimap = "0.6.1"
clap = { version = "4.2.2", features = ["derive"] }
tokio = { version = "1.28", features = ["rt", "sync", "rt-multi-thread"] }
derive = { path = "derive", version = "0.3.0", optional = true }
thiserror = "1.0.50"
log = "0.4"
env_logger = "0.10.1"

Expand All @@ -27,50 +22,6 @@ simplelog = "0.12"
criterion = { version = "0.5.1", features = ["html_reports"] }

[target.'cfg(unix)'.dev-dependencies]
pprof = { version = "0.13.0" }

[features]
yaml = ["dep:yaml-rust"]
derive = ["derive/derive"]
bench-prost-codec = [
"pprof/criterion",
"pprof/prost-codec",
"pprof/_protobuf",
"pprof/protobuf-codec",
]

[[bin]]
name = "dagrs"
required-features = ["yaml"]

[[example]]
name = "custom_log"
required-features = ["yaml"]

[[example]]
name = "engine"
required-features = ["yaml"]

[[example]]
name = "derive_task"
required-features = ["derive"]

[[example]]
name = "dependencies"
required-features = ["derive"]

[[example]]
name = "yaml_dag"
required-features = ["yaml"]

[[test]]
name = "dag_job_test"
required-features = ["yaml"]

[[test]]
name = "yaml_parser_test"
required-features = ["yaml"]

[[bench]]
name = "compute_dag_bench"
harness = false
[features]
119 changes: 119 additions & 0 deletions src/connection/in_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::{collections::HashMap, sync::Arc};

use tokio::sync::{broadcast, mpsc, Mutex};

use crate::graph::node::NodeId;

use super::information_packet::Content;

/// # Input Channels
/// A hash-table mapping `NodeId` to `InChannel`. In **Dagrs**, each `Node` stores input
/// channels in this map, enabling `Node` to receive information packets from other `Node`s.
#[derive(Default)]
pub struct InChannels(pub HashMap<NodeId, Arc<Mutex<InChannel>>>);

Check warning on line 13 in src/connection/in_channel.rs

View workflow job for this annotation

GitHub Actions / Check

field `0` is never read

impl InChannels {
/// Perform a blocking receive on the incoming channel from `NodeId`.
pub fn blocking_recv_from(&mut self, id: &NodeId) -> Result<Content, RecvErr> {

Check warning on line 17 in src/connection/in_channel.rs

View workflow job for this annotation

GitHub Actions / Check

methods `blocking_recv_from`, `recv_from`, `close`, and `get` are never used
match self.get(id) {
Some(channel) => channel.blocking_lock().blocking_recv(),
None => Err(RecvErr::NoSuchChannel),
}
}
/// Perform a asynchronous receive on the incoming channel from `NodeId`.
pub async fn recv_from(&mut self, id: &NodeId) -> Result<Content, RecvErr> {
match self.get(id) {
Some(channel) => channel.lock().await.recv().await,
None => Err(RecvErr::NoSuchChannel),
}
}

/// Close the channel by the given `NodeId`, and remove the channel in this map.
pub fn close(&mut self, id: &NodeId) {
if let Some(c) = self.get(id) {
c.blocking_lock().close();
self.0.remove(id);
}
}

fn get(&self, id: &NodeId) -> Option<Arc<Mutex<InChannel>>> {
match self.0.get(id) {
Some(c) => Some(c.clone()),
None => None,
}
}
}

/// # Input Channel
/// Wrapper of receivers of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will
/// decide the inner type of channel when building the graph.
/// Learn more about [Tokio Channels](https://tokio.rs/tokio/tutorial/channels).
pub enum InChannel {
/// Receiver of a `tokio::sync::mpsc` channel.
Mpsc(mpsc::Receiver<Content>),

Check warning on line 53 in src/connection/in_channel.rs

View workflow job for this annotation

GitHub Actions / Check

variants `Mpsc` and `Bcst` are never constructed
/// Receiver of a `tokio::sync::broadcast` channel.
Bcst(broadcast::Receiver<Content>),
}

impl InChannel {
/// Perform a blocking receive on this channel.
fn blocking_recv(&mut self) -> Result<Content, RecvErr> {

Check warning on line 60 in src/connection/in_channel.rs

View workflow job for this annotation

GitHub Actions / Check

methods `blocking_recv`, `recv`, and `close` are never used
match self {
InChannel::Mpsc(receiver) => {
if let Some(content) = receiver.blocking_recv() {
Ok(content)
} else {
Err(RecvErr::Closed)
}
}
InChannel::Bcst(receiver) => match receiver.blocking_recv() {
Ok(v) => Ok(v),
Err(e) => match e {
broadcast::error::RecvError::Closed => Err(RecvErr::Closed),
broadcast::error::RecvError::Lagged(x) => Err(RecvErr::Lagged(x)),
},
},
}
}

/// Perform a asynchronous receive on this channel.
async fn recv(&mut self) -> Result<Content, RecvErr> {
match self {
InChannel::Mpsc(receiver) => {
if let Some(content) = receiver.recv().await {
Ok(content)
} else {
Err(RecvErr::Closed)
}
}
InChannel::Bcst(receiver) => match receiver.recv().await {
Ok(v) => Ok(v),
Err(e) => match e {
broadcast::error::RecvError::Closed => Err(RecvErr::Closed),
broadcast::error::RecvError::Lagged(x) => Err(RecvErr::Lagged(x)),
},
},
}
}

/// Close the channel and drop the messages inside.
fn close(&mut self) {
match self {
InChannel::Mpsc(receiver) => receiver.close(),
// Broadcast channel will be closed after `self` is dropped.
InChannel::Bcst(_) => (),
}
}
}

/// # Input Channel Error Types
/// - NoSuchChannel: try to get a channel with an invalid `NodeId`.
/// - Closed: the channel to receive messages from is closed and empty already.
/// - Lagged(x): the channel encounters a cache overflow and `x` information
/// pakages are dropped on this receiver's side.
#[derive(Debug)]
pub enum RecvErr {

Check warning on line 115 in src/connection/in_channel.rs

View workflow job for this annotation

GitHub Actions / Check

enum `RecvErr` is never used
NoSuchChannel,
Closed,
Lagged(u64),
}
28 changes: 28 additions & 0 deletions src/connection/information_packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::{any::Any, sync::Arc};

/// Container type to store task output.
#[derive(Debug, Clone)]
pub struct Content {
inner: Arc<dyn Any + Send + Sync>,

Check warning on line 6 in src/connection/information_packet.rs

View workflow job for this annotation

GitHub Actions / Check

field `inner` is never read
}

impl Content {
/// Construct a new [`Content`].
pub fn new<H: Send + Sync + 'static>(val: H) -> Self {

Check warning on line 11 in src/connection/information_packet.rs

View workflow job for this annotation

GitHub Actions / Check

associated items `new`, `from_arc`, `get`, and `into_inner` are never used
Self {
inner: Arc::new(val),
}
}

pub fn from_arc<H: Send + Sync + 'static>(val: Arc<H>) -> Self {
Self { inner: val }
}

pub fn get<H: 'static>(&self) -> Option<&H> {
self.inner.downcast_ref::<H>()
}

pub fn into_inner<H: Send + Sync + 'static>(self) -> Option<Arc<H>> {
self.inner.downcast::<H>().ok()
}
}
2 changes: 2 additions & 0 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod in_channel;
pub mod information_packet;
Loading
Loading