Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
fabuzaid21 committed Jan 24, 2018
1 parent 57c461d commit 5370e94
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ protected void applyFunction(final double[] inputCol, final double[] outputCol)
}
}


/**
* An MBFunction that finds the percentile for each individual value in a given column. For example,
* for a column with values [0.1, 0.3, 0.2, 0.5, 0.4], applying the PercentileFunction would
Expand All @@ -148,7 +147,7 @@ public PercentileFucntion(final String arg) {
protected void applyFunction(final double[] inputCol, final double[] outputCol) {
// sort the column, and, for each value in the column, store the *min* position in the sorted array
final double[] sortedInputCol = Arrays.stream(inputCol).sorted().toArray();
Map<Double, Integer> map = new HashMap<>();
final Map<Double, Integer> map = new HashMap<>();
for (int i = sortedInputCol.length - 1; i >= 0; --i) {
// increment by one so that the max value has 100th percentile
map.put(sortedInputCol[i], i + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public void prettyPrint(final PrintStream out, final int maxNumToPrint) {
for (Row r : getRows(0, numToPrint)) {
r.prettyPrint(out, maxColNameLength);
}
out.println();
out.println("...");
out.println();
for (Row r : getRows(numRows - numToPrint, numRows)) {
r.prettyPrint(out, maxColNameLength);
}
Expand All @@ -176,17 +178,17 @@ public void prettyPrint(final PrintStream out, final int maxNumToPrint) {

/**
* {@link #prettyPrint(PrintStream, int)} with default <tt>out</tt> set to <tt>System.out</tt>
* and <tt>maxNumToPrint</tt> set to 25
* and <tt>maxNumToPrint</tt> set to 15
*/
public void prettyPrint() {
prettyPrint(System.out, 25);
prettyPrint(System.out, 15);
}

/**
* {@link #prettyPrint(PrintStream, int)} with default <tt>maxNumToPrint</tt> set to 25
* {@link #prettyPrint(PrintStream, int)} with default <tt>maxNumToPrint</tt> set to 15
*/
public void prettyPrint(final PrintStream out) {
prettyPrint(out, 25);
prettyPrint(out, 15);
}

