summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/pi.scala
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/pi.scala')
-rw-r--r--test/disabled/presentation/akka/src/pi.scala108
1 files changed, 108 insertions, 0 deletions
diff --git a/test/disabled/presentation/akka/src/pi.scala b/test/disabled/presentation/akka/src/pi.scala
new file mode 100644
index 0000000000..b4c644052c
--- /dev/null
+++ b/test/disabled/presentation/akka/src/pi.scala
@@ -0,0 +1,108 @@
+
+import akka.actor.{Actor, PoisonPill}
+import Actor._
+import akka.routing.{Routing, CyclicIterator}
+import Routing._
+
+import java.util.concurrent.CountDownLatch
+
+object Pi extends App {
+
+ calculate/*#*/(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
+
+ // ====================
+ // ===== Messages =====
+ // ====================
+ sealed trait PiMessage
+ case object Calculate extends PiMessage/*#*/
+ case class Work(start: Int, nrOfElements: Int) extends PiMessage
+ case class Result(value: Double) extends PiMessage
+
+ // ==================
+ // ===== Worker =====
+ // ==================
+ class Worker extends Actor/*#*/ {
+
+ // define the work
+ def calculatePiFor(start: Int, nrOfElements: Int): Double = {
+ var acc = 0.0
+ for (i <- start until (start + nrOfElements))
+ acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
+ acc
+ }
+
+ def receive /*?*/ = {
+ case Work(start, nrOfElements) =>
+ self reply/*#*/ Result(calculatePiFor(start, nrOfElements)) // perform the work // TODO: this currently returns wrong position for the symbol
+ }
+ }
+
+ // ==================
+ // ===== Master =====
+ // ==================
+ class Master(
+ nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
+ extends Actor {
+
+ var pi: Double = _
+ var nrOfResults: Int = _
+ var start: Long = _
+
+ // create the workers
+ val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]./*!*/start())
+
+ // wrap them with a load-balancing router
+ val router = Routing./*!*/loadBalancerActor(CyclicIterator(workers))./*!*/start()
+
+ // message handler
+ def receive = {
+ case Calculate =>
+ // schedule work
+ //for (start <- 0 until nrOfMessages) router ! Work(start, nrOfElements)
+ for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
+
+ // send a PoisonPill to all workers telling them to shut down themselves
+ router./*!*/!(Broadcast(PoisonPill))
+
+ // send a PoisonPill to the router, telling him to shut himself down
+ router ! PoisonPill
+
+ case Result(value) =>
+ // handle result from the worker
+ pi += value
+ nrOfResults/*#*/ += 1
+ if (nrOfResults == nrOfMessages) self./*!*/stop()
+ }
+
+ override def preStart() {
+ start = System.currentTimeMillis
+ }
+
+ override def postStop() {
+ // tell the world that the calculation is complete
+ println(
+ "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
+ .format(pi, (System.currentTimeMillis - start)))
+ latch/*#*/.countDown()
+ }
+ }
+
+ // ==================
+ // ===== Run it =====
+ // ==================
+ def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
+
+ // this latch is only plumbing to know when the calculation is completed
+ val latch = new CountDownLatch(1)
+
+ // create the master
+ val master = actorOf(
+ new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start()
+
+ // start the calculation
+ master ! Calculate
+
+ // wait for master to shut down
+ latch.await()
+ }
+}