blob: bca7936116894cea01f82d3e535875dccfef45cc (
plain) (
tree)
|
|
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dataflow
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue }
import akka.event.EventHandler
import akka.actor.{ Actor, ActorRef }
import akka.actor.Actor._
import akka.dispatch.CompletableFuture
import akka.AkkaException
import akka.japi.{ Function, Effect }
/**
* Implements Oz-style dataflow (single assignment) variables.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
object DataFlow {
object Start
object Exit
class DataFlowVariableException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
/**
* Executes the supplied thunk in another thread.
*/
def thread(body: => Unit): Unit = spawn(body)
/**
* JavaAPI.
* Executes the supplied Effect in another thread.
*/
def thread(body: Effect): Unit = spawn(body.apply)
/**
* Executes the supplied function in another thread.
*/
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
actorOf(new ReactiveEventBasedThread(body)).start()
/**
* JavaAPI.
* Executes the supplied Function in another thread.
*/
def thread[A <: AnyRef, R <: AnyRef](body: Function[A, R]) =
actorOf(new ReactiveEventBasedThread(body.apply)).start()
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
def receive = {
case Exit => self.stop()
case message => self.reply(body(message.asInstanceOf[A]))
}
}
private object DataFlowVariable {
private sealed abstract class DataFlowVariableMessage
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
private object Get extends DataFlowVariableMessage
}
/**
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
@deprecated("Superceeded by Future and CompletableFuture as of 1.1", "1.1")
sealed class DataFlowVariable[T <: Any](timeoutMs: Long) {
import DataFlowVariable._
def this() = this(1000 * 60)
private val value = new AtomicReference[Option[T]](None)
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = timeoutMs
def receive = {
case s@Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
while (dataFlow.blockedReaders.peek ne null)
dataFlow.blockedReaders.poll ! s
} else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
case Exit => self.stop()
}
}
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = timeoutMs
private var readerFuture: Option[CompletableFuture[Any]] = None
def receive = {
case Get => dataFlow.value.get match {
case Some(value) => self reply value
case None => readerFuture = self.senderFuture
}
case Set(v: T) => readerFuture.map(_ completeWithResult v)
case Exit => self.stop()
}
}
private[this] val in = actorOf(new In(this)).start()
/**
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def <<(ref: DataFlowVariable[T]) {
if (this.value.get.isEmpty) in ! Set(ref())
else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
}
/**
* JavaAPI.
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def set(ref: DataFlowVariable[T]) { this << ref }
/**
* Sets the value of this variable (if unset).
*/
def <<(value: T) {
if (this.value.get.isEmpty) in ! Set(value)
else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
}
/**
* JavaAPI.
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def set(value: T) { this << value }
/**
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
*/
def get(): T = this()
/**
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
*/
def apply(): T = {
value.get getOrElse {
val out = actorOf(new Out(this)).start()
val result = try {
blockedReaders offer out
(out !! Get).as[T]
} catch {
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
out ! Exit
throw e
}
result.getOrElse(throw new DataFlowVariableException(
"Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
}
}
def shutdown() { in ! Exit }
}
}
|