aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-06-19 00:47:07 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-06-19 00:47:07 -0300
commit1150d528eb5231993e542c086e2df90cf760d8a7 (patch)
treee1c86f9a115872073c46890b5ef32dbe1bd0bd3d /kamon-core/src/main/scala
parent35b8a715d78ddd194d410ba0cc2119b5a1caa924 (diff)
parent4abab8df49d1bc5d9a051a8b54852e0712be7b74 (diff)
downloadKamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.gz
Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.bz2
Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.zip
Merge branch 'master' into release-0.2
Conflicts: kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala project/Dependencies.scala project/Projects.scala version.sbt
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala51
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala161
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala38
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala58
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala41
-rw-r--r--kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala24
-rw-r--r--kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala6
12 files changed, 470 insertions, 31 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index e52a66b2..6db86828 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
+
package akka.instrumentation
import org.aspectj.lang.annotation._
@@ -23,7 +24,8 @@ import kamon.trace._
import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
-import java.util.concurrent.atomic.AtomicInteger
+import kamon.metrics.instruments.MinMaxCounter
+import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
@Aspect
class BehaviourInvokeTracing {
@@ -33,12 +35,26 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ if (cellWithMetrics.actorMetricsRecorder.isDefined) {
+ cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ cellWithMetrics.actorMetricsRecorder.map { am ⇒
+ import am.mailboxSize._
+ val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect()
+
+ record(min)
+ record(max)
+ record(current)
+ }
+ }
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -59,10 +75,7 @@ class BehaviourInvokeTracing {
am ⇒
am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
-
- val currentMailboxSize = cellWithMetrics.queueSize.decrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
+ cellWithMetrics.queueSize.decrement()
}
}
}
@@ -73,12 +86,7 @@ class BehaviourInvokeTracing {
@After("sendingMessageToActorCell(cell)")
def afterSendMessageToActorCell(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
- val currentMailboxSize = cellWithMetrics.queueSize.incrementAndGet()
- if (currentMailboxSize >= 0)
- am.mailboxSize.record(currentMailboxSize)
- }
+ cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment())
}
@Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
@@ -87,14 +95,31 @@ class BehaviourInvokeTracing {
@After("actorStop(cell)")
def afterStop(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity))
+
+ cellWithMetrics.actorMetricsRecorder.map { p ⇒
+ cellWithMetrics.mailboxSizeCollectorCancellable.cancel()
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ }
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
+ def actorInvokeFailure(cell: ActorCell): Unit = {}
+
+ @Before("actorInvokeFailure(cell)")
+ def beforeInvokeFailure(cell: ActorCell): Unit = {
+ val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+
+ cellWithMetrics.actorMetricsRecorder.map {
+ am ⇒ am.errorCounter.record(1L)
+ }
}
}
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
- val queueSize = new AtomicInteger
+ var mailboxSizeCollectorCancellable: Cancellable = _
+ val queueSize = MinMaxCounter()
}
@Aspect
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
new file mode 100644
index 00000000..60cc4ddf
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
@@ -0,0 +1,161 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
+import kamon.metrics.{ Metrics, DispatcherMetrics }
+import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
+import kamon.Kamon
+import akka.actor.{ Cancellable, ActorSystemImpl }
+import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.ThreadPoolExecutor
+import java.lang.reflect.Method
+import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement
+
+@Aspect
+class DispatcherTracing {
+
+ @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))")
+ def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {}
+
+ @Before("onActorSystemStartup(dispatchers, system)")
+ def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = {
+ val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem]
+ currentDispatchers.actorSystem = system
+ }
+
+ @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)")
+ def onDispatchersLookup(dispatchers: Dispatchers) = {}
+
+ @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher")
+ def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = {
+ val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem]
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+
+ dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem
+ }
+
+ @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))")
+ def onCreateExecutorService(): Unit = {}
+
+ @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))")
+ def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {}
+
+ @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)")
+ def onDispatcherStartup(dispatcher: Dispatcher): Unit = {}
+
+ @After("onDispatcherStartup(dispatcher)")
+ def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = {
+
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+ val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem)
+ val metricIdentity = DispatcherMetrics(dispatcher.id)
+
+ dispatcherWithMetrics.metricIdentity = metricIdentity
+ dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory)
+
+ if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) {
+ dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ dispatcherWithMetrics.dispatcherMetricsRecorder.map {
+ dm ⇒
+ val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) =
+ DispatcherMetricsCollector.collect(dispatcher)
+
+ dm.maximumPoolSize.record(maximumPoolSize)
+ dm.runningThreadCount.record(runningThreadCount)
+ dm.queueTaskCount.record(queueTaskCount)
+ dm.poolSize.record(poolSize)
+ }
+ }
+ }
+ }
+
+ @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)")
+ def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {}
+
+ @After("onDispatcherShutdown(dispatcher)")
+ def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+
+ dispatcherWithMetrics.dispatcherMetricsRecorder.map {
+ dispatcher ⇒
+ dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
+ Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ }
+ }
+}
+
+@Aspect
+class DispatcherMetricsMixin {
+
+ @DeclareMixin("akka.dispatch.Dispatcher")
+ def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {}
+
+ @DeclareMixin("akka.dispatch.Dispatchers")
+ def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {}
+}
+
+trait DispatcherMessageMetrics {
+ var metricIdentity: DispatcherMetrics = _
+ var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _
+ var dispatcherCollectorCancellable: Cancellable = _
+ var actorSystem: ActorSystemImpl = _
+}
+
+trait DispatchersWithActorSystem {
+ var actorSystem: ActorSystemImpl = _
+}
+
+object DispatcherMetricsCollector {
+
+ case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long)
+
+ private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = {
+ DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
+ (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
+ }
+
+ private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
+ DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
+ }
+
+ private val executorServiceMethod: Method = {
+ // executorService is protected
+ val method = classOf[Dispatcher].getDeclaredMethod("executorService")
+ method.setAccessible(true)
+ method
+ }
+
+ def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = {
+ dispatcher match {
+ case x: Dispatcher ⇒ {
+ val executor = executorServiceMethod.invoke(x) match {
+ case delegate: ExecutorServiceDelegate ⇒ delegate.executor
+ case other ⇒ other
+ }
+
+ executor match {
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
+ case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ }
+ }
+ case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
index bd0d628b..9e19dced 100644
--- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -17,7 +17,7 @@
package kamon.metrics
import com.typesafe.config.Config
-import kamon.metrics.instruments.ContinuousHdrRecorder
+import kamon.metrics.instruments.CounterRecorder
import org.HdrHistogram.HdrRecorder
case class ActorMetrics(name: String) extends MetricGroupIdentity {
@@ -30,29 +30,31 @@ object ActorMetrics extends MetricGroupCategory {
case object ProcessingTime extends MetricIdentity { val name, tag = "processing-time" }
case object MailboxSize extends MetricIdentity { val name, tag = "mailbox-size" }
case object TimeInMailbox extends MetricIdentity { val name, tag = "time-in-mailbox" }
+ case object ErrorCounter extends MetricIdentity { val name, tag = "errors" }
- case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder)
+ case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder, errorCounter: MetricRecorder)
extends MetricGroupRecorder {
def collect: MetricGroupSnapshot = {
- ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect())
+ ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect(), errorCounter.collect())
}
}
- case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike)
+ case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike, errorCounter: MetricSnapshotLike)
extends MetricGroupSnapshot {
val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map(
(ProcessingTime -> processingTime),
(MailboxSize -> mailboxSize),
- (TimeInMailbox -> timeInMailbox))
+ (TimeInMailbox -> timeInMailbox),
+ (ErrorCounter -> errorCounter))
}
val Factory = new MetricGroupFactory {
type GroupRecorder = ActorMetricRecorder
def create(config: Config): ActorMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.actor")
+ val settings = config.getConfig("precision.actor")
val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time"))
val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size"))
@@ -60,8 +62,9 @@ object ActorMetrics extends MetricGroupCategory {
new ActorMetricRecorder(
HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano),
- ContinuousHdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit),
- HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano))
+ HdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano),
+ CounterRecorder())
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala
new file mode 100644
index 00000000..f41e0c3f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala
@@ -0,0 +1,71 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import com.typesafe.config.Config
+import org.HdrHistogram.HdrRecorder
+
+case class DispatcherMetrics(name: String) extends MetricGroupIdentity {
+ val category = DispatcherMetrics
+}
+
+object DispatcherMetrics extends MetricGroupCategory {
+ val name = "dispatcher"
+
+ case object MaximumPoolSize extends MetricIdentity { val name, tag = "maximum-pool-size" }
+ case object RunningThreadCount extends MetricIdentity { val name, tag = "running-thread-count" }
+ case object QueueTaskCount extends MetricIdentity { val name, tag = "queued-task-count" }
+ case object PoolSize extends MetricIdentity { val name, tag = "pool-size" }
+
+ case class DispatcherMetricRecorder(maximumPoolSize: MetricRecorder, runningThreadCount: MetricRecorder, queueTaskCount: MetricRecorder, poolSize: MetricRecorder)
+ extends MetricGroupRecorder {
+
+ def collect: MetricGroupSnapshot = {
+ DispatcherMetricSnapshot(maximumPoolSize.collect(), runningThreadCount.collect(), queueTaskCount.collect(), poolSize.collect())
+ }
+ }
+
+ case class DispatcherMetricSnapshot(maximumPoolSize: MetricSnapshotLike, runningThreadCount: MetricSnapshotLike, queueTaskCount: MetricSnapshotLike, poolSize: MetricSnapshotLike)
+ extends MetricGroupSnapshot {
+
+ val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map(
+ (MaximumPoolSize -> maximumPoolSize),
+ (RunningThreadCount -> runningThreadCount),
+ (QueueTaskCount -> queueTaskCount),
+ (PoolSize -> poolSize))
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = DispatcherMetricRecorder
+
+ def create(config: Config): DispatcherMetricRecorder = {
+ val settings = config.getConfig("precision.dispatcher")
+
+ val MaximumPoolSizeConfig = extractPrecisionConfig(settings.getConfig("maximum-pool-size"))
+ val RunningThreadCountConfig = extractPrecisionConfig(settings.getConfig("running-thread-count"))
+ val QueueTaskCountConfig = extractPrecisionConfig(settings.getConfig("queued-task-count"))
+ val PoolSizeConfig = extractPrecisionConfig(settings.getConfig("pool-size"))
+
+ new DispatcherMetricRecorder(
+ HdrRecorder(MaximumPoolSizeConfig.highestTrackableValue, MaximumPoolSizeConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(RunningThreadCountConfig.highestTrackableValue, RunningThreadCountConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(QueueTaskCountConfig.highestTrackableValue, QueueTaskCountConfig.significantValueDigits, Scale.Unit),
+ HdrRecorder(PoolSizeConfig.highestTrackableValue, PoolSizeConfig.significantValueDigits, Scale.Unit))
+ }
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index 359540fc..c60babb2 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -24,16 +24,25 @@ import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.Subscriptions.Subscribe
+import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config
+ val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
+
+ /** Configured Dispatchers */
+ val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
+ val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
+
+ /** Configuration Settings */
+ val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS)
+
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
- val filters = loadFilters(config)
+ val filters = loadFilters(metricsExtConfig)
lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
if (shouldTrack(identity))
- Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder])
+ Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder])
else
None
}
@@ -50,6 +59,14 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
(for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
}
+ def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
+ import scala.concurrent.duration._
+
+ system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
+ body
+ }(gaugeRecordingsDispatcher)
+ }
+
private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false)
}
@@ -57,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
import scala.collection.JavaConverters._
- val filters = config.getObjectList("kamon.metrics.filters").asScala
+ val filters = config.getObjectList("filters").asScala
val allFilters =
for (
@@ -75,7 +92,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
allFilters.toMap
}
-
}
object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
index 7c197166..5454edf5 100644
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
@@ -53,7 +53,7 @@ object TraceMetrics extends MetricGroupCategory {
def create(config: Config): TraceMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.trace")
+ val settings = config.getConfig("precision.trace")
val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time"))
val segmentConfig = extractPrecisionConfig(settings.getConfig("segment"))
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala
new file mode 100644
index 00000000..e5efbc15
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala
@@ -0,0 +1,38 @@
+package kamon.metrics.instruments
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import kamon.metrics._
+import kamon.metrics.MetricSnapshot.Measurement
+
+import jsr166e.LongAdder
+
+class CounterRecorder extends MetricRecorder {
+ private val counter = new LongAdder
+
+ def record(value: Long): Unit = {
+ counter.add(value)
+ }
+
+ def collect(): MetricSnapshotLike = {
+ val sum = counter.sumThenReset()
+ MetricSnapshot(InstrumentTypes.Counter, sum, Scale.Unit, Vector(Measurement(1, sum)))
+ }
+}
+
+object CounterRecorder {
+ def apply(): CounterRecorder = new CounterRecorder()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala
new file mode 100644
index 00000000..ba2550af
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala
@@ -0,0 +1,58 @@
+package kamon.metrics.instruments
+
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.lang.Math._
+import jsr166e.LongMaxUpdater
+import kamon.util.PaddedAtomicLong
+import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
+
+class MinMaxCounter {
+ private val min = new LongMaxUpdater
+ private val max = new LongMaxUpdater
+ private val sum = new PaddedAtomicLong
+
+ min.update(0L)
+ max.update(0L)
+
+ def increment(value: Long = 1L): Unit = {
+ val currentValue = sum.addAndGet(value)
+ max.update(currentValue)
+ }
+
+ def decrement(value: Long = 1L): Unit = {
+ val currentValue = sum.addAndGet(-value)
+ min.update(-currentValue)
+ }
+
+ def collect(): CounterMeasurement = {
+ val currentValue = {
+ val value = sum.get()
+ if (value < 0) 0 else value
+ }
+ val result = CounterMeasurement(abs(min.maxThenReset()), max.maxThenReset(), currentValue)
+ max.update(currentValue)
+ min.update(-currentValue)
+ result
+ }
+}
+
+object MinMaxCounter {
+ def apply() = new MinMaxCounter()
+
+ case class CounterMeasurement(min: Long, max: Long, current: Long)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 9980a022..307cf17a 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -32,6 +32,8 @@ trait TraceContext {
def levelOfDetail: TracingLevelOfDetail
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
def finish(metadata: Map[String, String])
+
+ private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
}
object TraceContext {
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
new file mode 100644
index 00000000..3ff074b6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -0,0 +1,41 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import scala.collection.concurrent.TrieMap
+import kamon.trace.TraceLocal.TraceLocalKey
+
+object TraceLocal {
+ trait TraceLocalKey {
+ type ValueType
+ }
+
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit =
+ TraceRecorder.currentContext.map(_.traceLocalStorage.store(key)(value))
+
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] =
+ TraceRecorder.currentContext.flatMap(_.traceLocalStorage.retrieve(key))
+
+}
+
+class TraceLocalStorage {
+ val underlyingStorage = TrieMap[TraceLocal.TraceLocalKey, Any]()
+
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit = underlyingStorage.put(key, value)
+
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] = underlyingStorage.get(key).map(_.asInstanceOf[key.ValueType])
+}
diff --git a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
new file mode 100644
index 00000000..9c926372
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
@@ -0,0 +1,24 @@
+package kamon.util
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.util.concurrent.atomic.AtomicLong
+
+class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) {
+ @volatile var p1, p2, p3, p4, p5, p6 = 7L
+
+ protected def sumPaddingToPreventOptimisation() = p1 + p2 + p3 + p4 + p5 + p6
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala b/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala
index bf047898..12f7f549 100644
--- a/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala
+++ b/kamon-core/src/main/scala/kamon/weaver/logging/KamonWeaverMessageHandler.scala
@@ -17,8 +17,8 @@
package kamon.weaver.logging
import org.aspectj.bridge.{ IMessage, IMessageHandler }
-import org.slf4j.LoggerFactory
import com.typesafe.config.ConfigFactory
+import java.util.logging.Logger
/**
* Implementation of AspectJ's IMessageHandler interface that routes AspectJ weaving messages and controls them through kamon configuration.
@@ -26,7 +26,7 @@ import com.typesafe.config.ConfigFactory
class KamonWeaverMessageHandler extends IMessageHandler {
import IMessage._
- private val log = LoggerFactory.getLogger("AspectJ Weaver")
+ private val log = Logger.getLogger("AspectJ Weaver")
private val conf = ConfigFactory.load().getConfig("kamon.weaver")
private val isVerbose = conf.getBoolean("verbose")
@@ -54,7 +54,7 @@ class KamonWeaverMessageHandler extends IMessageHandler {
}
private def showErrorMessage(msg: IMessage): Boolean = {
- log.error(msg.getMessage)
+ log.severe(msg.getMessage)
true
}
}