|
3 | 3 | A Parameter Server implementation based on the
|
4 | 4 | [Streaming API](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html) of [Apache Flink](http://flink.apache.org/).
|
5 | 5 |
|
6 |
| -Parameter Server is an abstraction for model-parallel machine learning training |
7 |
| -(see the work of [Li et al.](https://doi.org/10.1145/2640087.2644155)). |
| 6 | +Parameter Server is an abstraction for model-parallel machine learning (see the work of [Li et al.](https://doi.org/10.1145/2640087.2644155)). |
8 | 7 | Our implementation could be used with the Streaming API:
|
9 |
| -it can take a `DataStream` of data-points as input, and produce a `DataStream` of model updates. |
10 |
| -Currently only asynchronous training is supported. |
| 8 | +it can take a `DataStream` of data-points as input, and produce a `DataStream` of model updates. This way, we can implement both online and offline ML algorithms. Currently only asynchronous training is supported. |
11 | 9 |
|
12 | 10 | # Build
|
13 |
| -Use [SBT](http://www.scala-sbt.org/). |
| 11 | +Use [SBT](http://www.scala-sbt.org/). It can be published to the local SBT cache |
14 | 12 |
|
15 |
| -# Docs |
16 |
| -See Scala Docs. |
| 13 | +```bash |
| 14 | +sbt publish-local |
| 15 | +``` |
| 16 | + |
| 17 | +and then added to a project as a dependency |
| 18 | + |
| 19 | +```sbt |
| 20 | +libraryDependencies += "hu.sztaki.ilab" % "flink-ps" % "0.1.0" |
| 21 | +``` |
| 22 | + |
| 23 | +# API |
| 24 | + |
| 25 | +We can use the Parameter Server in the following way: |
| 26 | + |
| 27 | + |
| 28 | + |
| 29 | +Basically, we can access the Parameter Server by defining a [```WorkerLogic```](https://github.com/gaborhermann/flink-ps/blob/master/src/main/scala/hu/sztaki/ilab/ps/WorkerLogic.scala), which can *pull* or *push* parameters. We provide input data to the worker via a Flink ```DataStream```. |
| 30 | + |
| 31 | +We need to implement the ```WorkerLogic``` trait |
| 32 | +```scala |
| 33 | +trait WorkerLogic[T, P, WorkerOut] extends Serializable { |
| 34 | + def onRecv(data: T, ps: ParameterServerClient[P, WorkerOut]): Unit |
| 35 | + def onPullRecv(paramId: Int, paramValue: P, ps: ParameterServerClient[P, WorkerOut]): Unit |
| 36 | +} |
| 37 | +``` |
| 38 | +where we can handle incoming data (`onRecv`), *pull* parameters from the Parameter Server, handle the answers to the pulls (`onPullRecv`), and *push* parameters to the Parameter Server or *output* results. We can use the ```ParameterServerClient```: |
| 39 | +```scala |
| 40 | +trait ParameterServerClient[P, WorkerOut] extends Serializable { |
| 41 | + def pull(id: Int): Unit |
| 42 | + def push(id: Int, deltaUpdate: P): Unit |
| 43 | + def output(out: WorkerOut): Unit |
| 44 | +} |
| 45 | +``` |
| 46 | + |
| 47 | +When we defined our worker logic we can wire it into a Flink job with the `transform` method of [```FlinkParameterServer```](src/main/scala/hu/sztaki/ilab/ps/FlinkParameterServer.scala). |
| 48 | + |
| 49 | +```scala |
| 50 | +def transform[T, P, WorkerOut]( |
| 51 | + trainingData: DataStream[T], |
| 52 | + workerLogic: WorkerLogic[T, P, WorkerOut], |
| 53 | + paramInit: => Int => P, |
| 54 | + paramUpdate: => (P, P) => P, |
| 55 | + workerParallelism: Int, |
| 56 | + psParallelism: Int, |
| 57 | + iterationWaitTime: Long): DataStream[Either[WorkerOut, (Int, P)]] |
| 58 | +``` |
| 59 | + |
| 60 | +Besides the `trainingData` stream and the `workerLogic`, we need to define how the Parameter Server should initialize a parameter based on the parameter id (`paramInit`), and how to update a parameter based on a received push (`paramUpdate`). We must also define how many parallel instances of workers and parameter servers we should use (`workerParallelism` and `psParallelism`), and the `iterationWaitTime` (see [Limitations](README.md#limitations)). |
| 61 | + |
| 62 | +# Limitations |
| 63 | + |
| 64 | +We implement the two-way communication of workers and the parameter server with Flink Streaming [iterations](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#iterations), which is not yet production-ready. The main issues are |
| 65 | +- **Sometimes deadlocks due to cyclic backpressure.** A workaround could be to limiting the amount of unanswered pulls per worker (e.g. by using [WorkerLogic.addPullLimiter](src/main/scala/hu/sztaki/ilab/ps/WorkerLogic.scala#L169)), or manually limiting the input rate of data on the input stream. In any case, deadlock would still be possible. |
| 66 | +- **Termination is not defined for finite input.** As a workaround, we can set the `iterationwaitTime` for the milliseconds to wait before shutting down if there's no messages sent along the iteration (see the Flink (Java Docs)https://ci.apache.org/projects/flink/flink-docs-master/api/java/)). |
| 67 | +- **No fault tolerance.** |
| 68 | + |
| 69 | +All these issues are being addressed in [FLIP-15](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132) and [FLIP-16](https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance) and soon to be fixed. Until then, we need to use workarounds. |
0 commit comments