Skip to content

Commit dfc1ae4

Browse files
Merge pull request #1 from alexarchambault/develop
Add client API
2 parents d446bb4 + 7a4c2e3 commit dfc1ae4

File tree

10 files changed

+112
-19
lines changed

10 files changed

+112
-19
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package libdaemonjvm.client
2+
3+
import java.net.Socket
4+
import java.nio.channels.SocketChannel
5+
import java.nio.charset.StandardCharsets
6+
import java.nio.file.Files
7+
8+
import libdaemonjvm.internal.{LockProcess, SocketFile, SocketHandler}
9+
import libdaemonjvm.LockFiles
10+
11+
object Connect {
12+
13+
def tryConnect(files: LockFiles): Option[Either[ConnectError, Either[Socket, SocketChannel]]] =
14+
tryConnect(files, LockProcess.default)
15+
16+
def tryConnect(
17+
files: LockFiles,
18+
proc: LockProcess
19+
): Option[Either[ConnectError, Either[Socket, SocketChannel]]] = {
20+
21+
def ifProcessRunning(pid: Int): Either[ConnectError, Either[Socket, SocketChannel]] =
22+
SocketFile.connect(files.socketPaths) match {
23+
case Left(e) =>
24+
Left(new ConnectError.ZombieFound(pid, e))
25+
case Right(s) =>
26+
Right(s)
27+
}
28+
29+
def ifFiles(hasLock: Boolean): Option[Either[ConnectError, Either[Socket, SocketChannel]]] = {
30+
val b = Files.readAllBytes(files.pidFile)
31+
32+
// FIXME Catch malformed content errors here?
33+
val s = new String(b, StandardCharsets.UTF_8).trim()
34+
val pidOpt =
35+
if (s.nonEmpty && s.forall(_.isDigit)) Some(s.toInt)
36+
else None
37+
38+
pidOpt.flatMap { pid =>
39+
if (proc.isRunning(pid)) Some(ifProcessRunning(pid))
40+
else None
41+
}
42+
}
43+
44+
def pidSocketFilesFound(): Boolean =
45+
Files.exists(files.pidFile) &&
46+
(SocketHandler.usesWindowsPipe || Files.exists(files.socketPaths.path))
47+
48+
if (pidSocketFilesFound())
49+
ifFiles(hasLock = false)
50+
else
51+
None
52+
}
53+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package libdaemonjvm.client
2+
3+
sealed abstract class ConnectError(
4+
message: String,
5+
cause: Throwable = null
6+
) extends Exception(message, cause)
7+
8+
object ConnectError {
9+
final class ZombieFound(val pid: Int, val connectionError: Throwable)
10+
extends ConnectError(s"Cannot connect to process $pid", connectionError)
11+
}

library/src/libdaemonjvm/LockProcess.scala renamed to library/src/libdaemonjvm/internal/LockProcess.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
package libdaemonjvm
2-
3-
import libdaemonjvm.internal.{Pid, Processes}
1+
package libdaemonjvm.internal
42

53
trait LockProcess {
64
def pid(): Int

library/src/libdaemonjvm/internal/SocketFile.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,29 @@ import libdaemonjvm.SocketPaths
99

1010
object SocketFile {
1111
def canConnect(paths: SocketPaths): Either[Throwable, Unit] = {
12+
var s: Either[Throwable, Either[Socket, SocketChannel]] = null
13+
try {
14+
s = connect(paths)
15+
s.map(_ => ())
16+
}
17+
finally if (s != null)
18+
s.toOption.foreach(_.merge.close())
19+
}
20+
def connect(paths: SocketPaths): Either[Throwable, Either[Socket, SocketChannel]] = {
1221
var s: Either[Socket, SocketChannel] = null
1322
try {
1423
s = SocketHandler.client(paths)
15-
Right(())
24+
Right(s)
1625
}
1726
catch {
1827
case e: SocketException =>
28+
if (s != null)
29+
s.merge.close()
1930
Left(e)
2031
case e: SocketExceptionLike =>
32+
if (s != null)
33+
s.merge.close()
2134
Left(e)
2235
}
23-
finally if (s != null)
24-
s.merge.close()
2536
}
2637
}

library/src/libdaemonjvm/Lock.scala renamed to library/src/libdaemonjvm/server/Lock.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
package libdaemonjvm
1+
package libdaemonjvm.server
22

33
import java.nio.channels.ServerSocketChannel
44
import java.nio.charset.StandardCharsets
55
import java.nio.file.{Files, Path}
66

7-
import libdaemonjvm.internal.{SocketFile, SocketHandler}
7+
import libdaemonjvm.LockFiles
8+
import libdaemonjvm.internal.{LockProcess, SocketFile, SocketHandler}
89
import java.net.ServerSocket
910

1011
object Lock {

library/src/libdaemonjvm/LockError.scala renamed to library/src/libdaemonjvm/server/LockError.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package libdaemonjvm
1+
package libdaemonjvm.server
22

33
import java.nio.file.Path
44

library/src/libdaemonjvm/SigInt.scala renamed to library/src/libdaemonjvm/server/SigInt.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package libdaemonjvm
1+
package libdaemonjvm.server
22

33
import sun.misc.{Signal, SignalHandler}
44

manual/server/src/libdaemonjvm/TestServer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import scala.concurrent.duration._
66
import java.util.concurrent.atomic.AtomicInteger
77

88
import libdaemonjvm.internal.SocketFile
9+
import libdaemonjvm.server.Lock
910
import java.io.IOException
1011

1112
object TestServer {

tests/test/src/libdaemonjvm/tests/LockTests.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,24 @@ package libdaemonjvm.tests
22

33
import com.eed3si9n.expecty.Expecty.expect
44
import libdaemonjvm._
5-
import libdaemonjvm.internal.SocketFile
5+
import libdaemonjvm.internal._
6+
import libdaemonjvm.server._
67

78
import java.nio.file.Files
8-
import libdaemonjvm.internal.SocketHandler
9+
import libdaemonjvm.client.Connect
910

1011
class LockTests extends munit.FunSuite {
1112

1213
test("simple") {
1314
TestUtil.withTestDir { dir =>
14-
TestUtil.tryAcquire(dir) { (files, maybeServerChannel) =>
15+
val files = TestUtil.lockFiles(dir)
16+
TestUtil.tryAcquire(files) { maybeServerChannel =>
1517
expect(maybeServerChannel.isRight)
16-
val canConnect = SocketFile.canConnect(files.socketPaths)
17-
expect(canConnect.isRight)
18+
val connectRes = Connect.tryConnect(files)
19+
expect(connectRes.exists(_.isRight))
1820
}
21+
val after = Connect.tryConnect(files)
22+
expect(after.exists(_.isLeft)) // zombie found (our own process, not listening on the socket)
1923
}
2024
}
2125

tests/test/src/libdaemonjvm/tests/TestUtil.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@ import java.util.concurrent.atomic.AtomicInteger
55
import java.nio.channels.ServerSocketChannel
66

77
import libdaemonjvm._
8+
import libdaemonjvm.internal._
9+
import libdaemonjvm.server._
810
import java.net.ServerSocket
911
import scala.util.Properties
1012
import java.util.concurrent.CountDownLatch
1113
import java.net.Socket
14+
import java.util.concurrent.atomic.AtomicBoolean
15+
import libdaemonjvm.internal.SocketFile
1216

1317
object TestUtil {
1418
private lazy val testDirBase = {
@@ -61,12 +65,17 @@ object TestUtil {
6165
var maybeServerChannel: Either[LockError, Either[ServerSocket, ServerSocketChannel]] = null
6266
var acceptThreadOpt = Option.empty[Thread]
6367
val accepting = new CountDownLatch(1)
68+
val shouldStop = new AtomicBoolean(false)
6469
try {
6570
maybeServerChannel = Lock.tryAcquire(files, proc)
6671
if (Properties.isWin)
6772
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
6873
acceptThreadOpt =
69-
maybeServerChannel.toOption.flatMap(_.left.toOption.map(acceptAndDiscard(_, accepting)))
74+
maybeServerChannel.toOption.flatMap(_.left.toOption.map(acceptAndDiscard(
75+
_,
76+
accepting,
77+
() => shouldStop.get()
78+
)))
7079
for (t <- acceptThreadOpt) {
7180
t.start()
7281
accepting.await()
@@ -75,14 +84,19 @@ object TestUtil {
7584
f(maybeServerChannel)
7685
}
7786
finally {
87+
shouldStop.set(true)
88+
SocketFile.canConnect(files.socketPaths) // unblock the server thread last accept
7889
for (e <- Option(maybeServerChannel); channel <- e)
7990
channel.merge.close()
80-
acceptThreadOpt.foreach(_.interrupt()) // not sure this has an effect... :|
8191
}
8292
}
8393

8494
val acceptAndDiscardCount = new AtomicInteger
85-
def acceptAndDiscard(s: ServerSocket, accepting: CountDownLatch): Thread =
95+
def acceptAndDiscard(
96+
s: ServerSocket,
97+
accepting: CountDownLatch,
98+
shouldStop: () => Boolean
99+
): Thread =
86100
new Thread(
87101
s"libdaemonjvm-tests-accept-and-discard-${acceptAndDiscardCount.incrementAndGet()}"
88102
) {
@@ -99,7 +113,7 @@ object TestUtil {
99113
}
100114
override def run(): Unit = {
101115
accepting.countDown()
102-
while (true) {
116+
while (!shouldStop()) {
103117
val client = s.accept()
104118
// closing the client socket in the background, as this call seems to block a few seconds
105119
closeSocket(client)

0 commit comments

Comments
 (0)