Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ import akka.coordination.lease.scaladsl.Lease
}

private def indirectlyConnectedFromSeenCurrentGossip: Set[UniqueAddress] = {
reachability.records.flatMap { r =>
reachability.records.iterator.flatMap { r =>
if (seenBy(r.subject.address)) r.observer :: r.subject :: Nil
else Nil
}.toSet
Expand Down Expand Up @@ -295,6 +295,7 @@ import akka.coordination.lease.scaladsl.Lease
try {
val intersectionOfObserversAndSubjects = indirectlyConnectedFromIntersectionOfObserversAndSubjects
val haveSeenCurrentGossip = indirectlyConnectedFromSeenCurrentGossip

// remove records between the indirectly connected
_reachability = reachability.filterRecords { r =>
// we only retain records for addresses that are still downable
Expand All @@ -306,13 +307,34 @@ import akka.coordination.lease.scaladsl.Lease
_unreachable = reachability.allUnreachableOrTerminated

val additionalDecision = decide()
if (additionalDecision.isIndirectlyConnected)
throw new IllegalStateException(
s"SBR double $additionalDecision decision, downing all instead. " +
s"originalReachability: [$originalReachability], filtered reachability [$reachability], " +
s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should keep this message around and include it in the final IllegalStateException, so that we have full information about both decisions?


nodesToDown(additionalDecision)
if (additionalDecision.isIndirectlyConnected) {
val directlyConnectedObservers =
reachability.allObservers.diff(intersectionOfObserversAndSubjects).diff(haveSeenCurrentGossip)
val unreachableByDirectlyConnectedObservers =
reachability.records.iterator.flatMap { r =>
if (directlyConnectedObservers(r.observer)) Some(r.subject) else None
}.toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't that a collect?

          reachability.records.iterator.collect { case r if directlyConnectedObservers(r.observer) =>
               r.subject
            }.toSet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tend to forget about collect...


// does not change the set of unreachable nodes, we're just ignoring some observations
_reachability = reachability.filterRecords { r =>
// keep observations by the directly connected
directlyConnectedObservers(r.observer) ||
// and also keep observations that no directly-connected observed
!(unreachableByDirectlyConnectedObservers(r.subject))
}
_unreachable = reachability.allUnreachableOrTerminated

val secondOpinion = decide() // or is that third opinion?
if (secondOpinion.isIndirectlyConnected)
throw new IllegalStateException(
s"SBR double $additionalDecision decision, downing all instead. " +
s"originalReachability: [$originalReachability], filtered reachability [$reachability], " +
s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]")

// nodesToDown(additionalDecision) is a subset of the already-known indirectly connected,
// so will be union'd in by caller
nodesToDown(secondOpinion)
} else nodesToDown(additionalDecision)

} finally {
_unreachable = originalUnreachable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class SplitBrainResolverSpec
assertDowningSide(side2, Set(memberA, memberB, memberC, memberD, memberE, memberF, memberG))
}

"double DownIndirectlyConnected when indirectly connected happens before clean partition: {A, B, C} | {(D, E), (F, G)} => {}" in new Setup2(
"down all when indirectly connected happens (covering all of majority side) before clean partition: {A, B, C} | {(D, E), (F, G)} => {}" in new Setup2(
role = None) {
side1 = Set(memberA, memberB, memberC)
side2 = Set(memberD, memberE, memberF, memberG)
Expand All @@ -578,19 +578,70 @@ class SplitBrainResolverSpec

// from side1 of the partition, minority
// D and G are observers and marked E and F as unreachable
// A has marked D and G as unreachable
// D, E, F, G unreachable due to the partition
//
// The records D->E, G->F are not removed in the second decision because they are not detected via seenB
// due to clean partition. That means that the second decision will also be DownIndirectlyConnected. To bail
// out from this situation the strategy will throw IllegalStateException, which is caught and translated to
// DownAll.
intercept[IllegalStateException] {
assertDowningSide(side1, Set(memberA, memberB, memberC))
}

// from side2 of the partition, majority
// due to clean partition.
// For the third decision, D->E and G->F are removed because E and F are known to be unreachable by A by
// the clean partition
assertDowningSide(side1, Set(memberA, memberB, memberC, memberD, memberG))

// from side2 of the partition, "majority"
// A, B, C are on minority side
// E, F are also unreachable
assertDowningSide(side2, Set(memberA, memberB, memberC, memberD, memberE, memberF, memberG))
}

"retain majority when indirectly connected happens (minority side) before clean partition { (A, B), C } | { D, E, F, G } => { D, E, F, G}" in new Setup2(
role = None) {
side1 = Set(memberA, memberB, memberC)
side2 = Set(memberD, memberE, memberF, memberG)
indirectlyConnected = List(memberA -> memberB)

// from side 1 (minority)
// A observed B unreachable before partition
assertDowningSide(side1, Set(memberA, memberB, memberC))

// from side 2 (majority)
// A observed B unreachable before partition
// A, B, C unreachable due to partition
// A is indirectly connected (A observed and is observed unreachable)
//
// A->B not removed for second decision because B is not indirectly connected => A still indirectly connected
// Since B is observed unreachable from a directly connected node, decide as if we hadn't seen the A->B observation
// side2 is a reachable majority, so DownUnreachable
assertDowningSide(side2, Set(memberA, memberB, memberC))
}

"double DownIndirectlyConnected when indirectly connected happens (large majority side) before clean partition { A } | { (C, E, G), B, D, F, H } => {}" in new Setup2(
role = None) {
side1 = Set(memberA)
side2 = Set(memberB, memberC, memberD, memberE, memberF, memberG, memberH)
indirectlyConnected = List(memberE -> memberG, memberG -> memberC)

assertDowningSide(side1, Set(memberA, memberE, memberG))

// assertDowningSide assumes that all on the majority side have seen latest gossip, which removes the indirectly
// connnected when doing the additional decision
{
val strategy = {
val s = createStrategy()
(side1 ++ side2).foreach(s.add)
s
}
val unreachability = (indirectlyConnected ++ side1.map(o => side2.head -> o)).toSet.toList
val r = createReachability(unreachability)
strategy.setReachability(r)

unreachability.foreach { case (_, to) => strategy.addUnreachable(to) }
// let's say that neither C nor G is has seen gossip
strategy.setSeenBy(side2.filterNot(m => (m eq memberC) || (m eq memberG)).map(_.address))

an[IllegalStateException] shouldBe thrownBy {
strategy.nodesToDown() should be((side1 ++ side2).map(_.uniqueAddress))
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more tests that are covering similar thing for the other strategies? KeepOldest, DownAllNodes, LeaseMajority

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be good to expand or add new multi-jvm test that cover this scenario. You can run locally with:

multi-jvm:testOnly akka.cluster.sbr.IndirectlyConnected5NodeSpec

Note that we also have SBR tests in akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/


"KeepOldest" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object TestAddresses {
new Member(UniqueAddress(addressA.copy(host = Some("f")), 0L), 5, Up, Set(defaultDcRole), Version.Zero)
val memberG =
new Member(UniqueAddress(addressA.copy(host = Some("g")), 0L), 6, Up, Set(defaultDcRole), Version.Zero)
val memberH =
new Member(UniqueAddress(addressA.copy(host = Some("h")), 0L), 7, Up, Set(defaultDcRole), Version.Zero)

val memberAWeaklyUp = new Member(memberA.uniqueAddress, Int.MaxValue, WeaklyUp, memberA.roles, Version.Zero)
val memberBWeaklyUp = new Member(memberB.uniqueAddress, Int.MaxValue, WeaklyUp, memberB.roles, Version.Zero)
Expand Down
Loading