summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/actor/Actor.scala
blob: b9bc51b635355e64fcccf6a8290d0adfc09e875e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
/** Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.actor

import akka.dispatch._
import akka.config.Config._
import akka.util.Helpers.{ narrow, narrowSilently }
import akka.util.ListenerManagement
import akka.AkkaException

import scala.beans.BeanProperty
import akka.util.{ ReflectiveAccess, Duration }
import akka.remoteinterface.RemoteSupport
import akka.japi.{ Creator, Procedure }
import java.lang.reflect.InvocationTargetException

/** Life-cycle messages for the Actors
 */
sealed trait LifeCycleMessage extends Serializable

/* Marker trait to show which Messages are automatically handled by Akka */
sealed trait AutoReceivedMessage { self: LifeCycleMessage => }

case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
  extends AutoReceivedMessage with LifeCycleMessage {

  /** Java API
   */
  def this(code: akka.japi.Function[ActorRef, Procedure[Any]], discardOld: Boolean) =
    this((self: ActorRef) => {
      val behavior = code(self)
      val result: Actor.Receive = { case msg => behavior(msg) }
      result
    }, discardOld)

  /** Java API with default non-stacking behavior
   */
  def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
}

case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage

case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage

case class Exit(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage

case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage

case class Unlink(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage

case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage

case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage

case object Kill extends AutoReceivedMessage with LifeCycleMessage

case object ReceiveTimeout extends LifeCycleMessage

case class MaximumNumberOfRestartsWithinTimeRangeReached(
  @BeanProperty val victim: ActorRef,
  @BeanProperty val maxNrOfRetries: Option[Int],
  @BeanProperty val withinTimeRange: Option[Int],
  @BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage

// Exceptions for Actors
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)

/** This message is thrown by default when an Actors behavior doesn't match a message
 */
case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
  override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg)
  override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}

/** Actor factory module with factory methods for creating various kinds of Actors.
 *
 *  @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */
object Actor extends ListenerManagement {

  /** Add shutdown cleanups
   */
  private[akka] lazy val shutdownHook = {
    val hook = new Runnable {
      override def run {
        // Clear Thread.subclassAudits
        val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
        tf.setAccessible(true)
        val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_, _]]
        subclassAudits synchronized { subclassAudits.clear }
      }
    }
    Runtime.getRuntime.addShutdownHook(new Thread(hook))
    hook
  }

  val registry = new ActorRegistry

  lazy val remote: RemoteSupport = {
    ReflectiveAccess
      .Remote
      .defaultRemoteSupport
      .map(_())
      .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath"))
  }

  private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
  private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)

  /** A Receive is a convenience type that defines actor message behavior currently modeled as
   *  a PartialFunction[Any, Unit].
   */
  type Receive = PartialFunction[Any, Unit]

  private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] {
    override def initialValue = None
  }

  /** Creates an ActorRef out of the Actor with type T.
   *  <pre>
   *   import Actor._
   *   val actor = actorOf[MyActor]
   *   actor.start()
   *   actor ! message
   *   actor.stop()
   *  </pre>
   *  You can create and start the actor in one statement like this:
   *  <pre>
   *   val actor = actorOf[MyActor].start()
   *  </pre>
   */
  def actorOf[T <: Actor: ClassTag]: ActorRef = actorOf(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]])

  /** Creates an ActorRef out of the Actor of the specified Class.
   *  <pre>
   *   import Actor._
   *   val actor = actorOf(classOf[MyActor])
   *   actor.start()
   *   actor ! message
   *   actor.stop()
   *  </pre>
   *  You can create and start the actor in one statement like this:
   *  <pre>
   *   val actor = actorOf(classOf[MyActor]).start()
   *  </pre>
   */
  def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
    import ReflectiveAccess.{ createInstance, noParams, noArgs }
    createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs) match {
      case Right(actor) => actor
      case Left(exception) =>
        val cause = exception match {
          case i: InvocationTargetException => i.getTargetException
          case _                            => exception
        }

        throw new ActorInitializationException(
          "Could not instantiate Actor of " + clazz +
            "\nMake sure Actor is NOT defined inside a class/trait," +
            "\nif so put it outside the class/trait, f.e. in a companion object," +
            "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause)
    }

  }, None)

  /** Creates an ActorRef out of the Actor. Allows you to pass in a factory function
   *  that creates the Actor. Please note that this function can be invoked multiple
   *  times if for example the Actor is supervised and needs to be restarted.
   *  <p/>
   *  This function should <b>NOT</b> be used for remote actors.
   *  <pre>
   *   import Actor._
   *   val actor = actorOf(new MyActor)
   *   actor.start()
   *   actor ! message
   *   actor.stop()
   *  </pre>
   *  You can create and start the actor in one statement like this:
   *  <pre>
   *   val actor = actorOf(new MyActor).start()
   *  </pre>
   */
  def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None)

  /** Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
   *  that creates the Actor. Please note that this function can be invoked multiple
   *  times if for example the Actor is supervised and needs to be restarted.
   *  <p/>
   *  This function should <b>NOT</b> be used for remote actors.
   *  JAVA API
   */
  def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None)

  /** Use to spawn out a block of code in an event-driven actor. Will shut actor down when
   *  the block has been executed.
   *  <p/>
   *  NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
   *  there is a method 'spawn[ActorType]' in the Actor trait already.
   *  Example:
   *  <pre>
   *  import Actor.{spawn}
   *
   *  spawn  {
   *   ... // do stuff
   *  }
   *  </pre>
   */
  def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
    case object Spawn
    actorOf(new Actor() {
      self.dispatcher = dispatcher
      def receive = {
        case Spawn => try { body } finally { self.stop() }
      }
    }).start() ! Spawn
  }

  /** Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
   *  to convert an Option[Any] to an Option[T].
   */
  implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption)

  /** Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
   *  to convert an Option[Any] to an Option[T].
   *  This means that the following code is equivalent:
   *   (actor !! "foo").as[Int] (Deprecated)
   *   and
   *   (actor !!! "foo").as[Int] (Recommended)
   */
  implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
    try { anyFuture.await } catch { case t: FutureTimeoutException => }
    anyFuture.resultOrException
  })
}

/** Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
 *  <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
 *  <p/>
 *  An actor has a well-defined (non-cyclic) life-cycle.
 *  <pre>
 *  => NEW (newly created actor) - can't receive messages (yet)
 *     => STARTED (when 'start' is invoked) - can receive messages
 *         => SHUT DOWN (when 'exit' is invoked) - can't do anything
 *  </pre>
 *
 *  <p/>
 *  The Actor's API is available in the 'self' member variable.
 *
 *  <p/>
 *  Here you find functions like:
 *   - !, !!, !!! and forward
 *   - link, unlink, startLink, spawnLink etc
 *   - makeRemote etc.
 *   - start, stop
 *   - etc.
 *
 *  <p/>
 *  Here you also find fields like
 *   - dispatcher = ...
 *   - id = ...
 *   - lifeCycle = ...
 *   - faultHandler = ...
 *   - trapExit = ...
 *   - etc.
 *
 *  <p/>
 *  This means that to use them you have to prefix them with 'self', like this: <tt>self ! Message</tt>
 *
 *  However, for convenience you can import these functions and fields like below, which will allow you do
 *  drop the 'self' prefix:
 *  <pre>
 *  class MyActor extends Actor  {
 *   import self._
 *   id = ...
 *   dispatcher = ...
 *   spawnLink[OtherActor]
 *   ...
 *  }
 *  </pre>
 *
 *  @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */
trait Actor {

  /** Type alias because traits cannot have companion objects.
   */
  type Receive = Actor.Receive

  /*
   * Some[ActorRef] representation of the 'self' ActorRef reference.
   * <p/>
   * Mainly for internal use, functions as the implicit sender references when invoking
   * the 'forward' function.
   */
  @transient
  implicit val someSelf: Some[ActorRef] = {
    val optRef = Actor.actorRefInCreation.get
    if (optRef.isEmpty) throw new ActorInitializationException(
      "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
        "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
        "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
        "\n\tEither use:" +
        "\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
        "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'")
    Actor.actorRefInCreation.set(None)
    optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed?
    optRef.asInstanceOf[Some[ActorRef]]
  }

  /*
   * Option[ActorRef] representation of the 'self' ActorRef reference.
   * <p/>
   * Mainly for internal use, functions as the implicit sender references when invoking
   * one of the message send functions ('!', '!!' and '!!!').
   */
  implicit def optionSelf: Option[ActorRef] = someSelf