/**
Expand Down
11 changes: 11 additions & 0 deletions sql/load.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
IMPORT FROM CSV FILE 'core/demo/sample.csv' INTO sample(usage double, latency
double, location string, version string);

IMPORT FROM CSV FILE 'core/demo/mobile_data.csv' INTO mobile_data(record_id
string, user_id string, state string, hw_make string, hw_model string,
firmware_version string, app_version string, avg_temp double, battery_drain
double, trip_time double);

IMPORT FROM CSV FILE 'wikiticker.csv' INTO wiki(time string, user string, page
string, channel string, namespace string, comment string, metroCode string,
cityName string, regionName string, regionIsoCode string, countryName string,
countryIsoCode string, isAnonymous string, isMinor string, isNew string,
isRobot string, isUnpatrolled string, delta double, added double, deleted
double);
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import edu.stanford.futuredata.macrobase.sql.tree.FunctionCall;
import edu.stanford.futuredata.macrobase.sql.tree.Identifier;
import edu.stanford.futuredata.macrobase.sql.tree.ImportCsv;
import edu.stanford.futuredata.macrobase.sql.tree.IsNotNullPredicate;
import edu.stanford.futuredata.macrobase.sql.tree.Literal;
import edu.stanford.futuredata.macrobase.sql.tree.LogicalBinaryExpression;
import edu.stanford.futuredata.macrobase.sql.tree.LogicalBinaryExpression.Type;
Expand All @@ -32,7 +31,6 @@
import edu.stanford.futuredata.macrobase.sql.tree.QueryBody;
import edu.stanford.futuredata.macrobase.sql.tree.QuerySpecification;
import edu.stanford.futuredata.macrobase.sql.tree.Relation;
import edu.stanford.futuredata.macrobase.sql.tree.Select;
import edu.stanford.futuredata.macrobase.sql.tree.SelectItem;
import edu.stanford.futuredata.macrobase.sql.tree.SingleColumn;
import edu.stanford.futuredata.macrobase.sql.tree.SortItem;
Expand Down Expand Up @@ -234,7 +232,7 @@ private List<SingleColumn> removeUDFsInSelect(List<SelectItem> selectItems) {
final SingleColumn col = (SingleColumn) item;
if (col.getExpression() instanceof FunctionCall) {
functionCalls.add(col);
it.remove();
it.remove();
}
}
}
Expand Down Expand Up @@ -280,8 +278,21 @@ private DataFrame evaluateSQLClauses(final QueryBody query, final DataFrame df)
// clauses, and then a second with just the original projections. That should be correct
// and give us better performance.

DataFrame resultDf = evaluateSelectClause(df, query.getSelect());
resultDf = evaluateWhereClause(resultDf, query.getWhere());
final List<SelectItem> selectWithoutUdfs = Lists
.newArrayList(query.getSelect().getSelectItems());
final List<SingleColumn> udfCols = removeUDFsInSelect(selectWithoutUdfs);
// selectWithoutUdfs has now been modified so that it no longer has UDFs

// create shallow copy, so modifications don't persist on the original DataFrame
DataFrame resultDf = df.copy();
final Map<String, double[]> newColumns = evaluateUDFs(resultDf, udfCols);

resultDf = evaluateWhereClause(df, query.getWhere());
resultDf = evaluateSelectClause(resultDf, selectWithoutUdfs);
for (Map.Entry newColumn : newColumns.entrySet()) {
// add UDF columns to result
resultDf.addColumn((String) newColumn.getKey(), (double[]) newColumn.getValue());
}
resultDf = evaluateOrderByClause(resultDf, query.getOrderBy());
return evaluateLimitClause(resultDf, query.getLimit());
}
Expand Down Expand Up @@ -330,52 +341,40 @@ private DataFrame getTable(String tableName) throws MacrobaseSQLException {
}

/**
* Evaluate Select clause of SQL query. If the clause is 'SELECT *' the same DataFrame is
* returned unchanged. UDFs in the Select clause are also evaluated here. TODO: add support for
* DISTINCT queries
* Evaluate only the UDFs of SQL query and return a Map of column names -> double arrays.
* If there are no UDFs (i.e. @param udfCols is empty), the Map is empty.
*
* @param inputDf The DataFrame to apply the Select clause on
* @param select The Select clause to evaluate
* @return A new DataFrame with the result of the Select clause applied
* @param inputDf The DataFrame to evaluate the UDFs on
* @param udfCols The List of UDFs to evaluate
* @return The Map of new columns to be added
*/
private DataFrame evaluateSelectClause(final DataFrame inputDf, final Select select)
private Map<String, double[]> evaluateUDFs(final DataFrame inputDf, final List<SingleColumn> udfCols)
throws MacrobaseException {
List<SelectItem> items = Lists.newArrayList(select.getSelectItems());
List<SingleColumn> udfCols = removeUDFsInSelect(
items);
// items has now been modified so that it no longer has UDFs
final DataFrame dfWithNoUdfs = evaluateSelectWithNoUDFs(inputDf, items);
// create shallow copy, so modifications don't
final DataFrame resultDf = dfWithNoUdfs.copy();
final Map<String, double[]> newColumns = new HashMap<>();
for (SingleColumn udfCol : udfCols) {
final FunctionCall func = (FunctionCall) udfCol.getExpression();
// for now, if UDFs is a.b.c.d(), ignore "a.b.c."
// for now, if UDF is a.b.c.d(), ignore "a.b.c."
final String funcName = func.getName().getSuffix();
// for now, assume func.getArguments returns at least 1 argument, always grab the first
final MBFunction mbFunction = MBFunction.getFunction(funcName,
func.getArguments().stream().map(Expression::toString).findFirst().get());

// column name for the UDF is either 1) the user-provided alias, or 2) the function name
// and arguments concatenated by "_"
final String colName = udfCol.getAlias().map(Identifier::toString).orElse(
funcName + "_" + Joiner.on("_")
.join(
func.getArguments().stream().map(Expression::toString).collect(toList())));
// modify resultDf in place, add column; mbFunction is evaluated on input DataFrame
resultDf.addColumn(colName, mbFunction.apply(inputDf));
newColumns.put(udfCol.toString(), mbFunction.apply(inputDf));
}
return resultDf;
return newColumns;
}

/**
* Evaluate Select clause of SQL query, but only once all UDFs from the clause have been
* removed. If the clause is 'SELECT *' the same DataFrame is returned unchanged.
* removed. If the clause is 'SELECT *' the same DataFrame is returned unchanged. TODO: add
* support for DISTINCT queries
*
* @param df The DataFrame to apply the Select clause on
* @param items The list of individual columns included in the Select clause
* @return A new DataFrame with the result of the Select clause applied
*/
private DataFrame evaluateSelectWithNoUDFs(DataFrame df, List<SelectItem> items) {
private DataFrame evaluateSelectClause(DataFrame df, List<SelectItem> items) {
if (items.size() == 1 && items.get(0) instanceof AllColumns) {
// SELECT * -> relation is unchanged
return df;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import static java.util.Objects.requireNonNull;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collector;

public class SingleColumn extends SelectItem {

Expand Down Expand Up @@ -79,11 +81,25 @@ public int hashCode() {

@Override
public String toString() {
if (alias.isPresent()) {
return expression.toString() + " " + alias.get();
// column name for the UDF is either 1) the user-provided alias, or 2) the function name
// and arguments concatenated by "_"
return alias.map(Identifier::toString).orElseGet(() -> formatForCol(expression));
}

/**
* @return If the Expression is a Function Call (e.g., a UDF), concatenate the function name
* and the arguments with "_". Otherwise, return the output of toString()
*/
private String formatForCol(final Expression expr) {
if (expr instanceof FunctionCall) {
final FunctionCall func = (FunctionCall) expr;
// for now, if UDF is a.b.c.d(), ignore "a.b.c."
final String funcName = func.getName().getSuffix();
return funcName + "_" + Joiner.on("_")
.join(func.getArguments().stream().map(Expression::toString).iterator());
}
return expr.toString();

return expression.toString();
}

@Override
Expand Down

0 comments on commit 5370e94

Please sign in to comment.