From 290aabd4608bffc58cf8fe734371ee7bbe78df4f Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 7 May 2014 22:07:02 -0300 Subject: + core: MinMaxCounter for actor mailbox size --- kamon-core/src/main/resources/reference.conf | 27 ++++++++++ .../ActorMessagePassingTracing.scala | 32 +++++++----- .../instruments/counter/MinMaxCounter.scala | 57 ++++++++++++++++++++++ .../main/scala/kamon/util/PaddedAtomicLong.scala | 24 +++++++++ 4 files changed, 128 insertions(+), 12 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala create mode 100644 kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index f29b9220..e5168929 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -56,6 +56,33 @@ kamon { } } + default-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + + # What kind of ExecutionService to use + executor = "fork-join-executor" + + # Configuration for the fork join pool + fork-join-executor { + + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 + } + + trace { # If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask` # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 78c170de..7766b3a1 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ + package akka.instrumentation import org.aspectj.lang.annotation._ @@ -23,7 +24,7 @@ import kamon.trace._ import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder -import java.util.concurrent.atomic.AtomicInteger +import kamon.metrics.instruments.counter.{ Counter, MinMaxCounter } @Aspect class BehaviourInvokeTracing { @@ -33,12 +34,27 @@ class BehaviourInvokeTracing { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + import scala.concurrent.duration._ + val metricsExtension = Kamon(Metrics)(system) val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] cellWithMetrics.metricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + + val executor = system.dispatchers.lookup("kamon.default-dispatcher") + + system.scheduler.schedule(0 milliseconds, 100 milliseconds) { + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ + val (min, max, sum) = cellWithMetrics.queueSize.collect() + + am.mailboxSize.record(min) + am.mailboxSize.record(max) + am.mailboxSize.record(sum) + } + }(executor) } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -59,10 +75,7 @@ class BehaviourInvokeTracing { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) - - val currentMailboxSize = cellWithMetrics.queueSize.decrementAndGet() - if (currentMailboxSize >= 0) - am.mailboxSize.record(currentMailboxSize) + cellWithMetrics.queueSize.decrement() } } } @@ -73,12 +86,7 @@ class BehaviourInvokeTracing { @After("sendingMessageToActorCell(cell)") def afterSendMessageToActorCell(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - val currentMailboxSize = cellWithMetrics.queueSize.incrementAndGet() - if (currentMailboxSize >= 0) - am.mailboxSize.record(currentMailboxSize) - } + cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment()) } @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") @@ -106,7 +114,7 @@ class BehaviourInvokeTracing { trait ActorCellMetrics { var metricIdentity: ActorMetrics = _ var actorMetricsRecorder: Option[ActorMetricRecorder] = _ - val queueSize = new AtomicInteger + val queueSize: Counter = MinMaxCounter() } @Aspect diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala new file mode 100644 index 00000000..052c7bcd --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala @@ -0,0 +1,57 @@ +package kamon.metrics.instruments.counter +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import java.lang.Math._ +import jsr166e.LongMaxUpdater +import kamon.util.PaddedAtomicLong + +sealed trait Counter { + def increment(value: Long = 1L): Unit + def decrement(value: Long = 1L): Unit + def collect(): (Long, Long, Long) +} + +class MinMaxCounter extends Counter { + private val min = new LongMaxUpdater + private val max = new LongMaxUpdater + private val sum = new PaddedAtomicLong + + min.update(0L) + max.update(0L) + + def increment(value: Long = 1L): Unit = { + val currentValue = sum.addAndGet(value) + max.update(currentValue) + } + + def decrement(value: Long = 1L): Unit = { + val currentValue = sum.addAndGet(-value) + min.update(-currentValue) + } + + def collect(): (Long, Long, Long) = { + val currentValue = sum.get() + val result = (abs(min.max()), max.max(), currentValue) + max.update(currentValue) + min.update(-currentValue) + result + } +} + +object MinMaxCounter { + def apply(): Counter = new MinMaxCounter() +} diff --git a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala new file mode 100644 index 00000000..9c926372 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala @@ -0,0 +1,24 @@ +package kamon.util +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import java.util.concurrent.atomic.AtomicLong + +class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) { + @volatile var p1, p2, p3, p4, p5, p6 = 7L + + protected def sumPaddingToPreventOptimisation() = p1 + p2 + p3 + p4 + p5 + p6 +} \ No newline at end of file -- cgit v1.2.3