-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathPiCostWeb.scala
186 lines (152 loc) · 4.71 KB
/
PiCostWeb.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Web-based management and monitoring of Akka actors
package com.earldouglas.picostweb
import akka.actor._
import akka.routing.Routing._
import akka.routing.CyclicIterator
import org.scalatra._
class PiCostWeb extends ScalatraServlet with UrlSupport {
val accumulator = Actor.actorOf[Accumulator].start
var coordinator = new Coordinator(accumulator)
new Thread(coordinator).start
before {
contentType = "text/html"
}
get("/") {
<html>
<head>
<title>Pi Cost</title>
</head>
<body>
<h1>Pi: <span id="pi"></span></h1>
<div>Work size: {coordinator.work} <a href="harder">increase</a> <a href="softer">decrease</a></div>
<div>Work delay: {coordinator.delay} ms <a href="faster">faster</a> <a href="slower">slower</a></div>
<div>Workers: {coordinator.workerCount} <a href="addworker">add</a> <a href="removeworker">remove</a></div>
<div>Actors: <span id="status"></span></div>
<script type="text/javascript">
<!--
function ajaxThingy(url, id) {
var xmlHttpReq = false;
if (window.XMLHttpRequest) {
xmlHttpReq = new XMLHttpRequest();
} else if (window.ActiveXObject) {
xmlHttpReq = new ActiveXObject("Microsoft.XMLHTTP");
}
xmlHttpReq.open('GET', url, true);
xmlHttpReq.onreadystatechange = function() {
if (xmlHttpReq.readyState == 4) {
document.getElementById(id).innerHTML = xmlHttpReq.responseText;
}
}
xmlHttpReq.send();
}
ajaxThingy("/pi", "pi");
setInterval('ajaxThingy("/pi", "pi")', 250)
setInterval('ajaxThingy("/status", "status")', 250)
document.forms['compute'].onsubmit = new Function('computePi(); return false')
-->
</script>
</body>
</html>
}
get("/pi") {
(accumulator !! GetEstimate).getOrElse(4)
}
get("/status") {
(accumulator !! GetStatus).getOrElse(Nil) match {
case ss: Statuses =>
xml.NodeSeq.fromSeq(List((ss.values.map(status(_)))).flatten)
}
}
get("/addworker") {
coordinator.addWorker()
redirect("/")
}
get("/removeworker") {
coordinator.removeWorker()
redirect("/")
}
get("/faster") {
coordinator.goFaster
redirect("/")
}
get("/slower") {
coordinator.goSlower
redirect("/")
}
get("/harder") {
coordinator.workHarder
redirect("/")
}
get("/softer") {
coordinator.hardlyWork
redirect("/")
}
private def status(status: Status) = {
<div><span>{status.uuid}: </span><span>{status.mailboxSize} in queue</span></div>
}
protected def contextPath = request.getContextPath
}
class Coordinator(accumulator: ActorRef) extends Runnable {
var activeWorkers: List[ActorRef] = Nil
var inactiveWorkers: List[ActorRef] = Nil
var workers: ActorRef = _
implicit val accumulatorOption = Option(accumulator)
var x = 0
var sleepTime = 1024
var workSize = 1024
def addWorker() = {
inactiveWorkers match {
case Nil => activeWorkers = Actor.actorOf[Worker].start :: activeWorkers
case head :: tail =>
activeWorkers = inactiveWorkers.head :: activeWorkers
inactiveWorkers = inactiveWorkers.tail
}
workers = loadBalancerActor(new CyclicIterator(activeWorkers))
}
def removeWorker() = {
inactiveWorkers = activeWorkers.head :: inactiveWorkers
activeWorkers = activeWorkers.tail
workers = loadBalancerActor(new CyclicIterator(activeWorkers))
}
def workerCount = activeWorkers.size
def delay = sleepTime
def goFaster = sleepTime /= 2
def goSlower = sleepTime *= 2
def work = workSize
def workHarder = workSize *= 2
def hardlyWork = workSize /= 2
def run() = {
while(true) {
Thread.sleep(sleepTime)
val length = workSize
if (activeWorkers.size >= 1) {
workers ! (x until x + length)
x += length
}
}
}
}
case object GetEstimate
case object GetStatus
case class Statuses(val values: Iterable[Status])
case class Status(val uuid: Uuid, val mailboxSize: Int)
case class Result(val value: Double, val status: Status)
class Accumulator extends Actor {
var pi: Double = _
val statuses = new collection.mutable.HashMap[Uuid, Status]
def receive = {
case GetEstimate => self reply pi
case GetStatus => self.reply(new Statuses(statuses.values))
case result: Result =>
pi += result.value
statuses(result.status.uuid) = result.status
}
}
class Worker extends Actor {
def receive = {
case range: Range =>
val value = (for (k <- range) yield (4 * math.pow(-1, k) / (2 * k + 1))).sum
val result = new Result(value, new Status(self.uuid, self.mailboxSize))
self reply result
}
}