Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'bi-queries'
Browse files Browse the repository at this point in the history
  • Loading branch information
szarnyasg committed Feb 8, 2018
2 parents 670c6c3 + 45a8e80 commit 99de8d1
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 971 deletions.
908 changes: 15 additions & 893 deletions compiler/src/test/scala/ingraph/sandbox/BiCompilerTest.scala

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion compiler/src/test/scala/ingraph/sandbox/CompilerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ case class CompilerTestConfig(querySuitePath: Option[String] = None
def printAny: Boolean = printQuery || printCypher || printQPlan || printJPlan || printFPlan
}

class CompilerTest extends FunSuite {
abstract class CompilerTest extends FunSuite {
val config = CompilerTestConfig()
val separatorLength = 77

Expand Down
6 changes: 3 additions & 3 deletions compiler/src/test/scala/ingraph/sandbox/DmlTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ class DmlTest extends FunSuite {
println(plan)
}

ignore("should compile simple DELETE for a vertex") {
test("should compile simple DELETE for a vertex") {
val cypher = CypherParser.parseString("MATCH (n:Person) DELETE n")
val plan = CypherToQPlan.build(cypher)
println(plan)
}

ignore("should compile simple DELETE for an edge") {
test("should compile simple DELETE for an edge") {
val cypher = CypherParser.parseString("MATCH (p1:Person)-[r:KNOWS]-(p2:Person) DELETE r")
val plan = CypherToQPlan.build(cypher)
println(plan)
}

ignore("should compile simple DELETE for an entire pattern") {
test("should compile simple DELETE for an entire pattern") {
val cypher = CypherParser.parseString("MATCH (p1:Person)-[r:KNOWS]-(p2:Person) DELETE p1, r, p2")
val plan = CypherToQPlan.build(cypher)
println(plan)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package ingraph.parse

import hu.bme.mit.ire.datatypes.Tuple
import hu.bme.mit.ire.nodes.unary.aggregation._
import hu.bme.mit.ire.util.GenericMath
import ingraph.expressionparser.FunctionLookup
import ingraph.model.expr.{FunctionInvocation, TupleIndexLiteralAttribute}
import ingraph.model.expr.{FunctionInvocation, Parameter, TupleIndexLiteralAttribute}
import ingraph.model.misc.FunctionCategory
import org.apache.spark.sql.catalyst.expressions.{Add, And, BinaryArithmetic, BinaryComparison, BinaryOperator, CaseWhen, Divide, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, Pmod, Remainder, Subtract}
import org.apache.spark.unsafe.types.UTF8String


object ExpressionParser {
def apply[T](exp: Expression): Tuple => T = {
val e = parse(exp)
val e: Tuple => Any = parse(exp)
t => e(t).asInstanceOf[T]
}

Expand Down Expand Up @@ -64,10 +66,18 @@ object ExpressionParser {
case invoc: FunctionInvocation =>
val children: Seq[Tuple => Any] = invoc.children.map(parse)
children.length match {
case 0 => tuple => FunctionLookup.fun0(invoc.functor)()
case 1 => tuple => FunctionLookup.fun1(invoc.functor)(children.head(tuple))
case 2 => tuple => FunctionLookup.fun2(invoc.functor)(children(0)(tuple), children(1)(tuple))
case 3 => tuple => FunctionLookup.fun3(invoc.functor)(children(0)(tuple), children(1)(tuple), children(2)(tuple))
case 0 =>
val fun = FunctionLookup.fun0(invoc.functor)
tuple => fun()
case 1 =>
val fun = FunctionLookup.fun1(invoc.functor)
tuple => fun(children.head(tuple))
case 2 =>
val fun = FunctionLookup.fun2(invoc.functor)
tuple => fun(children(0)(tuple), children(1)(tuple))
case 3 =>
val fun = FunctionLookup.fun3(invoc.functor)
tuple => fun(children(0)(tuple), children(1)(tuple), children(2)(tuple))
}
case exp: CaseWhen =>
println(exp)
Expand All @@ -81,27 +91,22 @@ object ExpressionParser {
fallback(tuple)
}
caseFunction
case par: Parameter => tuple => "ot"
}
//
//

// def parseAggregate(exp: Expression, lookup: Map[String, Integer]): List[(String, () => StatefulAggregate)] = exp match {
// case exp: FunctionExpression if exp.getFunctor.getCategory == FunctionCategory.AGGREGATION =>
// if (exp.getFunctor != COLLECT) {
// val variable = exp.getArguments.get(0).asInstanceOf[VariableExpression].getVariable
// val index = lookup(variable.fullName)
// List((exp.fullName, exp.getFunctor match {
// case AVG => () => new StatefulAverage(index)
// case COUNT => () => new NullAwareStatefulCount(index)
// case COUNT_ALL => () => new StatefulCount()
// case MAX => () => new StatefulMax(index)
// case MIN => () => new StatefulMin(index)
// case SUM => () => new StatefulSum(index)
// }))
// } else {
// val list = parseListExpression(exp.getArguments.get(0).asInstanceOf[ListExpression])
// val indices = list.map(e => lookup(e.asInstanceOf[VariableExpression].getVariable.fullName)).map(_.toInt)
// List((exp.fullName, () => new StatefulCollect(indices)))
// }

def parseAggregate(exp: Expression): Option[(Int, () => StatefulAggregate)] = exp match {
case FunctionInvocation(functor, Seq(TupleIndexLiteralAttribute(index, _)), _) =>
import ingraph.model.misc.Function._
val factory = functor match {
case AVG => () => new StatefulAverage(index)
case COUNT => () => new NullAwareStatefulCount(index)
case COUNT_ALL => () => new StatefulCount()
case MAX => () => new StatefulMax(index)
case MIN => () => new StatefulMin(index)
case SUM => () => new StatefulSum(index)
}
Some((index, factory))
//TODO: collect
case _ => None
}
}
32 changes: 16 additions & 16 deletions ire-adapter/src/main/scala/ingraph/ire/EngineFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import hu.bme.mit.ire.engine.RelationalEngine
import hu.bme.mit.ire.messages.{ChangeSet, ReteMessage}
import hu.bme.mit.ire.nodes.binary.{AntiJoinNode, JoinNode, LeftOuterJoinNode, UnionNode}
import hu.bme.mit.ire.nodes.unary._
import hu.bme.mit.ire.nodes.unary.aggregation.{AggregationNode, StatefulAggregate}
import hu.bme.mit.ire.util.BufferMultimap
import hu.bme.mit.ire.util.Utils.conversions._
import ingraph.model.expr._
Expand Down Expand Up @@ -53,7 +54,7 @@ object EngineFactory {
case op: UnaryTNode =>
val node: (ReteMessage) => Unit = op match {
case op: Production => production
// case op: Grouping => instantiateGrouping(op, expr)
case op: Grouping => grouping(op, expr)
case op: SortAndTop => sortAndTop(op, expr)
case op: Selection => selection(op, expr)
case op: Projection => projection(op, expr)
Expand Down Expand Up @@ -126,21 +127,20 @@ object EngineFactory {

// unary nodes

// private def instantiateGrouping(op: Grouping, expr: ForwardConnection) = {
// val variableLookup = getSchema(op.child)
// val aggregates = op.fnode.jnode.projectList.flatMap(
// e => ExpressionParser.parseAggregate(e, variableLookup)
// )
// val functions = () => aggregates.map(
// _._2() // GOOD LUCK UNDERSTANDING THIS
// )
// val aggregationCriteria = op.fnode.jnode.aggregationCriteria.map(e => (e, ExpressionParser.parseValue(e, variableLookup)))
// val projectionVariableLookup: Map[String, Int] =
// aggregationCriteria.zipWithIndex.map( a => a._1._1.fullName -> a._2.asInstanceOf[Integer] ).toMap ++
// aggregates.zipWithIndex.map( az => az._1._1 -> (az._2 + op.fnode.jnode.aggregationCriteria.size()))
// val projectionExpressions = op.internalSchema.map( e => ExpressionParser.parseValue(e, projectionVariableLookup))
// newLocal(Props(new AggregationNode(expr.child, aggregationCriteria.map(_._2), functions, projectionExpressions)))
// }
private def grouping(op: Grouping, expr: ForwardConnection) = {
val aggregates = op.projectionTuple.map(e => ExpressionParser.parseAggregate(e))
val factories = aggregates.flatten.map(_._2()).toVector
val nonAggregates = op.projectionTuple.filter(op.aggregationCriteria.contains)
var normalIndex = 0
var aggregateIndex = op.projectionTuple.size - op.aggregationCriteria.size - 1
val projections = aggregates.map {
case None => normalIndex += 1; normalIndex - 1
case Some(_) => aggregateIndex += 1; aggregateIndex - 1
}
val aggregationMask = op.aggregationCriteria.map(e => ExpressionParser[Any](e)).toVector

newLocal(Props(new AggregationNode(expr.child, aggregationMask, () => factories, projections.toVector)))
}

private def selection(op: Selection, expr: ForwardConnection) = {
newLocal(Props(new SelectionNode(expr.child, ExpressionParser[Boolean](op.condition))))
Expand Down
127 changes: 111 additions & 16 deletions ire-adapter/src/test/scala/ingraph/ire/BiEngineTest.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,120 @@
package ingraph.ire

import org.scalatest.FunSuite
class BiEngineTest extends EngineTest {

class BiEngineTest extends FunSuite {
override val queryDir: String = "ldbc-snb-bi"

ignore("test 1") {
val indexer = new Indexer()
test("bi-101: Count persons") {
run("""MATCH (p:Person)-[:IS_LOCATED_IN]->(c:country)
|RETURN p, count(c) AS cc
""".stripMargin)
}

// compiles
ignore("bi-01 from file: Posting summary") {
runFromFile("bi-01")
}

ignore("bi-02 from file: Top tags for country, age, gender, time") {
runFromFile("bi-02")
}

ignore("bi-03 from file: Tag evolution") {
runFromFile("bi-03")
}

test("bi-04 from file: Popular topics in a country") {
runFromFile("bi-04")
}

ignore("bi-05 from file: Top posters in a country") {
runFromFile("bi-05")
}

test("bi-06 from file: Most active Posters of a given Topic") {
runFromFile("bi-06")
}

test("bi-07 from file: Most authoritative users on a given topic") {
runFromFile("bi-07")
}

test("bi-08 from file: Related Topics") {
runFromFile("bi-08")
}

// compiles
ignore("bi-09 from file: Forum with related Tags") {
runFromFile("bi-09")
}

ignore("bi-10 from file: Central Person for a Tag") {
runFromFile("bi-10")
}

val readQuery =
"""MATCH (country:Country {name: 'Austria'})
|MATCH (a:Person)-[:isLocatedIn]->(:City)-[:isPartOf]->(country)
|MATCH (b:Person)-[:isLocatedIn]->(:City)-[:isPartOf]->(country)
|MATCH (c:Person)-[:isLocatedIn]->(:City)-[:isPartOf]->(country)
|MATCH (a)-[:knows]-(b), (b)-[:knows]-(c), (c)-[:knows]-(a)
|WHERE a.id < b.id
| AND b.id < c.id
|RETURN count(*)
""".stripMargin
ignore("bi-11 from file: Unrelated replies") {
runFromFile("bi-11")
}

test("bi-12 from file: Trending Posts") {
runFromFile("bi-12")
}

val readAdapter = new IngraphIncrementalAdapter(readQuery, "read", indexer)
val result = readAdapter.result()
ignore("bi-13 from file: Popular Tags per month in a country") {
runFromFile("bi-13")
}

// compiles
ignore("bi-14 from file: Top thread initiators") {
runFromFile("bi-14")
}

// compiles
ignore("bi-15 from file: Social normals") {
runFromFile("bi-15")
}

ignore("bi-16 from file: Experts in social circle") {
runFromFile("bi-16")
}

test("bi-17 from file: Friend triangles") {
runFromFile("bi-17")
}

// compiles
ignore("bi-18 from file: How many persons have a given number of posts") {
runFromFile("bi-18")
}

// compiles
ignore("bi-19 from file: Stranger's interaction") {
runFromFile("bi-19")
}

// compiles
ignore("bi-20 from file: High-level topics") {
runFromFile("bi-20")
}

ignore("bi-21 from file: Zombies in a country") {
runFromFile("bi-21")
}

ignore("bi-22 from file: International dialog") {
runFromFile("bi-22")
}

test("bi-23 from file: Holiday destinations") {
runFromFile("bi-23")
}

// compiles
ignore ("bi-24 from file: Messages by Topic and Continent") {
runFromFile("bi-24")
}

ignore("bi-25 from file: Weighted paths") {
runFromFile("bi-25")
}
}
21 changes: 21 additions & 0 deletions ire-adapter/src/test/scala/ingraph/ire/EngineTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ingraph.ire

import org.scalatest.FunSuite

abstract class EngineTest extends FunSuite {

val queryDir: String

def run(readQuery: String): Unit = {
val indexer = new Indexer()
val readAdapter = new IngraphIncrementalAdapter(readQuery, "read", indexer)
val result = readAdapter.result()
}

def runFromFile(fileBaseName: String): Unit = {
val source = scala.io.Source.fromFile(s"../queries/${queryDir}/${fileBaseName}.cypher")
val queryString = try source.getLines.mkString("\n") finally source.close()
run(queryString)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@ class TrainbenchmarkBatchIntegrationTest extends FunSuite {
val expected = (7 to 9).reverse.map(n => Vector(n.toLong))
assert(results == expected)
}

test("basic aggregations") {
val query = "MATCH (s: Switch) RETURN count(s)"
val results = TrainbenchmarkUtils.readModelAndGetResults(query, 1)
assert(results == List(Vector(40)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import scala.collection.immutable.VectorBuilder
import scala.collection.mutable

class AggregationNode(override val next: (ReteMessage) => Unit,
mask: Vector[Tuple => Any], functions: () => Vector[StatefulAggregate],
projection: Vector[Tuple => Any]) extends UnaryNode with SingleForwarder {
mask: Vector[Tuple => Any], factories: () => Vector[StatefulAggregate],
projection: Vector[Int]) extends UnaryNode with SingleForwarder {
private val keyCount = mutable.Map[Tuple, Int]().withDefault(f => 0)
private val data = mutable.Map[Tuple, Vector[StatefulAggregate]]().withDefault(f => functions())
private val data = mutable.Map[Tuple, Vector[StatefulAggregate]]().withDefault(f => factories())

override def onChangeSet(changeSet: ChangeSet): Unit = {
val oldValues = mutable.Map[Tuple, (Tuple, Int)]()
for ((key, tuples) <- changeSet.positive.groupBy(t => mask.map(m => m(t)))) {
val aggregators = data.getOrElseUpdate(key, functions())
val aggregators = data.getOrElseUpdate(key, factories())

oldValues.getOrElseUpdate(key, (aggregators.map(_.value()), keyCount(key)))
for (aggregator <- aggregators)
aggregator.maintainPositive(tuples)
keyCount(key) += tuples.size
}
for ((key, tuples) <- changeSet.negative.groupBy(t => mask.map(m => m(t)))) {
val aggregators = data.getOrElseUpdate(key, functions())
val aggregators = data.getOrElseUpdate(key, factories())
oldValues.getOrElseUpdate(key, (aggregators.map(_.value()), keyCount(key)))
for (aggregator <- aggregators)
aggregator.maintainNegative(tuples)
Expand All @@ -44,8 +44,8 @@ class AggregationNode(override val next: (ReteMessage) => Unit,
negative += key ++ oldValues
}
}
val positiveBag: TupleBag = positive.result().map((t: Tuple) => projection.map(_(t)))
val negativeBag: TupleBag = negative.result().map((t: Tuple) => projection.map(_(t)))
val positiveBag: TupleBag = positive.result().map(t => projection.map(t))
val negativeBag: TupleBag = negative.result().map(t => projection.map(t))
forward(ChangeSet(
positive = positiveBag,
negative = negativeBag))
Expand Down
Loading

0 comments on commit 99de8d1

Please sign in to comment.