Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.predictionio.workflow

import java.io.Serializable
import java.io.{BufferedWriter, File, FileWriter, Serializable}
import java.net.URI

import com.twitter.bijection.Injection
import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
Expand All @@ -32,12 +33,13 @@ import org.apache.predictionio.workflow.CleanupFunctions
import org.apache.spark.rdd.RDD
import org.json4s._
import org.json4s.native.JsonMethods._
import scala.collection.parallel.ParSeq
import scala.io.Source
import scala.language.existentials

case class BatchPredictConfig(
inputFilePath: String = "batchpredict-input.json",
outputFilePath: String = "batchpredict-output.json",
queryPartitions: Option[Int] = None,
engineInstanceId: String = "",
engineId: Option[String] = None,
engineVersion: Option[String] = None,
Expand Down Expand Up @@ -78,10 +80,6 @@ object BatchPredict extends Logging {
c.copy(outputFilePath = x)
} text("Path to file containing output predictions; a " +
"multi-object JSON file with one object per line.")
opt[Int]("query-partitions") action { (x, c) =>
c.copy(queryPartitions = Some(x))
} text("Limit concurrency of predictions by setting the number " +
"of partitions used internally for the RDD of queries.")
opt[String]("engineId") action { (x, c) =>
c.copy(engineId = Some(x))
} text("Engine ID.")
Expand Down Expand Up @@ -148,6 +146,14 @@ object BatchPredict extends Logging {
engine: Engine[_, _, _, Q, P, _]): Unit = {

try {
val outputFile = try {
new File(new URI(config.outputFilePath))
} catch {
case e: Throwable => new File(config.outputFilePath)
}
outputFile.delete
outputFile.createNewFile

val engineParams = engine.engineInstanceToEngineParams(
engineInstance, config.jsonExtractor)

Expand Down Expand Up @@ -180,21 +186,14 @@ object BatchPredict extends Logging {
val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
servingParamsWithName._2)

val runSparkContext = WorkflowContext(
batch = engineInstance.engineFactory,
executorEnv = engineInstance.env,
mode = "Batch Predict (runner)",
sparkEnv = engineInstance.sparkConf)
val inputQueries: ParSeq[String] = Source.
fromURL(config.inputFilePath).
getLines.
filter(_.trim.nonEmpty).
toSeq.
par

val inputRDD: RDD[String] = runSparkContext.
textFile(config.inputFilePath).
filter(_.trim.nonEmpty)
val queriesRDD: RDD[String] = config.queryPartitions match {
case Some(p) => inputRDD.repartition(p)
case None => inputRDD
}

val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
val parallelPredictions: ParSeq[String] = inputQueries.map { queryString =>
val jsonExtractorOption = config.jsonExtractor
// Extract Query from Json
val query = JsonExtractor.extract(
Expand Down Expand Up @@ -226,7 +225,14 @@ object BatchPredict extends Logging {
compact(render(predictionJValue))
}

predictionsRDD.saveAsTextFile(config.outputFilePath)
val output = new BufferedWriter(new FileWriter(outputFile, true))
try {
parallelPredictions.foreach { line =>
output.write(s"${line}\n")
}
} finally {
output.close
}

} finally {
CleanupFunctions.run()
Expand Down
64 changes: 11 additions & 53 deletions docs/manual/source/batchpredict/index.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,22 @@ limitations under the License.
-->

##Overview
Process predictions for many queries using efficient parallelization
through Spark. Useful for mass auditing of predictions and for
generating predictions to push into other systems.
Process predictions for many queries using efficient parallelization. Useful
for mass auditing of predictions and for generating predictions to push into
other systems.

Batch predict reads and writes multi-object JSON files similar to the
[batch import](/datacollection/batchimport/) format. JSON objects are separated
by newlines and cannot themselves contain unencoded newlines.

##Compatibility
`pio batchpredict` loads the engine and processes queries exactly like
`pio deploy`. There is only one additional requirement for engines
to utilize batch predict:
`pio deploy`.

WARNING: All algorithm classes used in the engine must be
[serializable](https://www.scala-lang.org/api/2.11.8/index.html#scala.Serializable).
**This is already true for PredictionIO's base algorithm classes**, but may be broken
by including non-serializable fields in their constructor. Using the
[`@transient` annotation](http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/)
may help in these cases.

This requirement is due to processing the input queries as a
[Spark RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)
which enables high-performance parallelization, even on a single machine.
WARNING: This feature has changed since its initial release in PredictionIO
0.12.0-incubating. Queries are no longer parallelized with Spark, instead using
Scala's built-in parallel collections. As a result, the output is now a simple
JSON file, not a multi-part Hadoop sequence file.

##Usage

Expand All @@ -53,30 +46,17 @@ Command to process bulk predictions. Takes the same options as `pio deploy` plus
### `--input <value>`

Path to file containing queries; a multi-object JSON file with one
query object per line. Accepts any valid Hadoop file URL.
query object per line.

Default: `batchpredict-input.json`

### `--output <value>`

Path to file to receive results; a multi-object JSON file with one
object per line, the prediction + original query. Accepts any
valid Hadoop file URL. Actual output will be written as Hadoop
partition files in a directory with the output name.
object per line, the prediction + original query.

Default: `batchpredict-output.json`

### `--query-partitions <value>`

Configure the concurrency of predictions by setting the number of partitions
used internally for the RDD of queries. This will directly effect the
number of resulting `part-*` output files. While setting to `1` may seem
appealing to get a single output file, this will remove parallelization
for the batch process, reducing performance and possibly exhausting memory.

Default: number created by Spark context's `textFile` (probably the number
of cores available on the local machine)

### `--engine-instance-id <value>`

Identifier for the trained instance to use for batch predict.
Expand All @@ -90,10 +70,6 @@ Default: the latest trained instance.
A multi-object JSON file of queries as they would be sent to the engine's
HTTP Queries API.

NOTE: Read via
[SparkContext's `textFile`](https://spark.apache.org/docs/latest/rdd-programming-guide.html#external-datasets)
and so may be a single file or any supported Hadoop format.

File: `batchpredict-input.json`

```json
Expand All @@ -119,30 +95,12 @@ This command will run to completion, aborting if any errors are encountered.
A multi-object JSON file of predictions + original queries. The predictions
are JSON objects as they would be returned from the engine's HTTP Queries API.

NOTE: Results are written via Spark RDD's `saveAsTextFile` so each partition
will be written to its own `part-*` file.
See [post-processing results](#post-processing-results).

File 1: `batchpredict-output.json/part-00000`
File: `batchpredict-output.json`

```json
{"query":{"user":"1"},"prediction":{"itemScores":[{"item":"1","score":33},{"item":"2","score":32}]}}
{"query":{"user":"3"},"prediction":{"itemScores":[{"item":"2","score":16},{"item":"3","score":12}]}}
{"query":{"user":"4"},"prediction":{"itemScores":[{"item":"3","score":19},{"item":"1","score":18}]}}
```

File 2: `batchpredict-output.json/part-00001`

```json
{"query":{"user":"2"},"prediction":{"itemScores":[{"item":"5","score":55},{"item":"3","score":28}]}}
{"query":{"user":"5"},"prediction":{"itemScores":[{"item":"1","score":24},{"item":"4","score":14}]}}
```

###Post-processing Results

After the process exits successfully, the parts may be concatenated into a
single output file using a command like:

```bash
cat batchpredict-output.json/part-* > batchpredict-output-all.json
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.sys.process._
case class BatchPredictArgs(
inputFilePath: String = "batchpredict-input.json",
outputFilePath: String = "batchpredict-output.json",
queryPartitions: Option[Int] = None,
variantJson: Option[File] = None,
jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)

Expand Down Expand Up @@ -59,9 +58,6 @@ object RunBatchPredict extends Logging {
"--engine-variant",
batchPredictArgs.variantJson.getOrElse(
new File(engineDirPath, "engine.json")).getCanonicalPath) ++
(if (batchPredictArgs.queryPartitions.isEmpty) Nil
else Seq("--query-partitions",
batchPredictArgs.queryPartitions.get.toString)) ++
(if (verbose) Seq("--verbose") else Nil) ++
Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,28 +331,20 @@ object Console extends Logging {
cmd("batchpredict").
text("Use an engine instance to process batch predictions. This\n" +
"command will pass all pass-through arguments to its underlying\n" +
"spark-submit command. All algorithm classes used in the engine\n" +
"must be serializable.").
"spark-submit command.").
action { (_, c) =>
c.copy(commands = c.commands :+ "batchpredict")
} children(
opt[String]("input") action { (x, c) =>
c.copy(batchPredict = c.batchPredict.copy(inputFilePath = x))
} text("Path to file containing queries; a multi-object JSON file\n" +
"with one query object per line. Accepts any valid Hadoop\n" +
"file URL. Default: batchpredict-input.json"),
"with one query object per line.\n" +
"Default: batchpredict-input.json"),
opt[String]("output") action { (x, c) =>
c.copy(batchPredict = c.batchPredict.copy(outputFilePath = x))
} text("Path to file to receive results; a multi-object JSON file\n" +
"with one object per line, the prediction + original query.\n" +
"Accepts any valid Hadoop file URL. Actual output will be\n" +
"written as Hadoop partition files in a directory with the\n" +
"output name. Default: batchpredict-output.json"),
opt[Int]("query-partitions") action { (x, c) =>
c.copy(batchPredict = c.batchPredict.copy(queryPartitions = Some(x)))
} text("Limit concurrency of predictions by setting the number\n" +
"of partitions used internally for the RDD of queries.\n" +
"Default: number created by Spark context's `textFile`"),
"Default: batchpredict-output.json"),
opt[String]("engine-instance-id") action { (x, c) =>
c.copy(engineInstanceId = Some(x))
} text("Engine instance ID."),
Expand Down Expand Up @@ -696,7 +688,6 @@ object Console extends Logging {
BatchPredictArgs(
ca.batchPredict.inputFilePath,
ca.batchPredict.outputFilePath,
ca.batchPredict.queryPartitions,
ca.workflow.variantJson,
ca.workflow.jsonExtractor),
ca.spark,
Expand Down