Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/main/scala/mimir/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
val ra = new mimir.sql.RAToSql(this)
val functions = new mimir.algebra.function.FunctionRegistry
val aggregates = new mimir.algebra.function.AggregateRegistry
val types:mimir.algebra.typeregistry.TypeRegistry
= mimir.algebra.typeregistry.DefaultTypeRegistry

//// Logic
val compiler = new mimir.exec.Compiler(this)
Expand All @@ -113,11 +115,17 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
val typechecker = new mimir.algebra.Typechecker(
functions = Some(functions),
aggregates = Some(aggregates),
models = Some(models)
models = Some(models),
types = types
)
val interpreter = new mimir.algebra.Eval(
functions = Some(functions)
)
//// Translation
val gpromTranslator = new mimir.algebra.gprom.OperatorTranslation(this)
val sparkTranslator = new mimir.algebra.spark.OperatorTranslation(this)


val metadataTables = Seq("MIMIR_ADAPTIVE_SCHEMAS", "MIMIR_MODEL_OWNERS", "MIMIR_MODELS", "MIMIR_VIEWS", "MIMIR_SYS_TABLES", "MIMIR_SYS_ATTRS")
/**
* Optimize and evaluate the specified query. Applies all Mimir-specific optimizations
Expand Down Expand Up @@ -239,6 +247,11 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
case None => backend.getTableSchema(name)
}
}
/**
* Look up the raw typed schema for the specified table
*/
def tableBaseSchema(name: String): Option[Seq[(String,BaseType)]] =
tableSchema(name).map { _.map { case (name,t) => (name, types.rootType(t)) } }