  /** The 'self' field holds the ActorRef for this actor.
   *  <p/>
   *  Can be used to send messages to itself:
   *  <pre>
   *  self ! message
   *  </pre>
   *  Here you also find most of the Actor API.
   *  <p/>
   *  For example fields like:
   *  <pre>
   *  self.dispatcher = ...
   *  self.trapExit = ...
   *  self.faultHandler = ...
   *  self.lifeCycle = ...
   *  self.sender
   *  </pre>
   *  <p/>
   *  Here you also find methods like:
   *  <pre>
   *  self.reply(..)
   *  self.link(..)
   *  self.unlink(..)
   *  self.start(..)
   *  self.stop(..)
   *  </pre>
   */
  @transient
  val self: ScalaActorRef = someSelf.get

  /** User overridable callback/setting.
   *  <p/>
   *  Partial function implementing the actor logic.
   *  To be implemented by concrete actor class.
   *  <p/>
   *  Example code:
   *  <pre>
   *   def receive = {
   *     case Ping =&gt;
   *       println("got a 'Ping' message")
   *       self.reply("pong")
   *
   *     case OneWay =&gt;
   *       println("got a 'OneWay' message")
   *
   *     case unknown =&gt;
   *       println("unknown message: " + unknown)
   *  }
   *  </pre>
   */
  protected def receive: Receive

  /** User overridable callback.
   *  <p/>
   *  Is called when an Actor is started by invoking 'actor.start()'.
   */
  def preStart() {}

  /** User overridable callback.
   *  <p/>
   *  Is called when 'actor.stop()' is invoked.
   */
  def postStop() {}

  /** User overridable callback.
   *  <p/>
   *  Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
   */
  def preRestart(reason: Throwable) {}

  /** User overridable callback.
   *  <p/>
   *  Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
   */
  def postRestart(reason: Throwable) {}

  /** User overridable callback.
   *  <p/>
   *  Is called when a message isn't handled by the current behavior of the actor
   *  by default it throws an UnhandledMessageException
   */
  def unhandled(msg: Any) {
    throw new UnhandledMessageException(msg, self)
  }

  /** Is the actor able to handle the message passed in as arguments?
   */
  def isDefinedAt(message: Any): Boolean = {
    val behaviorStack = self.hotswap
    message match { //Same logic as apply(msg) but without the unhandled catch-all
      case l: AutoReceivedMessage => true
      case msg if behaviorStack.nonEmpty &&
        behaviorStack.head.isDefinedAt(msg) => true
      case msg if behaviorStack.isEmpty &&
        processingBehavior.isDefinedAt(msg) => true
      case _ => false
    }
  }

  /** Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
   *  Puts the behavior on top of the hotswap stack.
   *  If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
   */
  def become(behavior: Receive, discardOld: Boolean = true) {
    if (discardOld) unbecome()
    self.hotswap = self.hotswap.push(behavior)
  }

  /** Reverts the Actor behavior to the previous one in the hotswap stack.
   */
  def unbecome(): Unit = {
    val h = self.hotswap
    if (h.nonEmpty) self.hotswap = h.pop
  }

  // =========================================
  // ==== INTERNAL IMPLEMENTATION DETAILS ====
  // =========================================

  private[akka] final def apply(msg: Any) = {
    if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
      throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
    val behaviorStack = self.hotswap
    msg match {
      case l: AutoReceivedMessage => autoReceiveMessage(l)
      case msg if behaviorStack.nonEmpty &&
        behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
      case msg if behaviorStack.isEmpty &&
        processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg)
      case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior
    }
  }

  private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match {
    case HotSwap(code, discardOld) => become(code(self), discardOld)
    case RevertHotSwap             => unbecome()
    case Exit(dead, reason)        => self.handleTrapExit(dead, reason)
    case Link(child)               => self.link(child)
    case Unlink(child)             => self.unlink(child)
    case UnlinkAndStop(child)      => self.unlink(child); child.stop()
    case Restart(reason)           => throw reason
    case Kill                      => throw new ActorKilledException("Kill")
    case PoisonPill =>
      val f = self.senderFuture
      self.stop()
      if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
  }

  private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior
}

private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {

  /** Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
   *  if the actual type is not assignable from the given one.
   */
  def as[T]: Option[T] = narrow[T](anyOption)

  /** Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
   *  ClassCastException and return None in that case.
   */
  def asSilently[T: ClassTag]: Option[T] = narrowSilently[T](anyOption)
}

/** Marker interface for proxyable actors (such as typed actor).
 *
 *  @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */
trait Proxyable {
  private[actor] def swapProxiedActor(newInstance: Actor)
}

/** Represents the different Actor types.
 *
 *  @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */
sealed trait ActorType
object ActorType {
  case object ScalaActor extends ActorType
  case object TypedActor extends ActorType
}