From d69483662a596beb13cf6e128450a1c51881a6f6 Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Mon, 24 Sep 2012 14:58:55 +0200 Subject: SI-6305 fix. --- src/actors-migration/scala/actors/MigrationSystem.scala | 5 +++-- src/actors-migration/scala/actors/Pattern.scala | 5 +++-- src/actors-migration/scala/actors/Props.scala | 4 +++- src/actors-migration/scala/actors/StashingActor.scala | 4 +++- src/actors-migration/scala/actors/Timeout.scala | 2 +- test/files/jvm/actmig-PinS_1.scala | 1 + test/files/jvm/actmig-PinS_2.scala | 3 ++- test/files/jvm/actmig-PinS_3.scala | 5 +++-- test/files/jvm/actmig-public-methods_1.scala | 5 +++-- test/files/jvm/actmig-react-receive.scala | 5 +++-- test/osgi/src/BasicLibrary.scala | 2 +- 11 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/actors-migration/scala/actors/MigrationSystem.scala b/src/actors-migration/scala/actors/MigrationSystem.scala index ffc93d9c6f..3dcb38e634 100644 --- a/src/actors-migration/scala/actors/MigrationSystem.scala +++ b/src/actors-migration/scala/actors/MigrationSystem.scala @@ -1,10 +1,11 @@ -package scala.actors +package scala.actors.migration +import scala.actors._ import scala.collection._ object MigrationSystem { - private[actors] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] { + private[migration] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] { override def initialValue() = immutable.Stack[Boolean]() } diff --git a/src/actors-migration/scala/actors/Pattern.scala b/src/actors-migration/scala/actors/Pattern.scala index a97ee3ba07..297bcc7263 100644 --- a/src/actors-migration/scala/actors/Pattern.scala +++ b/src/actors-migration/scala/actors/Pattern.scala @@ -1,7 +1,8 @@ -package scala.actors +package scala.actors.migration +import scala.actors._ import scala.concurrent.duration.Duration -import scala.language.implicitConversions +import language.implicitConversions object pattern { diff --git a/src/actors-migration/scala/actors/Props.scala b/src/actors-migration/scala/actors/Props.scala index 891e23213a..c12384ea55 100644 --- a/src/actors-migration/scala/actors/Props.scala +++ b/src/actors-migration/scala/actors/Props.scala @@ -1,4 +1,6 @@ -package scala.actors +package scala.actors.migration + +import scala.actors._ /** * ActorRef configuration object. It represents the minimal subset of Akka Props class. diff --git a/src/actors-migration/scala/actors/StashingActor.scala b/src/actors-migration/scala/actors/StashingActor.scala index 775d115d0b..9c3917b65e 100644 --- a/src/actors-migration/scala/actors/StashingActor.scala +++ b/src/actors-migration/scala/actors/StashingActor.scala @@ -1,5 +1,7 @@ -package scala.actors +package scala.actors.migration +import scala.actors._ +import scala.actors.Actor._ import scala.collection._ import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit diff --git a/src/actors-migration/scala/actors/Timeout.scala b/src/actors-migration/scala/actors/Timeout.scala index c3017d8569..32ea5f20fc 100644 --- a/src/actors-migration/scala/actors/Timeout.scala +++ b/src/actors-migration/scala/actors/Timeout.scala @@ -6,7 +6,7 @@ ** |/ ** \* */ -package scala.actors +package scala.actors.migration import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit diff --git a/test/files/jvm/actmig-PinS_1.scala b/test/files/jvm/actmig-PinS_1.scala index 640684f728..db3ced3f0e 100644 --- a/test/files/jvm/actmig-PinS_1.scala +++ b/test/files/jvm/actmig-PinS_1.scala @@ -1,4 +1,5 @@ import scala.actors._ +import scala.actors.migration._ import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await } diff --git a/test/files/jvm/actmig-PinS_2.scala b/test/files/jvm/actmig-PinS_2.scala index 761df6b5a7..b2627b4628 100644 --- a/test/files/jvm/actmig-PinS_2.scala +++ b/test/files/jvm/actmig-PinS_2.scala @@ -1,4 +1,5 @@ -import scala.actors.{ MigrationSystem, StashingActor, ActorRef, Props, Exit } +import scala.actors._ +import scala.actors.migration._ import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await } diff --git a/test/files/jvm/actmig-PinS_3.scala b/test/files/jvm/actmig-PinS_3.scala index de9788724d..8a5eded443 100644 --- a/test/files/jvm/actmig-PinS_3.scala +++ b/test/files/jvm/actmig-PinS_3.scala @@ -1,4 +1,5 @@ -import scala.actors.{ MigrationSystem, StashingActor, ActorRef, Terminated, Props } +import scala.actors._ +import scala.actors.migration._ import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await } @@ -158,4 +159,4 @@ object Test extends App { stash(m) } }, "default-stash-dispatcher")) -} \ No newline at end of file +} diff --git a/test/files/jvm/actmig-public-methods_1.scala b/test/files/jvm/actmig-public-methods_1.scala index 4bbdc9a71f..74fdee947b 100644 --- a/test/files/jvm/actmig-public-methods_1.scala +++ b/test/files/jvm/actmig-public-methods_1.scala @@ -1,10 +1,11 @@ import scala.collection.mutable.ArrayBuffer import scala.actors.Actor._ import scala.actors._ +import scala.actors.migration._ import scala.util._ import java.util.concurrent.{ TimeUnit, CountDownLatch } -import scala.concurrent.duration.Duration -import scala.actors.pattern._ +import scala.concurrent.duration._ +import scala.actors.migration.pattern._ object Test { val NUMBER_OF_TESTS = 6 diff --git a/test/files/jvm/actmig-react-receive.scala b/test/files/jvm/actmig-react-receive.scala index 280582511f..af36c2ac70 100644 --- a/test/files/jvm/actmig-react-receive.scala +++ b/test/files/jvm/actmig-react-receive.scala @@ -1,6 +1,7 @@ -import scala.actors.MigrationSystem._ +import scala.actors.migration.MigrationSystem._ import scala.actors.Actor._ -import scala.actors.{ Actor, StashingActor, ActorRef, Props, MigrationSystem, PoisonPill } +import scala.actors._ +import scala.actors.migration._ import java.util.concurrent.{ TimeUnit, CountDownLatch } import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ diff --git a/test/osgi/src/BasicLibrary.scala b/test/osgi/src/BasicLibrary.scala index 38dea69e99..6618f02102 100644 --- a/test/osgi/src/BasicLibrary.scala +++ b/test/osgi/src/BasicLibrary.scala @@ -29,7 +29,7 @@ class BasicLibraryTest extends ScalaOsgiHelper { def everythingLoads(): Unit = { // Note - This tests sun.misc usage. import scala.concurrent._ - import scala.concurrent.util.Duration.Inf + import scala.concurrent.duration.Duration.Inf import ExecutionContext.Implicits._ val x = Future(2) map (_ + 1) assertEquals(3, Await.result(x, Inf)) -- cgit v1.2.3 From b92becd757d8319129fa8bd0a93af8c6fd2b23b7 Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Mon, 24 Sep 2012 17:04:29 +0200 Subject: Support for scala.concurrent for the ActorRef. Review by @phaller --- src/actors-migration/scala/actors/Pattern.scala | 6 +- src/actors/scala/actors/ActorRef.scala | 23 ++++--- test/files/jvm/actmig-PinS.scala | 4 ++ test/files/jvm/actmig-PinS_1.scala | 4 ++ test/files/jvm/actmig-PinS_2.scala | 4 ++ test/files/jvm/actmig-PinS_3.scala | 4 ++ test/files/jvm/actmig-public-methods.check | 4 +- test/files/jvm/actmig-public-methods_1.check | 4 +- test/files/jvm/actmig-public-methods_1.scala | 85 +++++++++++++++---------- test/files/jvm/actmig-react-receive.scala | 4 ++ 10 files changed, 92 insertions(+), 50 deletions(-) diff --git a/src/actors-migration/scala/actors/Pattern.scala b/src/actors-migration/scala/actors/Pattern.scala index 297bcc7263..25ba191ce7 100644 --- a/src/actors-migration/scala/actors/Pattern.scala +++ b/src/actors-migration/scala/actors/Pattern.scala @@ -6,7 +6,7 @@ import language.implicitConversions object pattern { - implicit def askSupport(ar: ActorRef): AskableActorRef = + implicit def ask(ar: ActorRef): AskableActorRef = new AskableActorRef(ar) } @@ -17,9 +17,9 @@ class AskableActorRef(val ar: ActorRef) extends ActorRef { def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender) - def ?(message: Any)(timeout: Timeout): Future[Any] = ar.?(message, timeout.duration) + def ?(message: Any)(implicit timeout: Timeout): scala.concurrent.Future[Any] = ar.?(message, timeout.duration) - private[actors] def ?(message: Any, timeout: Duration): Future[Any] = ar.?(message, timeout) + private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = ar.?(message, timeout) def forward(message: Any) = ar.forward(message) diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala index 7768f04c2b..5e0ca1554a 100644 --- a/src/actors/scala/actors/ActorRef.scala +++ b/src/actors/scala/actors/ActorRef.scala @@ -2,6 +2,8 @@ package scala.actors import java.util.concurrent.TimeoutException import scala.concurrent.duration.Duration +import scala.concurrent.Promise +import scala.concurrent.ExecutionContext.Implicits.global /** * Trait used for migration of Scala actors to Akka. @@ -28,7 +30,7 @@ trait ActorRef { /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - private[actors] def ?(message: Any, timeout: Duration): Future[Any] + private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] /** * Forwards the message and passes the original sender actor as the sender. @@ -43,7 +45,7 @@ trait ActorRef { private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef { - override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + override private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = throw new UnsupportedOperationException("Output channel does not support ?") /** @@ -88,14 +90,19 @@ private[actors] final class InternalActorRef(override val actor: InternalActor) /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = - Futures.future { - val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) - actor !? (dur, message) match { - case Some(x) => x - case None => new AskTimeoutException("? operation timed out.") + override private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = { + val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) + val replyPromise = Promise[Any] + scala.concurrent.future { + scala.concurrent.blocking { + actor !? (dur, message) + } match { + case Some(x) => replyPromise success x + case None => replyPromise failure new AskTimeoutException("? operation timed out.") } } + replyPromise.future + } override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (message == PoisonPill) diff --git a/test/files/jvm/actmig-PinS.scala b/test/files/jvm/actmig-PinS.scala index 30307f3737..3f07fab12e 100644 --- a/test/files/jvm/actmig-PinS.scala +++ b/test/files/jvm/actmig-PinS.scala @@ -1,3 +1,7 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ import scala.actors._ import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await } diff --git a/test/files/jvm/actmig-PinS_1.scala b/test/files/jvm/actmig-PinS_1.scala index db3ced3f0e..876688ca75 100644 --- a/test/files/jvm/actmig-PinS_1.scala +++ b/test/files/jvm/actmig-PinS_1.scala @@ -1,3 +1,7 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ import scala.actors._ import scala.actors.migration._ import scala.concurrent.duration._ diff --git a/test/files/jvm/actmig-PinS_2.scala b/test/files/jvm/actmig-PinS_2.scala index b2627b4628..7d12578f71 100644 --- a/test/files/jvm/actmig-PinS_2.scala +++ b/test/files/jvm/actmig-PinS_2.scala @@ -1,3 +1,7 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ import scala.actors._ import scala.actors.migration._ import scala.concurrent.duration._ diff --git a/test/files/jvm/actmig-PinS_3.scala b/test/files/jvm/actmig-PinS_3.scala index 8a5eded443..c2943008b0 100644 --- a/test/files/jvm/actmig-PinS_3.scala +++ b/test/files/jvm/actmig-PinS_3.scala @@ -1,3 +1,7 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ import scala.actors._ import scala.actors.migration._ import scala.concurrent.duration._ diff --git a/test/files/jvm/actmig-public-methods.check b/test/files/jvm/actmig-public-methods.check index bb6530c926..c861c90e63 100644 --- a/test/files/jvm/actmig-public-methods.check +++ b/test/files/jvm/actmig-public-methods.check @@ -1,6 +1,6 @@ None Some(bang qmark after 1) bang +bang bang in the future after 0 bang qmark after 0 -bang qmark in future after 0 -typed bang qmark in future after 0 +typed bang bang in the future after 0 diff --git a/test/files/jvm/actmig-public-methods_1.check b/test/files/jvm/actmig-public-methods_1.check index bb6530c926..c861c90e63 100644 --- a/test/files/jvm/actmig-public-methods_1.check +++ b/test/files/jvm/actmig-public-methods_1.check @@ -1,6 +1,6 @@ None Some(bang qmark after 1) bang +bang bang in the future after 0 bang qmark after 0 -bang qmark in future after 0 -typed bang qmark in future after 0 +typed bang bang in the future after 0 diff --git a/test/files/jvm/actmig-public-methods_1.scala b/test/files/jvm/actmig-public-methods_1.scala index 74fdee947b..15516a5d51 100644 --- a/test/files/jvm/actmig-public-methods_1.scala +++ b/test/files/jvm/actmig-public-methods_1.scala @@ -1,11 +1,18 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ import scala.collection.mutable.ArrayBuffer import scala.actors.Actor._ import scala.actors._ import scala.actors.migration._ import scala.util._ +import scala.concurrent._ +import scala.concurrent.duration._ import java.util.concurrent.{ TimeUnit, CountDownLatch } import scala.concurrent.duration._ import scala.actors.migration.pattern._ +import scala.concurrent.ExecutionContext.Implicits.global object Test { val NUMBER_OF_TESTS = 6 @@ -40,45 +47,53 @@ object Test { respActor ! "bang" - implicit val timeout = Timeout(Duration(500, TimeUnit.MILLISECONDS)) - val msg = ("bang qmark", 0L) - val res1 = respActor.?(msg)(Timeout(Duration.Inf)) - append(res1().toString) - latch.countDown() - - val msg1 = ("bang qmark", 1L) - val res2 = respActor.?(msg1)(Timeout(Duration(500, TimeUnit.MILLISECONDS))) - append((res2() match { - case x: AskTimeoutException => None - case v => Some(v) - }).toString) - latch.countDown() - - // this one should time out - val msg11 = ("bang qmark", 500L) - val res21 = respActor.?(msg11)(Timeout(Duration(1, TimeUnit.MILLISECONDS))) - append((res21() match { - case x: AskTimeoutException => None - case v => Some(v) - }).toString) - latch.countDown() - - val msg2 = ("bang qmark in future", 0L) - val fut1 = respActor.?(msg2)(Duration.Inf) - append(fut1().toString()) - latch.countDown() - - val handler: PartialFunction[Any, String] = { - case x: String => x.toString + { + val msg = ("bang qmark", 0L) + val res = respActor.?(msg)(Timeout(Duration.Inf)) + append(Await.result(res, Duration.Inf).toString) + latch.countDown() + } + + { + val msg = ("bang qmark", 1L) + val res = respActor.?(msg)(Timeout(5 seconds)) + + val promise = Promise[Option[Any]]() + res.onComplete(v => promise.success(v.toOption)) + append(Await.result(promise.future, Duration.Inf).toString) + + latch.countDown() + } + + { + val msg = ("bang qmark", 5000L) + val res = respActor.?(msg)(Timeout(1 millisecond)) + val promise = Promise[Option[Any]]() + res.onComplete(v => promise.success(v.toOption)) + append(Await.result(promise.future, Duration.Inf).toString) + latch.countDown() + } + + { + val msg = ("bang bang in the future", 0L) + val fut = respActor.?(msg)(Timeout(Duration.Inf)) + append(Await.result(fut, Duration.Inf).toString) + latch.countDown() } - val msg3 = ("typed bang qmark in future", 0L) - val fut2 = (respActor.?(msg3)(Duration.Inf)) - append(Futures.future { handler.apply(fut2()) }().toString) - latch.countDown() + { + val handler: PartialFunction[Any, String] = { + case x: String => x + } + + val msg = ("typed bang bang in the future", 0L) + val fut = (respActor.?(msg)(Timeout(Duration.Inf))) + append((Await.result(fut.map(handler), Duration.Inf)).toString) + latch.countDown() + } // output - latch.await(200, TimeUnit.MILLISECONDS) + latch.await(10, TimeUnit.SECONDS) if (latch.getCount() > 0) { println("Error: Tasks have not finished!!!") } diff --git a/test/files/jvm/actmig-react-receive.scala b/test/files/jvm/actmig-react-receive.scala index af36c2ac70..6adeac8b52 100644 --- a/test/files/jvm/actmig-react-receive.scala +++ b/test/files/jvm/actmig-react-receive.scala @@ -1,3 +1,7 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ import scala.actors.migration.MigrationSystem._ import scala.actors.Actor._ import scala.actors._ -- cgit v1.2.3 From 3ba88113f23a1cd614366d770cb22fd2c6336771 Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Mon, 24 Sep 2012 17:06:08 +0200 Subject: Additional Actor Migration Tests. Review by @phaller. --- test/files/jvm/actmig-hierarchy.scala | 47 +++++++ test/files/jvm/actmig-hierarchy_1.scala | 45 +++++++ test/files/jvm/actmig-instantiation.scala | 96 ++++++++++++++ test/files/jvm/actmig-loop-react.check | 1 + test/files/jvm/actmig-loop-react.scala | 196 +++++++++++++++++++++++++++++ test/files/jvm/actmig-public-methods.scala | 74 +++++++++++ test/files/jvm/actmig-react-within.check | 2 + test/files/jvm/actmig-react-within.scala | 48 +++++++ test/files/jvm/actmig-receive.check | 27 ++++ test/files/jvm/actmig-receive.scala | 120 ++++++++++++++++++ 10 files changed, 656 insertions(+) create mode 100644 test/files/jvm/actmig-hierarchy.scala create mode 100644 test/files/jvm/actmig-hierarchy_1.scala create mode 100644 test/files/jvm/actmig-instantiation.scala create mode 100644 test/files/jvm/actmig-loop-react.scala create mode 100644 test/files/jvm/actmig-public-methods.scala create mode 100644 test/files/jvm/actmig-react-within.check create mode 100644 test/files/jvm/actmig-react-within.scala create mode 100644 test/files/jvm/actmig-receive.check create mode 100644 test/files/jvm/actmig-receive.scala diff --git a/test/files/jvm/actmig-hierarchy.scala b/test/files/jvm/actmig-hierarchy.scala new file mode 100644 index 0000000000..17a44fda7a --- /dev/null +++ b/test/files/jvm/actmig-hierarchy.scala @@ -0,0 +1,47 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.actors._ + + +class ReactorActor extends Reactor[String] { + def act() { + var cond = true + loopWhile(cond) { + react { + case x if x == "hello1" => println("hello") + case "exit" => cond = false + } + } + } +} + +class ReplyActor extends ReplyReactor { + def act() { + var cond = true + loopWhile(cond) { + react { + case "hello" => println("hello") + case "exit" => cond = false; + } + } + } +} + + +object Test { + + def main(args: Array[String]) { + val reactorActor = new ReactorActor + val replyActor = new ReplyActor + reactorActor.start() + replyActor.start() + + reactorActor ! "hello1" + replyActor ! "hello" + + reactorActor ! "exit" + replyActor ! "exit" + } +} \ No newline at end of file diff --git a/test/files/jvm/actmig-hierarchy_1.scala b/test/files/jvm/actmig-hierarchy_1.scala new file mode 100644 index 0000000000..14f03c9d48 --- /dev/null +++ b/test/files/jvm/actmig-hierarchy_1.scala @@ -0,0 +1,45 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.actors._ + +class ReactorActor extends Actor { + def act() { + var cond = true + loopWhile(cond) { + react { + case x: String if x == "hello1" => println("hello") + case "exit" => cond = false + } + } + } +} + +class ReplyActor extends Actor { + def act() { + var cond = true + loopWhile(cond) { + react { + case "hello" => println("hello") + case "exit" => cond = false; + } + } + } +} + +object Test { + + def main(args: Array[String]) { + val reactorActor = new ReactorActor + val replyActor = new ReplyActor + reactorActor.start() + replyActor.start() + + reactorActor ! "hello1" + replyActor ! "hello" + + reactorActor ! "exit" + replyActor ! "exit" + } +} \ No newline at end of file diff --git a/test/files/jvm/actmig-instantiation.scala b/test/files/jvm/actmig-instantiation.scala new file mode 100644 index 0000000000..d54dff9558 --- /dev/null +++ b/test/files/jvm/actmig-instantiation.scala @@ -0,0 +1,96 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.actors.migration.MigrationSystem._ +import scala.actors.migration._ +import scala.actors.Actor._ +import scala.actors._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer + +class TestStashingActor extends StashingActor { + + def receive = { case v: Int => Test.append(v); Test.latch.countDown() } + +} + +object Test { + val NUMBER_OF_TESTS = 5 + + // used for sorting non-deterministic output + val buff = ArrayBuffer[Int](0) + val latch = new CountDownLatch(NUMBER_OF_TESTS) + val toStop = ArrayBuffer[ActorRef]() + + def append(v: Int) = synchronized { + buff += v + } + + def main(args: Array[String]) = { + // plain scala actor + val a1 = actor { + react { case v: Int => Test.append(v); Test.latch.countDown() } + } + a1 ! 100 + + // simple instantiation + val a2 = MigrationSystem.actorOf(Props(() => new TestStashingActor, "akka.actor.default-stash-dispatcher")) + a2 ! 200 + toStop += a2 + + // actor of with scala actor + val a3 = MigrationSystem.actorOf(Props(() => actor { + react { case v: Int => Test.append(v); Test.latch.countDown() } + }, "akka.actor.default-stash-dispatcher")) + a3 ! 300 + + // using the manifest + val a4 = MigrationSystem.actorOf(Props(() => new TestStashingActor, "akka.actor.default-stash-dispatcher")) + a4 ! 400 + toStop += a4 + + // deterministic part of a test + // creation without actorOf + try { + val a3 = new TestStashingActor + a3 ! -1 + } catch { + case e => println("OK error: " + e) + } + + // actorOf double creation + try { + val a3 = MigrationSystem.actorOf(Props(() => { + new TestStashingActor + new TestStashingActor + }, "akka.actor.default-stash-dispatcher")) + a3 ! -1 + } catch { + case e => println("OK error: " + e) + } + + // actorOf nesting + try { + val a5 = MigrationSystem.actorOf(Props(() => { + val a6 = MigrationSystem.actorOf(Props(() => new TestStashingActor, "akka.actor.default-stash-dispatcher")) + toStop += a6 + new TestStashingActor + }, "akka.actor.default-stash-dispatcher")) + + a5 ! 500 + toStop += a5 + } catch { + case e => println("Should not throw an exception: " + e) + } + + // output + latch.await(5, TimeUnit.SECONDS) + if (latch.getCount() > 0) { + println("Error: Tasks have not finished!!!") + } + + buff.sorted.foreach(println) + toStop.foreach(_ ! PoisonPill) + } +} \ No newline at end of file diff --git a/test/files/jvm/actmig-loop-react.check b/test/files/jvm/actmig-loop-react.check index 54cbe942c0..2474cbe71b 100644 --- a/test/files/jvm/actmig-loop-react.check +++ b/test/files/jvm/actmig-loop-react.check @@ -13,3 +13,4 @@ after react do task 1 do string I am a String do task 42 +after react diff --git a/test/files/jvm/actmig-loop-react.scala b/test/files/jvm/actmig-loop-react.scala new file mode 100644 index 0000000000..7f4c6f96dc --- /dev/null +++ b/test/files/jvm/actmig-loop-react.scala @@ -0,0 +1,196 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.actors.migration.MigrationSystem._ +import scala.actors.Actor._ +import scala.actors._ +import scala.actors.migration._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.concurrent.{ Promise, Await } + +object Test { + val finishedLWCR, finishedTNR, finishedEH = Promise[Boolean] + val finishedLWCR1, finishedTNR1, finishedEH1 = Promise[Boolean] + + def testLoopWithConditionReact() = { + // Snippet showing composition of receives + // Loop with Condition Snippet - before + val myActor = actor { + var c = true + loopWhile(c) { + react { + case x: Int => + // do task + println("do task") + if (x == 42) { + c = false + finishedLWCR1.success(true) + } + } + } + } + + myActor.start() + myActor ! 1 + myActor ! 42 + + Await.ready(finishedLWCR1.future, 5 seconds) + + // Loop with Condition Snippet - migrated + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { + case x: Int => + // do task + println("do task") + if (x == 42) { + finishedLWCR.success(true) + context.stop(self) + } + } + }, "default-stashing-dispatcher")) + myAkkaActor ! 1 + myAkkaActor ! 42 + } + + def testNestedReact() = { + // Snippet showing composition of receives + // Loop with Condition Snippet - before + val myActor = actor { + var c = true + loopWhile(c) { + react { + case x: Int => + // do task + println("do task " + x) + if (x == 42) { + c = false + } else { + react { + case y: String => + println("do string " + y) + } + } + println("after react") + finishedTNR1.success(true) + } + } + } + myActor.start() + + myActor ! 1 + myActor ! "I am a String" + myActor ! 42 + + Await.ready(finishedTNR1.future, 5 seconds) + + // Loop with Condition Snippet - migrated + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { + case x: Int => + // do task + println("do task " + x) + if (x == 42) { + println("after react") + finishedTNR.success(true) + context.stop(self) + } else + context.become(({ + case y: String => + println("do string " + y) + }: Receive).andThen(x => { + unstashAll() + context.unbecome() + }).orElse { case x => stash() }) + } + }, "default-stashing-dispatcher")) + + myAkkaActor ! 1 + myAkkaActor ! "I am a String" + myAkkaActor ! 42 + + } + + def exceptionHandling() = { + // Stashing actor with act and exception handler + val myActor = MigrationSystem.actorOf(Props(() => new StashingActor { + + def receive = { case _ => println("Dummy method.") } + override def act() = { + loop { + react { + case "fail" => + throw new Exception("failed") + case "work" => + println("working") + case "die" => + finishedEH1.success(true) + exit() + } + } + } + + override def exceptionHandler = { + case x: Exception => println("scala got exception") + } + + }, "default-stashing-dispatcher")) + + myActor ! "work" + myActor ! "fail" + myActor ! "die" + + Await.ready(finishedEH1.future, 5 seconds) + // Stashing actor in Akka style + val myAkkaActor = MigrationSystem.actorOf(Props(() => new StashingActor { + def receive = PFCatch({ + case "fail" => + throw new Exception("failed") + case "work" => + println("working") + case "die" => + finishedEH.success(true) + context.stop(self) + }, { case x: Exception => println("akka got exception") }) + }, "default-stashing-dispatcher")) + + myAkkaActor ! "work" + myAkkaActor ! "fail" + myAkkaActor ! "die" + } + + def main(args: Array[String]) = { + testLoopWithConditionReact() + Await.ready(finishedLWCR.future, 5 seconds) + exceptionHandling() + Await.ready(finishedEH.future, 5 seconds) + testNestedReact() + Await.ready(finishedTNR.future, 5 seconds) + } + +} + +// As per Jim Mcbeath's blog (http://jim-mcbeath.blogspot.com/2008/07/actor-exceptions.html) +class PFCatch(f: PartialFunction[Any, Unit], + handler: PartialFunction[Exception, Unit]) + extends PartialFunction[Any, Unit] { + + def apply(x: Any) = { + try { + f(x) + } catch { + case e: Exception if handler.isDefinedAt(e) => handler(e) + } + } + + def isDefinedAt(x: Any) = f.isDefinedAt(x) +} + +object PFCatch { + def apply(f: PartialFunction[Any, Unit], + handler: PartialFunction[Exception, Unit]) = new PFCatch(f, handler) +} diff --git a/test/files/jvm/actmig-public-methods.scala b/test/files/jvm/actmig-public-methods.scala new file mode 100644 index 0000000000..58d7a1a9d4 --- /dev/null +++ b/test/files/jvm/actmig-public-methods.scala @@ -0,0 +1,74 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.collection.mutable.ArrayBuffer +import scala.actors.Actor._ +import scala.actors._ +import scala.actors.migration.MigrationSystem +import scala.util.continuations._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } + +object Test { + val NUMBER_OF_TESTS = 6 + + // used for sorting non-deterministic output + val buff = ArrayBuffer[String]() + val latch = new CountDownLatch(NUMBER_OF_TESTS) + val toStop = ArrayBuffer[Actor]() + + def append(v: String) = synchronized { + buff += v + } + + def main(args: Array[String]) = { + + val respActor = actor { + loop { + react { + case (x: String, time: Long) => + Thread.sleep(time) + reply(x + " after " + time) + case str: String => + append(str) + latch.countDown() + case _ => exit() + } + } + } + + toStop += respActor + + respActor ! ("bang") + + val res1 = respActor !? (("bang qmark", 0L)) + append(res1.toString) + latch.countDown() + + val res2 = respActor !? (5000, ("bang qmark", 1L)) + append(res2.toString) + latch.countDown() + + // this one should timeout + val res21 = respActor !? (1, ("bang qmark", 5000L)) + append(res21.toString) + latch.countDown() + + val fut1 = respActor !! (("bang bang in the future", 0L)) + append(fut1().toString()) + latch.countDown() + + val fut2 = respActor !! (("typed bang bang in the future", 0L), { case x: String => x }) + append(fut2()) + latch.countDown() + + // output + latch.await(10, TimeUnit.SECONDS) + if (latch.getCount() > 0) { + println("Error: Tasks have not finished!!!") + } + + buff.sorted.foreach(println) + toStop.foreach(_ ! 'stop) + } +} \ No newline at end of file diff --git a/test/files/jvm/actmig-react-within.check b/test/files/jvm/actmig-react-within.check new file mode 100644 index 0000000000..57798dbefb --- /dev/null +++ b/test/files/jvm/actmig-react-within.check @@ -0,0 +1,2 @@ +received +received diff --git a/test/files/jvm/actmig-react-within.scala b/test/files/jvm/actmig-react-within.scala new file mode 100644 index 0000000000..43350ef120 --- /dev/null +++ b/test/files/jvm/actmig-react-within.scala @@ -0,0 +1,48 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.actors.migration.MigrationSystem._ +import scala.actors.Actor._ +import scala.actors._ +import scala.actors.migration._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.concurrent.{ Promise, Await } + +object Test { + val finished = Promise[Boolean] + + def testReactWithin() = { + val sActor = actor { + loop { + reactWithin(1) { + case scala.actors.TIMEOUT => + println("received") + exit() + case _ => + println("Should not occur.") + } + } + } + + val myActor = MigrationSystem.actorOf(Props(() => new StashingActor { + context.setReceiveTimeout(1 millisecond) + def receive = { + case ReceiveTimeout => + println("received") + finished.success(true) + context.stop(self) + case _ => + println("Should not occur.") + } + }, "default-stashing-dispatcher")) + } + + def main(args: Array[String]) = { + testReactWithin() + Await.ready(finished.future, 5 seconds) + } + +} diff --git a/test/files/jvm/actmig-receive.check b/test/files/jvm/actmig-receive.check new file mode 100644 index 0000000000..30886140e1 --- /dev/null +++ b/test/files/jvm/actmig-receive.check @@ -0,0 +1,27 @@ +Original +do before +receive 1 +do in between +receive 1 +do after +Transformed +do before +receive 1 +do in between +receive 1 +do after +Test Loop Receive +Original +do before body +receive 1 +do after receive +do before body +do after receive +after loop +Transformed +do before body +receive 1 +do after receive +do before body +do after receive +after loop diff --git a/test/files/jvm/actmig-receive.scala b/test/files/jvm/actmig-receive.scala new file mode 100644 index 0000000000..03dc1be63b --- /dev/null +++ b/test/files/jvm/actmig-receive.scala @@ -0,0 +1,120 @@ +/** + * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change + * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov. + */ +import scala.actors.migration.MigrationSystem._ +import scala.actors.Actor._ +import scala.actors._ +import scala.actors.migration._ +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.concurrent.{ Promise, Await } + +object Test { + val finishedSingle, finishedSingle1, finishedLoop, finishedLoop1 = Promise[Boolean] + + def testDoubleReceive() = { + println("Original") + // Snippet that shows how to get rid of receive calls in Scala Actors. + // This snippet is used in the Actors Migration Kit. + val myActor = actor { + println("do before") + receive { + case "hello" => + println("receive 1") + } + println("do in between") + receive { + case "hello" => + println("receive 1") + } + println("do after") + finishedSingle.success(true) + } + + myActor ! "hello" + myActor ! "hello" + + Await.ready(finishedSingle.future, 5 seconds) + println("Transformed") + val myActorReact = actor { + println("do before") + react (({ + case "hello" => + println("receive 1") + }: PartialFunction[Any, Unit]).andThen { x => + println("do in between") + react (({ + case "hello" => + println("receive 1") + }: PartialFunction[Any, Unit]).andThen { x => + println("do after") + finishedSingle1.success(true) + }) + }) + } + + myActorReact ! "hello" + myActorReact ! "hello" + + Await.ready(finishedSingle1.future, 5 seconds) + } + + def testLoopReceive() = { + println("Test Loop Receive") + // Snippet that shows how to get rid of receive calls in loops. + // This snippet is used in the Actors Migration Kit. + println("Original") + val myActor = actor { + var c = true + while (c) { + println("do before body") + receive { + case "hello" => + println("receive 1") + case "exit" => + c = false + } + println("do after receive") + } + println("after loop") + finishedLoop.success(true) + } + + myActor ! "hello" + myActor ! "exit" + Await.ready(finishedLoop.future, 5 seconds) + println("Transformed") + + val myActorReact = actor { + var c = true + loopWhile(c) { + println("do before body") + react (({ + case "hello" => + println("receive 1") + case "exit" => + c = false + }: PartialFunction[Any, Unit]).andThen { x => + println("do after receive") + if (c == false) { + println("after loop") + finishedLoop1.success(true) + } + }) + } + } + + myActorReact ! "hello" + myActorReact ! "exit" + + Await.ready(finishedLoop1.future, 5 seconds) + } + + def main(args: Array[String]) = { + testDoubleReceive() + testLoopReceive() + } + +} -- cgit v1.2.3 From f362bbb40bd3bffb90efb4e67e52b38452573e3e Mon Sep 17 00:00:00 2001 From: Vojin Jovanovic Date: Tue, 25 Sep 2012 15:06:32 +0200 Subject: Moving actor migration to separate dir. --- .../scala/actors/MigrationSystem.scala | 37 --- src/actors-migration/scala/actors/Pattern.scala | 27 --- src/actors-migration/scala/actors/Props.scala | 15 -- .../scala/actors/StashingActor.scala | 258 --------------------- src/actors-migration/scala/actors/Timeout.scala | 40 ---- .../scala/actors/migration/MigrationSystem.scala | 37 +++ .../scala/actors/migration/Pattern.scala | 27 +++ .../scala/actors/migration/Props.scala | 15 ++ .../scala/actors/migration/StashingActor.scala | 258 +++++++++++++++++++++ .../scala/actors/migration/Timeout.scala | 40 ++++ 10 files changed, 377 insertions(+), 377 deletions(-) delete mode 100644 src/actors-migration/scala/actors/MigrationSystem.scala delete mode 100644 src/actors-migration/scala/actors/Pattern.scala delete mode 100644 src/actors-migration/scala/actors/Props.scala delete mode 100644 src/actors-migration/scala/actors/StashingActor.scala delete mode 100644 src/actors-migration/scala/actors/Timeout.scala create mode 100644 src/actors-migration/scala/actors/migration/MigrationSystem.scala create mode 100644 src/actors-migration/scala/actors/migration/Pattern.scala create mode 100644 src/actors-migration/scala/actors/migration/Props.scala create mode 100644 src/actors-migration/scala/actors/migration/StashingActor.scala create mode 100644 src/actors-migration/scala/actors/migration/Timeout.scala diff --git a/src/actors-migration/scala/actors/MigrationSystem.scala b/src/actors-migration/scala/actors/MigrationSystem.scala deleted file mode 100644 index 3dcb38e634..0000000000 --- a/src/actors-migration/scala/actors/MigrationSystem.scala +++ /dev/null @@ -1,37 +0,0 @@ -package scala.actors.migration - -import scala.actors._ -import scala.collection._ - -object MigrationSystem { - - private[migration] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] { - override def initialValue() = immutable.Stack[Boolean]() - } - - private[this] def withCleanContext(block: => ActorRef): ActorRef = { - // push clean marker - val old = contextStack.get - contextStack.set(old.push(true)) - try { - val instance = block - - if (instance eq null) - throw new Exception("Actor instance passed to actorOf can't be 'null'") - - instance - } finally { - val stackAfter = contextStack.get - if (stackAfter.nonEmpty) - contextStack.set(if (!stackAfter.head) stackAfter.pop.pop else stackAfter.pop) - } - } - - def actorOf(props: Props): ActorRef = withCleanContext { - val creator = props.creator() - val r = new InternalActorRef(creator) - creator.start() - r - } - -} \ No newline at end of file diff --git a/src/actors-migration/scala/actors/Pattern.scala b/src/actors-migration/scala/actors/Pattern.scala deleted file mode 100644 index 25ba191ce7..0000000000 --- a/src/actors-migration/scala/actors/Pattern.scala +++ /dev/null @@ -1,27 +0,0 @@ -package scala.actors.migration - -import scala.actors._ -import scala.concurrent.duration.Duration -import language.implicitConversions - -object pattern { - - implicit def ask(ar: ActorRef): AskableActorRef = - new AskableActorRef(ar) -} - -/** - * ActorRef with support for ask(?) operation. - */ -class AskableActorRef(val ar: ActorRef) extends ActorRef { - - def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender) - - def ?(message: Any)(implicit timeout: Timeout): scala.concurrent.Future[Any] = ar.?(message, timeout.duration) - - private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = ar.?(message, timeout) - - def forward(message: Any) = ar.forward(message) - - private[actors] def localActor: AbstractActor = ar.localActor -} diff --git a/src/actors-migration/scala/actors/Props.scala b/src/actors-migration/scala/actors/Props.scala deleted file mode 100644 index c12384ea55..0000000000 --- a/src/actors-migration/scala/actors/Props.scala +++ /dev/null @@ -1,15 +0,0 @@ -package scala.actors.migration - -import scala.actors._ - -/** - * ActorRef configuration object. It represents the minimal subset of Akka Props class. - */ -case class Props(creator: () ⇒ InternalActor, dispatcher: String) { - - /** - * Returns a new Props with the specified creator set - */ - def withCreator(c: ⇒ InternalActor) = copy(creator = () ⇒ c) - -} diff --git a/src/actors-migration/scala/actors/StashingActor.scala b/src/actors-migration/scala/actors/StashingActor.scala deleted file mode 100644 index 9c3917b65e..0000000000 --- a/src/actors-migration/scala/actors/StashingActor.scala +++ /dev/null @@ -1,258 +0,0 @@ -package scala.actors.migration - -import scala.actors._ -import scala.actors.Actor._ -import scala.collection._ -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit -import scala.language.implicitConversions - -object StashingActor extends Combinators { - implicit def mkBody[A](body: => A) = new InternalActor.Body[A] { - def andThen[B](other: => B): Unit = Actor.rawSelf.seq(body, other) - } -} - -@deprecated("Scala Actors are being removed from the standard library. Please refer to the migration guide.", "2.10") -trait StashingActor extends InternalActor { - type Receive = PartialFunction[Any, Unit] - - // checks if StashingActor is created within the actorOf block - creationCheck; - - private[actors] val ref = new InternalActorRef(this) - - val self: ActorRef = ref - - protected[this] val context: ActorContext = new ActorContext(this) - - @volatile - private var myTimeout: Option[Long] = None - - private val stash = new MQueue[Any]("Stash") - - /** - * Migration notes: - * this method replaces receiveWithin, receive and react methods from Scala Actors. - */ - def receive: Receive - - /** - * User overridable callback. - *

- * Is called when an Actor is started by invoking 'actor'. - */ - def preStart() {} - - /** - * User overridable callback. - *

- * Is called when 'actor.stop()' is invoked. - */ - def postStop() {} - - /** - * User overridable callback. - *

- * Is called on a crashed Actor right BEFORE it is restarted to allow clean - * up of resources before Actor is terminated. - * By default it calls postStop() - */ - def preRestart(reason: Throwable, message: Option[Any]) { postStop() } - - /** - * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. - * Puts the behavior on top of the hotswap stack. - * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack - */ - private def become(behavior: Receive, discardOld: Boolean = true) { - if (discardOld) unbecome() - behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(behavior)) - } - - /** - * Reverts the Actor behavior to the previous one in the hotswap stack. - */ - private def unbecome() { - // never unbecome the initial behavior - if (behaviorStack.size > 1) - behaviorStack = behaviorStack.pop - } - - /** - * User overridable callback. - *

- * Is called when a message isn't handled by the current behavior of the actor - * by default it does: EventHandler.warning(self, message) - */ - def unhandled(message: Any) { - message match { - case Terminated(dead) ⇒ throw new DeathPactException(dead) - case _ ⇒ System.err.println("Unhandeled message " + message) - } - } - - protected def sender: ActorRef = new OutputChannelRef(internalSender) - - override def act(): Unit = internalAct() - - override def start(): StashingActor = { - super.start() - this - } - - override def receive[R](f: PartialFunction[Any, R]): R - - /* - * Internal implementation. - */ - - private[actors] var behaviorStack = immutable.Stack[PartialFunction[Any, Unit]]() - - /* - * Checks that StashingActor can be created only by MigrationSystem.actorOf method. - */ - private[this] def creationCheck = { - - // creation check (see ActorRef) - val context = MigrationSystem.contextStack.get - if (context.isEmpty) - throw new RuntimeException("In order to create StashingActor one must use actorOf.") - else { - if (!context.head) - throw new RuntimeException("Only one actor can be created per actorOf call.") - else - MigrationSystem.contextStack.set(context.push(false)) - } - - } - - private[actors] override def preAct() { - preStart() - } - - /** - * Adds message to a stash, to be processed later. Stashed messages can be fed back into the $actor's - * mailbox using unstashAll(). - * - * Temporarily stashing away messages that the $actor does not (yet) handle simplifies implementing - * certain messaging protocols. - */ - final def stash(msg: Any): Unit = { - stash.append(msg, null) - } - - final def unstashAll(): Unit = { - mailbox.prepend(stash) - stash.clear() - } - - /** - * Wraps any partial function with Exit message handling. - */ - private[actors] def wrapWithSystemMessageHandling(pf: PartialFunction[Any, Unit]): PartialFunction[Any, Unit] = { - - def swapExitHandler(pf: PartialFunction[Any, Unit]) = new PartialFunction[Any, Unit] { - def swapExit(v: Any) = v match { - case Exit(from, reason) => - Terminated(new InternalActorRef(from.asInstanceOf[InternalActor])) - case v => v - } - - def isDefinedAt(v: Any) = pf.isDefinedAt(swapExit(v)) - def apply(v: Any) = pf(swapExit(v)) - } - - swapExitHandler(pf orElse { - case m => unhandled(m) - }) - } - - /** - * Method that models the behavior of Akka actors. - */ - private[actors] def internalAct() { - trapExit = true - behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(receive)) - loop { - if (myTimeout.isDefined) - reactWithin(myTimeout.get)(behaviorStack.top) - else - react(behaviorStack.top) - } - } - - private[actors] override def internalPostStop() = postStop() - - // Used for pattern matching statement similar to Akka - lazy val ReceiveTimeout = TIMEOUT - - /** - * Used to simulate Akka context behavior. Should be used only for migration purposes. - */ - protected[actors] class ActorContext(val actr: StashingActor) { - - /** - * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. - * Puts the behavior on top of the hotswap stack. - * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack - */ - def become(behavior: Receive, discardOld: Boolean = true) = actr.become(behavior, discardOld) - - /** - * Reverts the Actor behavior to the previous one in the hotswap stack. - */ - def unbecome() = actr.unbecome() - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop(subject: ActorRef): Nothing = if (subject != ref) - throw new RuntimeException("Only stoping of self is allowed during migration.") - else - actr.exit() - - /** - * Registers this actor as a Monitor for the provided ActorRef. - * @return the provided ActorRef - */ - def watch(subject: ActorRef): ActorRef = { - actr.watch(subject) - subject - } - - /** - * Unregisters this actor as Monitor for the provided ActorRef. - * @return the provided ActorRef - */ - def unwatch(subject: ActorRef): ActorRef = { - actr unwatch subject - subject - } - - /** - * Defines the receiver timeout value. - */ - final def setReceiveTimeout(timeout: Duration): Unit = - actr.myTimeout = Some(timeout.toMillis) - - /** - * Gets the current receiveTimeout - */ - final def receiveTimeout: Option[Duration] = - actr.myTimeout.map(Duration(_, TimeUnit.MILLISECONDS)) - - } -} - -/** - * This message is thrown by default when an Actor does not handle termination. - */ -class DeathPactException(ref: ActorRef = null) extends Exception { - override def fillInStackTrace() = this //Don't waste cycles generating stack trace -} - -/** - * Message that is sent to a watching actor when the watched actor terminates. - */ -case class Terminated(actor: ActorRef) diff --git a/src/actors-migration/scala/actors/Timeout.scala b/src/actors-migration/scala/actors/Timeout.scala deleted file mode 100644 index 32ea5f20fc..0000000000 --- a/src/actors-migration/scala/actors/Timeout.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors.migration - -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit -import scala.language.implicitConversions - -case class Timeout(duration: Duration) { - def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) - def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) -} - -object Timeout { - - /** - * A timeout with zero duration, will cause most requests to always timeout. - */ - val zero = new Timeout(Duration.Zero) - - /** - * A Timeout with infinite duration. Will never timeout. Use extreme caution with this - * as it may cause memory leaks, blocked threads, or may not even be supported by - * the receiver, which would result in an exception. - */ - val never = new Timeout(Duration.Inf) - - def apply(timeout: Long) = new Timeout(timeout) - def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) - - implicit def durationToTimeout(duration: Duration) = new Timeout(duration) - implicit def intToTimeout(timeout: Int) = new Timeout(timeout) - implicit def longToTimeout(timeout: Long) = new Timeout(timeout) -} diff --git a/src/actors-migration/scala/actors/migration/MigrationSystem.scala b/src/actors-migration/scala/actors/migration/MigrationSystem.scala new file mode 100644 index 0000000000..3dcb38e634 --- /dev/null +++ b/src/actors-migration/scala/actors/migration/MigrationSystem.scala @@ -0,0 +1,37 @@ +package scala.actors.migration + +import scala.actors._ +import scala.collection._ + +object MigrationSystem { + + private[migration] val contextStack = new ThreadLocal[immutable.Stack[Boolean]] { + override def initialValue() = immutable.Stack[Boolean]() + } + + private[this] def withCleanContext(block: => ActorRef): ActorRef = { + // push clean marker + val old = contextStack.get + contextStack.set(old.push(true)) + try { + val instance = block + + if (instance eq null) + throw new Exception("Actor instance passed to actorOf can't be 'null'") + + instance + } finally { + val stackAfter = contextStack.get + if (stackAfter.nonEmpty) + contextStack.set(if (!stackAfter.head) stackAfter.pop.pop else stackAfter.pop) + } + } + + def actorOf(props: Props): ActorRef = withCleanContext { + val creator = props.creator() + val r = new InternalActorRef(creator) + creator.start() + r + } + +} \ No newline at end of file diff --git a/src/actors-migration/scala/actors/migration/Pattern.scala b/src/actors-migration/scala/actors/migration/Pattern.scala new file mode 100644 index 0000000000..25ba191ce7 --- /dev/null +++ b/src/actors-migration/scala/actors/migration/Pattern.scala @@ -0,0 +1,27 @@ +package scala.actors.migration + +import scala.actors._ +import scala.concurrent.duration.Duration +import language.implicitConversions + +object pattern { + + implicit def ask(ar: ActorRef): AskableActorRef = + new AskableActorRef(ar) +} + +/** + * ActorRef with support for ask(?) operation. + */ +class AskableActorRef(val ar: ActorRef) extends ActorRef { + + def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender) + + def ?(message: Any)(implicit timeout: Timeout): scala.concurrent.Future[Any] = ar.?(message, timeout.duration) + + private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = ar.?(message, timeout) + + def forward(message: Any) = ar.forward(message) + + private[actors] def localActor: AbstractActor = ar.localActor +} diff --git a/src/actors-migration/scala/actors/migration/Props.scala b/src/actors-migration/scala/actors/migration/Props.scala new file mode 100644 index 0000000000..c12384ea55 --- /dev/null +++ b/src/actors-migration/scala/actors/migration/Props.scala @@ -0,0 +1,15 @@ +package scala.actors.migration + +import scala.actors._ + +/** + * ActorRef configuration object. It represents the minimal subset of Akka Props class. + */ +case class Props(creator: () ⇒ InternalActor, dispatcher: String) { + + /** + * Returns a new Props with the specified creator set + */ + def withCreator(c: ⇒ InternalActor) = copy(creator = () ⇒ c) + +} diff --git a/src/actors-migration/scala/actors/migration/StashingActor.scala b/src/actors-migration/scala/actors/migration/StashingActor.scala new file mode 100644 index 0000000000..9c3917b65e --- /dev/null +++ b/src/actors-migration/scala/actors/migration/StashingActor.scala @@ -0,0 +1,258 @@ +package scala.actors.migration + +import scala.actors._ +import scala.actors.Actor._ +import scala.collection._ +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeUnit +import scala.language.implicitConversions + +object StashingActor extends Combinators { + implicit def mkBody[A](body: => A) = new InternalActor.Body[A] { + def andThen[B](other: => B): Unit = Actor.rawSelf.seq(body, other) + } +} + +@deprecated("Scala Actors are being removed from the standard library. Please refer to the migration guide.", "2.10") +trait StashingActor extends InternalActor { + type Receive = PartialFunction[Any, Unit] + + // checks if StashingActor is created within the actorOf block + creationCheck; + + private[actors] val ref = new InternalActorRef(this) + + val self: ActorRef = ref + + protected[this] val context: ActorContext = new ActorContext(this) + + @volatile + private var myTimeout: Option[Long] = None + + private val stash = new MQueue[Any]("Stash") + + /** + * Migration notes: + * this method replaces receiveWithin, receive and react methods from Scala Actors. + */ + def receive: Receive + + /** + * User overridable callback. + *

+ * Is called when an Actor is started by invoking 'actor'. + */ + def preStart() {} + + /** + * User overridable callback. + *

+ * Is called when 'actor.stop()' is invoked. + */ + def postStop() {} + + /** + * User overridable callback. + *

+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. + * By default it calls postStop() + */ + def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + + /** + * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. + * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack + */ + private def become(behavior: Receive, discardOld: Boolean = true) { + if (discardOld) unbecome() + behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(behavior)) + } + + /** + * Reverts the Actor behavior to the previous one in the hotswap stack. + */ + private def unbecome() { + // never unbecome the initial behavior + if (behaviorStack.size > 1) + behaviorStack = behaviorStack.pop + } + + /** + * User overridable callback. + *

+ * Is called when a message isn't handled by the current behavior of the actor + * by default it does: EventHandler.warning(self, message) + */ + def unhandled(message: Any) { + message match { + case Terminated(dead) ⇒ throw new DeathPactException(dead) + case _ ⇒ System.err.println("Unhandeled message " + message) + } + } + + protected def sender: ActorRef = new OutputChannelRef(internalSender) + + override def act(): Unit = internalAct() + + override def start(): StashingActor = { + super.start() + this + } + + override def receive[R](f: PartialFunction[Any, R]): R + + /* + * Internal implementation. + */ + + private[actors] var behaviorStack = immutable.Stack[PartialFunction[Any, Unit]]() + + /* + * Checks that StashingActor can be created only by MigrationSystem.actorOf method. + */ + private[this] def creationCheck = { + + // creation check (see ActorRef) + val context = MigrationSystem.contextStack.get + if (context.isEmpty) + throw new RuntimeException("In order to create StashingActor one must use actorOf.") + else { + if (!context.head) + throw new RuntimeException("Only one actor can be created per actorOf call.") + else + MigrationSystem.contextStack.set(context.push(false)) + } + + } + + private[actors] override def preAct() { + preStart() + } + + /** + * Adds message to a stash, to be processed later. Stashed messages can be fed back into the $actor's + * mailbox using unstashAll(). + * + * Temporarily stashing away messages that the $actor does not (yet) handle simplifies implementing + * certain messaging protocols. + */ + final def stash(msg: Any): Unit = { + stash.append(msg, null) + } + + final def unstashAll(): Unit = { + mailbox.prepend(stash) + stash.clear() + } + + /** + * Wraps any partial function with Exit message handling. + */ + private[actors] def wrapWithSystemMessageHandling(pf: PartialFunction[Any, Unit]): PartialFunction[Any, Unit] = { + + def swapExitHandler(pf: PartialFunction[Any, Unit]) = new PartialFunction[Any, Unit] { + def swapExit(v: Any) = v match { + case Exit(from, reason) => + Terminated(new InternalActorRef(from.asInstanceOf[InternalActor])) + case v => v + } + + def isDefinedAt(v: Any) = pf.isDefinedAt(swapExit(v)) + def apply(v: Any) = pf(swapExit(v)) + } + + swapExitHandler(pf orElse { + case m => unhandled(m) + }) + } + + /** + * Method that models the behavior of Akka actors. + */ + private[actors] def internalAct() { + trapExit = true + behaviorStack = behaviorStack.push(wrapWithSystemMessageHandling(receive)) + loop { + if (myTimeout.isDefined) + reactWithin(myTimeout.get)(behaviorStack.top) + else + react(behaviorStack.top) + } + } + + private[actors] override def internalPostStop() = postStop() + + // Used for pattern matching statement similar to Akka + lazy val ReceiveTimeout = TIMEOUT + + /** + * Used to simulate Akka context behavior. Should be used only for migration purposes. + */ + protected[actors] class ActorContext(val actr: StashingActor) { + + /** + * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. + * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack + */ + def become(behavior: Receive, discardOld: Boolean = true) = actr.become(behavior, discardOld) + + /** + * Reverts the Actor behavior to the previous one in the hotswap stack. + */ + def unbecome() = actr.unbecome() + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop(subject: ActorRef): Nothing = if (subject != ref) + throw new RuntimeException("Only stoping of self is allowed during migration.") + else + actr.exit() + + /** + * Registers this actor as a Monitor for the provided ActorRef. + * @return the provided ActorRef + */ + def watch(subject: ActorRef): ActorRef = { + actr.watch(subject) + subject + } + + /** + * Unregisters this actor as Monitor for the provided ActorRef. + * @return the provided ActorRef + */ + def unwatch(subject: ActorRef): ActorRef = { + actr unwatch subject + subject + } + + /** + * Defines the receiver timeout value. + */ + final def setReceiveTimeout(timeout: Duration): Unit = + actr.myTimeout = Some(timeout.toMillis) + + /** + * Gets the current receiveTimeout + */ + final def receiveTimeout: Option[Duration] = + actr.myTimeout.map(Duration(_, TimeUnit.MILLISECONDS)) + + } +} + +/** + * This message is thrown by default when an Actor does not handle termination. + */ +class DeathPactException(ref: ActorRef = null) extends Exception { + override def fillInStackTrace() = this //Don't waste cycles generating stack trace +} + +/** + * Message that is sent to a watching actor when the watched actor terminates. + */ +case class Terminated(actor: ActorRef) diff --git a/src/actors-migration/scala/actors/migration/Timeout.scala b/src/actors-migration/scala/actors/migration/Timeout.scala new file mode 100644 index 0000000000..32ea5f20fc --- /dev/null +++ b/src/actors-migration/scala/actors/migration/Timeout.scala @@ -0,0 +1,40 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors.migration + +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeUnit +import scala.language.implicitConversions + +case class Timeout(duration: Duration) { + def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) + def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) +} + +object Timeout { + + /** + * A timeout with zero duration, will cause most requests to always timeout. + */ + val zero = new Timeout(Duration.Zero) + + /** + * A Timeout with infinite duration. Will never timeout. Use extreme caution with this + * as it may cause memory leaks, blocked threads, or may not even be supported by + * the receiver, which would result in an exception. + */ + val never = new Timeout(Duration.Inf) + + def apply(timeout: Long) = new Timeout(timeout) + def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) + + implicit def durationToTimeout(duration: Duration) = new Timeout(duration) + implicit def intToTimeout(timeout: Int) = new Timeout(timeout) + implicit def longToTimeout(timeout: Long) = new Timeout(timeout) +} -- cgit v1.2.3