aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-05-07 22:07:02 -0300
committerDiego <diegolparra@gmail.com>2014-05-07 22:07:02 -0300
commit290aabd4608bffc58cf8fe734371ee7bbe78df4f (patch)
tree4813ac52bdc74dbd1ba6cbed82bfd82893b17b50
parent5e413d21a2fe5e604eca5b2c2f029206a61c83aa (diff)
downloadKamon-290aabd4608bffc58cf8fe734371ee7bbe78df4f.tar.gz
Kamon-290aabd4608bffc58cf8fe734371ee7bbe78df4f.tar.bz2
Kamon-290aabd4608bffc58cf8fe734371ee7bbe78df4f.zip
+ core: MinMaxCounter for actor mailbox size
-rw-r--r--kamon-core/src/main/resources/reference.conf27
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala32
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala24
4 files changed, 128 insertions, 12 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index f29b9220..e5168929 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -56,6 +56,33 @@ kamon {
}
}
+ default-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+
+ # Configuration for the fork join pool
+ fork-join-executor {
+
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+ }
+
+
trace {
# If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask`
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 78c170de..7766b3a1 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
+
package akka.instrumentation
import org.aspectj.lang.annotation._
@@ -23,7 +24,7 @@ import kamon.trace._
import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
-import java.util.concurrent.atomic.AtomicInteger
+import kamon.metrics.instruments.counter.{ Counter, MinMaxCounter }
@Aspect
class BehaviourInvokeTracing {
@@ -33,12 +34,27 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ import scala.concurrent.duration._
+
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ val executor = system.dispatchers.lookup("kamon.default-dispatcher")
+
+ system.scheduler.schedule(0 milliseconds, 100 milliseconds) {
+ cellWithMetrics.actorMetricsRecorder.map {
+ am ⇒
+ val (min, max, sum) = cellWithMetrics.queueSize.collect()
+
+ am.mailboxSize.record(min)
+ am.mailboxSize.record(max)
+ am.mailboxSize.record(sum)
+ }
+ }(executor)
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -59,10 +75,7 @@ class BehaviourInvokeTracing {
am ⇒
am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
-
- val currentMailboxSize = cellWithMetrics.queueSize.decrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
+ cellWithMetrics.queueSize.decrement()
}
}
}
@@ -73,12 +86,7 @@ class BehaviourInvokeTracing {
@After("sendingMessageToActorCell(cell)")
def afterSendMessageToActorCell(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
- val currentMailboxSize = cellWithMetrics.queueSize.incrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
- }
+ cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment())
}
@Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
@@ -106,7 +114,7 @@ class BehaviourInvokeTracing {
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
- val queueSize = new AtomicInteger
+ val queueSize: Counter = MinMaxCounter()
}
@Aspect
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala
new file mode 100644
index 00000000..052c7bcd
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/instruments/counter/MinMaxCounter.scala
@@ -0,0 +1,57 @@
+package kamon.metrics.instruments.counter
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.lang.Math._
+import jsr166e.LongMaxUpdater
+import kamon.util.PaddedAtomicLong
+
+sealed trait Counter {
+ def increment(value: Long = 1L): Unit
+ def decrement(value: Long = 1L): Unit
+ def collect(): (Long, Long, Long)
+}
+
+class MinMaxCounter extends Counter {
+ private val min = new LongMaxUpdater
+ private val max = new LongMaxUpdater
+ private val sum = new PaddedAtomicLong
+
+ min.update(0L)
+ max.update(0L)
+
+ def increment(value: Long = 1L): Unit = {
+ val currentValue = sum.addAndGet(value)
+ max.update(currentValue)
+ }
+
+ def decrement(value: Long = 1L): Unit = {
+ val currentValue = sum.addAndGet(-value)
+ min.update(-currentValue)
+ }
+
+ def collect(): (Long, Long, Long) = {
+ val currentValue = sum.get()
+ val result = (abs(min.max()), max.max(), currentValue)
+ max.update(currentValue)
+ min.update(-currentValue)
+ result
+ }
+}
+
+object MinMaxCounter {
+ def apply(): Counter = new MinMaxCounter()
+}
diff --git a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
new file mode 100644
index 00000000..9c926372
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
@@ -0,0 +1,24 @@
+package kamon.util
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.util.concurrent.atomic.AtomicLong
+
+class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) {
+ @volatile var p1, p2, p3, p4, p5, p6 = 7L
+
+ protected def sumPaddingToPreventOptimisation() = p1 + p2 + p3 + p4 + p5 + p6
+} \ No newline at end of file