Skip to content

Commit 2371cc6

Browse files
zhuyaogaipan3793
authored andcommitted
[KYUUBI #5216] Workaround for negative counter in SessionLimiter
### _Why are the changes needed?_ Fix: #5216 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5217 from zhuyaogai/issue-5216. Closes #5216 b8d2e17 [Fantasy-Jay] Limit counter resource leak in SessionLimiter. cda3702 [Fantasy-Jay] Limit counter resource leak in SessionLimiter. 36272d1 [Fantasy-Jay] fix test bug. 1e282d2 [Fantasy-Jay] Limit counter resource leak in SessionLimiter. 7fc389f [Fantasy-Jay] Limit counter resource leak in SessionLimiter. Authored-by: Fantasy-Jay <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent 9ea0a1b commit 2371cc6

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int, userIpAddressLimit
9595

9696
private def decrLimitCount(key: String): Unit = {
9797
_counters.get(key) match {
98-
case count: AtomicInteger => count.decrementAndGet()
98+
case count: AtomicInteger =>
99+
count.accumulateAndGet(1, (l, r) => if (l > 0) l - r else l)
99100
case _ =>
100101
}
101102
}
@@ -121,12 +122,6 @@ class SessionLimiterWithAccessControlListImpl(
121122
}
122123
}
123124

124-
override def decrement(userIpAddress: UserIpAddress): Unit = {
125-
if (!unlimitedUsers.contains(userIpAddress.user)) {
126-
super.decrement(userIpAddress)
127-
}
128-
}
129-
130125
private[kyuubi] def setUnlimitedUsers(unlimitedUsers: Set[String]): Unit = {
131126
this.unlimitedUsers = unlimitedUsers
132127
}

kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import java.util.concurrent.{CountDownLatch, Executors}
2020
import java.util.concurrent.atomic.LongAdder
2121

2222
import scala.collection.JavaConverters._
23+
import scala.util.Random
2324

2425
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
26+
import org.apache.kyuubi.util.ThreadUtils
2527

2628
class SessionLimiterSuite extends KyuubiFunSuite {
2729

@@ -149,4 +151,52 @@ class SessionLimiterSuite extends KyuubiFunSuite {
149151
assert(caught.getMessage.equals(
150152
"Connection denied because the user is in the deny user list. (user: user002)"))
151153
}
154+
155+
test("test refresh unlimited users and deny users") {
156+
val random: Random = new Random()
157+
val latch = new CountDownLatch(600)
158+
val userLimit = 100
159+
val ipAddressLimit = 101
160+
val userIpAddressLimit = 102
161+
val limiter =
162+
SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, Set.empty, Set.empty)
163+
val threadPool = ThreadUtils.newDaemonCachedThreadPool("test-refresh-config")
164+
165+
def checkUserLimit(userIpAddress: UserIpAddress): Unit = {
166+
for (i <- 0 until 200) {
167+
threadPool.execute(() => {
168+
try {
169+
Thread.sleep(random.nextInt(200))
170+
limiter.increment(userIpAddress)
171+
} catch {
172+
case _: Throwable =>
173+
} finally {
174+
Thread.sleep(random.nextInt(500))
175+
// finally call limiter#decrement method.
176+
limiter.decrement(userIpAddress)
177+
latch.countDown()
178+
}
179+
})
180+
}
181+
}
182+
183+
checkUserLimit(UserIpAddress("user001", "127.0.0.1"))
184+
checkUserLimit(UserIpAddress("user002", "127.0.0.2"))
185+
checkUserLimit(UserIpAddress("user003", "127.0.0.3"))
186+
187+
Thread.sleep(100)
188+
// set unlimited users and deny users
189+
SessionLimiter.resetUnlimitedUsers(limiter, Set("user001"))
190+
SessionLimiter.resetDenyUsers(limiter, Set("user002"))
191+
192+
Thread.sleep(300)
193+
// unset unlimited users and deny users
194+
SessionLimiter.resetUnlimitedUsers(limiter, Set.empty)
195+
SessionLimiter.resetDenyUsers(limiter, Set.empty)
196+
197+
latch.await()
198+
threadPool.shutdown()
199+
limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
200+
.foreach(c => assert(c.get() == 0))
201+
}
152202
}

0 commit comments

Comments
 (0)