Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs): rsync client logs from new mnt location #314

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/ansible/provisioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ impl AnsibleProvisioner {
.get_inventory(AnsibleInventoryType::FullConeNatGateway, false)
}

pub fn get_uploader_inventory(&self) -> Result<Vec<VirtualMachine>> {
self.ansible_runner
.get_inventory(AnsibleInventoryType::Uploaders, false)
}

pub fn get_node_registries(
&self,
inventory_type: &AnsibleInventoryType,
Expand Down
97 changes: 28 additions & 69 deletions src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,30 @@ impl TestnetDeployer {
// take root_dir at the top as `get_all_node_inventory` changes the working dir.
let root_dir = std::env::current_dir()?;

let mut uploader_inventory = vec![];
let mut all_inventory = vec![];
if !disable_client_logs {
let unfiltered_uploader_vms = self.ansible_provisioner.get_current_uploader_count()?;
let uploader_vm = if let Some(filter) = &vm_filter {
unfiltered_uploader_vms
.into_iter()
.filter(|(vm, _)| vm.name.contains(filter))
.collect()
} else {
unfiltered_uploader_vms
};
uploader_inventory.extend(uploader_vm);
all_inventory.extend(self.get_uploader_inventory(name)?);
}

let unfiltered_node_vms = self.get_all_node_inventory(name)?;
let all_node_inventory = if let Some(filter) = vm_filter {
unfiltered_node_vms
all_inventory.extend(self.get_all_node_inventory(name)?);

let all_inventory = if let Some(filter) = vm_filter {
all_inventory
.into_iter()
.filter(|vm| vm.name.contains(&filter))
.collect()
} else {
unfiltered_node_vms
all_inventory
};

create_initial_log_dir_setup_client(&root_dir, name, &uploader_inventory)?;
let log_base_dir = create_initial_log_dir_setup_node(&root_dir, name, &all_node_inventory)?;
let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_inventory)?;

// We might use the script, so goto the resource dir.
std::env::set_current_dir(self.working_directory_path.clone())?;
println!("Starting to rsync the log files");
let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
let progress_bar = get_progress_bar(all_inventory.len() as u64)?;

let mut rsync_args = all_node_inventory
let rsync_args = all_inventory
.iter()
.map(|vm| {
let args = if vm.name.contains("symmetric") {
Expand All @@ -89,6 +80,11 @@ impl TestnetDeployer {
debug!("Using symmetric rsync args for {:?}", vm.name);
debug!("Args for {}: {:?}", vm.name, args);
args
} else if vm.name.contains("uploader") {
let args = self.construct_uploader_args(vm, &log_base_dir);
debug!("Using uploader rsync args for {:?} ", vm.name);
debug!("Args for {}: {:?}", vm.name, args);
args
} else {
let args = self.construct_public_node_args(vm, &log_base_dir);
debug!("Using public rsync args for {:?}", vm.name);
Expand All @@ -100,21 +96,6 @@ impl TestnetDeployer {
})
.collect::<Result<Vec<_>>>()?;

rsync_args.extend(uploader_inventory.iter().flat_map(|(vm, uploader_count)| {
let mut all_user_args = vec![];
for ant_user in 1..=*uploader_count {
let vm = vm.clone();
let args = self.construct_uploader_args(&vm, ant_user, &log_base_dir);
debug!(
"Using uploader rsync args for {:?} and user {ant_user}",
vm.name
);
debug!("Args for {} and user {ant_user}: {:?}", vm.name, args);
all_user_args.push((vm, args));
}
all_user_args
}));

let failed_inventory = rsync_args
.par_iter()
.filter_map(|(vm, args)| {
Expand Down Expand Up @@ -155,13 +136,8 @@ impl TestnetDeployer {
Ok(())
}

fn construct_uploader_args(
&self,
vm: &VirtualMachine,
ant_user: usize,
log_base_dir: &Path,
) -> Vec<String> {
let vm_path = log_base_dir.join(format!("{}_ant{ant_user}", vm.name));
fn construct_uploader_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
let vm_path = log_base_dir.join(&vm.name);
let mut rsync_args = DEFAULT_RSYNC_ARGS
.iter()
.map(|str| str.to_string())
Expand All @@ -178,10 +154,7 @@ impl TestnetDeployer {
.to_string_lossy()
.as_ref()
),
format!(
"root@{}:/home/ant{ant_user}/.local/share/autonomi/client/logs/",
vm.public_ip_addr
),
format!("root@{}:/mnt/client-logs/log/", vm.public_ip_addr),
vm_path.to_string_lossy().to_string(),
]);

Expand Down Expand Up @@ -319,7 +292,7 @@ impl TestnetDeployer {
// take root_dir at the top as `get_all_node_inventory` changes the working dir.
let root_dir = std::env::current_dir()?;
let all_node_inventory = self.get_all_node_inventory(name)?;
let log_abs_dest = create_initial_log_dir_setup_node(&root_dir, name, &all_node_inventory)?;
let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;

let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
println!("Running ripgrep with command: {rg_cmd}");
Expand Down Expand Up @@ -450,6 +423,14 @@ impl TestnetDeployer {
}
self.ansible_provisioner.get_all_node_inventory()
}

fn get_uploader_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
let environments = self.terraform_runner.workspace_list()?;
if !environments.contains(&name.to_string()) {
return Err(Error::EnvironmentDoesNotExist(name.to_string()));
}
self.ansible_provisioner.get_uploader_inventory()
}
}

pub async fn get_logs(name: &str) -> Result<()> {
Expand Down Expand Up @@ -563,7 +544,7 @@ fn visit_dirs(
}

// Create the log dirs for all the machines. Returns the absolute path to the `logs/name`
fn create_initial_log_dir_setup_node(
fn create_initial_log_dir_setup(
root_dir: &Path,
name: &str,
all_node_inventory: &[VirtualMachine],
Expand All @@ -581,25 +562,3 @@ fn create_initial_log_dir_setup_node(
});
Ok(log_abs_dest)
}

// Create the log dirs for all the machines.
fn create_initial_log_dir_setup_client(
root_dir: &Path,
name: &str,
all_node_inventory: &[(VirtualMachine, usize)],
) -> Result<()> {
let log_dest = root_dir.join("logs").join(name);
if !log_dest.exists() {
std::fs::create_dir_all(&log_dest)?;
}
// Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
let log_abs_dest = std::fs::canonicalize(log_dest)?;
// Create a log dir per VM
all_node_inventory.par_iter().for_each(|(vm, users)| {
for ant_user in 1..=*users {
let vm_path = log_abs_dest.join(format!("{}_ant{ant_user}", vm.name));
let _ = std::fs::create_dir_all(vm_path);
}
});
Ok(())
}
Loading