Skip to content

Commit cddf1b6

Browse files
committed
Done
1 parent 49f8762 commit cddf1b6

25 files changed

+413
-292
lines changed

README.md

+30-51
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,19 @@
11
# Akka Persistence Plugin for MongoDB
22
[![CircleCI](https://circleci.com/gh/null-vector/akka-reactivemongo-plugin.svg?style=svg)](https://circleci.com/gh/null-vector/akka-reactivemongo-plugin)
33
[![codecov](https://codecov.io/gh/null-vector/akka-reactivemongo-plugin/branch/master/graph/badge.svg)](https://codecov.io/gh/null-vector/akka-reactivemongo-plugin)
4+
[ ![Download](https://api.bintray.com/packages/null-vector/releases/akka-reactivemongo-plugin/images/download.svg) ](https://bintray.com/null-vector/releases/akka-reactivemongo-plugin/_latestVersion)
45

56
This implementation use the [reactivemongo drive](http://reactivemongo.org/).
67

78
## Installation
8-
This plugin support scala `2.12` and `2.13`, akka `2.6.1` and reactivemongo `0.18.x` and `0.20.x`.
9+
This plugin support scala `2.13`, akka `2.6.10` and reactivemongo `1.0.0`.
910

1011
Add in your `build.sbt` the following lines:
1112
```scala
1213
resolvers += Resolver.bintrayRepo("null-vector", "releases")
1314
```
14-
For reactivemongo `0.18.x` use:
15-
16-
[![Download](https://api.bintray.com/packages/null-vector/releases/akka-reactivemongo-plugin/images/download.svg?version=1.2.11) ](https://bintray.com/null-vector/releases/akka-reactivemongo-plugin/1.2.11/link)
17-
1815
```scala
19-
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.2.x"
20-
```
21-
For reactivemongo `0.20.x` use:
22-
23-
[ ![Download](https://api.bintray.com/packages/null-vector/releases/akka-reactivemongo-plugin/images/download.svg) ](https://bintray.com/null-vector/releases/akka-reactivemongo-plugin/_latestVersion)
24-
25-
```scala
26-
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.3.x"
16+
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.0"
2717
```
2818

2919
## Configuration
@@ -39,44 +29,15 @@ akka-persistence-reactivemongo {
3929
See [Connect to a database](http://reactivemongo.org/releases/0.1x/documentation/tutorial/connect-database.html) for more information.
4030

4131
## Events Adapters
42-
Before save any event for you `PersistentActor` it is needed to add the corresponding `EventAdapter`.
43-
44-
Events adapters must extends from `org.nullvector.EventAdapter[E]`, for example:
45-
46-
```scala
47-
class UserAddedEventAdapter extends EventAdapter[UserAdded] {
48-
49-
private implicit val userAddedMapping: BSONDocumentHandler[UserAdded] = Macros.handler[UserAdded]
50-
51-
override val manifest: String = "UserAdded"
52-
53-
override def payloadToBson(payload: UserAdded): BSONDocument = BSON.writeDocument(payload).get
54-
55-
override def bsonToPayload(doc: BSONDocument): UserAdded = BSON.readDocument(doc).get
56-
57-
}
58-
```
59-
And then you have to register the new Adapter:
60-
```scala
61-
val serializer = ReactiveMongoEventSerializer(system)
62-
63-
serializer.addEventAdapter(new UserAddedEventAdapter)
64-
```
65-
A more simple way to create an event adapter by hand is using `EventAdapterMapping`:
66-
```scala
67-
implicit val mapping: BSONDocumentMapping[SolarPlanet] = EventAdapterFactory.mappingOf[SolarPlanet]
68-
val eventAdapter = new EventAdapterMapping[SolarPlanet](manifest = "planet")
69-
```
70-
## EventAdapter Factory
71-
To avoid writing boilerplate code creating Event Adapters, we can use the `EventAdapterFactory`:
32+
Before save any event from your persistent actor it is needed to register the corresponding `EventAdapter`.
7233
```scala
7334
case class ProductId(id: String) extends AnyVal
7435
case class InvoiceItem(productId: ProductId, price: BigDecimal, tax: BigDecimal)
7536
case class InvoiceItemAdded(invoiceItem: InvoiceItem)
7637

7738
val eventAdapter = EventAdapterFactory.adapt[InviceItemAdded](withManifest = "InvoceItemAdded")
7839

79-
ReactiveMongoEventSerializer(ActorSystem()).addEventAdapter(eventAdapter)
40+
ReactiveMongoEventSerializer(actorSystem).addEventAdapter(eventAdapter)
8041
```
8142
It is also possible to override mappings or add unsupported mappings. All added mappings must extends from `BSONReader[_]` or `BSONWriter[_]` or both.
8243
```scala
@@ -101,16 +62,27 @@ case class InvoiceLineAdded(line: InvoiceLine)
10162
implicit val conf = MacroConfiguration(discriminator = "_type", typeNaming = TypeNaming.SimpleName)
10263
val eventAdapter = EventAdapterFactory.adapt[InvoceLineAdded](withManifest = "InvoiceLineAdded")
10364
```
65+
Behind the scene `EventAdapterFactory` use the ReactiveMongo Macros, so you can configure the BSON mappings:
66+
```scala
67+
implicit val conf: Aux[MacroOptions] = MacroConfiguration(discriminator = "_type", typeNaming = TypeNaming.SimpleName)
68+
```
69+
### Custom mappings
70+
You can create mappings by hand:
71+
```scala
72+
implicit val a: BSONDocumentMapping[SolarPlanet] = EventAdapterFactory.mappingOf[SolarPlanet]
73+
val eventAdapter = new EventAdapterMapping[SolarPlanet]("planet")
74+
75+
serializer.addEventAdapter(eventAdapter)
76+
```
10477

10578
## Persistence Id
106-
By default the persistence id has the following form: `<Aggregate>-<Id>`, and the aggregate will be the name of the journal collection.
79+
By default, the persistence id has the following form: `<Aggregate>-<Id>`, and the aggregate will be the name of the MongoDB collection.
10780

108-
You can change the persistence id format by adding your own collection extractor name, implementing the trait `org.nullvector.CollectionNameMapping`,
109-
and registering in the configuration:
81+
You can change the persistence id separator character:
11082
```
11183
akka-persistence-reactivemongo {
11284
mongo-uri = "mongodb://localhost/test?rm.failover=900ms:21x1.30"
113-
collection-name-mapping = "org.nullvector.DefaultCollectionNameMapping"
85+
persistence-id-separator = |
11486
}
11587
```
11688

@@ -119,17 +91,16 @@ akka-persistence-reactivemongo {
11991
Here are some examples of how to use persistence query:
12092
```scala
12193
val readJournal = ReactiveMongoJournalProvider(system).scaladslReadJournal
122-
12394
val tagsSource: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByTag("some_tag", NoOffset)
12495

12596
tagsSource.runWith(Sink.foreach{ envelope => envelope.event match {
12697
case UserAdded(name, age) => // Do Something
12798
}})
12899
```
129100

130-
Sometime is necesary to create an Offset:
101+
Sometime is necessary to create an Offset:
131102
```scala
132-
val offset = ObjectIdOffset(DateTime.now())
103+
val offset = ObjectIdOffset.fromDateTime(DateTime.now()) // A Joda DateTime
133104
```
134105
For streams that never complete like `#persistenceIds`, `#eventsByTag`, etc. it is possible to configure the interval that pulls from the journal:
135106
```
@@ -147,3 +118,11 @@ If you want different refresh intervals from different query, you can add a `Ref
147118
.addAttributes(RefreshInterval(700.millis))
148119
.runWith(Sink.foreach(println))
149120
```
121+
122+
# Test Driven Development
123+
Here is a great feature for TDD lovers: it is possible to configure the plugin to persist in memory and reduce the test latency more than half.
124+
```
125+
akka-persistence-reactivemongo {
126+
persist-in-memory = true
127+
}
128+
```

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ lazy val rxmongoVersion = "1.0.0"
77
lazy val commonSettings = Seq(
88
name := "akka-reactivemongo-plugin",
99
organization := "null-vector",
10-
version := "1.4.0-SNAPSHOT",
10+
version := "1.4.0",
1111
scalaVersion := scala213,
1212
crossScalaVersions := supportedScalaVersions,
1313
scalacOptions := Seq(

core/src/main/resources/reference.conf

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ akka-persistence-reactivemongo-snapshot {
2323

2424
akka-persistence-reactivemongo {
2525
collection-name-mapping = "org.nullvector.DefaultCollectionNameMapping"
26+
persistence-id-separator = "-"
2627
read-journal {
2728
class = "org.nullvector.query.ReactiveMongoJournalProvider"
2829
refresh-interval = 2s
@@ -31,4 +32,12 @@ akka-persistence-reactivemongo {
3132
prefix-collection-journal = journal
3233
prefix-collection-snapshot = snapshot
3334
persist-in-memory = false
35+
persist-in-memory-dispatcher = {
36+
type = PinnedDispatcher
37+
executor = "thread-pool-executor"
38+
thread-pool-executor {
39+
fixed-pool-size = 1
40+
}
41+
throughput = 1
42+
}
3443
}
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
11
package org.nullvector
22

3+
import com.typesafe.config.Config
4+
35
import scala.util.matching.Regex
46

57
trait CollectionNameMapping {
6-
78
def collectionNameOf(persistentId: String): Option[String]
89
}
910

10-
class DefaultCollectionNameMapping extends CollectionNameMapping {
11-
12-
import DefaultCollectionNameMapping._
11+
class DefaultCollectionNameMapping(config: Config) extends CollectionNameMapping {
12+
private val separator: String = config.getString("akka-persistence-reactivemongo.persistence-id-separator")
13+
private val pattern: Regex = buildPattern(separator.head)
1314

1415
override def collectionNameOf(persistentId: String): Option[String] = persistentId match {
15-
case defaultPattern(name, _) => Some(name)
16+
case pattern(name, _) => Some(name)
1617
case _ => None
1718
}
1819

19-
}
20-
21-
object DefaultCollectionNameMapping {
22-
val defaultPattern: Regex = "(\\w+)-(\\w+)".r
20+
private def buildPattern(separator: Char) = s"(\\w+)[$separator](.+)".r
2321
}
2422

0 commit comments

Comments
 (0)