Skip to content

Commit

Permalink
[casr-cluster] Add update cluster poc
Browse files Browse the repository at this point in the history
  • Loading branch information
hkctkuy committed Dec 7, 2023
1 parent 3536dd1 commit d55a4b4
Show file tree
Hide file tree
Showing 4 changed files with 599 additions and 60 deletions.
227 changes: 177 additions & 50 deletions casr/src/bin/casr-cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use libcasr::{init_ignored_frames, stacktrace::*};

use anyhow::{bail, Context, Result};
use clap::{builder::FalseyValueParser, Arg, ArgAction};
use rayon::iter::{IndexedParallelIterator, ParallelIterator};
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};

use std::collections::HashSet;
use std::fs;
Expand Down Expand Up @@ -39,6 +38,8 @@ fn stacktrace(path: &Path) -> Result<Stacktrace> {
///
/// * `dedup` - deduplicate casrep by crashline for each cluster, if true
///
/// * `offset` - cluster enumerate offset
///
/// # Return value
///
/// * Number of clusters
Expand All @@ -49,60 +50,18 @@ fn make_clusters(
outpath: Option<&Path>,
jobs: usize,
dedup: bool,
offset: usize,
) -> Result<(usize, usize, usize)> {
// if outpath is "None" we consider that outpath and inpath are the same
let outpath = outpath.unwrap_or(inpath);
let dir = fs::read_dir(inpath).with_context(|| format!("File: {}", inpath.display()))?;

let casreps: Vec<PathBuf> = dir
.map(|path| path.unwrap().path())
.filter(|s| s.extension().is_some() && s.extension().unwrap() == "casrep")
.collect();
let casreps = util::get_reports(inpath)?;
let len = casreps.len();
if len < 2 {
bail!("{} reports, nothing to cluster...", len);
}

// Start thread pool.
let custom_pool = rayon::ThreadPoolBuilder::new()
.num_threads(jobs.min(len))
.build()
.unwrap();

// Report info from casreps: (casrep, (trace, crashline))
let mut casrep_info: RwLock<Vec<(PathBuf, (Stacktrace, String))>> = RwLock::new(Vec::new());
// Casreps with stacktraces, that we cannot parse
let mut badreports: RwLock<Vec<PathBuf>> = RwLock::new(Vec::new());
custom_pool.install(|| {
(0..len).into_par_iter().for_each(|i| {
if let Ok(report) = util::report_from_file(casreps[i].as_path()) {
if let Ok(trace) = report.filtered_stacktrace() {
casrep_info
.write()
.unwrap()
.push((casreps[i].clone(), (trace, report.crashline)));
} else {
badreports.write().unwrap().push(casreps[i].clone());
}
} else {
badreports.write().unwrap().push(casreps[i].clone());
}
})
});
let casrep_info = casrep_info.get_mut().unwrap();
let badreports = badreports.get_mut().unwrap();

// Sort by casrep filename
casrep_info.sort_by(|a, b| {
a.0.file_name()
.unwrap()
.to_str()
.unwrap()
.cmp(b.0.file_name().unwrap().to_str().unwrap())
});

let (casreps, (stacktraces, crashlines)): (Vec<_>, (Vec<_>, Vec<_>)) =
casrep_info.iter().cloned().unzip();
// Get casreps with stacktraces and crashlines
let (casreps, stacktraces, crashlines, badreports) = util::reports_from_dirs(casreps, jobs);

if !badreports.is_empty() {
fs::create_dir_all(format!("{}/clerr", &outpath.display()))?;
Expand All @@ -128,7 +87,7 @@ fn make_clusters(
// Cluster formation
let cluster_cnt: usize = *clusters.iter().max().unwrap();
for i in 1..=cluster_cnt {
fs::create_dir_all(format!("{}/cl{}", &outpath.display(), i))?;
fs::create_dir_all(format!("{}/cl{}", &outpath.display(), i + offset))?;
}

// Init before and after dedup counters
Expand All @@ -150,7 +109,7 @@ fn make_clusters(
format!(
"{}/cl{}/{}",
&outpath.display(),
&clusters[i],
clusters[i] + offset,
&casreps[i].file_name().unwrap().to_str().unwrap()
),
)?;
Expand Down Expand Up @@ -343,6 +302,157 @@ fn merge_dirs(input: &Path, output: &Path) -> Result<u64> {
Ok(new)
}

/// Perform the clustering of casreps
///
/// # Arguments
///
/// * `newpath` - path to directory with new CASR reports
///
/// * `oldpath` - target directory for exiting clusters
///
/// * `jobs` - number of jobs for cluster updating process
///
/// * `dedup` - deduplicate casrep by crashline for each cluster, if true
///
fn update_clusters(newpath: &Path, oldpath: &Path, jobs: usize, dedup: bool) -> Result<()> {
// Get new casreps
let casreps = util::get_reports(newpath)?;
let (casreps, stacktraces, crashlines, _) = util::reports_from_dirs(casreps, jobs);
let casreps = casreps
.iter()
.zip(stacktraces.iter().zip(crashlines.iter()));

// Get casreps from existing clusters
let cluster_dirs: Vec<PathBuf> = fs::read_dir(oldpath)
.unwrap()
.map(|path| path.unwrap().path())
.filter(|path| {
path.clone()
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with("cl")
})
.collect();
let len = cluster_dirs.len();
// Init clusters vector
let mut clusters: Vec<Cluster> = Vec::new();
// Init dedup crashline list for each cluster
let mut unique_crashlines: Vec<HashSet<String>> = vec![HashSet::new(); len];
// Get casreps from each existing cluster
for cluster in &cluster_dirs {
// Get cluster number
let i = cluster.clone().file_name().unwrap().to_str().unwrap()[2..]
.to_string()
.parse::<usize>()
.unwrap();
// Get casreps from cluster
let casreps = util::get_reports(cluster)?;
let (_, stacktraces, crashlines, _) = util::reports_from_dirs(casreps, jobs);
// Fill cluster info structures
let diam = diam(&stacktraces);
clusters.push(Cluster {
number: i,
stacktraces,
diam,
});
if dedup {
for crashline in crashlines {
// Note: Clusters enumerate from 1, not 0
unique_crashlines[i - 1].insert(crashline);
}
}
}

// Init list of casreps, which aren't suitable for any cluster
let mut deviants = Vec::<&PathBuf>::new();

// Try to insert each new casrep
for (casrep, (stacktrace, crashline)) in casreps {
// list of "inner" clusters for casrep
let mut inners: Vec<(usize, f64)> = Vec::new();
// list of "outer" clusters for casrep
let mut outers: Vec<(usize, f64)> = Vec::new();
// Checker if casrep is duplicate of someone else
let mut dup = false;
for cluster in &clusters {
// TODO: Add strategy options
let relation = relation(
stacktrace,
cluster,
AccumStrategy::Dist,
AccumStrategy::Dist,
);
match relation {
Relation::Dup => {
dup = true;
break;
}
Relation::Inner(measure) => {
inners.push((cluster.number, measure));
}
Relation::Outer(measure) => {
outers.push((cluster.number, measure));
}
Relation::Oot => {
continue;
}
}
}
if dup {
continue;
}
// Get cluster with min measure
let number = if !inners.is_empty() {
inners.iter().min_by(|a, b| a.1.total_cmp(&b.1)).unwrap().0
} else if !outers.is_empty() {
outers.iter().min_by(|a, b| a.1.total_cmp(&b.1)).unwrap().0
} else {
// Out of threshold
deviants.push(casrep);
continue;
};

// TODO: Check crashline
// Save casrep
fs::copy(
casrep,
format!(
"{}/{}",
&cluster_dirs[number - 1].display(),
&casrep.file_name().unwrap().to_str().unwrap()
),
)?;

// Update cluster
let i = clusters.iter().position(|a| a.number == number).unwrap();
clusters[i].stacktraces.push(stacktrace.to_vec());
clusters[i].diam = diam(&clusters[i].stacktraces);
}

// Handle deviant casreps
if !deviants.is_empty() {
// Copy casrep to tmp dir
let deviant_dir = format!("{}/deviant", &oldpath.display());
fs::create_dir_all(&deviant_dir)?;
for casrep in deviants {
fs::copy(
casrep,
format!(
"{}/{}",
&deviant_dir,
&casrep.file_name().unwrap().to_str().unwrap()
),
)?;
}
// Cluster deviant casreps
let (result, before, after) =
make_clusters(Path::new(&deviant_dir), Some(oldpath), jobs, dedup, len)?;
}
Ok(())
}

fn main() -> Result<()> {
let matches = clap::Command::new("casr-cluster")
.version(clap::crate_version!())
Expand Down Expand Up @@ -408,6 +518,18 @@ fn main() -> Result<()> {
INPUT_DIR will be added to OUTPUT_DIR.",
),
)
.arg(
Arg::new("update")
.short('u')
.long("update")
.action(ArgAction::Set)
.num_args(2)
.value_parser(clap::value_parser!(PathBuf))
.value_names(["NEW_DIR", "OLD_DIR"])
.help(
"Update clusters from OLD_DIR using CASR reports from NEW_DIR.",
),
)
.arg(
Arg::new("ignore")
.long("ignore")
Expand Down Expand Up @@ -458,6 +580,7 @@ fn main() -> Result<()> {
paths.get(1).map(|x| x.as_path()),
jobs,
dedup_crashlines,
0,
)?;
println!("Number of clusters: {result}");
// Print crashline dedup summary
Expand All @@ -481,6 +604,10 @@ fn main() -> Result<()> {
new,
paths[1].display()
);
} else if matches.contains_id("update") {
let paths: Vec<&PathBuf> = matches.get_many::<PathBuf>("update").unwrap().collect();

update_clusters(paths[0], paths[1], jobs, dedup_crashlines)?;
}

Ok(())
Expand Down
Loading

0 comments on commit d55a4b4

Please sign in to comment.