diff options
author | Seth Tisue <seth@tisue.net> | 2017-03-20 17:13:56 -0700 |
---|---|---|
committer | Seth Tisue <seth@tisue.net> | 2017-03-20 17:24:33 -0700 |
commit | 25048bc73741846107c18ed01e0e9f6f07785379 (patch) | |
tree | c1c9d60002fec74fc13af354e51bb3d688b33902 /test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala | |
parent | 0563c4b23cdc7ed6c05e9defe2a675df4d838347 (diff) | |
download | scala-25048bc73741846107c18ed01e0e9f6f07785379.tar.gz scala-25048bc73741846107c18ed01e0e9f6f07785379.tar.bz2 scala-25048bc73741846107c18ed01e0e9f6f07785379.zip |
rm -r test/{flaky,disabled*,checker-tests,support,debug}
keeping this stuff, somewhere, forever and ever and ever is what
version control is for.
who dares disturb the ancient and accursed tomb of all this code...?
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala')
-rw-r--r-- | test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala | 165 |
1 files changed, 0 insertions, 165 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala b/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala deleted file mode 100644 index bca7936116..0000000000 --- a/test/disabled/presentation/akka/src/akka/dataflow/DataFlow.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dataflow - -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue } - -import akka.event.EventHandler -import akka.actor.{ Actor, ActorRef } -import akka.actor.Actor._ -import akka.dispatch.CompletableFuture -import akka.AkkaException -import akka.japi.{ Function, Effect } - -/** - * Implements Oz-style dataflow (single assignment) variables. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object DataFlow { - object Start - object Exit - - class DataFlowVariableException(message: String, cause: Throwable = null) extends AkkaException(message, cause) - - /** - * Executes the supplied thunk in another thread. - */ - def thread(body: => Unit): Unit = spawn(body) - - /** - * JavaAPI. - * Executes the supplied Effect in another thread. - */ - def thread(body: Effect): Unit = spawn(body.apply) - - /** - * Executes the supplied function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: A => R) = - actorOf(new ReactiveEventBasedThread(body)).start() - - /** - * JavaAPI. - * Executes the supplied Function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: Function[A, R]) = - actorOf(new ReactiveEventBasedThread(body.apply)).start() - - private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) - extends Actor { - def receive = { - case Exit => self.stop() - case message => self.reply(body(message.asInstanceOf[A])) - } - } - - private object DataFlowVariable { - private sealed abstract class DataFlowVariableMessage - private case class Set[T <: Any](value: T) extends DataFlowVariableMessage - private object Get extends DataFlowVariableMessage - } - - /** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ - @deprecated("Superceeded by Future and CompletableFuture as of 1.1", "1.1") - sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { - import DataFlowVariable._ - - def this() = this(1000 * 60) - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] - - private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - def receive = { - case s@Set(v) => - if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { - while (dataFlow.blockedReaders.peek ne null) - dataFlow.blockedReaders.poll ! s - } else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case Exit => self.stop() - } - } - - private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - private var readerFuture: Option[CompletableFuture[Any]] = None - def receive = { - case Get => dataFlow.value.get match { - case Some(value) => self reply value - case None => readerFuture = self.senderFuture - } - case Set(v: T) => readerFuture.map(_ completeWithResult v) - case Exit => self.stop() - } - } - - private[this] val in = actorOf(new In(this)).start() - - /** - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def <<(ref: DataFlowVariable[T]) { - if (this.value.get.isEmpty) in ! Set(ref()) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(ref: DataFlowVariable[T]) { this << ref } - - /** - * Sets the value of this variable (if unset). - */ - def <<(value: T) { - if (this.value.get.isEmpty) in ! Set(value) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(value: T) { this << value } - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def get(): T = this() - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def apply(): T = { - value.get getOrElse { - val out = actorOf(new Out(this)).start() - - val result = try { - blockedReaders offer out - (out !! Get).as[T] - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - out ! Exit - throw e - } - - result.getOrElse(throw new DataFlowVariableException( - "Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) - } - } - - def shutdown() { in ! Exit } - } -} |