summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala')
-rw-r--r--test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala165
1 files changed, 165 insertions, 0 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala b/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala
new file mode 100644
index 0000000000..bca7936116
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dataflow
+
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue }
+
+import akka.event.EventHandler
+import akka.actor.{ Actor, ActorRef }
+import akka.actor.Actor._
+import akka.dispatch.CompletableFuture
+import akka.AkkaException
+import akka.japi.{ Function, Effect }
+
+/**
+ * Implements Oz-style dataflow (single assignment) variables.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+object DataFlow {
+ object Start
+ object Exit
+
+ class DataFlowVariableException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
+
+ /**
+ * Executes the supplied thunk in another thread.
+ */
+ def thread(body: => Unit): Unit = spawn(body)
+
+ /**
+ * JavaAPI.
+ * Executes the supplied Effect in another thread.
+ */
+ def thread(body: Effect): Unit = spawn(body.apply)
+
+ /**
+ * Executes the supplied function in another thread.
+ */
+ def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
+ actorOf(new ReactiveEventBasedThread(body)).start()
+
+ /**
+ * JavaAPI.
+ * Executes the supplied Function in another thread.
+ */
+ def thread[A <: AnyRef, R <: AnyRef](body: Function[A, R]) =
+ actorOf(new ReactiveEventBasedThread(body.apply)).start()
+
+ private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
+ extends Actor {
+ def receive = {
+ case Exit => self.stop()
+ case message => self.reply(body(message.asInstanceOf[A]))
+ }
+ }
+
+ private object DataFlowVariable {
+ private sealed abstract class DataFlowVariableMessage
+ private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
+ private object Get extends DataFlowVariableMessage
+ }
+
+ /**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+ @deprecated("Superceeded by Future and CompletableFuture as of 1.1", "1.1")
+ sealed class DataFlowVariable[T <: Any](timeoutMs: Long) {
+ import DataFlowVariable._
+
+ def this() = this(1000 * 60)
+
+ private val value = new AtomicReference[Option[T]](None)
+ private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
+
+ private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
+ self.timeout = timeoutMs
+ def receive = {
+ case s@Set(v) =>
+ if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
+ while (dataFlow.blockedReaders.peek ne null)
+ dataFlow.blockedReaders.poll ! s
+ } else throw new DataFlowVariableException(
+ "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
+ case Exit => self.stop()
+ }
+ }
+
+ private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
+ self.timeout = timeoutMs
+ private var readerFuture: Option[CompletableFuture[Any]] = None
+ def receive = {
+ case Get => dataFlow.value.get match {
+ case Some(value) => self reply value
+ case None => readerFuture = self.senderFuture
+ }
+ case Set(v: T) => readerFuture.map(_ completeWithResult v)
+ case Exit => self.stop()
+ }
+ }
+
+ private[this] val in = actorOf(new In(this)).start()
+
+ /**
+ * Sets the value of this variable (if unset) with the value of the supplied variable.
+ */
+ def <<(ref: DataFlowVariable[T]) {
+ if (this.value.get.isEmpty) in ! Set(ref())
+ else throw new DataFlowVariableException(
+ "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
+ }
+
+ /**
+ * JavaAPI.
+ * Sets the value of this variable (if unset) with the value of the supplied variable.
+ */
+ def set(ref: DataFlowVariable[T]) { this << ref }
+
+ /**
+ * Sets the value of this variable (if unset).
+ */
+ def <<(value: T) {
+ if (this.value.get.isEmpty) in ! Set(value)
+ else throw new DataFlowVariableException(
+ "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
+ }
+
+ /**
+ * JavaAPI.
+ * Sets the value of this variable (if unset) with the value of the supplied variable.
+ */
+ def set(value: T) { this << value }
+
+ /**
+ * Retrieves the value of variable, throws a DataFlowVariableException if it times out.
+ */
+ def get(): T = this()
+
+ /**
+ * Retrieves the value of variable, throws a DataFlowVariableException if it times out.
+ */
+ def apply(): T = {
+ value.get getOrElse {
+ val out = actorOf(new Out(this)).start()
+
+ val result = try {
+ blockedReaders offer out
+ (out !! Get).as[T]
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ out ! Exit
+ throw e
+ }
+
+ result.getOrElse(throw new DataFlowVariableException(
+ "Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
+ }
+ }
+
+ def shutdown() { in ! Exit }
+ }
+}