From a4fb4d997cf60a22c5e79c431adc165507623dbc Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 24 May 2018 14:45:31 -0700 Subject: [PATCH] distributed FDs --- .../macrobase/pipeline/BasicBatchPipeline.java | 2 ++ .../summary/DistributedBatchSummarizer.java | 13 +++++++++++++ .../APLSummarizerDistributed.java | 4 +++- .../APrioriLinearDistributed.java | 17 ++++++++++++++++- 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/edu/stanford/futuredata/macrobase/pipeline/BasicBatchPipeline.java b/core/src/main/java/edu/stanford/futuredata/macrobase/pipeline/BasicBatchPipeline.java index 42ddf9d8e..df4afe5e4 100644 --- a/core/src/main/java/edu/stanford/futuredata/macrobase/pipeline/BasicBatchPipeline.java +++ b/core/src/main/java/edu/stanford/futuredata/macrobase/pipeline/BasicBatchPipeline.java @@ -196,6 +196,8 @@ private DistributedBatchSummarizer getDistributedSummarizer(String outlierColumn summarizer.setAttributes(attributes); summarizer.setMinSupport(minSupport); summarizer.setMinRatioMetric(minRiskRatio); + summarizer.setFDUsage(useFDs); + summarizer.setFDValues(functionalDependencies); summarizer.setNumPartitions(distributedNumPartitions); return summarizer; } diff --git a/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/DistributedBatchSummarizer.java b/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/DistributedBatchSummarizer.java index 7f5fe65eb..fd93bfbd1 100644 --- a/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/DistributedBatchSummarizer.java +++ b/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/DistributedBatchSummarizer.java @@ -24,6 +24,8 @@ public abstract class DistributedBatchSummarizer implements Operator attributes = new ArrayList<>(); protected int numThreads = Runtime.getRuntime().availableProcessors(); protected String ratioMetric = "global_ratio"; + protected boolean useFDs = false; + protected int[] functionalDependencies; /** * Adjust this to tune the significance (e.g. number of rows affected) of the results returned. @@ -64,4 +66,15 @@ public DistributedBatchSummarizer setRatioMetric(final String ratioMetric) { this.ratioMetric = ratioMetric; return this; } + + public DistributedBatchSummarizer setFDUsage(final boolean useFDs) { + this.useFDs = useFDs; + return this; + } + + public DistributedBatchSummarizer setFDValues(final int[] functionalDependencies) { + this.functionalDependencies = functionalDependencies; + return this; + } + } diff --git a/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APLSummarizerDistributed.java b/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APLSummarizerDistributed.java index 0c822a186..5e6a7f64f 100644 --- a/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APLSummarizerDistributed.java +++ b/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APLSummarizerDistributed.java @@ -169,7 +169,9 @@ private void processInternal(JavaPairRDD partitionedDataFram encoder.getOutlierList(), encoder.getColCardinalities(), qualityMetricList, - thresholds + thresholds, + useFDs, + functionalDependencies ); log.info("Number of results: {}", aplResults.size()); diff --git a/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APrioriLinearDistributed.java b/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APrioriLinearDistributed.java index 442e7dc80..3f83061ed 100644 --- a/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APrioriLinearDistributed.java +++ b/lib/src/main/java/edu/stanford/futuredata/macrobase/distributed/analysis/summary/aplinearDistributed/APrioriLinearDistributed.java @@ -33,7 +33,9 @@ public static List explain( ArrayList[] outlierList, int[] colCardinalities, List argQualityMetrics, - List argThresholds + List argThresholds, + boolean useFDs, + int[] functionalDependencies ) { Logger log = LoggerFactory.getLogger("APLSummarizerDistributed"); @@ -189,6 +191,10 @@ public static List explain( for (int colNumOne = 0; colNumOne < numColumns; colNumOne++) { int[] curColumnOneAttributes = attributesForThread[colNumOne]; for (int colNumTwo = colNumOne + 1; colNumTwo < numColumns; colNumTwo++) { + //if FDs are enabled, and these two attribute cols are FDs, skip + if (useFDs && ((functionalDependencies[colNumOne] & (1< explain( for (int colNumOne = 0; colNumOne < numColumns; colNumOne++) { int[] curColumnOneAttributes = attributesForThread[colNumOne % numColumns]; for (int colNumTwo = colNumOne + 1; colNumTwo < numColumns; colNumTwo++) { + //if FD on and attributes 1 and 2 are FDs, skip + if (useFDs && ((functionalDependencies[colNumOne] & (1<