From aad6b22ff9382bb1780efaa3e97af04c92a672f3 Mon Sep 17 00:00:00 2001 From: Mars Hall Date: Fri, 17 Nov 2017 15:25:46 -0800 Subject: [PATCH] Parallelize batchpredict with Scala parallel collections instead of Spark RDD --- .../predictionio/workflow/BatchPredict.scala | 48 ++++++++------ docs/manual/source/batchpredict/index.html.md | 64 ++++--------------- .../predictionio/tools/RunBatchPredict.scala | 4 -- .../predictionio/tools/console/Console.scala | 17 ++--- 4 files changed, 42 insertions(+), 91 deletions(-) diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala index 69525b11cf..aeda5bb6a1 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala @@ -18,7 +18,8 @@ package org.apache.predictionio.workflow -import java.io.Serializable +import java.io.{BufferedWriter, File, FileWriter, Serializable} +import java.net.URI import com.twitter.bijection.Injection import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator} @@ -32,12 +33,13 @@ import org.apache.predictionio.workflow.CleanupFunctions import org.apache.spark.rdd.RDD import org.json4s._ import org.json4s.native.JsonMethods._ +import scala.collection.parallel.ParSeq +import scala.io.Source import scala.language.existentials case class BatchPredictConfig( inputFilePath: String = "batchpredict-input.json", outputFilePath: String = "batchpredict-output.json", - queryPartitions: Option[Int] = None, engineInstanceId: String = "", engineId: Option[String] = None, engineVersion: Option[String] = None, @@ -78,10 +80,6 @@ object BatchPredict extends Logging { c.copy(outputFilePath = x) } text("Path to file containing output predictions; a " + "multi-object JSON file with one object per line.") - opt[Int]("query-partitions") action { (x, c) => - c.copy(queryPartitions = Some(x)) - } text("Limit concurrency of predictions by setting the number " + - "of partitions used internally for the RDD of queries.") opt[String]("engineId") action { (x, c) => c.copy(engineId = Some(x)) } text("Engine ID.") @@ -148,6 +146,14 @@ object BatchPredict extends Logging { engine: Engine[_, _, _, Q, P, _]): Unit = { try { + val outputFile = try { + new File(new URI(config.outputFilePath)) + } catch { + case e: Throwable => new File(config.outputFilePath) + } + outputFile.delete + outputFile.createNewFile + val engineParams = engine.engineInstanceToEngineParams( engineInstance, config.jsonExtractor) @@ -180,21 +186,14 @@ object BatchPredict extends Logging { val serving = Doer(engine.servingClassMap(servingParamsWithName._1), servingParamsWithName._2) - val runSparkContext = WorkflowContext( - batch = engineInstance.engineFactory, - executorEnv = engineInstance.env, - mode = "Batch Predict (runner)", - sparkEnv = engineInstance.sparkConf) + val inputQueries: ParSeq[String] = Source. + fromURL(config.inputFilePath). + getLines. + filter(_.trim.nonEmpty). + toSeq. + par - val inputRDD: RDD[String] = runSparkContext. - textFile(config.inputFilePath). - filter(_.trim.nonEmpty) - val queriesRDD: RDD[String] = config.queryPartitions match { - case Some(p) => inputRDD.repartition(p) - case None => inputRDD - } - - val predictionsRDD: RDD[String] = queriesRDD.map { queryString => + val parallelPredictions: ParSeq[String] = inputQueries.map { queryString => val jsonExtractorOption = config.jsonExtractor // Extract Query from Json val query = JsonExtractor.extract( @@ -226,7 +225,14 @@ object BatchPredict extends Logging { compact(render(predictionJValue)) } - predictionsRDD.saveAsTextFile(config.outputFilePath) + val output = new BufferedWriter(new FileWriter(outputFile, true)) + try { + parallelPredictions.foreach { line => + output.write(s"${line}\n") + } + } finally { + output.close + } } finally { CleanupFunctions.run() diff --git a/docs/manual/source/batchpredict/index.html.md b/docs/manual/source/batchpredict/index.html.md index 38ddb3bb89..5e8eee33b8 100644 --- a/docs/manual/source/batchpredict/index.html.md +++ b/docs/manual/source/batchpredict/index.html.md @@ -20,9 +20,9 @@ limitations under the License. --> ##Overview -Process predictions for many queries using efficient parallelization -through Spark. Useful for mass auditing of predictions and for -generating predictions to push into other systems. +Process predictions for many queries using efficient parallelization. Useful +for mass auditing of predictions and for generating predictions to push into +other systems. Batch predict reads and writes multi-object JSON files similar to the [batch import](/datacollection/batchimport/) format. JSON objects are separated @@ -30,19 +30,12 @@ by newlines and cannot themselves contain unencoded newlines. ##Compatibility `pio batchpredict` loads the engine and processes queries exactly like -`pio deploy`. There is only one additional requirement for engines -to utilize batch predict: +`pio deploy`. -WARNING: All algorithm classes used in the engine must be -[serializable](https://www.scala-lang.org/api/2.11.8/index.html#scala.Serializable). -**This is already true for PredictionIO's base algorithm classes**, but may be broken -by including non-serializable fields in their constructor. Using the -[`@transient` annotation](http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/) -may help in these cases. - -This requirement is due to processing the input queries as a -[Spark RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds) -which enables high-performance parallelization, even on a single machine. +WARNING: This feature has changed since its initial release in PredictionIO +0.12.0-incubating. Queries are no longer parallelized with Spark, instead using +Scala's built-in parallel collections. As a result, the output is now a simple +JSON file, not a multi-part Hadoop sequence file. ##Usage @@ -53,30 +46,17 @@ Command to process bulk predictions. Takes the same options as `pio deploy` plus ### `--input ` Path to file containing queries; a multi-object JSON file with one -query object per line. Accepts any valid Hadoop file URL. +query object per line. Default: `batchpredict-input.json` ### `--output ` Path to file to receive results; a multi-object JSON file with one -object per line, the prediction + original query. Accepts any -valid Hadoop file URL. Actual output will be written as Hadoop -partition files in a directory with the output name. +object per line, the prediction + original query. Default: `batchpredict-output.json` -### `--query-partitions ` - -Configure the concurrency of predictions by setting the number of partitions -used internally for the RDD of queries. This will directly effect the -number of resulting `part-*` output files. While setting to `1` may seem -appealing to get a single output file, this will remove parallelization -for the batch process, reducing performance and possibly exhausting memory. - -Default: number created by Spark context's `textFile` (probably the number -of cores available on the local machine) - ### `--engine-instance-id ` Identifier for the trained instance to use for batch predict. @@ -90,10 +70,6 @@ Default: the latest trained instance. A multi-object JSON file of queries as they would be sent to the engine's HTTP Queries API. -NOTE: Read via -[SparkContext's `textFile`](https://spark.apache.org/docs/latest/rdd-programming-guide.html#external-datasets) -and so may be a single file or any supported Hadoop format. - File: `batchpredict-input.json` ```json @@ -119,30 +95,12 @@ This command will run to completion, aborting if any errors are encountered. A multi-object JSON file of predictions + original queries. The predictions are JSON objects as they would be returned from the engine's HTTP Queries API. -NOTE: Results are written via Spark RDD's `saveAsTextFile` so each partition -will be written to its own `part-*` file. -See [post-processing results](#post-processing-results). - -File 1: `batchpredict-output.json/part-00000` +File: `batchpredict-output.json` ```json {"query":{"user":"1"},"prediction":{"itemScores":[{"item":"1","score":33},{"item":"2","score":32}]}} {"query":{"user":"3"},"prediction":{"itemScores":[{"item":"2","score":16},{"item":"3","score":12}]}} {"query":{"user":"4"},"prediction":{"itemScores":[{"item":"3","score":19},{"item":"1","score":18}]}} -``` - -File 2: `batchpredict-output.json/part-00001` - -```json {"query":{"user":"2"},"prediction":{"itemScores":[{"item":"5","score":55},{"item":"3","score":28}]}} {"query":{"user":"5"},"prediction":{"itemScores":[{"item":"1","score":24},{"item":"4","score":14}]}} ``` - -###Post-processing Results - -After the process exits successfully, the parts may be concatenated into a -single output file using a command like: - -```bash -cat batchpredict-output.json/part-* > batchpredict-output-all.json -``` diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala index c76d2037a5..d007e892ee 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala @@ -31,7 +31,6 @@ import scala.sys.process._ case class BatchPredictArgs( inputFilePath: String = "batchpredict-input.json", outputFilePath: String = "batchpredict-output.json", - queryPartitions: Option[Int] = None, variantJson: Option[File] = None, jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) @@ -59,9 +58,6 @@ object RunBatchPredict extends Logging { "--engine-variant", batchPredictArgs.variantJson.getOrElse( new File(engineDirPath, "engine.json")).getCanonicalPath) ++ - (if (batchPredictArgs.queryPartitions.isEmpty) Nil - else Seq("--query-partitions", - batchPredictArgs.queryPartitions.get.toString)) ++ (if (verbose) Seq("--verbose") else Nil) ++ Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString) diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala index 04df82f634..10b96cd469 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala @@ -331,28 +331,20 @@ object Console extends Logging { cmd("batchpredict"). text("Use an engine instance to process batch predictions. This\n" + "command will pass all pass-through arguments to its underlying\n" + - "spark-submit command. All algorithm classes used in the engine\n" + - "must be serializable."). + "spark-submit command."). action { (_, c) => c.copy(commands = c.commands :+ "batchpredict") } children( opt[String]("input") action { (x, c) => c.copy(batchPredict = c.batchPredict.copy(inputFilePath = x)) } text("Path to file containing queries; a multi-object JSON file\n" + - "with one query object per line. Accepts any valid Hadoop\n" + - "file URL. Default: batchpredict-input.json"), + "with one query object per line.\n" + + "Default: batchpredict-input.json"), opt[String]("output") action { (x, c) => c.copy(batchPredict = c.batchPredict.copy(outputFilePath = x)) } text("Path to file to receive results; a multi-object JSON file\n" + "with one object per line, the prediction + original query.\n" + - "Accepts any valid Hadoop file URL. Actual output will be\n" + - "written as Hadoop partition files in a directory with the\n" + - "output name. Default: batchpredict-output.json"), - opt[Int]("query-partitions") action { (x, c) => - c.copy(batchPredict = c.batchPredict.copy(queryPartitions = Some(x))) - } text("Limit concurrency of predictions by setting the number\n" + - "of partitions used internally for the RDD of queries.\n" + - "Default: number created by Spark context's `textFile`"), + "Default: batchpredict-output.json"), opt[String]("engine-instance-id") action { (x, c) => c.copy(engineInstanceId = Some(x)) } text("Engine instance ID."), @@ -696,7 +688,6 @@ object Console extends Logging { BatchPredictArgs( ca.batchPredict.inputFilePath, ca.batchPredict.outputFilePath, - ca.batchPredict.queryPartitions, ca.workflow.variantJson, ca.workflow.jsonExtractor), ca.spark,