/**
* Build a Table operator for the table with the provided name.
Expand Down Expand Up @@ -458,8 +471,6 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
views.init()
lenses.init()
adaptiveSchemas.init()
mimir.algebra.gprom.OperatorTranslation(this)
mimir.algebra.spark.OperatorTranslation(this)
}

/**
Expand Down Expand Up @@ -492,7 +503,7 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
sourceFile: File
) : Unit = loadTable(targetTable, sourceFile, true,
("CSV", Seq(StringPrimitive(","),BoolPrimitive(false),BoolPrimitive(false))),
Some(targetSchema.map(el => (el._1, Type.fromString(el._2)))))
Some(targetSchema.map(el => (el._1, types.rootType(types.fromString(el._2))))))

def loadTable(
targetTable: String,
Expand All @@ -511,7 +522,7 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
sourceFile: File,
force:Boolean,
format:(String, Seq[PrimitiveValue]),
targetSchema: Option[Seq[(String, Type)]]
targetSchema: Option[Seq[(String, BaseType)]]
){
val (delim, typeinference, detectHeaders) = format._1.toUpperCase() match {
case "CSV" => {
Expand All @@ -535,7 +546,7 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
targetTable: String,
sourceFile: File,
force:Boolean,
targetSchema: Option[Seq[(String, Type)]] = None,
targetSchema: Option[Seq[(String, BaseType)]] = None,
inferTypes:Boolean = true,
detectHeaders:Boolean = true,
backendOptions:Map[String, String] = Map(),
Expand Down Expand Up @@ -565,7 +576,7 @@ case class Database(backend: RABackend, metadataBackend: MetadataBackend)
views.create(targetTable.toUpperCase, oper)
} else {
val schema = targetSchema match {
case None => tableSchema(targetTable)
case None => tableBaseSchema(targetTable)
case _ => targetSchema
}
LoadData.handleLoadTableRaw(this, targetTable.toUpperCase, schema, sourceFile, options, format)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/mimir/Mimir.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ object Mimir extends LazyLogging {
if(!conf.quiet()){
output.print("Connecting to " + conf.backend() + "://" + conf.dbname() + "...")
}
sback.sparkTranslator = db.sparkTranslator
db.metadataBackend.open()
db.backend.open()
OperatorTranslation.db = db
sback.registerSparkFunctions(db.functions.functionPrototypes.map(el => el._1).toSeq, db.functions)
sback.registerSparkAggregates(db.aggregates.prototypes.map(el => el._1).toSeq, db.aggregates)

Expand Down
29 changes: 19 additions & 10 deletions src/main/scala/mimir/MimirVizier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ object MimirVizier extends LazyLogging {
val database = Mimir.conf.dbname().split("[\\\\/]").last.replaceAll("\\..*", "")
val sback = new SparkBackend(database)
db = new Database(sback, new JDBCMetadataBackend(Mimir.conf.backend(), Mimir.conf.dbname()))
sback.sparkTranslator = db.sparkTranslator
db.metadataBackend.open()
db.backend.open()
val otherExcludeFuncs = Seq("NOT","AND","!","%","&","*","+","-","/","<","<=","<=>","=","==",">",">=","^")
Expand Down Expand Up @@ -335,8 +336,8 @@ object MimirVizier extends LazyLogging {
logger.debug(s"loadCSV: From Vistrails: [ $file ] inferTypes: $inferTypes detectHeaders: $detectHeaders format: ${format} -> [ ${backendOptions.mkString(",")} ]") ;
val bkOpts = backendOptions.map{
case (optKey:String, optVal:String) => (optKey, optVal)
case hm:java.util.HashMap[String,String] => {
val entry = hm.entrySet().iterator().next()
case hm:java.util.HashMap[_,_] => {
val entry = hm.asInstanceOf[java.util.HashMap[String,String]].entrySet().iterator().next()
(entry.getKey, entry.getValue)
}
case _ => throw new Exception("loadCSV: bad options type")
Expand Down Expand Up @@ -459,11 +460,15 @@ object MimirVizier extends LazyLogging {
val timeRes = logTime("createLens") {
logger.debug("createView: From Vistrails: [" + input + "] [" + query + "]" ) ;
val (viewNameSuffix, inputSubstitutionQuery) = input match {
case aliases:JMapWrapper[String,String] => {
case rawAliases:JMapWrapper[_,_] => {
// The following line needed to suppress compiler type erasure warning
val aliases = rawAliases.asInstanceOf[JMapWrapper[String,String]]
aliases.map{ case (vizierName, mimirName) => db.sql.registerVizierNameMapping(vizierName.toUpperCase(), mimirName) }
(aliases.unzip._2.mkString(""), query)
}
case inputs:Seq[String] => {
case rawInputs:Seq[_] => {
// The following line needed to suppress compiler type erasure warning
val inputs = rawInputs.asInstanceOf[Seq[String]]
(inputs.mkString(""),inputs.zipWithIndex.foldLeft(query)((init, curr) => {
init.replaceAll(s"\\{\\{\\s*input_${curr._2}\\s*\\}\\}", curr._1)
}))
Expand Down Expand Up @@ -570,11 +575,15 @@ object MimirVizier extends LazyLogging {

def vistrailsQueryMimirJson(input:Any, query : String, includeUncertainty:Boolean, includeReasons:Boolean) : String = {
val inputSubstitutionQuery = input match {
case aliases:JMapWrapper[String,String] => {
case rawAliases:JMapWrapper[_,_] => {
// The following line needed to suppress compiler type erasure warning
val aliases = rawAliases.asInstanceOf[JMapWrapper[String,String]]
aliases.map{ case (vizierName, mimirName) => db.sql.registerVizierNameMapping(vizierName.toUpperCase(), mimirName) }
query
}
case inputs:Seq[String] => {
case rawInputs:Seq[_] => {
// The following line needed to suppress compiler type erasure warning
val inputs = rawInputs.asInstanceOf[Seq[String]]
inputs.zipWithIndex.foldLeft(query)((init, curr) => {
init.replaceAll(s"\\{\\{\\s*input_${curr._2}\\s*\\}\\}", curr._1)
})
Expand Down Expand Up @@ -732,7 +741,7 @@ def vistrailsQueryMimirJson(query : String, includeUncertainty:Boolean, includeR
try{
logger.debug("getSchema: From Vistrails: [" + query + "]" ) ;
val oper = totallyOptimize(db.sql.convert(db.parse(query).head.asInstanceOf[Select]))
JSONBuilder.list( db.typechecker.schemaOf(oper).map( schel => Map( "name" -> schel._1, "type" -> schel._2.toString(), "base_type" -> Type.rootType(schel._2).toString())))
JSONBuilder.list( db.typechecker.schemaOf(oper).map( schel => Map( "name" -> schel._1, "type" -> schel._2.toString(), "base_type" -> db.types.rootType(schel._2).toString())))
} catch {
case t: Throwable => {
logger.error("Error Getting Schema: [" + query + "]", t)
Expand Down Expand Up @@ -1105,7 +1114,7 @@ def vistrailsQueryMimirJson(query : String, includeUncertainty:Boolean, includeR
val resultList = results.toList
val (resultsStrs, prov) = resultList.map(row => (row.tuple.map(cell => cell), row.provenance.asString)).unzip
JSONBuilder.dict(Map(
"schema" -> results.schema.map( schel => Map( "name" -> schel._1, "type" ->schel._2.toString(), "base_type" -> Type.rootType(schel._2).toString())),
"schema" -> results.schema.map( schel => Map( "name" -> schel._1, "type" ->schel._2.toString(), "base_type" -> db.types.rootType(schel._2).toString())),
"data" -> resultsStrs,
"prov" -> prov
))
Expand All @@ -1120,7 +1129,7 @@ def vistrailsQueryMimirJson(query : String, includeUncertainty:Boolean, includeR
val (resultsStrs, colTaint) = resultsStrsColTaint.unzip
val (prov, rowTaint) = provRowTaint.unzip
JSONBuilder.dict(Map(
"schema" -> results.schema.map( schel => Map( "name" -> schel._1, "type" ->schel._2.toString(), "base_type" -> Type.rootType(schel._2).toString())),
"schema" -> results.schema.map( schel => Map( "name" -> schel._1, "type" ->schel._2.toString(), "base_type" -> db.types.rootType(schel._2).toString())),
"data" -> resultsStrs,
"prov" -> prov,
"col_taint" -> colTaint,
Expand All @@ -1138,7 +1147,7 @@ def vistrailsQueryMimirJson(query : String, includeUncertainty:Boolean, includeR
val (prov, rowTaint) = provRowTaint.unzip
val reasons = explainEverything(oper).map(reasonSet => reasonSet.all(db).toSeq.map(_.toJSONWithFeedback))
JSONBuilder.dict(Map(
"schema" -> results.schema.map( schel => Map( "name" -> schel._1, "type" ->schel._2.toString(), "base_type" -> Type.rootType(schel._2).toString())),
"schema" -> results.schema.map( schel => Map( "name" -> schel._1, "type" ->schel._2.toString(), "base_type" -> db.types.rootType(schel._2).toString())),
"data" -> resultsStrs,
"prov" -> prov,
"col_taint" -> colTaint,
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/mimir/adaptive/AdaptiveSchemaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class AdaptiveSchemaManager(db: Database)
){ _.map { row =>
val name = row(0).asString
val mlensType = row(1).asString
val query = Json.toOperator(Json.parse(row(2).asString))
val query = Json.toOperator(Json.parse(row(2).asString), db.types)
val args:Seq[Expression] =
Json.toExpressionList(Json.parse(row(3).asString))
Json.toExpressionList(Json.parse(row(3).asString), db.types)

(
MultilensRegistry.multilenses(mlensType),
Expand All @@ -87,9 +87,9 @@ class AdaptiveSchemaManager(db: Database)
){ _.map { row =>
val name = row(0).asString
val mlensType = row(1).asString
val query = Json.toOperator(Json.parse(row(2).asString))
val query = Json.toOperator(Json.parse(row(2).asString), db.types)
val args:Seq[Expression] =
Json.toExpressionList(Json.parse(row(3).asString))
Json.toExpressionList(Json.parse(row(3).asString), db.types)

(
MultilensRegistry.multilenses(mlensType),
Expand Down Expand Up @@ -166,9 +166,9 @@ class AdaptiveSchemaManager(db: Database)
val row = result.next
val name = row(0).asString
val mlensType = row(1).asString
val query = Json.toOperator(Json.parse(row(2).asString))
val query = Json.toOperator(Json.parse(row(2).asString), db.types)
val args:Seq[Expression] =
Json.toExpressionList(Json.parse(row(3).asString))
Json.toExpressionList(Json.parse(row(3).asString), db.types)

Some((
MultilensRegistry.multilenses(mlensType),
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/mimir/adaptive/DiscalaAbadiNormalizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ object DiscalaAbadiNormalizer
fullSchema.map { x => (x._2.toLong -> x._1._1) }.toMap
)
groupingModel.trainDomain(db)//.reconnectToDatabase(db)
val schemaLookup =
fullSchema.map( x => (x._2 -> x._1) ).toMap
val schemaLookup:Map[Int,(String,BaseType)] =
fullSchema.map( x => (x._2 -> (x._1._1, db.types.rootType(x._1._2))) ).toMap

// for every possible parent/child relationship except for ROOT
val parentKeyRepairs =
Expand Down Expand Up @@ -261,9 +261,9 @@ class DAFDRepairModel(
name: String,
context: String,
source: Operator,
keys: Seq[(String, Type)],
keys: Seq[(String, BaseType)],
target: String,
targetType: Type,
targetType: BaseType,
scoreCol: Option[String],
attrLookup: Map[Long,String]
) extends RepairKeyModel(name, context, source, keys, target, targetType, scoreCol)
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/mimir/adaptive/SchemaMatching.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object SchemaMatching
val typeName = split(1)
(
varName.toString.toUpperCase ->
Type.fromString(typeName.toString)
db.types.rootType(db.types.fromString(typeName.toString))
)
}).
toList
Expand Down Expand Up @@ -118,7 +118,7 @@ object SchemaMatching
val typeName = split(1)
(
varName.toString.toUpperCase ->
Type.fromString(typeName.toString)
db.types.fromString(typeName.toString)
)
}).toList
targetSchema.tail.foldLeft(
Expand Down Expand Up @@ -149,16 +149,16 @@ object SchemaMatching
{
if(table.equals("DATA")){
val targetSchema =
config.args.
map(field => {
val split = db.interpreter.evalString(field).split(" +")
val varName = split(0).toUpperCase
val typeName = split(1)
(
varName.toString.toUpperCase ->
Type.fromString(typeName.toString)
)
}).toList
config.args.
map(field => {
val split = db.interpreter.evalString(field).split(" +")
val varName = split(0).toUpperCase
val typeName = split(1)
(
varName.toString.toUpperCase ->
db.types.fromString(typeName.toString)
)
}).toList
Some(Project(
targetSchema.map { case (colName, colType) => {
val metaModel = db.models.get(s"${config.schema}:META:$colName")
Expand Down
37 changes: 25 additions & 12 deletions src/main/scala/mimir/adaptive/TypeInference.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ object TypeInference
{


def detectType(v: String): Iterable[Type] = {
Type.tests.flatMap({ case (t, regexp) =>
regexp.findFirstMatchIn(v).map(_ => t)
})++
TypeRegistry.matchers.flatMap({ case (regexp, name) =>
regexp.findFirstMatchIn(v).map(_ => TUser(name))
})
}
// def detectType(v: String, types:TypeRegistry): Iterable[Type] = {
// Type.tests.flatMap({ case (t, regexp) =>
// regexp.findFirstMatchIn(v).map(_ => t)
// })++
// TypeRegistry.matchers.flatMap({ case (regexp, name) =>
// regexp.findFirstMatchIn(v).map(_ => TUser(name))
// })
// }

def initSchema(db: Database, config: MultilensConfig): TraversableOnce[Model] =
{
Expand All @@ -49,7 +49,8 @@ object TypeInference
modelColumns,
stringDefaultScore,
db.backend.asInstanceOf[BackendWithSparkContext].getSparkContext(),
Some(db.backend.execute(config.query.limit(TypeInferenceModel.sampleLimit, 0)))
Some(db.backend.execute(config.query.limit(TypeInferenceModel.sampleLimit, 0))),
db.types.getSerializable
)

val columnIndexes =
Expand Down Expand Up @@ -115,14 +116,26 @@ object TypeInference
if(table.equals("DATA")){
val model = db.models.get(s"MIMIR_TI_ATTR_${config.schema}").asInstanceOf[TypeInferenceModel]
val columnIndexes = model.columns.zipWithIndex.toMap

Some(Project(
config.query.columnNames.map { colName =>
ProjectArg(colName,
if(columnIndexes contains colName){
Function("CAST", Seq(

val targetType =
model.bestGuess(0,
Seq(IntPrimitive(columnIndexes(colName))),
Seq()
).asInstanceOf[TypePrimitive].t

val castExpr = db.types.typeCaster(targetType, Var(colName))

Conditional(
IsNullExpression(Var(colName)),
Var(colName),
model.bestGuess(0, Seq(IntPrimitive(columnIndexes(colName))), Seq())
))
castExpr
)

} else {
Var(colName)
}
Expand Down
20 changes: 7 additions & 13 deletions src/main/scala/mimir/algebra/Cast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import mimir.util._

object Cast
{
def apply(t: Type, x: PrimitiveValue): PrimitiveValue =
def apply(t: BaseType, x: PrimitiveValue): PrimitiveValue =
{
try {
t match {
Expand All @@ -27,25 +27,19 @@ object Cast
case _ => TextUtils.parseInterval(x.asString)
}
case TRowId() => RowIdPrimitive(x.asString)
case TAny() => x
case TBool() => BoolPrimitive(x.asLong != 0)
case TType() => TypePrimitive(Type.fromString(x.asString))
case TUser(name) => {
val (typeRegexp, baseT) = TypeRegistry.registeredTypes(name)
val base = apply(baseT, x)
if(typeRegexp.findFirstMatchIn(base.asString).isEmpty){
NullPrimitive()
} else {
base
}
}
case TType() => TypePrimitive(
BaseType.fromString(x.asString)
.getOrElse{ TUser(x.asString) }
)
case TAny() => x
}
} catch {
case _:TypeException=> NullPrimitive();
case _:NumberFormatException => NullPrimitive();
}
}

def apply(t: Type, x: String): PrimitiveValue =
def apply(t: BaseType, x: String): PrimitiveValue =
apply(t, StringPrimitive(x))
}
Loading