From 3f53cb29a12806bb6e47cfb2631b0cb35ec6dca8 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 8 Apr 2014 11:27:04 -0300 Subject: + core: move logic in UnboundedMailboxInstrumentation to ActoMessagePassingTracing in order to do at most two calls --- kamon-core/src/main/resources/META-INF/aop.xml | 3 -- .../ActorMessagePassingTracing.scala | 32 +++++++----- .../UnboundedMailboxInstrumentation.scala | 58 ---------------------- 3 files changed, 19 insertions(+), 74 deletions(-) delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 27a047b0..180d905b 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -16,9 +16,6 @@ - - - diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index dcdf6f94..85a47ee3 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -1,5 +1,5 @@ /* =================================================== - * Copyright © 2013 the kamon project + * Copyright © 2013-2014 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,12 @@ package akka.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.dispatch.{Envelope, MessageDispatcher} import kamon.trace._ -import kamon.metrics.{ ActorMetrics, Metrics } +import kamon.metrics.{ActorMetrics, Metrics} import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder +import java.util.concurrent.atomic.AtomicInteger @Aspect class BehaviourInvokeTracing { @@ -49,14 +50,17 @@ class BehaviourInvokeTracing { val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { - pjp.proceed() - } - - cellWithMetrics.actorMetricsRecorder.map { am ⇒ - am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) - am.mailboxSize.record(cell.numberOfMessages) + try { + TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { + pjp.proceed() + } + } finally { + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ + am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) + am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) + am.mailboxSize.record(cellWithMetrics.queueSize.decrementAndGet()) + } } } @@ -66,8 +70,9 @@ class BehaviourInvokeTracing { @After("sendingMessageToActorCell(cell)") def afterSendMessageToActorCell(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map { am ⇒ - am.mailboxSize.record(cell.numberOfMessages) + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ + am.mailboxSize.record(cellWithMetrics.queueSize.incrementAndGet()) } } @@ -84,6 +89,7 @@ class BehaviourInvokeTracing { trait ActorCellMetrics { var metricIdentity: ActorMetrics = _ var actorMetricsRecorder: Option[ActorMetricRecorder] = _ + val queueSize = new AtomicInteger } @Aspect diff --git a/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala deleted file mode 100644 index 94cd82eb..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import java.util.concurrent.atomic.AtomicInteger -import org.aspectj.lang.ProceedingJoinPoint - -/** - * This aspect adds a counter to the message queue to get O(1) time in the .numberOfMessages method implementation - */ -@Aspect -class UnboundedMailboxInstrumentation { - - @DeclareMixin("akka.dispatch.UnboundedMailbox$MessageQueue") - def mixinMailboxToMailboxSizeAware: MessageQueueSizeCounting = new MessageQueueSizeCounting {} - - @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.enqueue(..)) && this(queue)") - def enqueuePointcut(queue: MessageQueueSizeCounting): Unit = {} - - @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.dequeue()) && this(queue)") - def dequeuePointcut(queue: MessageQueueSizeCounting): Unit = {} - - @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.numberOfMessages()) && this(queue)") - def numberOfMessagesPointcut(queue: MessageQueueSizeCounting): Unit = {} - - @After("dequeuePointcut(queue)") - def afterDequeuePointcut(queue: MessageQueueSizeCounting): Unit = { - if (queue.internalQueueSize.get() > 0) queue.internalQueueSize.decrementAndGet() - } - - @After("enqueuePointcut(queue)") - def afterEnqueue(queue: MessageQueueSizeCounting): Unit = { - queue.internalQueueSize.incrementAndGet() - } - - @Around("numberOfMessagesPointcut(queue)") - def aroundNumberOfMessages(pjp: ProceedingJoinPoint, queue: MessageQueueSizeCounting): Any = { - queue.internalQueueSize.get() - } -} - -trait MessageQueueSizeCounting { - val internalQueueSize = new AtomicInteger -} -- cgit v1.2.3