Skip to content

Commit d006034

Browse files
Tweak Lock.tryAcquire API
1 parent faa89ae commit d006034

File tree

3 files changed

+58
-48
lines changed

3 files changed

+58
-48
lines changed

library/src/libdaemonjvm/server/Lock.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,20 @@ import java.net.ServerSocket
1010

1111
object Lock {
1212

13-
def tryAcquire[T](files: LockFiles)
14-
: Either[LockError, Either[ServerSocket, ServerSocketChannel]] =
15-
tryAcquire(files, LockProcess.default, SocketHandler.server(files.socketPaths))
1613
def tryAcquire[T](
17-
files: LockFiles,
18-
proc: LockProcess
19-
): Either[LockError, Either[ServerSocket, ServerSocketChannel]] =
20-
tryAcquire(files, proc, SocketHandler.server(files.socketPaths))
14+
files: LockFiles
15+
)(
16+
startListening: Either[ServerSocket, ServerSocketChannel] => T
17+
): Either[LockError, T] =
18+
tryAcquire(files, LockProcess.default) {
19+
val socket = SocketHandler.server(files.socketPaths)
20+
startListening(socket)
21+
}
2122

2223
def tryAcquire[T](
2324
files: LockFiles,
24-
proc: LockProcess,
25+
proc: LockProcess
26+
)(
2527
setup: => T
2628
): Either[LockError, T] = {
2729

manual/server/src/libdaemonjvm/TestServer.scala

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package libdaemonjvm
22

3-
import java.io.IOException
3+
import java.io.{Closeable, IOException}
44
import java.nio.file.{Files, Paths}
55
import java.nio.file.attribute.PosixFilePermission
66
import java.util.concurrent.atomic.AtomicInteger
@@ -15,6 +15,26 @@ import libdaemonjvm.server.Lock
1515
object TestServer {
1616
val delay = 2.seconds
1717
def runTestClients = false
18+
19+
def runServer(incomingConn: () => Closeable): Unit = {
20+
val count = new AtomicInteger
21+
while (true) {
22+
println("Waiting for clients")
23+
val c = incomingConn()
24+
val idx = count.incrementAndGet()
25+
val runnable: Runnable = { () =>
26+
println(s"New incoming connection $idx, closing it in $delay")
27+
Thread.sleep(delay.toMillis)
28+
println(s"Closing incoming connection $idx")
29+
c.close()
30+
println(s"Closed incoming connection $idx")
31+
}
32+
val t = new Thread(runnable)
33+
t.start()
34+
Thread.sleep(1000L) // meh, wait for server to be actually listening
35+
}
36+
}
37+
1838
def main(args: Array[String]): Unit = {
1939
val path = Paths.get("data-dir")
2040
if (!Properties.isWin) {
@@ -29,10 +49,9 @@ object TestServer {
2949
)
3050
}
3151
val files = LockFiles.under(path, "libdaemonjvm\\test-server-client\\pipe")
32-
val incomingConn = Lock.tryAcquire(files) match {
33-
case Left(e) => throw e
34-
case Right(Left(s)) => () => s.accept()
35-
case Right(Right(s)) => () => s.accept()
52+
Lock.tryAcquire(files)(s => runServer(() => s.fold(_.accept(), _.accept()))) match {
53+
case Left(e) => throw e
54+
case Right(()) =>
3655
}
3756

3857
def clientRunnable(idx: Int): Runnable = { () =>
@@ -59,22 +78,5 @@ object TestServer {
5978
runClient(3)
6079
runClient(4)
6180
}
62-
63-
val count = new AtomicInteger
64-
while (true) {
65-
println("Waiting for clients")
66-
val c = incomingConn()
67-
val idx = count.incrementAndGet()
68-
val runnable: Runnable = { () =>
69-
println(s"New incoming connection $idx, closing it in $delay")
70-
Thread.sleep(delay.toMillis)
71-
println(s"Closing incoming connection $idx")
72-
c.close()
73-
println(s"Closed incoming connection $idx")
74-
}
75-
val t = new Thread(runnable)
76-
t.setDaemon(true)
77-
t.start()
78-
}
7981
}
8082
}

tests/test/src/libdaemonjvm/tests/TestUtil.scala

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,24 +68,30 @@ object TestUtil {
6868
files: LockFiles,
6969
proc: LockProcess
7070
)(f: Either[LockError, Either[ServerSocket, ServerSocketChannel]] => T): T = {
71-
var maybeServerChannel: Either[LockError, Either[ServerSocket, ServerSocketChannel]] = null
72-
var acceptThreadOpt = Option.empty[Thread]
73-
val accepting = new CountDownLatch(1)
74-
val shouldStop = new AtomicBoolean(false)
71+
var serverChannel: Either[ServerSocket, ServerSocketChannel] = null
72+
var acceptThreadOpt = Option.empty[Thread]
73+
val accepting = new CountDownLatch(1)
74+
val shouldStop = new AtomicBoolean(false)
7575
try {
76-
maybeServerChannel = Lock.tryAcquire(files, proc)
77-
if (Properties.isWin)
78-
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
79-
acceptThreadOpt =
80-
maybeServerChannel.toOption.flatMap(_.left.toOption.map(acceptAndDiscard(
81-
_,
82-
accepting,
83-
() => shouldStop.get()
84-
)))
85-
for (t <- acceptThreadOpt) {
86-
t.start()
87-
accepting.await()
88-
Thread.sleep(1000L) // waiting so that the accept call below effectively awaits client... :|
76+
val maybeServerChannel = Lock.tryAcquire(files, proc) {
77+
serverChannel = SocketHandler.server(files.socketPaths)
78+
if (Properties.isWin)
79+
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
80+
acceptThreadOpt =
81+
serverChannel.left.toOption.map(acceptAndDiscard(
82+
_,
83+
accepting,
84+
() => shouldStop.get()
85+
))
86+
for (t <- acceptThreadOpt) {
87+
t.start()
88+
accepting.await()
89+
// waiting so that the accept call below effectively awaits client... :|
90+
Thread.sleep(
91+
1000L
92+
)
93+
}
94+
serverChannel
8995
}
9096
f(maybeServerChannel)
9197
}
@@ -96,7 +102,7 @@ object TestUtil {
96102
case NonFatal(e) =>
97103
System.err.println(s"Ignoring $e while trying to unblock last accept")
98104
}
99-
for (e <- Option(maybeServerChannel); channel <- e)
105+
for (channel <- Option(serverChannel))
100106
channel.merge.close()
101107
}
102108
}

0 commit comments

Comments
 (0)