aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-11 15:25:21 -0700
committerReynold Xin <rxin@apache.org>2014-08-11 15:25:21 -0700
commit37338666655909502e424b4639d680271d6d4c12 (patch)
tree816be409334dde9bb9000eb307de2fbbfca8f67a /core
parentdb06a81fb7a413faa3fe0f8c35918f70454cb05d (diff)
downloadspark-37338666655909502e424b4639d680271d6d4c12.tar.gz
spark-37338666655909502e424b4639d680271d6d4c12.tar.bz2
spark-37338666655909502e424b4639d680271d6d4c12.zip
[SPARK-2952] Enable logging actor messages at DEBUG level
Example messages: ``` 14/08/09 21:37:01 DEBUG BlockManagerMasterActor: [actor] received message RegisterBlockManager(BlockManagerId(0, rxin-mbp, 58092, 0),278302556,Actor[akka.tcp://spark@rxin-mbp:58088/user/BlockManagerActor1#-63596539]) from Actor[akka.tcp://spark@rxin-mbp:58088/temp/$c] 14/08/09 21:37:01 DEBUG BlockManagerMasterActor: [actor] handled message (0.279 ms) RegisterBlockManager(BlockManagerId(0, rxin-mbp, 58092, 0),278302556,Actor[akka.tcp://spark@rxin-mbp:58088/user/BlockManagerActor1#-63596539]) from Actor[akka.tcp://spark@rxin-mbp:58088/temp/$c] ``` cc @mengxr @tdas @pwendell Author: Reynold Xin <rxin@apache.org> Closes #1870 from rxin/actorLogging and squashes the following commits: c531ee5 [Reynold Xin] Added license header for ActorLogReceive. f6b1ebe [Reynold Xin] [SPARK-2952] Enable logging actor messages at DEBUG level
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala6
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala64
13 files changed, 111 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 24ccce21b6..83ae57b7f1 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -21,6 +21,7 @@ import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.util.ActorLogReceive
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -36,8 +37,10 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
- override def receive = {
+private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+ extends Actor with ActorLogReceive with Logging {
+
+ override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 8940917614..51705c895a 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
/** Actor class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
- def receive = {
+ override def receiveWithLogging = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c07003784e..065ddda50e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -27,12 +27,14 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
/**
* Proxy that relays messages to the driver.
*/
-private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
+private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
+ extends Actor with ActorLogReceive with Logging {
+
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
@@ -114,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
}
}
- override def receive = {
+ override def receiveWithLogging = {
case SubmitDriverResponse(success, driverId, message) =>
println(message)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index d38e9e7920..32790053a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -56,7 +56,7 @@ private[spark] class AppClient(
var registered = false
var activeMasterUrl: String = null
- class ClientActor extends Actor with Logging {
+ class ClientActor extends Actor with ActorLogReceive with Logging {
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
@@ -119,7 +119,7 @@ private[spark] class AppClient(
.contains(remoteUrl.hostPort)
}
- override def receive = {
+ override def receiveWithLogging = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
registered = true
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a70ecdb375..cfa2c028a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -42,14 +42,14 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
import context.dispatcher // to use Akka's scheduler.schedule()
@@ -167,7 +167,7 @@ private[spark] class Master(
context.stop(leaderElectionAgent)
}
- override def receive = {
+ override def receiveWithLogging = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index bacb514ed6..80fde7e4b2 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -51,7 +51,7 @@ private[spark] class Worker(
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
import context.dispatcher
Utils.checkHost(host, "Expected hostname")
@@ -187,7 +187,7 @@ private[spark] class Worker(
}
}
- override def receive = {
+ override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 530c147000..6d0d0bbe5e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -22,13 +22,15 @@ import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, Di
import org.apache.spark.Logging
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
+import org.apache.spark.util.ActorLogReceive
/**
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
* Provides fate sharing between a worker and its associated child processes.
*/
-private[spark] class WorkerWatcher(workerUrl: String) extends Actor
- with Logging {
+private[spark] class WorkerWatcher(workerUrl: String)
+ extends Actor with ActorLogReceive with Logging {
+
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -48,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
- override def receive = {
+ override def receiveWithLogging = {
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 1f46a0f176..13af5b6f58 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,14 +31,15 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
- sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging {
+ sparkProperties: Seq[(String, String)])
+ extends Actor with ActorLogReceive with ExecutorBackend with Logging {
Utils.checkHostPort(hostPort, "Expected hostport")
@@ -52,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
- override def receive = {
+ override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 33500d967e..2a3711ae2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils
/**
@@ -61,7 +61,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
- class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
+ class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
+
+ override protected def log = CoarseGrainedSchedulerBackend.this.log
+
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
@@ -79,7 +82,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
- def receive = {
+ def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorActor.contains(executorId)) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 3d1cf312cc..bec9502f20 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -23,9 +23,9 @@ import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
+import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.ActorLogReceive
private case class ReviveOffers()
@@ -43,7 +43,7 @@ private case class StopExecutor()
private[spark] class LocalActor(
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
- private val totalCores: Int) extends Actor with Logging {
+ private val totalCores: Int) extends Actor with ActorLogReceive with Logging {
private var freeCores = totalCores
@@ -53,7 +53,7 @@ private[spark] class LocalActor(
val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
- def receive = {
+ override def receiveWithLogging = {
case ReviveOffers =>
reviveOffers()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index bd31e3c5a1..3ab07703b6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -39,7 +39,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
@@ -55,8 +55,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000))
- val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs",
- 60000)
+ val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)
var timeoutCheckingTask: Cancellable = null
@@ -67,9 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
super.preStart()
}
- def receive = {
+ override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
- logInfo("received a register")
register(blockManagerId, maxMemSize, slaveActor)
sender ! true
@@ -118,7 +116,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
sender ! true
case StopBlockManagerMaster =>
- logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
index 6d4db064df..c194e0fed3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
@@ -23,6 +23,7 @@ import akka.actor.{ActorRef, Actor}
import org.apache.spark.{Logging, MapOutputTracker}
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ActorLogReceive
/**
* An actor to take commands from the master to execute options. For example,
@@ -32,12 +33,12 @@ private[storage]
class BlockManagerSlaveActor(
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
import context.dispatcher
// Operations that involve removing blocks may be slow and should be done asynchronously
- override def receive = {
+ override def receiveWithLogging = {
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, sender) {
blockManager.removeBlock(blockId)
diff --git a/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala b/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
new file mode 100644
index 0000000000..332d0cbb2d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import akka.actor.Actor
+import org.slf4j.Logger
+
+/**
+ * A trait to enable logging all Akka actor messages. Here's an example of using this:
+ *
+ * {{{
+ * class BlockManagerMasterActor extends Actor with ActorLogReceive with Logging {
+ * ...
+ * override def receiveWithLogging = {
+ * case GetLocations(blockId) =>
+ * sender ! getLocations(blockId)
+ * ...
+ * }
+ * ...
+ * }
+ * }}}
+ *
+ */
+private[spark] trait ActorLogReceive {
+ self: Actor =>
+
+ override def receive: Actor.Receive = new Actor.Receive {
+
+ private val _receiveWithLogging = receiveWithLogging
+
+ override def isDefinedAt(o: Any): Boolean = _receiveWithLogging.isDefinedAt(o)
+
+ override def apply(o: Any): Unit = {
+ if (log.isDebugEnabled) {
+ log.debug(s"[actor] received message $o from ${self.sender}")
+ }
+ val start = System.nanoTime
+ _receiveWithLogging.apply(o)
+ val timeTaken = (System.nanoTime - start).toDouble / 1000000
+ if (log.isDebugEnabled) {
+ log.debug(s"[actor] handled message ($timeTaken ms) $o from ${self.sender}")
+ }
+ }
+ }
+
+ def receiveWithLogging: Actor.Receive
+
+ protected def log: Logger
+}