From f08cb7395f1b18cd473d876fa862bfdebe61788c Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 7 May 2014 22:07:02 -0300 Subject: + core: MinMaxCounter for actor mailbox size --- .../ActorMessagePassingTracing.scala | 32 ++++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) (limited to 'kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 78c170de..7766b3a1 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ + package akka.instrumentation import org.aspectj.lang.annotation._ @@ -23,7 +24,7 @@ import kamon.trace._ import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder -import java.util.concurrent.atomic.AtomicInteger +import kamon.metrics.instruments.counter.{ Counter, MinMaxCounter } @Aspect class BehaviourInvokeTracing { @@ -33,12 +34,27 @@ class BehaviourInvokeTracing { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + import scala.concurrent.duration._ + val metricsExtension = Kamon(Metrics)(system) val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] cellWithMetrics.metricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + + val executor = system.dispatchers.lookup("kamon.default-dispatcher") + + system.scheduler.schedule(0 milliseconds, 100 milliseconds) { + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ + val (min, max, sum) = cellWithMetrics.queueSize.collect() + + am.mailboxSize.record(min) + am.mailboxSize.record(max) + am.mailboxSize.record(sum) + } + }(executor) } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -59,10 +75,7 @@ class BehaviourInvokeTracing { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) - - val currentMailboxSize = cellWithMetrics.queueSize.decrementAndGet() - if (currentMailboxSize >= 0) - am.mailboxSize.record(currentMailboxSize) + cellWithMetrics.queueSize.decrement() } } } @@ -73,12 +86,7 @@ class BehaviourInvokeTracing { @After("sendingMessageToActorCell(cell)") def afterSendMessageToActorCell(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - val currentMailboxSize = cellWithMetrics.queueSize.incrementAndGet() - if (currentMailboxSize >= 0) - am.mailboxSize.record(currentMailboxSize) - } + cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment()) } @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") @@ -106,7 +114,7 @@ class BehaviourInvokeTracing { trait ActorCellMetrics { var metricIdentity: ActorMetrics = _ var actorMetricsRecorder: Option[ActorMetricRecorder] = _ - val queueSize = new AtomicInteger + val queueSize: Counter = MinMaxCounter() } @Aspect -- cgit v1.2.3