Skip to content

Commit

Permalink
Merge pull request #227 from stanford-futuredata/movers
Browse files Browse the repository at this point in the history
Adding predicate (movers) query support
  • Loading branch information
sahaana authored Dec 8, 2017
2 parents 42c75e8 + aac1217 commit 8c02554
Show file tree
Hide file tree
Showing 17 changed files with 512 additions and 79 deletions.
17 changes: 17 additions & 0 deletions core/demo/cube_predicate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"pipeline": "CubePipeline",
"inputURI": "csv://core/demo/sample_cubed.csv",
"classifier": "predicate",
"countColumn": "count",

"metric": "release",
"predicate": ">=",
"cutoff": "12-11-17",

"attributes": [
"location",
"version"
],
"minSupport": 0.2,
"minRatioMetric": 2.0
}
15 changes: 15 additions & 0 deletions core/demo/double_predicate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"pipeline": "BasicBatchPipeline",
"inputURI": "csv://core/demo/tiny_predicate.csv",
"classifier": "predicate",
"metric": "power",
"predicate": ">=",
"cutoff": 80.0,
"summarizer": "apriori",
"attributes": [
"location",
"version"
],
"minRatioMetric": 2.0,
"minSupport": 0.2
}
20 changes: 10 additions & 10 deletions core/demo/sample_cubed.csv
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
location,version,count,mean,std
AUS,v3,150,35.7415333333,4.61047330283
AUS,v4,50,34.0068,4.93254640534
CAN,v1,50,34.0772,5.24978796073
CAN,v2,150,34.9576666667,4.62355719076
CAN,v3,20,8.26286,5.42913454695
RUS,v4,200,35.69215,5.16468641899
UK,v2,100,35.6926,4.55136401355
UK,v3,100,34.4426,5.62178062665
USA,v1,200,34.49175,5.11885885892
release,location,version,count,mean,std
12-10-17,AUS,v3,150,35.7415333333,4.61047330283
12-20-17,AUS,v4,50,34.0068,4.93254640534
12-10-17,CAN,v1,50,34.0772,5.24978796073
12-10-17,CAN,v2,150,34.9576666667,4.62355719076
12-10-17,CAN,v3,20,8.26286,5.42913454695
12-20-17,RUS,v4,200,35.69215,5.16468641899
12-10-17,UK,v2,100,35.6926,4.55136401355
12-10-17,UK,v3,100,34.4426,5.62178062665
12-10-17,USA,v1,200,34.49175,5.11885885892
15 changes: 15 additions & 0 deletions core/demo/string_predicate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"pipeline": "BasicBatchPipeline",
"inputURI": "csv://core/demo/tiny_predicate.csv",
"classifier": "predicate",
"metric": "release",
"predicate": ">=",
"cutoff": "10-19-17",
"summarizer": "apriori",
"attributes": [
"location",
"version"
],
"minRatioMetric": 2.0,
"minSupport": 0.2
}
12 changes: 12 additions & 0 deletions core/demo/tiny_predicate.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
power,version,location,release
99,8,CAN,10-29-17
32,7,USA,10-12-17
42,6,USA,10-04-17
13,7,USA,10-09-17
89,8,CAN,10-15-17
43,6,CAN,10-21-17
54,7,USA,10-02-17
34,6,USA,10-05-17
98,8,CAN,10-19-17
65,6,USA,10-10-17

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import edu.stanford.futuredata.macrobase.analysis.classify.Classifier;
import edu.stanford.futuredata.macrobase.analysis.classify.PercentileClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.PredicateClassifier;
import edu.stanford.futuredata.macrobase.analysis.summary.Explanation;
import edu.stanford.futuredata.macrobase.analysis.summary.apriori.APrioriSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.BatchSummarizer;
Expand Down Expand Up @@ -29,8 +30,11 @@ public class BasicBatchPipeline implements Pipeline {
private String classifierType;
private String metric;
private double cutoff;
private String strCutoff;
private boolean isStrPredicate;
private boolean pctileHigh;
private boolean pctileLow;
private String predicateStr;

private String summarizerType;
private List<String> attributes;
Expand All @@ -44,9 +48,23 @@ public BasicBatchPipeline (PipelineConfig conf) {

classifierType = conf.get("classifier", "percentile");
metric = conf.get("metric");
cutoff = conf.get("cutoff", 1.0);
pctileHigh = conf.get("includeHi", true);

if (classifierType.equals("predicate")) {
Object rawCutoff = conf.get("cutoff");
isStrPredicate = rawCutoff instanceof String;
if (isStrPredicate) {
strCutoff = (String) rawCutoff;
} else {
cutoff = (double) rawCutoff;
}
} else {
isStrPredicate = false;
cutoff = conf.get("cutoff", 1.0);
}

pctileHigh = conf.get("includeHi",true);
pctileLow = conf.get("includeLo", true);
predicateStr = conf.get("predicate", "==").trim();

summarizerType = conf.get("summarizer", "apriori");
attributes = conf.get("attributes");
Expand All @@ -64,6 +82,14 @@ public Classifier getClassifier() throws MacrobaseException {
classifier.setIncludeLow(pctileLow);
return classifier;
}
case "predicate": {
if (isStrPredicate){
PredicateClassifier classifier = new PredicateClassifier(metric, predicateStr, strCutoff);
return classifier;
}
PredicateClassifier classifier = new PredicateClassifier(metric, predicateStr, cutoff);
return classifier;
}
default : {
throw new MacrobaseException("Bad Classifier Type");
}
Expand Down Expand Up @@ -112,7 +138,12 @@ public BatchSummarizer getSummarizer(String outlierColumnName) throws MacrobaseE

public DataFrame loadData() throws Exception {
Map<String, Schema.ColType> colTypes = new HashMap<>();
colTypes.put(metric, Schema.ColType.DOUBLE);
if (isStrPredicate) {
colTypes.put(metric, Schema.ColType.STRING);
}
else{
colTypes.put(metric, Schema.ColType.DOUBLE);
}
return PipelineUtils.loadDataFrame(inputURI, colTypes);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package edu.stanford.futuredata.macrobase.pipeline;

import edu.stanford.futuredata.macrobase.analysis.classify.ArithmeticClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.CubeClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.QuantileClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.RawClassifier;
import edu.stanford.futuredata.macrobase.analysis.summary.Explanation;
import edu.stanford.futuredata.macrobase.analysis.classify.*;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLExplanation;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLMeanSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLOutlierSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.apriori.APrioriSummarizer;
import edu.stanford.futuredata.macrobase.datamodel.DataFrame;
import edu.stanford.futuredata.macrobase.datamodel.Schema;
import edu.stanford.futuredata.macrobase.ingest.CSVDataFrameWriter;
Expand All @@ -35,15 +30,23 @@ public class CubePipeline implements Pipeline {
private Map<String, Object> jsonBody;
private boolean usePost;

// Classifiers
private String classifierType;
private String countColumn;
private String meanColumn;
private String stdColumn;
private LinkedHashMap<String, Double> quantileColumns;
private double cutoff;
private String strCutoff;
private boolean isStrPredicate;

private String predicateStr;
private String metric;

private boolean includeHi;
private boolean includeLo;
private String meanColumn;
private String stdColumn;
private LinkedHashMap<String, Double> quantileColumns;

// Explanation
private List<String> attributes;
private double minSupport;
private double minRatioMetric;
Expand All @@ -58,12 +61,28 @@ public CubePipeline(PipelineConfig conf) {

classifierType = conf.get("classifier", "arithmetic");
countColumn = conf.get("countColumn", "count");

if (classifierType.equals("predicate")) {
Object rawCutoff = conf.get("cutoff");
isStrPredicate = rawCutoff instanceof String;
if (isStrPredicate) {
strCutoff = (String) rawCutoff;
} else {
cutoff = (double) rawCutoff;
}
} else {
isStrPredicate = false;
cutoff = conf.get("cutoff", 1.0);
}

predicateStr = conf.get("predicate", "==").trim();
metric = conf.get("metric", null);

includeHi = conf.get("includeHi", true);
includeLo = conf.get("includeLo", true);
meanColumn = conf.get("meanColumn", "mean");
stdColumn = conf.get("stdColumn", "std");
quantileColumns = conf.get("quantileColumns", new LinkedHashMap<String, Double>());
cutoff = conf.get("cutoff", 1.0);
includeHi = conf.get("includeHi", true);
includeLo = conf.get("includeLo", true);

attributes = conf.get("attributes");
minSupport = conf.get("minSupport", 3.0);
Expand Down Expand Up @@ -92,10 +111,6 @@ public APLExplanation results() throws Exception {
classifier.process(df);
elapsed = System.currentTimeMillis() - startTime;
log.info("Classification time: {}", elapsed);
log.info("Outlier cutoffs: {} {}",
classifier.getLowCutoff(),
classifier.getHighCutoff()
);
df = classifier.getResults();
if (debugDump) {
CSVDataFrameWriter writer = new CSVDataFrameWriter();
Expand All @@ -115,23 +130,29 @@ public APLExplanation results() throws Exception {

private Map<String, Schema.ColType> getColTypes() throws MacrobaseException {
Map<String, Schema.ColType> colTypes = new HashMap<>();
colTypes.put(countColumn, Schema.ColType.DOUBLE);
switch (classifierType) {
case "meanshift":
case "arithmetic": {
colTypes.put(countColumn, Schema.ColType.DOUBLE);
colTypes.put(meanColumn, Schema.ColType.DOUBLE);
colTypes.put(stdColumn, Schema.ColType.DOUBLE);
return colTypes;
}
case "quantile": {
colTypes.put(countColumn, Schema.ColType.DOUBLE);
for (String col : quantileColumns.keySet()) {
colTypes.put(col, Schema.ColType.DOUBLE);
}
return colTypes;
}
case "predicate": {
if (isStrPredicate) {
colTypes.put(metric, Schema.ColType.STRING);
} else {
colTypes.put(metric, Schema.ColType.DOUBLE);
}
return colTypes;
}
case "raw": {
colTypes.put(countColumn, Schema.ColType.DOUBLE);
colTypes.put(meanColumn, Schema.ColType.DOUBLE);
}
default:
Expand All @@ -157,6 +178,15 @@ private CubeClassifier getClassifier() throws MacrobaseException {
classifier.setIncludeLow(includeLo);
return classifier;
}
case "predicate": {
if (isStrPredicate){
PredicateCubeClassifier classifier = new PredicateCubeClassifier(countColumn, metric, predicateStr, strCutoff);
return classifier;
}
PredicateCubeClassifier classifier = new PredicateCubeClassifier(countColumn, metric, predicateStr, cutoff);
return classifier;
}

case "meanshift":
case "raw": {
return new RawClassifier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ public void testDemoQuery() throws Exception {
Explanation e = p.results();
assertEquals(3.0, e.numTotal(), 1e-10);
}

}
}
15 changes: 15 additions & 0 deletions core/src/test/resources/tiny_predicate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pipeline: "BasicBatchPipeline"

inputURI: "csv://src/test/resources/tiny.csv"

classifier: "predicate"
metric: "usage"
predicate: "=="
cutoff: 2.0

summarizer: "apriori"
attributes:
- "location"
- "version"
minRatioMetric: 10.0
minSupport: 0.2
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* Returns a new dataframe with a column representation of the estimated number of
* outliers in each group.
*/
public abstract class CubeClassifier implements Transformer, ThresholdClassifier {
public abstract class CubeClassifier implements Transformer {
protected String countColumnName = "count";
protected String outputColumnName = "_OUTLIER";

Expand Down
Loading

0 comments on commit 8c02554

Please sign in to comment.