Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ name = "agent_lib"
path = "lib/src/lib.rs"

[dev-dependencies]
rand = "0.8.5"
tempfile = "3.8.1"
17 changes: 11 additions & 6 deletions agent/lib/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ use log::{error, info, trace};

use crate::api::grpc_definitions::{register_response::Response, RegisterRequest};

use super::grpc_definitions::{
lambdo_api_service_client::LambdoApiServiceClient, Code, StatusMessage,
use super::{
grpc_definitions::{lambdo_api_service_client::LambdoApiServiceClient, Code, StatusMessage},
ClientTrait, SelfCreatingClientTrait,
};

pub struct Client {
client: LambdoApiServiceClient<tonic::transport::Channel>,
}

impl Client {
pub async fn new(gprc_host: IpAddr, port: u16) -> Self {
#[tonic::async_trait]
impl SelfCreatingClientTrait for Client {
async fn new(gprc_host: IpAddr, port: u16) -> Self {
info!("Connecting to gRPC server at {}:{}", gprc_host, port);

let mut counter = 0;
Expand All @@ -34,8 +36,11 @@ impl Client {

panic!("Failed to connect to gRPC server");
}
}

pub async fn register(&mut self, port: u16) -> Result<String> {
#[tonic::async_trait]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have used tokio async_trait here

impl ClientTrait for Client {
async fn register(&mut self, port: u16) -> Result<String> {
info!("Registering to lambdo..");
let register_response = self
.client
Expand All @@ -49,7 +54,7 @@ impl Client {
}
}

pub async fn status(&mut self, id: String, code: Code) -> Result<()> {
async fn status(&mut self, id: String, code: Code) -> Result<()> {
self.client
.status(StatusMessage {
id,
Expand Down
20 changes: 20 additions & 0 deletions agent/lib/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
use std::net::IpAddr;

use anyhow::Result;
use grpc_definitions::Code;

pub mod client;
#[rustfmt::skip]
pub mod grpc_definitions;
pub mod server;

/// Client trait
/// This trait is used to abstract the gRPC client
#[tonic::async_trait]
pub trait ClientTrait: Send + Sync {
async fn status(&mut self, id: String, code: Code) -> Result<()>;
async fn register(&mut self, port: u16) -> Result<String>;
}

/// SelfCreatingClientTrait trait
/// This trait is used to abstract the gRPC client creation
#[tonic::async_trait]
pub trait SelfCreatingClientTrait: ClientTrait {
async fn new(grpc_host: IpAddr, port: u16) -> Self;
}
122 changes: 114 additions & 8 deletions agent/lib/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,34 @@ use log::{debug, error, info, trace};
use tokio::sync::Mutex;
use tonic::{Request, Response, Status};

use crate::{config::AgentConfig, runner_engine};
use crate::{api::ClientTrait, config::AgentConfig, runner_engine};

use super::{
client::Client,
grpc_definitions::{
lambdo_agent_service_server::LambdoAgentService, Empty, ExecuteRequest, ExecuteResponse,
StatusMessage,
},
SelfCreatingClientTrait,
};

pub struct LambdoAgentServer {
pub config: AgentConfig,
pub client: Arc<Mutex<Client>>,
pub client: Arc<Mutex<Box<dyn ClientTrait>>>,
pub id: String,
}

impl LambdoAgentServer {
pub async fn new(config: AgentConfig) -> Self {
pub async fn new<C: ClientTrait + SelfCreatingClientTrait + 'static>(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like the idea of a SelfCreatingClientTrait.
For me, we should do that a bit differently:

  • LambdoAgentServer::new() should take a ClientTrait as a parameter
  • ClientTrait should have a config() function that would be called in LambdoAgentServer::new()

That would probably reduce the complexity by a lot IMO

config: AgentConfig,
) -> Self {
let grpc_remote_host = IpAddr::from_str(&config.grpc.remote_host).unwrap_or_else(|e| {
error!("Invalid IP address: {}", config.grpc.remote_host);
panic!("{}", e.to_string())
});
trace!("gRPC remote host: {}", grpc_remote_host);

trace!("Creating gRPC client..");
let mut client = Client::new(grpc_remote_host, config.grpc.remote_port).await;
let mut client = C::new(grpc_remote_host, config.grpc.remote_port).await;

trace!("Registering to gRPC server..");
let id = {
Expand All @@ -38,7 +40,7 @@ impl LambdoAgentServer {
match client.register(config.grpc.local_port).await {
Ok(id) => break id,
Err(e) => {
error!("Failed to register to gRPC server, {} try: {}", counter, e);
error!("Failed to rese provide us with your discord handle, after joining our servergister to gRPC server, {} try: {}", counter, e);
counter += 1;
if counter >= 10 {
panic!("Failed to register to gRPC server");
Expand All @@ -63,7 +65,7 @@ impl LambdoAgentServer {

Self {
config,
client: Arc::new(Mutex::new(client)),
client: Arc::new(Mutex::new(Box::new(client))),
id,
}
}
Expand All @@ -84,7 +86,8 @@ impl LambdoAgentService for LambdoAgentServer {
let request = request.into_inner();
debug!("Received request: {:?}", request);

let mut runner_engine = runner_engine::service::RunnerEngine::new(request);
let mut runner_engine =
runner_engine::service::RunnerEngine::new(request, &self.config.workspace_path);
let mut self_client = self.client.lock().await;

if let Err(e) = runner_engine.create_workspace() {
Expand Down Expand Up @@ -122,3 +125,106 @@ impl LambdoAgentService for LambdoAgentServer {
}
}
}

#[cfg(test)]
mod test {
use super::super::grpc_definitions::Code;
use crate::{
api::{
grpc_definitions::{
lambdo_agent_service_server::LambdoAgentService, Empty, ExecuteRequest,
ExecuteRequestStep,
},
server::LambdoAgentServer,
ClientTrait, SelfCreatingClientTrait,
},
config::{AgentConfig, GRPCConfig},
};
use anyhow::Result;
use tonic::Request;

struct MockClient;

#[tonic::async_trait]
impl ClientTrait for MockClient {
async fn register(&mut self, _local_port: u16) -> Result<String> {
Ok("test".to_string())
}

async fn status(&mut self, _id: String, _code: Code) -> Result<()> {
Ok(())
}
}

#[tonic::async_trait]
impl SelfCreatingClientTrait for MockClient {
async fn new(_grpc_host: std::net::IpAddr, _port: u16) -> Self {
MockClient
}
}

#[tokio::test]
async fn status_unimplemented() {
let config = AgentConfig {
apiVersion: "lambdo.io/v1alpha1".to_string(),
kind: "AgentConfig".to_string(),
grpc: GRPCConfig {
remote_port: 50051,
remote_host: "127.0.0.1".to_string(),
local_host: "127.0.0.1".to_string(),
local_port: 50051,
},
workspace_path: tempfile::tempdir()
.unwrap()
.into_path()
.to_str()
.unwrap()
.to_string(),
};

let server = LambdoAgentServer::new::<MockClient>(config).await;
let status = server.status(Request::new(Empty {})).await;

assert!(status.is_err());
}

#[tokio::test]
async fn execute_works() {
let config = AgentConfig {
apiVersion: "lambdo.io/v1alpha1".to_string(),
kind: "AgentConfig".to_string(),
grpc: GRPCConfig {
remote_port: 50051,
remote_host: "127.0.0.1".to_string(),
local_host: "127.0.0.1".to_string(),
local_port: 50051,
},
workspace_path: tempfile::tempdir()
.unwrap()
.into_path()
.to_str()
.unwrap()
.to_string(),
};

let server = LambdoAgentServer::new::<MockClient>(config).await;
let execute = server
.execute(Request::new(ExecuteRequest {
id: "test".to_string(),
files: vec![],
steps: vec![ExecuteRequestStep {
command: "echo -n 'This is stdout' && echo -n 'This is stderr' >&2 && exit 1"
.to_string(),
enable_output: true,
}],
}))
.await;

assert!(execute.is_ok());

let execution_recap = execute.unwrap().into_inner();

assert_eq!(execution_recap.clone().steps[0].stdout, "This is stdout");
assert_eq!(execution_recap.steps[0].stderr, "This is stderr");
}
}
7 changes: 7 additions & 0 deletions agent/lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const fn default_local_port() -> u16 {
0
}

fn default_workspace_path() -> String {
std::env::temp_dir().to_str().unwrap().to_string()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.to_str().unwrap().to_string() seems a bit ugly :/

}

#[derive(Error, Debug)]
pub enum AgentConfigError {
#[error("cannot load config file")]
Expand All @@ -37,6 +41,9 @@ pub struct AgentConfig {
/// The gRPC configuration
#[serde(default = "default_grpc")]
pub grpc: GRPCConfig,
/// The workspace where the agent will store the files of Requests and their resulting files
#[serde(default = "default_workspace_path")]
pub workspace_path: String,
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
Expand Down
Loading