-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMain.scala
91 lines (80 loc) · 2.19 KB
/
Main.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import monix.eval.{Callback, Task}
import monix.nio.tcp._
import monix.reactive.Observable
object Main extends App {
implicit val ctx = monix.execution.Scheduler.Implicits.global
def response =
"""HTTP/1.1 200 OK
|Date: Mon, 27 Jul 2009 12:28:53 GMT
|Server: Apache/2.2.14 (Win32)
|Last-Modified: Wed, 22 Jul 2009 19:15:56 GMT
|Content-Length: 88
|Content-Type: text/html
|Connection: Closed
|
|<html>
|<body>
|<h1>Hello, World!</h1>
|</body>
|</html>""".stripMargin
def forkTask(server: TaskServerSocketChannel): Task[_] = {
for {
socket <- server.accept()
_ <- setServer(socket)
} yield {
// new Thread(() => {
// forkTask(server).runAsync
// }).start()
Task.unit
}
}
def setServer(socket: TaskSocketChannel): Task[_] = {
for {
conn <- {
Task.now(readWriteAsync(socket))
}
reader <- {
conn.tcpObservable
}
writer <- conn.tcpConsumer
_ <- reader
.doOnTerminateEval(_ => conn.stopWriting())
.map(_ => response.toArray.map(_.toByte))
.take(1)
.consumeWith(writer)
} yield {
writer
}
}
val serverProgramT = for {
server <- asyncServer(java.net.InetAddress.getByName(null).getHostName, 9002)
finish <- {
val obs = Observable
.fromIterable(1 to 10000)
.mapTask(_ => server.accept())
.mapParallelUnordered(4) { setServer }
val futrue = Task {
obs.doOnTerminateEval(_ => server.close())
.publish
.connect()
}.runAsync
Task.fromFuture(futrue)
}
// _ <- forkTask(server)
} yield {
finish
}
// serverProgramT.runAsync(new Callback[String] {
// override def onSuccess(value: String): Unit = println(s"Echoed $value bytes.")
// override def onError(ex: Throwable): Unit = println(ex)
// })
serverProgramT.runAsync
// def runServer(): Unit = serverProgramT.runAsync(new Callback[String] {
// override def onSuccess(value: String): Unit = {
// println(s"Echoed $value bytes.")
// runServer()
// }
// override def onError(ex: Throwable): Unit = println(ex)
// })
// runServer()
}