aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-06-19 00:47:07 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-06-19 00:47:07 -0300
commit1150d528eb5231993e542c086e2df90cf760d8a7 (patch)
treee1c86f9a115872073c46890b5ef32dbe1bd0bd3d /kamon-core
parent35b8a715d78ddd194d410ba0cc2119b5a1caa924 (diff)
parent4abab8df49d1bc5d9a051a8b54852e0712be7b74 (diff)
downloadKamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.gz
Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.bz2
Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.zip
Merge branch 'master' into release-0.2
Conflicts: kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala project/Dependencies.scala project/Projects.scala version.sbt
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml4
-rw-r--r--kamon-core/src/main/resources/reference.conf41
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala51
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala161
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala38
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala58
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala41
-rw-r--r--kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala24
-rw-r--r--kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala6
-rw-r--r--kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala31
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala39
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala105
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala9
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala110
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala65
20 files changed, 839 insertions, 66 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 180d905b..3f7dd42d 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -11,6 +11,10 @@
<aspect name="akka.instrumentation.BehaviourInvokeTracing"/>
<aspect name="kamon.instrumentation.ActorLoggingTracing"/>
+ <!-- Dispatchers -->
+ <aspect name="akka.instrumentation.DispatcherTracing"/>
+ <aspect name="akka.instrumentation.DispatcherMetricsMixin"/>
+
<!-- Futures -->
<aspect name="kamon.instrumentation.FutureTracing"/>
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index f29b9220..d2830892 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -3,9 +3,29 @@
# ================================== #
kamon {
+
+ # Default dispatcher for all Kamon components, unless a more specific one is configured.
+ default-dispatcher = "akka.actor.default-dispatcher"
+
metrics {
+
+ # Time interval for collecting all metrics and send the snapshots to all subscribed actors.
tick-interval = 1 second
+ # Time interval for recording values on all registered gauges.
+ gauge-recording-interval = 100 milliseconds
+
+
+ dispatchers {
+
+ # Dispatcher for periodical gauge value recordings.
+ gauge-recordings = ${kamon.default-dispatcher}
+
+ # Dispatcher for subscriptions and metrics collection actors.
+ metric-subscriptions = ${kamon.default-dispatcher}
+ }
+
+
filters = [
{
actor {
@@ -53,10 +73,30 @@ kamon {
significant-value-digits = 2
}
}
+
+ dispatcher {
+ maximum-pool-size {
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+ running-thread-count {
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+ queued-task-count {
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+ pool-size {
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+ }
}
}
trace {
+
# If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask`
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
# the future was created.
@@ -64,6 +104,7 @@ kamon {
}
weaver {
+
# AspectJ options supported by LTW
# showWeaveInfo: show informational messages whenever the weaver touches a class file.
# verbose: show informational messages about the weaving process.
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index e52a66b2..6db86828 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
+
package akka.instrumentation
import org.aspectj.lang.annotation._
@@ -23,7 +24,8 @@ import kamon.trace._
import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
-import java.util.concurrent.atomic.AtomicInteger
+import kamon.metrics.instruments.MinMaxCounter
+import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
@Aspect
class BehaviourInvokeTracing {
@@ -33,12 +35,26 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ if (cellWithMetrics.actorMetricsRecorder.isDefined) {
+ cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ cellWithMetrics.actorMetricsRecorder.map { am ⇒
+ import am.mailboxSize._
+ val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect()
+
+ record(min)
+ record(max)
+ record(current)
+ }
+ }
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -59,10 +75,7 @@ class BehaviourInvokeTracing {
am ⇒
am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
-
- val currentMailboxSize = cellWithMetrics.queueSize.decrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
+ cellWithMetrics.queueSize.decrement()
}
}
}
@@ -73,12 +86,7 @@ class BehaviourInvokeTracing {
@After("sendingMessageToActorCell(cell)")
def afterSendMessageToActorCell(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
- val currentMailboxSize = cellWithMetrics.queueSize.incrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
- }
+ cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment())
}
@Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
@@ -87,14 +95,31 @@ class BehaviourInvokeTracing {
@After("actorStop(cell)")
def afterStop(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity))
+
+ cellWithMetrics.actorMetricsRecorder.map { p ⇒
+ cellWithMetrics.mailboxSizeCollectorCancellable.cancel()
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ }
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
+ def actorInvokeFailure(cell: ActorCell): Unit = {}
+
+ @Before("actorInvokeFailure(cell)")
+ def beforeInvokeFailure(cell: ActorCell): Unit = {
+ val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+
+ cellWithMetrics.actorMetricsRecorder.map {
+ am ⇒ am.errorCounter.record(1L)
+ }
}
}
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
- val queueSize = new AtomicInteger
+ var mailboxSizeCollectorCancellable: Cancellable = _
+ val queueSize = MinMaxCounter()
}
@Aspect
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
new file mode 100644
index 00000000..60cc4ddf
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
@@ -0,0 +1,161 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
+import kamon.metrics.{ Metrics, DispatcherMetrics }
+import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
+import kamon.Kamon
+import akka.actor.{ Cancellable, ActorSystemImpl }
+import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.ThreadPoolExecutor
+import java.lang.reflect.Method
+import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement
+
+@Aspect
+class DispatcherTracing {
+
+ @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))")
+ def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {}
+
+ @Before("onActorSystemStartup(dispatchers, system)")
+ def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = {
+ val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem]
+ currentDispatchers.actorSystem = system
+ }
+
+ @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)")
+ def onDispatchersLookup(dispatchers: Dispatchers) = {}
+
+ @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher")
+ def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = {
+ val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem]
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+
+ dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem
+ }
+
+ @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))")
+ def onCreateExecutorService(): Unit = {}
+
+ @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))")
+ def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {}
+
+ @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)")
+ def onDispatcherStartup(dispatcher: Dispatcher): Unit = {}
+
+ @After("onDispatcherStartup(dispatcher)")
+ def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = {
+
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+ val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem)
+ val metricIdentity = DispatcherMetrics(dispatcher.id)
+
+ dispatcherWithMetrics.metricIdentity = metricIdentity
+ dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory)
+
+ if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) {
+ dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ dispatcherWithMetrics.dispatcherMetricsRecorder.map {
+ dm ⇒
+ val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) =
+ DispatcherMetricsCollector.collect(dispatcher)
+
+ dm.maximumPoolSize.record(maximumPoolSize)
+ dm.runningThreadCount.record(runningThreadCount)
+ dm.queueTaskCount.record(queueTaskCount)
+ dm.poolSize.record(poolSize)
+ }
+ }
+ }
+ }
+
+ @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)")
+ def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {}
+
+ @After("onDispatcherShutdown(dispatcher)")
+ def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+
+ dispatcherWithMetrics.dispatcherMetricsRecorder.map {
+ dispatcher ⇒
+ dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
+ Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ }
+ }
+}
+
+@Aspect
+class DispatcherMetricsMixin {
+
+ @DeclareMixin("akka.dispatch.Dispatcher")
+ def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {}
+
+ @DeclareMixin("akka.dispatch.Dispatchers")
+ def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {}
+}
+
+trait DispatcherMessageMetrics {
+ var metricIdentity: DispatcherMetrics = _
+ var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _
+ var dispatcherCollectorCancellable: Cancellable = _
+ var actorSystem: ActorSystemImpl = _
+}
+
+trait DispatchersWithActorSystem {
+ var actorSystem: ActorSystemImpl = _
+}
+
+object DispatcherMetricsCollector {
+
+ case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long)
+
+ private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = {
+ DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
+ (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
+ }
+
+ private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
+ DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
+ }
+
+ private val executorServiceMethod: Method = {
+ // executorService is protected
+ val method = classOf[Dispatcher].getDeclaredMethod("executorService")
+ method.setAccessible(true)
+ method
+ }
+
+ def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = {
+ dispatcher match {
+ case x: Dispatcher ⇒ {
+ val executor = executorServiceMethod.invoke(x) match {
+ case delegate: ExecutorServiceDelegate ⇒ delegate.executor
+ case other ⇒ other
+ }
+
+ executor match {
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
+ case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ }
+ }
+ case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
index bd0d628b..9e19dced 100644
--- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -17,7 +17,7 @@
package kamon.metrics
import com.typesafe.config.Config
-import kamon.metrics.instruments.ContinuousHdrRecorder
+import kamon.metrics.instruments.CounterRecorder
import org.HdrHistogram.HdrRecorder
case class ActorMetrics(name: String) extends MetricGroupIdentity {
@@ -30,29 +30,31 @@ object ActorMetrics extends MetricGroupCategory {
case object ProcessingTime extends MetricIdentity { val name, tag = "processing-time" }
case object MailboxSize extends MetricIdentity { val name, tag = "mailbox-size" }
case object TimeInMailbox extends MetricIdentity { val name, tag = "time-in-mailbox" }
+ case object ErrorCounter extends MetricIdentity { val name, tag = "errors" }
- case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder)
+ case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder, errorCounter: MetricRecorder)
extends MetricGroupRecorder {
def collect: MetricGroupSnapshot = {
- ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect())
+ ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect(), errorCounter.collect())
}
}
- case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike)
+ case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike, errorCounter: MetricSnapshotLike)
extends MetricGroupSnapshot {
val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map(
(ProcessingTime -> processingTime),
(MailboxSize -> mailboxSize),
- (TimeInMailbox -> timeInMailbox))
+ (TimeInMailbox -> timeInMailbox),
+ (ErrorCounter -> errorCounter))
}
val Factory = new MetricGroupFactory {
type GroupRecorder = ActorMetricRecorder
def create(config: Config): ActorMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.actor")
+ val settings = config.getConfig("precision.actor")
val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time"))
val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size"))
@@ -60,8 +62,9 @@ object ActorMetrics extends MetricGroupCategory {
new ActorMetricRecorder(
HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano),
- ContinuousHdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit),
- HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano))
+ HdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano),
+ CounterRecorder())
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala
new file mode 100644
index 00000000..f41e0c3f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala
@@ -0,0 +1,71 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import com.typesafe.config.Config
+import org.HdrHistogram.HdrRecorder
+
+case class DispatcherMetrics(name: String) extends MetricGroupIdentity {
+ val category = DispatcherMetrics
+}
+
+object DispatcherMetrics extends MetricGroupCategory {
+ val name = "dispatcher"
+
+ case object MaximumPoolSize extends MetricIdentity { val name, tag = "maximum-pool-size" }
+ case object RunningThreadCount extends MetricIdentity { val name, tag = "running-thread-count" }
+ case object QueueTaskCount extends MetricIdentity { val name, tag = "queued-task-count" }
+ case object PoolSize extends MetricIdentity { val name, tag = "pool-size" }
+
+ case class DispatcherMetricRecorder(maximumPoolSize: MetricRecorder, runningThreadCount: MetricRecorder, queueTaskCount: MetricRecorder, poolSize: MetricRecorder)
+ extends MetricGroupRecorder {
+
+ def collect: MetricGroupSnapshot = {
+ DispatcherMetricSnapshot(maximumPoolSize.collect(), runningThreadCount.collect(), queueTaskCount.collect(), poolSize.collect())
+ }
+ }
+
+ case class DispatcherMetricSnapshot(maximumPoolSize: MetricSnapshotLike, runningThreadCount: MetricSnapshotLike, queueTaskCount: MetricSnapshotLike, poolSize: MetricSnapshotLike)
+ extends MetricGroupSnapshot {
+
+ val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map(
+ (MaximumPoolSize -> maximumPoolSize),
+ (RunningThreadCount -> runningThreadCount),
+ (QueueTaskCount -> queueTaskCount),
+ (PoolSize -> poolSize))
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = DispatcherMetricRecorder
+
+ def create(config: Config): DispatcherMetricRecorder = {
+ val settings = config.getConfig("precision.dispatcher")
+
+ val MaximumPoolSizeConfig = extractPrecisionConfig(settings.getConfig("maximum-pool-size"))
+ val RunningThreadCountConfig = extractPrecisionConfig(settings.getConfig("running-thread-count"))
+ val QueueTaskCountConfig = extractPrecisionConfig(settings.getConfig("queued-task-count"))
+ val PoolSizeConfig = extractPrecisionConfig(settings.getConfig("pool-size"))
+
+ new DispatcherMetricRecorder(
+ HdrRecorder(MaximumPoolSizeConfig.highestTrackableValue, MaximumPoolSizeConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(RunningThreadCountConfig.highestTrackableValue, RunningThreadCountConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(QueueTaskCountConfig.highestTrackableValue, QueueTaskCountConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(PoolSizeConfig.highestTrackableValue, PoolSizeConfig.significantValueDigits, Scale.Unit))
+ }
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index 359540fc..c60babb2 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -24,16 +24,25 @@ import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.Subscriptions.Subscribe
+import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config
+ val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
+
+ /** Configured Dispatchers */
+ val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
+ val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
+
+ /** Configuration Settings */
+ val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS)
+
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
- val filters = loadFilters(config)
+ val filters = loadFilters(metricsExtConfig)
lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
if (shouldTrack(identity))
- Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder])
+ Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder])
else
None
}
@@ -50,6 +59,14 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
(for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
}
+ def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
+ import scala.concurrent.duration._
+
+ system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
+ body
+ }(gaugeRecordingsDispatcher)
+ }
+
private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false)
}
@@ -57,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
import scala.collection.JavaConverters._
- val filters = config.getObjectList("kamon.metrics.filters").asScala
+ val filters = config.getObjectList("filters").asScala
val allFilters =
for (
@@ -75,7 +92,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
allFilters.toMap
}
-
}
object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
index 7c197166..5454edf5 100644
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
@@ -53,7 +53,7 @@ object TraceMetrics extends MetricGroupCategory {
def create(config: Config): TraceMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.trace")
+ val settings = config.getConfig("precision.trace")
val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time"))
val segmentConfig = extractPrecisionConfig(settings.getConfig("segment"))
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala
new file mode 100644
index 00000000..e5efbc15
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala
@@ -0,0 +1,38 @@
+package kamon.metrics.instruments
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import kamon.metrics._
+import kamon.metrics.MetricSnapshot.Measurement
+
+import jsr166e.LongAdder
+
+class CounterRecorder extends MetricRecorder {
+ private val counter = new LongAdder
+
+ def record(value: Long): Unit = {
+ counter.add(value)
+ }
+
+ def collect(): MetricSnapshotLike = {
+ val sum = counter.sumThenReset()
+ MetricSnapshot(InstrumentTypes.Counter, sum, Scale.Unit, Vector(Measurement(1, sum)))
+ }
+}
+
+object CounterRecorder {
+ def apply(): CounterRecorder = new CounterRecorder()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala
new file mode 100644
index 00000000..ba2550af
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala
@@ -0,0 +1,58 @@
+package kamon.metrics.instruments
+
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.lang.Math._
+import jsr166e.LongMaxUpdater
+import kamon.util.PaddedAtomicLong
+import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
+
+class MinMaxCounter {
+ private val min = new LongMaxUpdater
+ private val max = new LongMaxUpdater
+ private val sum = new PaddedAtomicLong
+
+ min.update(0L)
+ max.update(0L)
+
+ def increment(value: Long = 1L): Unit = {
+ val currentValue = sum.addAndGet(value)
+ max.update(currentValue)
+ }
+
+ def decrement(value: Long = 1L): Unit = {
+ val currentValue = sum.addAndGet(-value)
+ min.update(-currentValue)
+ }
+
+ def collect(): CounterMeasurement = {
+ val currentValue = {
+ val value = sum.get()
+ if (value < 0) 0 else value
+ }
+ val result = CounterMeasurement(abs(min.maxThenReset()), max.maxThenReset(), currentValue)
+ max.update(currentValue)
+ min.update(-currentValue)
+ result
+ }
+}
+
+object MinMaxCounter {
+ def apply() = new MinMaxCounter()
+
+ case class CounterMeasurement(min: Long, max: Long, current: Long)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 9980a022..307cf17a 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -32,6 +32,8 @@ trait TraceContext {
def levelOfDetail: TracingLevelOfDetail
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
def finish(metadata: Map[String, String])
+
+ private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
}
object TraceContext {
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
new file mode 100644
index 00000000..3ff074b6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -0,0 +1,41 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import scala.collection.concurrent.TrieMap
+import kamon.trace.TraceLocal.TraceLocalKey
+
+object TraceLocal {
+ trait TraceLocalKey {
+ type ValueType
+ }
+
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit =
+ TraceRecorder.currentContext.map(_.traceLocalStorage.store(key)(value))
+
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] =
+ TraceRecorder.currentContext.flatMap(_.traceLocalStorage.retrieve(key))
+
+}
+
+class TraceLocalStorage {
+ val underlyingStorage = TrieMap[TraceLocal.TraceLocalKey, Any]()
+
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit = underlyingStorage.put(key, value)
+
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] = underlyingStorage.get(key).map(_.asInstanceOf[key.ValueType])
+}
diff --git a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
new file mode 100644
index 00000000..9c926372
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
@@ -0,0 +1,24 @@
+package kamon.util
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.util.concurrent.atomic.AtomicLong
+
+class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) {
+ @volatile var p1, p2, p3, p4, p5, p6 = 7L
+
+ protected def sumPaddingToPreventOptimisation() = p1 + p2 + p3 + p4 + p5 + p6
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala b/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala
index bf047898..12f7f549 100644
--- a/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala
+++ b/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala
@@ -17,8 +17,8 @@
package kamon.weaver.logging
import org.aspectj.bridge.{ IMessage, IMessageHandler }
-import org.slf4j.LoggerFactory
import com.typesafe.config.ConfigFactory
+import java.util.logging.Logger
/**
* Implementation of AspectJ's IMessageHandler interface that routes AspectJ weaving messages and controls them through kamon configuration.
@@ -26,7 +26,7 @@ import com.typesafe.config.ConfigFactory
class KamonWeaverMessageHandler extends IMessageHandler {
import IMessage._
- private val log = LoggerFactory.getLogger("AspectJ Weaver")
+ private val log = Logger.getLogger("AspectJ Weaver")
private val conf = ConfigFactory.load().getConfig("kamon.weaver")
private val isVerbose = conf.getBoolean("verbose")
@@ -54,7 +54,7 @@ class KamonWeaverMessageHandler extends IMessageHandler {
}
private def showErrorMessage(msg: IMessage): Boolean = {
- log.error(msg.getMessage)
+ log.severe(msg.getMessage)
true
}
}
diff --git a/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala
deleted file mode 100644
index 5108af25..00000000
--- a/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon
-
-import org.scalatest.{ WordSpecLike, WordSpec }
-import akka.testkit.TestKit
-import akka.actor.{ Props, ActorSystem }
-
-class MailboxSizeMetricsSpec extends TestKit(ActorSystem("mailbox-size-metrics-spec")) with WordSpecLike {
-
- "the mailbox size metrics instrumentation" should {
- "register a counter for mailbox size upon actor creation" in {
- val target = system.actorOf(Props.empty, "sample")
-
- //Metrics.registry.getHistograms.get("akka://mailbox-size-metrics-spec/sample:MAILBOX")
- }
- }
-}
diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
index 61d15bc9..645ca96a 100644
--- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
@@ -87,9 +87,10 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
// process the tick in which the actor is stalled.
val stalledTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds)
- stalledTickMetrics.mailboxSize.numberOfMeasurements should equal(1)
+ stalledTickMetrics.mailboxSize.numberOfMeasurements should equal(30)
// only the automatic last-value recording should be taken, and includes the message being currently processed.
- stalledTickMetrics.mailboxSize.measurements should contain only (Measurement(10, 1))
+ stalledTickMetrics.mailboxSize.measurements should contain only (Measurement(10, 30))
+ stalledTickMetrics.mailboxSize.min should equal(10)
stalledTickMetrics.mailboxSize.max should equal(10)
stalledTickMetrics.processingTime.numberOfMeasurements should be(0L)
stalledTickMetrics.timeInMailbox.numberOfMeasurements should be(0L)
@@ -101,6 +102,17 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
afterStallTickMetrics.processingTime.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos)
afterStallTickMetrics.timeInMailbox.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos)
}
+
+ "track the number of errors" in new ErrorActorFixture {
+ val (error, metricsListener) = failedActor("tracked-errors")
+
+ for (_ ← 1 to 5) {
+ error ! Error
+ }
+
+ val actorMetrics = expectActorMetrics("user/tracked-errors", metricsListener, 3 seconds)
+ actorMetrics.errorCounter.numberOfMeasurements should be(5L)
+ }
}
def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = {
@@ -124,6 +136,19 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
(actor, metricsListener)
}
}
+
+ trait ErrorActorFixture {
+ def failedActor(name: String): (ActorRef, TestProbe) = {
+ val actor = system.actorOf(Props[FailedActor], name)
+ val metricsListener = TestProbe()
+
+ Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true)
+ // Wait for one empty snapshot before proceeding to the test.
+ metricsListener.expectMsgType[TickMetricSnapshot]
+
+ (actor, metricsListener)
+ }
+ }
}
class DelayableActor extends Actor {
@@ -133,5 +158,15 @@ class DelayableActor extends Actor {
}
}
+class FailedActor extends Actor {
+ def receive = {
+ case Error ⇒ 1 / 0
+ case Discard ⇒
+ }
+}
+
case object Discard
+
case class Delay(time: FiniteDuration)
+
+case class Error()
diff --git a/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala
new file mode 100644
index 00000000..2a9cb6b4
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala
@@ -0,0 +1,105 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.scalatest.{ WordSpecLike, Matchers }
+import akka.testkit.{ TestProbe, TestKitBase }
+import akka.actor.{ ActorRef, Props, ActorSystem }
+import com.typesafe.config.ConfigFactory
+import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.DispatcherMetrics.DispatcherMetricSnapshot
+
+class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
+ implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | filters = [
+ | {
+ | dispatcher {
+ | includes = ["*"]
+ | excludes = ["dispatcher-explicitly-excluded"]
+ | }
+ | }
+ | ]
+ |}
+ |
+ |dispatcher-explicitly-excluded {
+ | type = "Dispatcher"
+ | executor = "fork-join-executor"
+ |}
+ |
+ |tracked-dispatcher {
+ | type = "Dispatcher"
+ | executor = "thread-pool-executor"
+ |}
+ |
+ """.stripMargin))
+
+ "the Kamon dispatcher metrics" should {
+ "respect the configured include and exclude filters" in {
+ system.actorOf(Props[DelayableActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher")
+ system.actorOf(Props[DelayableActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher")
+
+ Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true)
+ expectMsgType[TickMetricSnapshot]
+
+ within(2 seconds) {
+ val tickSnapshot = expectMsgType[TickMetricSnapshot]
+ tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher"))
+ tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded"))
+ }
+ }
+
+ "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture {
+ val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher")
+
+ for (_ ← 1 to 100) {
+ delayable ! Discard
+ }
+
+ val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds)
+ dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis
+ dispatcherMetrics.poolSize.max should be <= 22L //fail in travis
+ dispatcherMetrics.queueTaskCount.max should be(0L)
+ dispatcherMetrics.runningThreadCount.max should be(0L)
+ }
+
+ }
+
+ def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = {
+ val tickSnapshot = within(waitTime) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+ val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId))
+ dispatcherMetricsOption should not be empty
+ dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot]
+ }
+
+ trait DelayableActorFixture {
+ def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = {
+ val actor = system.actorOf(Props[DelayableActor].withDispatcher(dispatcher), name)
+ val metricsListener = TestProbe()
+
+ Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true)
+ // Wait for one empty snapshot before proceeding to the test.
+ metricsListener.expectMsgType[TickMetricSnapshot]
+
+ (actor, metricsListener)
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
index b4f33ec3..4d6ebc49 100644
--- a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
@@ -25,25 +25,28 @@ class MetricSnapshotSpec extends WordSpec with Matchers {
"support a max operation" in new SnapshotFixtures {
snapshotA.max should be(17)
snapshotB.max should be(10)
+ snapshotC.max should be(1)
}
"support a min operation" in new SnapshotFixtures {
snapshotA.min should be(1)
snapshotB.min should be(2)
+ snapshotC.min should be(1)
}
"be able to merge with other snapshot" in new SnapshotFixtures {
- val merged = snapshotA.merge(snapshotB)
+ val merged = snapshotA.merge(snapshotB).merge(snapshotC)
merged.min should be(1)
merged.max should be(17)
- merged.numberOfMeasurements should be(200)
+ merged.numberOfMeasurements should be(300)
merged.measurements.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17)
}
"be able to merge with empty snapshots" in new SnapshotFixtures {
snapshotA.merge(emptySnapshot) should be(snapshotA)
emptySnapshot.merge(snapshotA).merge(emptySnapshot) should be(snapshotA)
+ snapshotC.merge(emptySnapshot) should be(snapshotC)
}
}
@@ -63,5 +66,7 @@ class MetricSnapshotSpec extends WordSpec with Matchers {
Measurement(4, 48),
Measurement(5, 39),
Measurement(10, 7)))
+
+ val snapshotC = MetricSnapshot(InstrumentTypes.Counter, 100, Scale.Unit, Vector(Measurement(1, 100)))
}
}
diff --git a/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala
new file mode 100644
index 00000000..14f1573f
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala
@@ -0,0 +1,110 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package kamon.metrics.instrument
+
+import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metrics.instruments.MinMaxCounter
+import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
+
+class MinMaxCounterSpec extends WordSpecLike with Matchers {
+
+ "the MinMaxCounter" should {
+ "increment" in {
+ val counter = MinMaxCounter()
+
+ counter.increment()
+ counter.increment()
+ counter.increment()
+ counter.increment()
+ counter.increment()
+
+ val CounterMeasurement(_, _, current) = counter.collect()
+
+ current should be(5)
+ }
+
+ "decrement" in {
+ val counter = MinMaxCounter()
+ counter.increment(5L)
+
+ counter.decrement()
+ counter.decrement()
+ counter.decrement()
+ counter.decrement()
+ counter.decrement()
+
+ val CounterMeasurement(_, _, current) = counter.collect()
+
+ current should be(0)
+ }
+
+ "reset the min and max with the sum value when the collect method is called" in {
+ val counter = MinMaxCounter()
+
+ counter.increment(10)
+ counter.increment(20)
+ counter.increment(30)
+ counter.increment(40)
+ counter.increment(50)
+
+ counter.collect() //only for check the last value after reset min max
+
+ val CounterMeasurement(min, max, current) = counter.collect()
+
+ min should be(current)
+ max should be(current)
+ current should be(150)
+ }
+ }
+
+ "track the min value" in {
+ val counter = MinMaxCounter()
+
+ counter.increment(10)
+ counter.increment(20)
+ counter.increment(30)
+ counter.increment(40)
+ counter.increment(50)
+
+ val CounterMeasurement(min, _, _) = counter.collect()
+
+ min should be(0)
+
+ counter.increment(50)
+
+ val CounterMeasurement(minAfterCollectAndAddSomeValues, _, _) = counter.collect()
+
+ minAfterCollectAndAddSomeValues should be(150)
+ }
+
+ "track the max value" in {
+ val counter = MinMaxCounter()
+ counter.increment(10)
+ counter.increment(20)
+ counter.increment(30)
+ counter.increment(40)
+ counter.increment(50)
+
+ val CounterMeasurement(_, max, _) = counter.collect()
+
+ max should be(150)
+
+ counter.increment(200)
+
+ val CounterMeasurement(_, maxAfterCollectAndAddSomeValues, _) = counter.collect()
+
+ maxAfterCollectAndAddSomeValues should be(350)
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala
new file mode 100644
index 00000000..927573c2
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala
@@ -0,0 +1,65 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import akka.testkit.TestKit
+import akka.actor.ActorSystem
+import org.scalatest.{ OptionValues, Matchers, WordSpecLike }
+import org.scalatest.concurrent.PatienceConfiguration
+
+class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordSpecLike with Matchers
+ with PatienceConfiguration with OptionValues {
+
+ object SampleTraceLocalKey extends TraceLocal.TraceLocalKey { type ValueType = String }
+
+ "the TraceLocal storage" should {
+ "allow storing and retrieving values" in {
+ TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local") {
+ val testString = "Hello World"
+
+ TraceLocal.store(SampleTraceLocalKey)(testString)
+ TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString)
+ }
+ }
+
+ "return None when retrieving a non existent key" in {
+ TraceRecorder.withNewTraceContext("non-existent-key") {
+ TraceLocal.retrieve(SampleTraceLocalKey) should equal(None)
+ }
+ }
+
+ "return None when retrieving a key without a current TraceContext" in {
+ TraceLocal.retrieve(SampleTraceLocalKey) should equal(None)
+ }
+
+ "be attached to the TraceContext when it is propagated" in {
+ val testString = "Hello World"
+ val testContext = TraceRecorder.withNewTraceContext("manually-propagated-trace-local") {
+ TraceLocal.store(SampleTraceLocalKey)(testString)
+ TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString)
+ TraceRecorder.currentContext
+ }
+
+ /** No TraceLocal should be available here */
+ TraceLocal.retrieve(SampleTraceLocalKey) should equal(None)
+
+ TraceRecorder.withTraceContext(testContext) {
+ TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString)
+ }
+ }
+ }
+}