Skip to content

Commit 5b71111

Browse files
committed
feat(scheduler): add NodeRegistered structure
Signed-off-by: iverly <[email protected]>
1 parent 25b1e25 commit 5b71111

File tree

7 files changed

+331
-23
lines changed

7 files changed

+331
-23
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ serde_derive = "1.0.142"
1717
confy = "0.4.0"
1818
anyhow = "1.0.62"
1919
thiserror = "1.0.32"
20+
async-stream = "0.3.3"

scheduler/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::net::IpAddr;
2+
13
use proto::scheduler::{
24
Instance, InstanceStatus, NodeRegisterRequest, NodeRegisterResponse, NodeStatus,
35
NodeUnregisterRequest, NodeUnregisterResponse,
@@ -9,7 +11,8 @@ use tonic::Response;
911
pub mod config;
1012
pub mod instance;
1113
pub mod manager;
12-
pub mod node_listener;
14+
pub mod node;
15+
pub mod parser;
1316
pub mod storage;
1417

1518
#[derive(Error, Debug)]
@@ -28,6 +31,14 @@ pub enum SchedulerError {
2831

2932
#[derive(Error, Debug)]
3033
pub enum ProxyError {
34+
#[error("an transport error occurred from tonic: {0}")]
35+
TonicTransportError(#[from] tonic::transport::Error),
36+
#[error("an status error occurred from tonic: {0}")]
37+
TonicStatusError(#[from] tonic::Status),
38+
#[error("the gRPC client was not found")]
39+
GrpcClientNotFound,
40+
#[error("the gRPC stream was not found")]
41+
GrpcStreamNotFound,
3142
#[error("an error occurred while sending a message to the channel")]
3243
ChannelSenderError,
3344
}
@@ -64,6 +75,7 @@ pub enum Event {
6475
// Node events
6576
NodeRegister(
6677
NodeRegisterRequest,
78+
IpAddr,
6779
oneshot::Sender<Result<Response<NodeRegisterResponse>, tonic::Status>>,
6880
),
6981
NodeUnregister(

scheduler/src/manager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use tokio::{sync::oneshot, task::JoinHandle};
1111
use tonic::{transport::Server, Response};
1212

1313
use crate::instance::listener::InstanceListener;
14+
use crate::node::listener::NodeListener;
1415
use crate::SchedulerError;
15-
use crate::{config::Config, node_listener::NodeListener, storage::Storage, Event, Node};
16+
use crate::{config::Config, storage::Storage, Event, Node};
1617

1718
#[derive(Debug)]
1819
pub struct Manager {
@@ -139,7 +140,7 @@ impl Manager {
139140
info!("received instance destroy event : {:?}", id);
140141
tx.send(Ok(Response::new(()))).unwrap();
141142
}
142-
Event::NodeRegister(request, tx) => {
143+
Event::NodeRegister(request, _, tx) => {
143144
info!("received node register event : {:?}", request);
144145
tx.send(Ok(Response::new(NodeRegisterResponse::default())))
145146
.unwrap();
Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
use log::debug;
21
use proto::scheduler::{
32
node_service_server::NodeService, NodeRegisterRequest, NodeRegisterResponse, NodeStatus,
43
NodeUnregisterRequest, NodeUnregisterResponse,
54
};
65
use tokio::sync::mpsc;
7-
use tonic::{Request, Response, Status, Streaming};
6+
use tonic::{Request, Response, Streaming};
87

98
use crate::{manager::Manager, Event};
109

1110
#[derive(Debug)]
12-
#[allow(dead_code)]
1311
pub struct NodeListener {
1412
sender: mpsc::Sender<Event>,
1513
}
@@ -25,33 +23,34 @@ impl NodeService for NodeListener {
2523
async fn status(
2624
&self,
2725
request: Request<Streaming<NodeStatus>>,
28-
) -> Result<Response<()>, Status> {
26+
) -> Result<Response<()>, tonic::Status> {
27+
log::debug!("received gRPC request: {:?}", request);
28+
2929
let mut stream = request.into_inner();
30-
let (tx, mut rx) = Manager::create_mpsc_channel();
3130

31+
// send each status to the manager
3232
loop {
33+
let (tx, mut rx) = Manager::create_mpsc_channel();
3334
let message = stream.message().await?;
35+
3436
match message {
3537
Some(node_status) => {
36-
debug!("Node status: {:?}", node_status);
3738
self.sender
3839
.send(Event::NodeStatus(node_status, tx.clone()))
3940
.await
4041
.unwrap();
4142

43+
// wait for the manager to respond
4244
if let Some(res) = rx.recv().await {
4345
match res {
44-
Ok(()) => {
45-
debug!("Node status updated successfully");
46-
}
47-
Err(err) => {
48-
debug!("Error updating node status: {:?}", err);
49-
return Err(err);
50-
}
46+
Ok(_) => {}
47+
Err(err) => return Err(err),
5148
}
5249
}
5350
}
5451
None => {
52+
log::error!("Node status stream closed");
53+
// todo: emit node crash event (get the node id from the first status)
5554
return Ok(Response::new(()));
5655
}
5756
}
@@ -61,29 +60,33 @@ impl NodeService for NodeListener {
6160
async fn register(
6261
&self,
6362
request: Request<NodeRegisterRequest>,
64-
) -> Result<Response<NodeRegisterResponse>, Status> {
65-
debug!("{:?}", request);
63+
) -> Result<Response<NodeRegisterResponse>, tonic::Status> {
64+
log::debug!("received gRPC request: {:?}", request);
65+
6666
let (tx, rx) = Manager::create_oneshot_channel();
67+
let remote_addr = request.remote_addr().unwrap().ip();
68+
log::debug!("Registering a new node from: {:?}", remote_addr);
6769

6870
match self
6971
.sender
70-
.send(Event::NodeRegister(request.into_inner(), tx))
72+
.send(Event::NodeRegister(request.into_inner(), remote_addr, tx))
7173
.await
7274
{
7375
Ok(_) => {
7476
return rx.await.unwrap();
7577
}
7678
Err(_) => {
77-
return Err(Status::internal("could not send event to manager"));
79+
return Err(tonic::Status::internal("could not send event to manager"));
7880
}
7981
}
8082
}
8183

8284
async fn unregister(
8385
&self,
8486
request: Request<NodeUnregisterRequest>,
85-
) -> Result<Response<NodeUnregisterResponse>, Status> {
86-
debug!("{:?}", request);
87+
) -> Result<Response<NodeUnregisterResponse>, tonic::Status> {
88+
log::debug!("received gRPC request: {:?}", request);
89+
8790
let (tx, rx) = Manager::create_oneshot_channel();
8891

8992
match self
@@ -95,7 +98,7 @@ impl NodeService for NodeListener {
9598
return rx.await.unwrap();
9699
}
97100
Err(_) => {
98-
return Err(Status::internal("could not send event to manager"));
101+
return Err(tonic::Status::internal("could not send event to manager"));
99102
}
100103
}
101104
}

scheduler/src/node/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use proto::scheduler::{Resource, Status};
2+
3+
pub mod listener;
4+
pub mod registered;
5+
6+
#[derive(Debug, Clone)]
7+
pub struct Node {
8+
pub id: String,
9+
pub status: Status,
10+
pub resource: Option<Resource>,
11+
}

0 commit comments

Comments
 (0)