aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/akka
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-05-07 22:07:02 -0300
committerDiego <diegolparra@gmail.com>2014-05-07 22:07:02 -0300
commit290aabd4608bffc58cf8fe734371ee7bbe78df4f (patch)
tree4813ac52bdc74dbd1ba6cbed82bfd82893b17b50 /kamon-core/src/main/scala/akka
parent5e413d21a2fe5e604eca5b2c2f029206a61c83aa (diff)
downloadKamon-290aabd4608bffc58cf8fe734371ee7bbe78df4f.tar.gz
Kamon-290aabd4608bffc58cf8fe734371ee7bbe78df4f.tar.bz2
Kamon-290aabd4608bffc58cf8fe734371ee7bbe78df4f.zip
+ core: MinMaxCounter for actor mailbox size
Diffstat (limited to 'kamon-core/src/main/scala/akka')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala32
1 files changed, 20 insertions, 12 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 78c170de..7766b3a1 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
+
package akka.instrumentation
import org.aspectj.lang.annotation._
@@ -23,7 +24,7 @@ import kamon.trace._
import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
-import java.util.concurrent.atomic.AtomicInteger
+import kamon.metrics.instruments.counter.{ Counter, MinMaxCounter }
@Aspect
class BehaviourInvokeTracing {
@@ -33,12 +34,27 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ import scala.concurrent.duration._
+
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ val executor = system.dispatchers.lookup("kamon.default-dispatcher")
+
+ system.scheduler.schedule(0 milliseconds, 100 milliseconds) {
+ cellWithMetrics.actorMetricsRecorder.map {
+ am ⇒
+ val (min, max, sum) = cellWithMetrics.queueSize.collect()
+
+ am.mailboxSize.record(min)
+ am.mailboxSize.record(max)
+ am.mailboxSize.record(sum)
+ }
+ }(executor)
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -59,10 +75,7 @@ class BehaviourInvokeTracing {
am ⇒
am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
-
- val currentMailboxSize = cellWithMetrics.queueSize.decrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
+ cellWithMetrics.queueSize.decrement()
}
}
}
@@ -73,12 +86,7 @@ class BehaviourInvokeTracing {
@After("sendingMessageToActorCell(cell)")
def afterSendMessageToActorCell(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
- val currentMailboxSize = cellWithMetrics.queueSize.incrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
- }
+ cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment())
}
@Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
@@ -106,7 +114,7 @@ class BehaviourInvokeTracing {
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
- val queueSize = new AtomicInteger
+ val queueSize: Counter = MinMaxCounter()
}
@Aspect