Skip to content

Commit 367650e

Browse files
committed
Fix tests by making them respect sequence_number order
1 parent cdfcad3 commit 367650e

File tree

2 files changed

+22
-21
lines changed

2 files changed

+22
-21
lines changed

core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala

+18-17
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ trait PartitionedJournalSpecTestCases {
6262
"A journal" must {
6363
"store events concurrently without any gaps or duplicates among ordering (offset) values" in {
6464
//given
65-
val perId = "perId-1"
6665
val numOfSenders = 5
6766
val batchSize = 1000
6867
val senders = List.fill(numOfSenders)(TestProbe()).zipWithIndex
@@ -72,29 +71,31 @@ trait PartitionedJournalSpecTestCases {
7271
.sequence {
7372
senders.map { case (sender, idx) =>
7473
Future {
75-
writeMessages((idx * batchSize) + 1, (idx + 1) * batchSize, perId, sender.ref, writerUuid)
74+
writeMessages((idx * batchSize) + 1, (idx + 1) * batchSize, s"perId-${idx + 1}", sender.ref, writerUuid)
7675
}
7776
}
7877
}
7978
.futureValue(Timeout(Span(1, Minute)))
8079

8180
//then
82-
val journalOps = new ScalaPostgresReadJournalOperations(system)
83-
journalOps.withCurrentEventsByPersistenceId()(perId) { tp =>
84-
tp.request(Long.MaxValue)
85-
val replayedMessages = (1 to batchSize * numOfSenders).map { _ =>
86-
tp.expectNext()
87-
}
88-
tp.expectComplete()
89-
val orderings = replayedMessages.map(_.offset).collect { case Sequence(value) =>
90-
value
91-
}
92-
orderings.size should equal(batchSize * numOfSenders)
93-
val minOrd = orderings.min
94-
val maxOrd = orderings.max
95-
val expectedOrderings = (minOrd to maxOrd).toList
81+
senders.foreach { case (_, idx) =>
82+
val journalOps = new ScalaPostgresReadJournalOperations(system)
83+
journalOps.withCurrentEventsByPersistenceId()(s"perId-${idx + 1}") { tp =>
84+
tp.request(Long.MaxValue)
85+
val replayedMessages = (1 to batchSize).map { _ =>
86+
tp.expectNext()
87+
}
88+
tp.expectComplete()
89+
val orderings = replayedMessages.map(_.offset).collect { case Sequence(value) =>
90+
value
91+
}
92+
orderings.size should equal(batchSize)
93+
val minOrd = orderings.min
94+
val maxOrd = orderings.max
95+
val expectedOrderings = (minOrd to maxOrd).toList
9696

97-
(orderings.sorted should contain).theSameElementsInOrderAs(expectedOrderings)
97+
(orderings.sorted should contain).theSameElementsInOrderAs(expectedOrderings)
98+
}
9899
}
99100
}
100101
}

core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package akka.persistence.postgres.query
77

88
import java.util.concurrent.atomic.AtomicLong
9-
109
import akka.actor.{ ActorRef, ActorSystem }
1110
import akka.pattern.ask
1211
import akka.persistence.postgres.config.JournalSequenceRetrievalConfig
@@ -27,6 +26,7 @@ import slick.jdbc.{ JdbcBackend, JdbcCapabilities }
2726

2827
import scala.concurrent.Future
2928
import scala.concurrent.duration._
29+
import scala.util.Random
3030

3131
abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends QueryTestSpec(schemaType.configName) {
3232
private val log = LoggerFactory.getLogger(classOf[JournalSequenceActorTest])
@@ -76,7 +76,7 @@ abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends Quer
7676
JournalRow(id, deleted = false, "id", id, Array(0.toByte), Nil, emptyJson)
7777
}
7878
.grouped(10000)
79-
.mapAsync(4) { rows =>
79+
.mapAsync(1) { rows =>
8080
db.run(journalTable.forceInsertAll(rows))
8181
}
8282
.runWith(Sink.ignore)
@@ -112,7 +112,7 @@ abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends Quer
112112
JournalRow(id, deleted = false, "id", id, Array(0.toByte), Nil, emptyJson)
113113
}
114114
.grouped(10000)
115-
.mapAsync(4) { rows =>
115+
.mapAsync(1) { rows =>
116116
db.run(journalTable.forceInsertAll(rows))
117117
}
118118
.runWith(Sink.ignore)
@@ -145,7 +145,7 @@ abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends Quer
145145
JournalRow(id, deleted = false, "id", id, Array(0.toByte), Nil, emptyJson)
146146
}
147147
.grouped(10000)
148-
.mapAsync(4) { rows =>
148+
.mapAsync(1) { rows =>
149149
db.run(journalTable.forceInsertAll(rows))
150150
}
151151
.runWith(Sink.ignore)

0 commit comments

Comments
 (0)