aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala68
-rw-r--r--kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala124
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala16
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala176
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala80
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala55
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala163
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala93
-rw-r--r--kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala93
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Entity.scala58
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala173
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala63
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricKey.scala169
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala199
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala117
-rw-r--r--kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala82
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala173
-rw-r--r--kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala116
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala65
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala74
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala293
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala)0
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala118
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala164
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala (renamed from kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala)62
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala65
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala29
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala99
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala (renamed from kamon-core/src/main/scala/kamon/metric/Scale.scala)24
-rw-r--r--kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala125
-rw-r--r--kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Incubator.scala97
-rw-r--r--kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala120
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala73
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala169
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceExtension.scala36
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala34
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala92
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracerExtension.scala110
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala46
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracingContext.scala92
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala39
-rw-r--r--kamon-core/src/main/scala/kamon/util/ConfigTools.scala (renamed from kamon-core/src/main/scala/kamon/metric/package.scala)30
-rw-r--r--kamon-core/src/main/scala/kamon/util/FastDispatch.scala38
-rw-r--r--kamon-core/src/main/scala/kamon/util/LazyActorRef.scala53
-rw-r--r--kamon-core/src/main/scala/kamon/util/MapMerge.scala43
-rw-r--r--kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala3
-rw-r--r--kamon-core/src/main/scala/kamon/util/Sequencer.scala (renamed from kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala)28
-rw-r--r--kamon-core/src/main/scala/kamon/util/Timestamp.scala101
-rw-r--r--kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala45
59 files changed, 2858 insertions, 1965 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index dfebd3a5..ab9ce05e 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -14,10 +14,72 @@
*/
package kamon
-import akka.actor._
+import _root_.akka.actor
+import _root_.akka.actor._
+import com.typesafe.config.{ ConfigFactory, Config }
+import kamon.metric._
+import kamon.trace.{ TracerExtensionImpl, TracerExtension }
object Kamon {
- trait Extension extends akka.actor.Extension
- def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system)
+ trait Extension extends actor.Extension
+
+ private case class KamonCoreComponents(
+ metrics: MetricsExtension,
+ tracer: TracerExtension,
+ userMetrics: UserMetricsExtension)
+
+ @volatile private var _system: ActorSystem = _
+ @volatile private var _coreComponents: Option[KamonCoreComponents] = None
+
+ def start(config: Config): Unit = synchronized {
+ def resolveInternalConfig: Config = {
+ val internalConfig = config.getConfig("kamon.internal-config")
+
+ config
+ .withoutPath("akka")
+ .withoutPath("spray")
+ .withFallback(internalConfig)
+ }
+
+ if (_coreComponents.isEmpty) {
+ val metrics = MetricsExtensionImpl(config)
+ val simpleMetrics = UserMetricsExtensionImpl(metrics)
+ val tracer = TracerExtensionImpl(metrics, config)
+
+ _coreComponents = Some(KamonCoreComponents(metrics, tracer, simpleMetrics))
+ _system = ActorSystem("kamon", resolveInternalConfig)
+
+ metrics.start(_system)
+ tracer.start(_system)
+
+ } else sys.error("Kamon has already been started.")
+ }
+
+ def start(): Unit =
+ start(ConfigFactory.load)
+
+ def metrics: MetricsExtension =
+ ifStarted(_.metrics)
+
+ def tracer: TracerExtension =
+ ifStarted(_.tracer)
+
+ def userMetrics: UserMetricsExtension =
+ ifStarted(_.userMetrics)
+
+ def apply[T <: Kamon.Extension](key: ExtensionId[T]): T =
+ ifStarted { _ ⇒
+ if (_system ne null)
+ key(_system)
+ else
+ sys.error("Cannot retrieve extensions while Kamon is being initialized.")
+ }
+
+ def extension[T <: Kamon.Extension](key: ExtensionId[T]): T =
+ apply(key)
+
+ private def ifStarted[T](thunk: KamonCoreComponents ⇒ T): T =
+ _coreComponents.map(thunk(_)) getOrElse (sys.error("Kamon has not been started yet."))
+
}
diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
index 0dd189f6..553d59ed 100644
--- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
@@ -1,99 +1,41 @@
-package kamon.http
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.metric.instrument.Counter
-import kamon.metric._
-
-import scala.collection.concurrent.TrieMap
-
-object HttpServerMetrics extends MetricGroupIdentity {
- import Metrics.AtomicGetOrElseUpdateForTriemap
-
- val name: String = "http-server-metrics-recorder"
- val category = new MetricGroupCategory {
- val name: String = "http-server"
- }
-
- type TraceName = String
- type StatusCode = String
-
- case class CountPerStatusCode(statusCode: String) extends MetricIdentity {
- def name: String = statusCode
- }
-
- case class TraceCountPerStatus(traceName: TraceName, statusCode: StatusCode) extends MetricIdentity {
- def name: String = traceName + "_" + statusCode
- }
-
- class HttpServerMetricsRecorder extends MetricGroupRecorder {
-
- private val counters = TrieMap[StatusCode, Counter]()
- private val countersPerTrace = TrieMap[TraceName, TrieMap[StatusCode, Counter]]()
-
- def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L)
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
- def recordResponse(statusCode: StatusCode, count: Long): Unit =
- counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count)
-
- def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L)
-
- def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = {
- recordResponse(statusCode, count)
- countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count)
- }
-
- def collect(context: CollectionContext): HttpServerMetricsSnapshot = {
- val countsPerStatusCode = counters.map {
- case (statusCode, counter) ⇒ (statusCode, counter.collect(context))
- }.toMap
-
- val countsPerTraceAndStatus = countersPerTrace.map {
- case (traceName, countsPerStatus) ⇒
- (traceName, countsPerStatus.map { case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) }.toMap)
- }.toMap
-
- HttpServerMetricsSnapshot(countsPerStatusCode, countsPerTraceAndStatus)
- }
-
- def cleanup: Unit = {}
- }
+package kamon.http
- case class HttpServerMetricsSnapshot(countsPerStatusCode: Map[StatusCode, Counter.Snapshot],
- countsPerTraceAndStatusCode: Map[TraceName, Map[StatusCode, Counter.Snapshot]]) extends MetricGroupSnapshot {
+import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder }
+import kamon.metric.instrument.InstrumentFactory
- type GroupSnapshotType = HttpServerMetricsSnapshot
+/**
+ * Counts HTTP response status codes into per status code and per trace name + status counters. If recording a HTTP
+ * response with status 500 for the trace "GetUser", the counter with name "500" as well as the counter with name
+ * "GetUser_500" will be incremented.
+ */
+class HttpServerMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
- def merge(that: HttpServerMetricsSnapshot, context: CollectionContext): HttpServerMetricsSnapshot = {
- val combinedCountsPerStatus = combineMaps(countsPerStatusCode, that.countsPerStatusCode)((l, r) ⇒ l.merge(r, context))
- val combinedCountsPerTraceAndStatus = combineMaps(countsPerTraceAndStatusCode, that.countsPerTraceAndStatusCode) {
- (leftCounts, rightCounts) ⇒ combineMaps(leftCounts, rightCounts)((l, r) ⇒ l.merge(r, context))
- }
- HttpServerMetricsSnapshot(combinedCountsPerStatus, combinedCountsPerTraceAndStatus)
- }
+ def recordResponse(statusCode: String): Unit =
+ counter(statusCode).increment()
- def metrics: Map[MetricIdentity, MetricSnapshot] = {
- countsPerStatusCode.map {
- case (statusCode, count) ⇒ (CountPerStatusCode(statusCode), count)
- } ++ {
- for (
- (traceName, countsPerStatus) ← countsPerTraceAndStatusCode;
- (statusCode, count) ← countsPerStatus
- ) yield (TraceCountPerStatus(traceName, statusCode), count)
- }
- }
+ def recordResponse(traceName: String, statusCode: String): Unit = {
+ recordResponse(statusCode)
+ counter(traceName + "_" + statusCode).increment()
}
-
- val Factory = HttpServerMetricGroupFactory
}
-case object HttpServerMetricGroupFactory extends MetricGroupFactory {
-
- import HttpServerMetrics._
-
- type GroupRecorder = HttpServerMetricsRecorder
-
- def create(config: Config, system: ActorSystem): HttpServerMetricsRecorder =
- new HttpServerMetricsRecorder()
-
-} \ No newline at end of file
+object HttpServerMetrics extends EntityRecorderFactory[HttpServerMetrics] {
+ def category: String = "http-server"
+ def createRecorder(instrumentFactory: InstrumentFactory): HttpServerMetrics = new HttpServerMetrics(instrumentFactory)
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala
index 5ca4481e..4dc5ff41 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala
@@ -1,3 +1,19 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.instrumentation
import _root_.akka.event.EventStream
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
deleted file mode 100644
index 366f446d..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package akka.instrumentation
-
-import akka.actor._
-import akka.dispatch.{ Envelope, MessageDispatcher }
-import akka.routing.RoutedActorCell
-import kamon.Kamon
-import kamon.metric.ActorMetrics.ActorMetricsRecorder
-import kamon.metric.RouterMetrics.RouterMetricsRecorder
-import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics }
-import kamon.trace._
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class ActorCellInstrumentation {
-
- import ActorCellInstrumentation.PimpedActorCellMetrics
-
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)")
- def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
-
- @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
- def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
-
- val metricsExtension = Kamon(Metrics)(system)
- val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
-
- cellWithMetrics.actorMetricIdentity = metricIdentity
- cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
-
- cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
- val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}")
- routedActorCell.routerMetricIdentity = routerMetricIdentity
- routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory)
- }
- }
-
- @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
- def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {}
-
- @Around("invokingActorBehaviourAtActorCell(cell, envelope)")
- def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- val timestampBeforeProcessing = System.nanoTime()
- val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware]
-
- try {
- TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
- pjp.proceed()
- }
- } finally {
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
- val processingTime = System.nanoTime() - timestampBeforeProcessing
- val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
-
- am.processingTime.record(processingTime)
- am.timeInMailbox.record(timeInMailbox)
- am.mailboxSize.decrement()
-
- (processingTime, timeInMailbox)
- } map {
- case (processingTime, timeInMailbox) ⇒
- cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
- routedActorCell.routerMetricsRecorder.map {
- rm ⇒
- rm.processingTime.record(processingTime)
- rm.timeInMailbox.record(timeInMailbox)
- }
- }
- }
- }
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)")
- def sendingMessageToActorCell(cell: ActorCell): Unit = {}
-
- @After("sendingMessageToActorCell(cell)")
- def afterSendMessageToActorCell(cell: ActorCell): Unit = {
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment())
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
- def actorStop(cell: ActorCell): Unit = {}
-
- @After("actorStop(cell)")
- def afterStop(cell: ActorCell): Unit = {
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
-
- cellWithMetrics.actorMetricsRecorder.map { p ⇒
- Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity)
- }
-
- cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
- routedActorCell.routerMetricsRecorder.map { rm ⇒
- Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity)
- }
- }
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
- def actorInvokeFailure(cell: ActorCell): Unit = {}
-
- @Before("actorInvokeFailure(cell)")
- def beforeInvokeFailure(cell: ActorCell): Unit = {
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
-
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒ am.errors.increment()
- }
-
- cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
- routedActorCell.routerMetricsRecorder.map {
- rm ⇒ rm.errors.increment()
- }
- }
- }
-
-}
-
-trait ActorCellMetrics {
- var actorMetricIdentity: ActorMetrics = _
- var routerMetricIdentity: RouterMetrics = _
- var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
- var routerMetricsRecorder: Option[RouterMetricsRecorder] = _
-}
-
-@Aspect
-class ActorCellMetricsIntoActorCellMixin {
-
- @DeclareMixin("akka.actor.ActorCell")
- def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {}
-}
-
-@Aspect
-class TraceContextIntoEnvelopeMixin {
-
- @DeclareMixin("akka.dispatch.Envelope")
- def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default
-
- @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
- def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {}
-
- @After("envelopeCreation(ctx)")
- def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = {
- // Necessary to force the initialization of ContextAware at the moment of creation.
- ctx.traceContext
- }
-}
-
-object ActorCellInstrumentation {
- implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) {
- def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit): Unit = {
- if (cell.isInstanceOf[RoutedActorCell])
- block(cell)
- }
- }
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala
deleted file mode 100644
index 82b8304d..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package akka.instrumentation
-
-import kamon.trace.{ TraceContextAware, TraceRecorder }
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class ActorLoggingInstrumentation {
-
- @DeclareMixin("akka.event.Logging.LogEvent+")
- def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default
-
- @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)")
- def logEventCreation(event: TraceContextAware): Unit = {}
-
- @After("logEventCreation(event)")
- def captureTraceContext(event: TraceContextAware): Unit = {
- // Force initialization of TraceContextAware
- event.traceContext
- }
-
- @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
- def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {}
-
- @Around("withMdcInvocation(logSource, logEvent, logStatement)")
- def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {
- TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) {
- pjp.proceed()
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala
deleted file mode 100644
index 7845e90d..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package akka.instrumentation
-
-import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
-import kamon.trace.{ TraceContextAware, TraceRecorder }
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class ActorSystemMessageInstrumentation {
-
- @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)")
- def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {}
-
- @Around("systemMessageProcessing(messages)")
- def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
- if (messages.nonEmpty) {
- val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext
- TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed())
-
- } else pjp.proceed()
- }
-}
-
-@Aspect
-class TraceContextIntoSystemMessageMixin {
-
- @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
- def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default
-
- @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)")
- def envelopeCreation(ctx: TraceContextAware): Unit = {}
-
- @After("envelopeCreation(ctx)")
- def afterEnvelopeCreation(ctx: TraceContextAware): Unit = {
- // Necessary to force the initialization of ContextAware at the moment of creation.
- ctx.traceContext
- }
-}
-
-@Aspect
-class TraceContextIntoRepointableActorRefMixin {
-
- @DeclareMixin("akka.actor.RepointableActorRef")
- def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default
-
- @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)")
- def envelopeCreation(ctx: TraceContextAware): Unit = {}
-
- @After("envelopeCreation(ctx)")
- def afterEnvelopeCreation(ctx: TraceContextAware): Unit = {
- // Necessary to force the initialization of ContextAware at the moment of creation.
- ctx.traceContext
- }
-
- @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)")
- def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {}
-
- @Around("repointableActorRefCreation(repointableActorRef)")
- def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = {
- TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) {
- pjp.proceed()
- }
- }
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
deleted file mode 100644
index 5e8175fa..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package akka.instrumentation
-
-import akka.actor.ActorRefProvider
-import akka.event.Logging.Warning
-import akka.pattern.{ AskTimeoutException, PromiseActorRef }
-import kamon.Kamon
-import kamon.trace.Trace
-import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut }
-
-import scala.compat.Platform.EOL
-
-@Aspect
-class AskPatternInstrumentation {
-
- class StackTraceCaptureException extends Throwable
-
- @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider")
- def promiseActorRefApply(provider: ActorRefProvider): Unit = {}
-
- @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor")
- def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = {
- val system = promiseActor.provider.guardian.underlying.system
- val traceExtension = Kamon(Trace)(system)
-
- if (traceExtension.enableAskPatternTracing) {
- val future = promiseActor.result.future
- implicit val ec = system.dispatcher
- val stack = new StackTraceCaptureException
-
- future onFailure {
- case timeout: AskTimeoutException ⇒
- val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL)
-
- system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation],
- "Timeout triggered for ask pattern registered at: " + stackString))
- }
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala
deleted file mode 100644
index 8b3af3d6..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package akka.instrumentation
-
-import java.lang.reflect.Method
-import java.util.concurrent.ThreadPoolExecutor
-
-import akka.actor.{ ActorSystemImpl, Cancellable }
-import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher }
-import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement
-import kamon.Kamon
-import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder
-import kamon.metric.{ DispatcherMetrics, Metrics }
-import org.aspectj.lang.annotation._
-
-import scala.concurrent.forkjoin.ForkJoinPool
-
-@Aspect
-class DispatcherInstrumentation {
-
- @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))")
- def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {}
-
- @Before("onActorSystemStartup(dispatchers, system)")
- def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = {
- val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem]
- currentDispatchers.actorSystem = system
- }
-
- @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)")
- def onDispatchersLookup(dispatchers: Dispatchers) = {}
-
- @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher")
- def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = {
- val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem]
- val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo]
-
- dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem
- }
-
- @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))")
- def onCreateExecutorService(): Unit = {}
-
- @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))")
- def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {}
-
- @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)")
- def onDispatcherStartup(dispatcher: Dispatcher): Unit = {}
-
- @After("onDispatcherStartup(dispatcher)")
- def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = {
-
- val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo]
- val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem)
- val metricIdentity = DispatcherMetrics(dispatcher.id)
-
- dispatcherWithMetrics.metricIdentity = metricIdentity
- dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory)
-
- if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) {
- dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
- dispatcherWithMetrics.dispatcherMetricsRecorder.map {
- dm ⇒
- val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) =
- DispatcherMetricsCollector.collect(dispatcher)
-
- dm.maximumPoolSize.record(maximumPoolSize)
- dm.runningThreadCount.record(runningThreadCount)
- dm.queueTaskCount.record(queueTaskCount)
- dm.poolSize.record(poolSize)
- }
- }
- }
- }
-
- @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)")
- def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {}
-
- @After("onDispatcherShutdown(dispatcher)")
- def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
- val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo]
-
- dispatcherWithMetrics.dispatcherMetricsRecorder.map {
- dispatcher ⇒
- dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
- Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
- }
- }
-}
-
-@Aspect
-class DispatcherMetricCollectionInfoIntoDispatcherMixin {
-
- @DeclareMixin("akka.dispatch.MessageDispatcher")
- def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {}
-
- @DeclareMixin("akka.dispatch.Dispatchers")
- def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {}
-}
-
-trait DispatcherMetricCollectionInfo {
- var metricIdentity: DispatcherMetrics = _
- var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _
- var dispatcherCollectorCancellable: Cancellable = _
- var actorSystem: ActorSystemImpl = _
-}
-
-trait DispatchersWithActorSystem {
- var actorSystem: ActorSystemImpl = _
-}
-
-object DispatcherMetricsCollector {
-
- case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long)
-
- private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = {
- DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
- (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
- }
-
- private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
- DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
- }
-
- private val executorServiceMethod: Method = {
- // executorService is protected
- val method = classOf[Dispatcher].getDeclaredMethod("executorService")
- method.setAccessible(true)
- method
- }
-
- def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = {
- dispatcher match {
- case x: Dispatcher ⇒ {
- val executor = executorServiceMethod.invoke(x) match {
- case delegate: ExecutorServiceDelegate ⇒ delegate.executor
- case other ⇒ other
- }
-
- executor match {
- case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
- case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
- case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
- }
- }
- case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala
deleted file mode 100644
index bda2da78..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.instrumentation.scala
-
-import kamon.trace.{ TraceContextAware, TraceRecorder }
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class FutureInstrumentation {
-
- @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
- def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default
-
- @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)")
- def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {}
-
- @After("futureRelatedRunnableCreation(runnable)")
- def afterCreation(runnable: TraceContextAware): Unit = {
- // Force traceContext initialization.
- runnable.traceContext
- }
-
- @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)")
- def futureRelatedRunnableExecution(runnable: TraceContextAware) = {}
-
- @Around("futureRelatedRunnableExecution(runnable)")
- def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = {
- TraceRecorder.withInlineTraceContextReplacement(runnable.traceContext) {
- pjp.proceed()
- }
- }
-
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala
deleted file mode 100644
index 65caaa8f..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.instrumentation.scalaz
-
-import kamon.trace.{ TraceContextAware, TraceRecorder }
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class FutureInstrumentation {
-
- @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+")
- def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware =
- TraceContextAware.default
-
- @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)")
- def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {}
-
- @After("futureRelatedCallableCreation(callable)")
- def afterCreation(callable: TraceContextAware): Unit =
- // Force traceContext initialization.
- callable.traceContext
-
- @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)")
- def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {}
-
- @Around("futureRelatedCallableExecution(callable)")
- def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any =
- TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) {
- pjp.proceed()
- }
-
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala
deleted file mode 100644
index d2cb4e38..00000000
--- a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metric
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram }
-
-case class ActorMetrics(name: String) extends MetricGroupIdentity {
- val category = ActorMetrics
-}
-
-object ActorMetrics extends MetricGroupCategory {
- val name = "actor"
-
- case object ProcessingTime extends MetricIdentity { val name = "processing-time" }
- case object MailboxSize extends MetricIdentity { val name = "mailbox-size" }
- case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" }
- case object Errors extends MetricIdentity { val name = "errors" }
-
- case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter,
- errors: Counter) extends MetricGroupRecorder {
-
- def collect(context: CollectionContext): ActorMetricSnapshot =
- ActorMetricSnapshot(
- processingTime.collect(context),
- timeInMailbox.collect(context),
- mailboxSize.collect(context),
- errors.collect(context))
-
- def cleanup: Unit = {
- processingTime.cleanup
- mailboxSize.cleanup
- timeInMailbox.cleanup
- errors.cleanup
- }
- }
-
- case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot,
- mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot {
-
- type GroupSnapshotType = ActorMetricSnapshot
-
- def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot =
- ActorMetricSnapshot(
- processingTime.merge(that.processingTime, context),
- timeInMailbox.merge(that.timeInMailbox, context),
- mailboxSize.merge(that.mailboxSize, context),
- errors.merge(that.errors, context))
-
- lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
- (ProcessingTime -> processingTime),
- (MailboxSize -> mailboxSize),
- (TimeInMailbox -> timeInMailbox),
- (Errors -> errors))
- }
-
- val Factory = ActorMetricGroupFactory
-}
-
-case object ActorMetricGroupFactory extends MetricGroupFactory {
- import ActorMetrics._
-
- type GroupRecorder = ActorMetricsRecorder
-
- def create(config: Config, system: ActorSystem): ActorMetricsRecorder = {
- val settings = config.getConfig("precision.actor")
-
- val processingTimeConfig = settings.getConfig("processing-time")
- val timeInMailboxConfig = settings.getConfig("time-in-mailbox")
- val mailboxSizeConfig = settings.getConfig("mailbox-size")
-
- new ActorMetricsRecorder(
- Histogram.fromConfig(processingTimeConfig),
- Histogram.fromConfig(timeInMailboxConfig),
- MinMaxCounter.fromConfig(mailboxSizeConfig, system),
- Counter())
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala
deleted file mode 100644
index 126f6333..00000000
--- a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metric
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.metric.instrument.{ Histogram, HdrHistogram }
-
-case class DispatcherMetrics(name: String) extends MetricGroupIdentity {
- val category = DispatcherMetrics
-}
-
-object DispatcherMetrics extends MetricGroupCategory {
- val name = "dispatcher"
-
- case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" }
- case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" }
- case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" }
- case object PoolSize extends MetricIdentity { val name = "pool-size" }
-
- case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram,
- queueTaskCount: Histogram, poolSize: Histogram)
- extends MetricGroupRecorder {
-
- def collect(context: CollectionContext): MetricGroupSnapshot =
- DispatcherMetricSnapshot(
- maximumPoolSize.collect(context),
- runningThreadCount.collect(context),
- queueTaskCount.collect(context),
- poolSize.collect(context))
-
- def cleanup: Unit = {}
-
- }
-
- case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot,
- queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot {
-
- type GroupSnapshotType = DispatcherMetricSnapshot
-
- def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot =
- DispatcherMetricSnapshot(
- maximumPoolSize.merge(that.maximumPoolSize, context),
- runningThreadCount.merge(that.runningThreadCount, context),
- queueTaskCount.merge(that.queueTaskCount, context),
- poolSize.merge(that.poolSize, context))
-
- lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
- (MaximumPoolSize -> maximumPoolSize),
- (RunningThreadCount -> runningThreadCount),
- (QueueTaskCount -> queueTaskCount),
- (PoolSize -> poolSize))
- }
-
- val Factory = DispatcherMetricGroupFactory
-}
-
-case object DispatcherMetricGroupFactory extends MetricGroupFactory {
-
- import DispatcherMetrics._
-
- type GroupRecorder = DispatcherMetricRecorder
-
- def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = {
- val settings = config.getConfig("precision.dispatcher")
-
- val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size")
- val runningThreadCountConfig = settings.getConfig("running-thread-count")
- val queueTaskCountConfig = settings.getConfig("queued-task-count")
- val poolSizeConfig = settings.getConfig("pool-size")
-
- new DispatcherMetricRecorder(
- Histogram.fromConfig(maximumPoolSizeConfig),
- Histogram.fromConfig(runningThreadCountConfig),
- Histogram.fromConfig(queueTaskCountConfig),
- Histogram.fromConfig(poolSizeConfig))
- }
-
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/Entity.scala b/kamon-core/src/main/scala/kamon/metric/Entity.scala
new file mode 100644
index 00000000..8d328f83
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Entity.scala
@@ -0,0 +1,58 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+/**
+ * Identify a `thing` that is being monitored by Kamon. A [[kamon.metric.Entity]] is used to identify tracked `things`
+ * in both the metrics recording and reporting sides. Only the name and category fields are used with determining
+ * equality between two entities.
+ *
+ * // TODO: Find a better word for `thing`.
+ */
+class Entity(val name: String, val category: String, val metadata: Map[String, String]) {
+
+ override def equals(o: Any): Boolean = {
+ if (this eq o.asInstanceOf[AnyRef])
+ true
+ else if ((o.asInstanceOf[AnyRef] eq null) || !o.isInstanceOf[Entity])
+ false
+ else {
+ val thatAsEntity = o.asInstanceOf[Entity]
+ category == thatAsEntity.category && name == thatAsEntity.name
+ }
+ }
+
+ override def hashCode: Int = {
+ var result: Int = name.hashCode
+ result = 31 * result + category.hashCode
+ return result
+ }
+}
+
+object Entity {
+ def apply(name: String, category: String): Entity =
+ apply(name, category, Map.empty)
+
+ def apply(name: String, category: String, metadata: Map[String, String]): Entity =
+ new Entity(name, category, metadata)
+
+ def create(name: String, category: String): Entity =
+ apply(name, category, Map.empty)
+
+ def create(name: String, category: String, metadata: Map[String, String]): Entity =
+ new Entity(name, category, metadata)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
new file mode 100644
index 00000000..6e0a4248
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
@@ -0,0 +1,173 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.metric.instrument._
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+
+trait EntityRecorder {
+ def collect(collectionContext: CollectionContext): EntitySnapshot
+ def cleanup: Unit
+}
+
+trait EntityRecorderFactory[T <: EntityRecorder] {
+ def category: String
+ def createRecorder(instrumentFactory: InstrumentFactory): T
+}
+
+abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder {
+ import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax
+
+ private val _instruments = TrieMap.empty[MetricKey, Instrument]
+ private def register[T <: Instrument](key: MetricKey, instrument: ⇒ T): T =
+ _instruments.atomicGetOrElseUpdate(key, instrument, _.cleanup).asInstanceOf[T]
+
+ protected def histogram(name: String): Histogram =
+ register(HistogramKey(name), instrumentFactory.createHistogram(name))
+
+ protected def histogram(name: String, dynamicRange: DynamicRange): Histogram =
+ register(HistogramKey(name), instrumentFactory.createHistogram(name, Some(dynamicRange)))
+
+ protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name))
+
+ protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange)))
+
+ protected def histogram(key: HistogramKey): Histogram =
+ register(key, instrumentFactory.createHistogram(key.name))
+
+ protected def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram =
+ register(key, instrumentFactory.createHistogram(key.name, Some(dynamicRange)))
+
+ protected def removeHistogram(name: String): Unit =
+ _instruments.remove(HistogramKey(name))
+
+ protected def removeHistogram(key: HistogramKey): Unit =
+ _instruments.remove(key)
+
+ protected def minMaxCounter(name: String): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange)))
+
+ protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange)))
+
+ protected def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def removeMinMaxCounter(name: String): Unit =
+ _instruments.remove(MinMaxCounterKey(name))
+
+ protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit =
+ _instruments.remove(key)
+
+ protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector))
+
+ protected def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def removeGauge(name: String): Unit =
+ _instruments.remove(GaugeKey(name))
+
+ protected def removeGauge(key: GaugeKey): Unit =
+ _instruments.remove(key)
+
+ protected def counter(name: String): Counter =
+ register(CounterKey(name), instrumentFactory.createCounter())
+
+ protected def counter(key: CounterKey): Counter =
+ register(key, instrumentFactory.createCounter())
+
+ protected def removeCounter(name: String): Unit =
+ _instruments.remove(CounterKey(name))
+
+ protected def removeCounter(key: CounterKey): Unit =
+ _instruments.remove(key)
+
+ def collect(collectionContext: CollectionContext): EntitySnapshot = {
+ val snapshots = Map.newBuilder[MetricKey, InstrumentSnapshot]
+ _instruments.foreach {
+ case (key, instrument) ⇒ snapshots += key -> instrument.collect(collectionContext)
+ }
+
+ new DefaultEntitySnapshot(snapshots.result())
+ }
+
+ def cleanup: Unit = _instruments.values.foreach(_.cleanup)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
new file mode 100644
index 00000000..7ebb79e2
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
@@ -0,0 +1,63 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.{ Counter, Histogram, CollectionContext, InstrumentSnapshot }
+import kamon.util.MapMerge
+import scala.reflect.ClassTag
+
+trait EntitySnapshot {
+ def metrics: Map[MetricKey, InstrumentSnapshot]
+ def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot
+
+ def histogram(name: String): Option[Histogram.Snapshot] =
+ find[HistogramKey, Histogram.Snapshot](name)
+
+ def minMaxCounter(name: String): Option[Histogram.Snapshot] =
+ find[MinMaxCounterKey, Histogram.Snapshot](name)
+
+ def gauge(name: String): Option[Histogram.Snapshot] =
+ find[GaugeKey, Histogram.Snapshot](name)
+
+ def counter(name: String): Option[Counter.Snapshot] =
+ find[CounterKey, Counter.Snapshot](name)
+
+ def histograms: Map[HistogramKey, Histogram.Snapshot] =
+ filterByType[HistogramKey, Histogram.Snapshot]
+
+ def minMaxCounters: Map[MinMaxCounterKey, Histogram.Snapshot] =
+ filterByType[MinMaxCounterKey, Histogram.Snapshot]
+
+ def gauges: Map[GaugeKey, Histogram.Snapshot] =
+ filterByType[GaugeKey, Histogram.Snapshot]
+
+ def counters: Map[CounterKey, Counter.Snapshot] =
+ filterByType[CounterKey, Counter.Snapshot]
+
+ private def filterByType[K <: MetricKey, V <: InstrumentSnapshot](implicit keyCT: ClassTag[K]): Map[K, V] =
+ metrics.collect { case (k, v) if keyCT.runtimeClass.isInstance(k) ⇒ (k.asInstanceOf[K], v.asInstanceOf[V]) }
+
+ private def find[K <: MetricKey, V <: InstrumentSnapshot](name: String)(implicit keyCT: ClassTag[K]) =
+ metrics.find { case (k, v) ⇒ keyCT.runtimeClass.isInstance(k) && k.name == name } map (_._2.asInstanceOf[V])
+}
+
+class DefaultEntitySnapshot(val metrics: Map[MetricKey, InstrumentSnapshot]) extends EntitySnapshot {
+ import MapMerge.Syntax
+
+ override def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot =
+ new DefaultEntitySnapshot(metrics.merge(that.metrics, (l, r) ⇒ l.merge(r, collectionContext)))
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala
new file mode 100644
index 00000000..a5d30c81
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala
@@ -0,0 +1,169 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.{ InstrumentTypes, InstrumentType, UnitOfMeasurement }
+
+/**
+ * MetricKeys are used to identify a given metric in entity recorders and snapshots. MetricKeys can be used to encode
+ * additional metadata for a metric being recorded, as well as the unit of measurement of the data being recorder.
+ */
+sealed trait MetricKey {
+ def name: String
+ def unitOfMeasurement: UnitOfMeasurement
+ def instrumentType: InstrumentType
+ def metadata: Map[String, String]
+}
+
+// Wish that there was a shorter way to describe the operations bellow, but apparently there is no way to generalize all
+// the apply/create versions that would produce the desired return types when used from Java.
+
+/**
+ * MetricKey for all Histogram-based metrics.
+ */
+case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.Histogram
+}
+
+object HistogramKey {
+ def apply(name: String): HistogramKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): HistogramKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): HistogramKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): HistogramKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): HistogramKey =
+ apply(name, unitOfMeasurement, metadata)
+}
+
+/**
+ * MetricKey for all MinMaxCounter-based metrics.
+ */
+case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.MinMaxCounter
+}
+
+object MinMaxCounterKey {
+ def apply(name: String): MinMaxCounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): MinMaxCounterKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): MinMaxCounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): MinMaxCounterKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): MinMaxCounterKey =
+ apply(name, unitOfMeasurement, metadata)
+}
+
+/**
+ * MetricKey for all Gauge-based metrics.
+ */
+case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.Gauge
+}
+
+object GaugeKey {
+ def apply(name: String): GaugeKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): GaugeKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): GaugeKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): GaugeKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): GaugeKey =
+ apply(name, unitOfMeasurement, metadata)
+}
+
+/**
+ * MetricKey for all Counter-based metrics.
+ */
+case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.Counter
+}
+
+object CounterKey {
+ def apply(name: String): CounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): CounterKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): CounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): CounterKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): CounterKey =
+ apply(name, unitOfMeasurement, metadata)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index f491cc57..87911352 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -16,129 +16,126 @@
package kamon.metric
-import akka.event.Logging.Error
-import akka.event.EventStream
+import com.typesafe.config.Config
+import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe }
+import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext }
import scala.collection.concurrent.TrieMap
import akka.actor._
-import com.typesafe.config.Config
-import kamon.util.GlobPathFilter
-import kamon.Kamon
-import akka.actor
-import kamon.metric.Metrics.MetricGroupFilter
-import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe }
-import java.util.concurrent.TimeUnit
-
-class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- import Metrics.AtomicGetOrElseUpdateForTriemap
-
- val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
- printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error"))
-
- /** Configured Dispatchers */
- val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
- val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
-
- /** Configuration Settings */
- val gaugeRecordingInterval: Long = metricsExtConfig.getMilliseconds("gauge-recording-interval")
-
- val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
- val filters = loadFilters(metricsExtConfig)
- lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
-
- def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
- if (shouldTrack(identity))
- Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder])
- else
- None
- }
+import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate }
- def unregister(identity: MetricGroupIdentity): Unit = {
- storage.remove(identity).map(_.cleanup)
- }
+case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T)
- def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit =
- subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber)
+trait MetricsExtension {
+ def settings: MetricsExtensionSettings
+ def shouldTrack(entity: Entity): Boolean
+ def shouldTrack(entityName: String, category: String): Boolean =
+ shouldTrack(Entity(entityName, category))
- def unsubscribe(subscriber: ActorRef): Unit =
- subscriptions.tell(Unsubscribe(subscriber), subscriber)
+ def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]]
+ def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T]
+ def unregister(entity: Entity): Unit
- def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
- import scala.concurrent.duration._
+ def find(entity: Entity): Option[EntityRecorder]
+ def find(name: String, category: String): Option[EntityRecorder]
- system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
- body
- }(gaugeRecordingsDispatcher)
- }
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit =
+ subscribe(filter, subscriber, permanently = false)
- private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
- filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true)
- }
+ def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit =
+ subscribe(SubscriptionFilter(category, selection), subscriber, permanently)
+
+ def subscribe(category: String, selection: String, subscriber: ActorRef): Unit =
+ subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false)
- def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
- import scala.collection.JavaConverters._
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit
- val filters = config.getObjectList("filters").asScala
+ def unsubscribe(subscriber: ActorRef): Unit
+ def buildDefaultCollectionContext: CollectionContext
+ def instrumentFactory(category: String): InstrumentFactory
+}
+
+private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension {
+ private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder]
+ private val _subscriptions = new LazyActorRef
+
+ val settings = MetricsExtensionSettings(config)
+
+ def shouldTrack(entity: Entity): Boolean =
+ settings.entityFilters.get(entity.category).map {
+ filter ⇒ filter.accept(entity.name)
- val allFilters =
- for (
- filter ← filters;
- entry ← filter.entrySet().asScala
- ) yield {
- val key = entry.getKey
- val keyBasedConfig = entry.getValue.atKey(key)
+ } getOrElse (settings.trackUnmatchedEntities)
- val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList
- val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList
+ def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = {
+ import TriemapAtomicGetOrElseUpdate.Syntax
+ val entity = Entity(entityName, recorderFactory.category)
- (key, MetricGroupFilter(includes, excludes))
- }
+ if (shouldTrack(entity)) {
+ val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory)
+ val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory), _.cleanup).asInstanceOf[T]
- allFilters.toMap
+ Some(EntityRegistration(entity, recorder))
+ } else None
}
- def buildDefaultCollectionContext: CollectionContext =
- CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size"))
-
- def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = {
- if (!disableWeaverMissingError) {
- val weaverMissingMessage =
- """
- |
- | ___ _ ___ _ _ ___ ___ _ _
- | / _ \ | | |_ | | | | | | \/ |(_) (_)
- |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _
- || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` |
- || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| |
- |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, |
- | | | __/ |
- | |_| |___/
- |
- | It seems like your application wasn't started with the -javaagent:/path-to-aspectj-weaver.jar option. Without that Kamon might
- | not work properly, if you need help on setting up the weaver go to http://kamon.io/introduction/get-started/ for more info. If
- | you are sure that you don't need the weaver (e.g. you are only using KamonStandalone) then you can disable this error message
- | by changing the kamon.metrics.disable-aspectj-weaver-missing-error setting in your configuration file.
- |
- """.stripMargin
-
- eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage))
+ def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = {
+ _trackedEntities.put(entity, recorder).map { oldRecorder ⇒
+ oldRecorder.cleanup
}
+
+ EntityRegistration(entity, recorder)
}
-}
-object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
- def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system)
+ def unregister(entity: Entity): Unit =
+ _trackedEntities.remove(entity).map(_.cleanup)
+
+ def find(entity: Entity): Option[EntityRecorder] =
+ _trackedEntities.get(entity)
- case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
- def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
+ def find(name: String, category: String): Option[EntityRecorder] =
+ find(Entity(name, category))
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit =
+ _subscriptions.tell(Subscribe(filter, subscriber, permanent))
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ _subscriptions.tell(Unsubscribe(subscriber))
+
+ def buildDefaultCollectionContext: CollectionContext =
+ CollectionContext(settings.defaultCollectionContextBufferSize)
+
+ def instrumentFactory(category: String): InstrumentFactory =
+ settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory)
+
+ private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = {
+ val builder = Map.newBuilder[Entity, EntitySnapshot]
+ _trackedEntities.foreach {
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
+ }
+
+ builder.result()
+ }
+
+ /**
+ * Metrics Extension initialization.
+ */
+ private var _system: ActorSystem = null
+ private lazy val _start = {
+ _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics"))
+ settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher))
}
- implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) {
- def atomicGetOrElseUpdate(key: K, op: ⇒ V): V =
- trieMap.get(key) match {
- case Some(v) ⇒ v
- case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d)
- }
+ def start(system: ActorSystem): Unit = synchronized {
+ _system = system
+ _start
+ _system = null
}
}
+
+private[kamon] object MetricsExtensionImpl {
+
+ def apply(config: Config) =
+ new MetricsExtensionImpl(config)
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
new file mode 100644
index 00000000..9881ed00
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
@@ -0,0 +1,117 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import com.typesafe.config.Config
+import kamon.metric.instrument._
+import kamon.util.GlobPathFilter
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key.
+ */
+case class MetricsExtensionSettings(
+ tickInterval: FiniteDuration,
+ defaultCollectionContextBufferSize: Int,
+ trackUnmatchedEntities: Boolean,
+ entityFilters: Map[String, EntityFilter],
+ instrumentFactories: Map[String, InstrumentFactory],
+ defaultInstrumentFactory: InstrumentFactory,
+ refreshScheduler: RefreshScheduler) {
+
+ private[kamon] def pointScheduler(targetScheduler: RefreshScheduler): Unit = refreshScheduler match {
+ case lrs: LazyRefreshScheduler ⇒ lrs.point(targetScheduler)
+ case others ⇒
+ }
+}
+
+/**
+ *
+ */
+case class EntityFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
+ def accept(name: String): Boolean =
+ includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
+}
+
+object MetricsExtensionSettings {
+ import kamon.util.ConfigTools.Syntax
+ import scala.concurrent.duration._
+
+ def apply(config: Config): MetricsExtensionSettings = {
+ val metricConfig = config.getConfig("kamon.metric")
+
+ val tickInterval = metricConfig.getFiniteDuration("tick-interval")
+ val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size")
+ val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities")
+ val entityFilters = loadFilters(metricConfig.getConfig("filters"))
+ val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings"))
+
+ val refreshScheduler = new LazyRefreshScheduler
+ val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler)
+ val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler)
+
+ MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories,
+ defaultInstrumentFactory, refreshScheduler)
+ }
+
+ /**
+ * Load all the default filters configured under the `kamon.metric.filters` configuration key. All filters are
+ * defined with the entity category as a sub-key of the `kamon.metric.filters` key and two sub-keys to it: includes
+ * and excludes with lists of string glob patterns as values. Example:
+ *
+ * {{{
+ *
+ * kamon.metrics.filters {
+ * actor {
+ * includes = ["user/test-actor", "user/service/worker-*"]
+ * excludes = ["user/IO-*"]
+ * }
+ * }
+ *
+ * }}}
+ *
+ * @return a Map from category name to corresponding entity filter.
+ */
+ def loadFilters(filtersConfig: Config): Map[String, EntityFilter] = {
+ import scala.collection.JavaConverters._
+
+ filtersConfig.firstLevelKeys map { category: String ⇒
+ val includes = filtersConfig.getStringList(s"$category.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList
+ val excludes = filtersConfig.getStringList(s"$category.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList
+
+ (category, EntityFilter(includes, excludes))
+ } toMap
+ }
+
+ /**
+ * Load any custom configuration settings defined under the `kamon.metric.instrument-settings` configuration key and
+ * create InstrumentFactories for them.
+ *
+ * @return a Map from category name to InstrumentFactory.
+ */
+ def loadInstrumentFactories(instrumentSettings: Config, defaults: DefaultInstrumentSettings, refreshScheduler: RefreshScheduler): Map[String, InstrumentFactory] = {
+ instrumentSettings.firstLevelKeys.map { category ⇒
+ val categoryConfig = instrumentSettings.getConfig(category)
+ val customSettings = categoryConfig.firstLevelKeys.map { instrumentName ⇒
+ (instrumentName, InstrumentCustomSettings.fromConfig(categoryConfig.getConfig(instrumentName)))
+ } toMap
+
+ (category, new InstrumentFactory(customSettings, defaults, refreshScheduler))
+ } toMap
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala
deleted file mode 100644
index ddfef416..00000000
--- a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon.metric
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.metric.instrument.{ Counter, Histogram }
-
-case class RouterMetrics(name: String) extends MetricGroupIdentity {
- val category = RouterMetrics
-}
-
-object RouterMetrics extends MetricGroupCategory {
- val name = "router"
-
- case object ProcessingTime extends MetricIdentity { val name = "processing-time" }
- case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" }
- case object Errors extends MetricIdentity { val name = "errors" }
-
- case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder {
-
- def collect(context: CollectionContext): RouterMetricSnapshot =
- RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context))
-
- def cleanup: Unit = {
- processingTime.cleanup
- timeInMailbox.cleanup
- errors.cleanup
- }
- }
-
- case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot {
-
- type GroupSnapshotType = RouterMetricSnapshot
-
- def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot =
- RouterMetricSnapshot(
- processingTime.merge(that.processingTime, context),
- timeInMailbox.merge(that.timeInMailbox, context),
- errors.merge(that.errors, context))
-
- lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
- ProcessingTime -> processingTime,
- TimeInMailbox -> timeInMailbox,
- Errors -> errors)
- }
-
- val Factory = RouterMetricGroupFactory
-}
-
-case object RouterMetricGroupFactory extends MetricGroupFactory {
-
- import RouterMetrics._
-
- type GroupRecorder = RouterMetricsRecorder
-
- def create(config: Config, system: ActorSystem): RouterMetricsRecorder = {
- val settings = config.getConfig("precision.router")
-
- val processingTimeConfig = settings.getConfig("processing-time")
- val timeInMailboxConfig = settings.getConfig("time-in-mailbox")
-
- new RouterMetricsRecorder(
- Histogram.fromConfig(processingTimeConfig),
- Histogram.fromConfig(timeInMailboxConfig),
- Counter())
- }
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
deleted file mode 100644
index 1ba9f312..00000000
--- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metric
-
-import akka.actor._
-import kamon.metric.Subscriptions._
-import kamon.util.GlobPathFilter
-import scala.concurrent.duration.{ FiniteDuration, Duration }
-import java.util.concurrent.TimeUnit
-import kamon.Kamon
-import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
-
-class Subscriptions extends Actor {
- import context.system
-
- val flushMetricsSchedule = scheduleFlushMessage()
- val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
-
- var lastTick: Long = System.currentTimeMillis()
- var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
- var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
-
- def receive = {
- case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent)
- case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
- case Terminated(subscriber) ⇒ unsubscribe(subscriber)
- case FlushMetrics ⇒ flush()
- }
-
- def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = {
- context.watch(subscriber)
- val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection))
-
- if (permanent) {
- permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine {
- permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
- })
- } else {
- oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine {
- oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
- })
- }
- }
-
- def unsubscribe(subscriber: ActorRef): Unit = {
- if (permanentSubscriptions.contains(subscriber))
- permanentSubscriptions = permanentSubscriptions - subscriber
-
- if (oneShotSubscriptions.contains(subscriber))
- oneShotSubscriptions = oneShotSubscriptions - subscriber
- }
-
- def flush(): Unit = {
- val currentTick = System.currentTimeMillis()
- val snapshots = collectAll()
-
- dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots)
- dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots)
-
- lastTick = currentTick
- oneShotSubscriptions = Map.empty
- }
-
- def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = {
- val allMetrics = Kamon(Metrics).storage
- val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot]
-
- allMetrics.foreach {
- case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
- }
-
- builder.result()
- }
-
- def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[ActorRef, MetricSelectionFilter],
- snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
-
- for ((subscriber, filter) ← subscriptions) {
- val selection = snapshots.filter(group ⇒ filter.accept(group._1))
- val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
-
- subscriber ! tickMetrics
- }
- }
-
- def scheduleFlushMessage(): Cancellable = {
- val config = context.system.settings.config
- val tickInterval = Duration(config.getMilliseconds("kamon.metrics.tick-interval"), TimeUnit.MILLISECONDS)
- context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
- }
-}
-
-object Subscriptions {
- case object FlushMetrics
- case class Unsubscribe(subscriber: ActorRef)
- case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false)
- case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
-
- trait MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean
- }
-
- object MetricSelectionFilter {
- val empty = new MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean = false
- }
-
- implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) {
- def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity)
- }
- }
- }
-
- case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean = {
- category.equals(identity.category) && globFilter.accept(identity.name)
- }
- }
-}
-
-class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
- val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
- val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext
-
- def receive = empty
-
- def empty: Actor.Receive = {
- case tick: TickMetricSnapshot ⇒ context become (buffering(tick))
- case FlushBuffer ⇒ // Nothing to flush.
- }
-
- def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
- case TickMetricSnapshot(_, to, tickMetrics) ⇒
- val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup)
- val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
-
- context become (buffering(combinedSnapshot))
-
- case FlushBuffer ⇒
- receiver ! buffered
- context become (empty)
-
- }
-
- override def postStop(): Unit = {
- flushSchedule.cancel()
- super.postStop()
- }
-
- def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext)))
-}
-
-object TickMetricSnapshotBuffer {
- case object FlushBuffer
-
- def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
- Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
new file mode 100644
index 00000000..68b545a5
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
@@ -0,0 +1,116 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor._
+import kamon.metric.SubscriptionsDispatcher._
+import kamon.util.{ MilliTimestamp, GlobPathFilter }
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers.
+ */
+private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl) extends Actor {
+ var lastTick = MilliTimestamp.now
+ var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher)
+ val collectionContext = metricsExtension.buildDefaultCollectionContext
+
+ def receive = {
+ case Tick ⇒ processTick()
+ case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently)
+ case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
+ case Terminated(subscriber) ⇒ unsubscribe(subscriber)
+ }
+
+ def processTick(): Unit =
+ dispatch(metricsExtension.collectSnapshots(collectionContext))
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = {
+ def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] =
+ storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter))
+
+ context.watch(subscriber)
+
+ if (permanent)
+ permanentSubscriptions = addSubscription(permanentSubscriptions)
+ else
+ oneShotSubscriptions = addSubscription(oneShotSubscriptions)
+ }
+
+ def unsubscribe(subscriber: ActorRef): Unit = {
+ permanentSubscriptions = permanentSubscriptions - subscriber
+ oneShotSubscriptions = oneShotSubscriptions - subscriber
+ }
+
+ def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = {
+ val currentTick = MilliTimestamp.now
+
+ dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots)
+ dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots)
+
+ lastTick = currentTick
+ oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ }
+
+ def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter],
+ snapshots: Map[Entity, EntitySnapshot]): Unit = {
+
+ for ((subscriber, filter) ← subscriptions) {
+ val selection = snapshots.filter(group ⇒ filter.accept(group._1))
+ val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
+
+ subscriber ! tickMetrics
+ }
+ }
+}
+
+object SubscriptionsDispatcher {
+ def props(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl): Props =
+ Props(new SubscriptionsDispatcher(interval, metricsExtension))
+
+ case object Tick
+ case class Unsubscribe(subscriber: ActorRef)
+ case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false)
+ case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot])
+
+}
+
+trait SubscriptionFilter { self ⇒
+
+ def accept(entity: Entity): Boolean
+
+ final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter {
+ override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity)
+ }
+}
+
+object SubscriptionFilter {
+ val Empty = new SubscriptionFilter {
+ def accept(entity: Entity): Boolean = false
+ }
+
+ def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter {
+ val categoryPattern = new GlobPathFilter(category)
+ val namePattern = new GlobPathFilter(name)
+
+ def accept(entity: Entity): Boolean = {
+ categoryPattern.accept(entity.category) && namePattern.accept(entity.name)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
new file mode 100644
index 00000000..dfc5d5f0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
@@ -0,0 +1,65 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.{ Props, Actor, ActorRef }
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
+import kamon.metric.instrument.CollectionContext
+import kamon.util.MapMerge
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
+ import MapMerge.Syntax
+
+ val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
+ val collectionContext: CollectionContext = Kamon.metrics.buildDefaultCollectionContext
+
+ def receive = empty
+
+ def empty: Actor.Receive = {
+ case tick: TickMetricSnapshot ⇒ context become (buffering(tick))
+ case FlushBuffer ⇒ // Nothing to flush.
+ }
+
+ def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
+ case TickMetricSnapshot(_, to, tickMetrics) ⇒
+ val combinedMetrics = buffered.metrics.merge(tickMetrics, (l, r) ⇒ l.merge(r, collectionContext))
+ val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
+
+ context become (buffering(combinedSnapshot))
+
+ case FlushBuffer ⇒
+ receiver ! buffered
+ context become (empty)
+
+ }
+
+ override def postStop(): Unit = {
+ flushSchedule.cancel()
+ super.postStop()
+ }
+}
+
+object TickMetricSnapshotBuffer {
+ case object FlushBuffer
+
+ def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
+ Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
index eaad6e0d..3da9c1d4 100644
--- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -16,67 +16,29 @@
package kamon.metric
-import akka.actor.ActorSystem
-import kamon.metric.instrument.{ Histogram }
+import kamon.metric.instrument.{ Time, InstrumentFactory, Histogram }
-import scala.collection.concurrent.TrieMap
-import com.typesafe.config.Config
+class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ import TraceMetrics.segmentKey
-case class TraceMetrics(name: String) extends MetricGroupIdentity {
- val category = TraceMetrics
-}
-
-object TraceMetrics extends MetricGroupCategory {
- import Metrics.AtomicGetOrElseUpdateForTriemap
-
- val name = "trace"
-
- case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
-
- case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram)
- extends MetricGroupRecorder {
-
- val segments = TrieMap[MetricIdentity, Histogram]()
-
- def segmentRecorder(segmentIdentity: MetricIdentity): Histogram =
- segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
-
- def collect(context: CollectionContext): TraceMetricsSnapshot =
- TraceMetricsSnapshot(
- elapsedTime.collect(context),
- segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap)
+ /**
+ * Records blah blah
+ */
+ val ElapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds)
- def cleanup: Unit = {}
- }
-
- case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot])
- extends MetricGroupSnapshot {
-
- type GroupSnapshotType = TraceMetricsSnapshot
-
- def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot =
- TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) ⇒ l.merge(r, context)))
-
- def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime)
- }
-
- val Factory = TraceMetricGroupFactory
+ /**
+ * Records Blah Blah.
+ *
+ */
+ def segment(name: String, category: String, library: String): Histogram =
+ histogram(segmentKey(name, category, library))
}
-case object TraceMetricGroupFactory extends MetricGroupFactory {
-
- import TraceMetrics._
-
- type GroupRecorder = TraceMetricRecorder
-
- def create(config: Config, system: ActorSystem): TraceMetricRecorder = {
- val settings = config.getConfig("precision.trace")
- val elapsedTimeConfig = settings.getConfig("elapsed-time")
- val segmentConfig = settings.getConfig("segment")
+object TraceMetrics extends EntityRecorderFactory[TraceMetrics] {
+ def category: String = "trace"
+ def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory)
- new TraceMetricRecorder(
- Histogram.fromConfig(elapsedTimeConfig, Scale.Nano),
- () ⇒ Histogram.fromConfig(segmentConfig, Scale.Nano))
- }
+ def segmentKey(name: String, category: String, library: String): HistogramKey =
+ HistogramKey(name, Time.Nanoseconds, Map("category" -> category, "library" -> library))
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
index b7ac1ac5..e0818292 100644
--- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -1,189 +1,204 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.metric
-import akka.actor
-import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
-import kamon.Kamon
-import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.metric.instrument._
import scala.concurrent.duration.FiniteDuration
-class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- import Metrics.AtomicGetOrElseUpdateForTriemap
- import UserMetrics._
+trait UserMetricsExtension {
+ def histogram(name: String): Histogram
+ def histogram(name: String, dynamicRange: DynamicRange): Histogram
+ def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram
+ def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram
+ def histogram(key: HistogramKey): Histogram
+ def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram
+ def removeHistogram(name: String): Unit
+ def removeHistogram(key: HistogramKey): Unit
+
+ def minMaxCounter(name: String): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter
+ def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter
+ def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter
+ def removeMinMaxCounter(name: String): Unit
+ def removeMinMaxCounter(key: MinMaxCounterKey): Unit
+
+ def gauge(name: String, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def removeGauge(name: String): Unit
+ def removeGauge(key: GaugeKey): Unit
+
+ def counter(name: String): Counter
+ def counter(key: CounterKey): Counter
+ def removeCounter(name: String): Unit
+ def removeCounter(key: CounterKey): Unit
- lazy val metricsExtension = Kamon(Metrics)(system)
- val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision")
+}
- val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision")
- val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision")
- val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision")
+private[kamon] class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension {
+ override def histogram(name: String): Histogram =
+ super.histogram(name)
- def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), {
- UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit))
- }).asInstanceOf[UserHistogramRecorder].histogram
- }
+ override def histogram(name: String, dynamicRange: DynamicRange): Histogram =
+ super.histogram(name, dynamicRange)
- def registerHistogram(name: String): Histogram = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), {
- UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig))
- }).asInstanceOf[UserHistogramRecorder].histogram
- }
+ override def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ super.histogram(name, unitOfMeasurement)
- def registerCounter(name: String): Counter = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), {
- UserCounterRecorder(Counter())
- }).asInstanceOf[UserCounterRecorder].counter
- }
+ override def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ super.histogram(name, dynamicRange, unitOfMeasurement)
- def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration): MinMaxCounter = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), {
- UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system))
- }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter
- }
+ override def histogram(key: HistogramKey): Histogram =
+ super.histogram(key)
- def registerMinMaxCounter(name: String): MinMaxCounter = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), {
- UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system))
- }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter
- }
+ override def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram =
+ super.histogram(key, dynamicRange)
- def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), {
- UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector))
- }).asInstanceOf[UserGaugeRecorder].gauge
- }
+ override def removeHistogram(name: String): Unit =
+ super.removeHistogram(name)
- def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), {
- UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector))
- }).asInstanceOf[UserGaugeRecorder].gauge
- }
+ override def removeHistogram(key: HistogramKey): Unit =
+ super.removeHistogram(key)
- def removeHistogram(name: String): Unit =
- metricsExtension.unregister(UserHistogram(name))
+ override def minMaxCounter(name: String): MinMaxCounter =
+ super.minMaxCounter(name)
- def removeCounter(name: String): Unit =
- metricsExtension.unregister(UserCounter(name))
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange)
- def removeMinMaxCounter(name: String): Unit =
- metricsExtension.unregister(UserMinMaxCounter(name))
+ override def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(name, refreshInterval)
- def removeGauge(name: String): Unit =
- metricsExtension.unregister(UserGauge(name))
-}
+ override def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, unitOfMeasurement)
-object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange, refreshInterval)
- def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system)
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange, unitOfMeasurement)
- sealed trait UserMetricGroup
- //
- // Histograms
- //
+ override def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, refreshInterval, unitOfMeasurement)
- case class UserHistogram(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserHistograms
- }
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange, refreshInterval, unitOfMeasurement)
- case class UserHistogramRecorder(histogram: Histogram) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserHistogramSnapshot(histogram.collect(context))
+ override def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter =
+ super.minMaxCounter(key)
- def cleanup: Unit = histogram.cleanup
- }
+ override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter =
+ super.minMaxCounter(key, dynamicRange)
- case class UserHistogramSnapshot(histogramSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserHistogramSnapshot
+ override def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(key, refreshInterval)
- def merge(that: UserHistogramSnapshot, context: CollectionContext): UserHistogramSnapshot =
- UserHistogramSnapshot(that.histogramSnapshot.merge(histogramSnapshot, context))
+ override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(key, dynamicRange, refreshInterval)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, histogramSnapshot))
- }
+ override def removeMinMaxCounter(name: String): Unit =
+ super.removeMinMaxCounter(name)
- //
- // Counters
- //
+ override def removeMinMaxCounter(key: MinMaxCounterKey): Unit =
+ super.removeMinMaxCounter(key)
- case class UserCounter(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserCounters
- }
+ override def gauge(name: String, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, valueCollector)
- case class UserCounterRecorder(counter: Counter) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserCounterSnapshot(counter.collect(context))
+ override def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, valueCollector)
- def cleanup: Unit = counter.cleanup
- }
+ override def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, refreshInterval, valueCollector)
- case class UserCounterSnapshot(counterSnapshot: Counter.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserCounterSnapshot
+ override def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, unitOfMeasurement, valueCollector)
- def merge(that: UserCounterSnapshot, context: CollectionContext): UserCounterSnapshot =
- UserCounterSnapshot(that.counterSnapshot.merge(counterSnapshot, context))
+ override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, refreshInterval, valueCollector)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((Count, counterSnapshot))
- }
+ override def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, unitOfMeasurement, valueCollector)
- //
- // MinMaxCounters
- //
+ override def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, refreshInterval, unitOfMeasurement, valueCollector)
- case class UserMinMaxCounter(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserMinMaxCounters
- }
+ override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, refreshInterval, unitOfMeasurement, valueCollector)
- case class UserMinMaxCounterRecorder(minMaxCounter: MinMaxCounter) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserMinMaxCounterSnapshot(minMaxCounter.collect(context))
+ override def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, valueCollector)
- def cleanup: Unit = minMaxCounter.cleanup
- }
+ override def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, dynamicRange, valueCollector)
- case class UserMinMaxCounterSnapshot(minMaxCounterSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserMinMaxCounterSnapshot
+ override def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, refreshInterval, valueCollector)
- def merge(that: UserMinMaxCounterSnapshot, context: CollectionContext): UserMinMaxCounterSnapshot =
- UserMinMaxCounterSnapshot(that.minMaxCounterSnapshot.merge(minMaxCounterSnapshot, context))
+ override def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, dynamicRange, refreshInterval, valueCollector)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, minMaxCounterSnapshot))
- }
+ override def removeGauge(name: String): Unit =
+ super.removeGauge(name)
- //
- // Gauges
- //
+ override def removeGauge(key: GaugeKey): Unit =
+ super.removeGauge(key)
- case class UserGauge(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserGauges
- }
+ override def counter(name: String): Counter =
+ super.counter(name)
- case class UserGaugeRecorder(gauge: Gauge) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserGaugeSnapshot(gauge.collect(context))
+ override def counter(key: CounterKey): Counter =
+ super.counter(key)
- def cleanup: Unit = gauge.cleanup
- }
+ override def removeCounter(name: String): Unit =
+ super.removeCounter(name)
- case class UserGaugeSnapshot(gaugeSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserGaugeSnapshot
-
- def merge(that: UserGaugeSnapshot, context: CollectionContext): UserGaugeSnapshot =
- UserGaugeSnapshot(that.gaugeSnapshot.merge(gaugeSnapshot, context))
-
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, gaugeSnapshot))
- }
+ override def removeCounter(key: CounterKey): Unit =
+ super.removeCounter(key)
+}
- case object UserHistograms extends MetricGroupCategory { val name: String = "histogram" }
- case object UserCounters extends MetricGroupCategory { val name: String = "counter" }
- case object UserMinMaxCounters extends MetricGroupCategory { val name: String = "min-max-counter" }
- case object UserGauges extends MetricGroupCategory { val name: String = "gauge" }
+private[kamon] object UserMetricsExtensionImpl {
+ val UserMetricEntity = Entity("user-metric", "user-metric")
- case object RecordedValues extends MetricIdentity { val name: String = "values" }
- case object Count extends MetricIdentity { val name: String = "count" }
+ def apply(metricsExtension: MetricsExtension): UserMetricsExtensionImpl = {
+ val instrumentFactory = metricsExtension.instrumentFactory(UserMetricEntity.category)
+ val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory)
-}
+ metricsExtension.register(UserMetricEntity, userMetricsExtension).recorder
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala
index e79090a8..e79090a8 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
index 0f29ba6f..c1b69cbe 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
@@ -17,9 +17,8 @@
package kamon.metric.instrument
import kamon.jsr166.LongAdder
-import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder }
-trait Counter extends MetricRecorder {
+trait Counter extends Instrument {
type SnapshotType = Counter.Snapshot
def increment(): Unit
@@ -29,12 +28,11 @@ trait Counter extends MetricRecorder {
object Counter {
def apply(): Counter = new LongAdderCounter
+ def create(): Counter = apply()
- trait Snapshot extends MetricSnapshot {
- type SnapshotType = Counter.Snapshot
-
+ trait Snapshot extends InstrumentSnapshot {
def count: Long
- def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot
}
}
@@ -55,5 +53,8 @@ class LongAdderCounter extends Counter {
}
case class CounterSnapshot(count: Long) extends Counter.Snapshot {
- def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count)
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match {
+ case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount)
+ case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.")
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
index 0c1815c3..80214510 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
@@ -1,70 +1,102 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.metric.instrument
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference }
-import akka.actor.{ Cancellable, ActorSystem }
-import com.typesafe.config.Config
-import kamon.metric.{ CollectionContext, Scale, MetricRecorder }
+import akka.actor.Cancellable
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
import scala.concurrent.duration.FiniteDuration
-trait Gauge extends MetricRecorder {
+trait Gauge extends Instrument {
type SnapshotType = Histogram.Snapshot
- def record(value: Long)
- def record(value: Long, count: Long)
+ def record(value: Long): Unit
+ def record(value: Long, count: Long): Unit
+ def refreshValue(): Unit
}
object Gauge {
- trait CurrentValueCollector {
- def currentValue: Long
- }
-
- def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration,
- system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
-
- val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
- val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector)
-
- val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = {
+ val underlyingHistogram = Histogram(dynamicRange)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector)
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
gauge.refreshValue()
- }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ })
- gauge.refreshValuesSchedule.set(refreshValuesSchedule)
+ gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule)
gauge
}
- def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge =
- fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction))
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge =
+ apply(dynamicRange, refreshInterval, scheduler, valueCollector)
- def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = {
- val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision")
- fromConfig(config, system)(currentValueCollector)
+ trait CurrentValueCollector {
+ def currentValue: Long
}
- def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = {
- import scala.concurrent.duration._
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+}
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
- val refreshInterval = config.getMilliseconds("refresh-interval").toInt
+/**
+ * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between
+ * the current observed value and the previously observed value. Should only be used if the observed value is always
+ * increasing or staying steady, but is never able to decrease.
+ *
+ * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between
+ * the current value and the last value will be returned.
+ */
+class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector {
+ @volatile private var _readAtLeastOnce = false
+ private val _lastObservedValue = new AtomicLong(0)
+
+ def currentValue: Long = {
+ if (_readAtLeastOnce) {
+ val wrappedCurrent = wrappedValueCollector.currentValue
+ val diff = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent)
+
+ if (diff >= 0) diff else 0L
+
+ } else {
+ _lastObservedValue.set(wrappedValueCollector.currentValue)
+ _readAtLeastOnce = true
+ 0L
+ }
- Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector)
}
+}
- def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
- fromConfig(config, system, Scale.Unit)(currentValueCollector)
- }
+object DifferentialValueCollector {
+ def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector =
+ new DifferentialValueCollector(wrappedValueCollector)
- implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
- def currentValue: Long = f.apply()
- }
+ def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector =
+ new DifferentialValueCollector(new CurrentValueCollector {
+ def currentValue: Long = wrappedValueCollector
+ })
}
class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
- val refreshValuesSchedule = new AtomicReference[Cancellable]()
+ private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]()
def record(value: Long): Unit = underlyingHistogram.record(value)
@@ -73,10 +105,12 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector
def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
def cleanup: Unit = {
- if (refreshValuesSchedule.get() != null)
- refreshValuesSchedule.get().cancel()
+ if (automaticValueCollectorSchedule.get() != null)
+ automaticValueCollectorSchedule.get().cancel()
}
- def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue)
+ def refreshValue(): Unit =
+ underlyingHistogram.record(currentValueCollector.currentValue)
+
}
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
index bed75fc8..5c4c7f71 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -17,12 +17,11 @@
package kamon.metric.instrument
import java.nio.LongBuffer
-import com.typesafe.config.Config
import org.HdrHistogram.AtomicHistogramFieldsAccessor
+import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange }
import org.HdrHistogram.AtomicHistogram
-import kamon.metric._
-trait Histogram extends MetricRecorder {
+trait Histogram extends Instrument {
type SnapshotType = Histogram.Snapshot
def record(value: Long)
@@ -31,30 +30,40 @@ trait Histogram extends MetricRecorder {
object Histogram {
- def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram =
- new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale)
-
- def fromConfig(config: Config): Histogram = {
- fromConfig(config, Scale.Unit)
- }
-
- def fromConfig(config: Config, scale: Scale): Histogram = {
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
-
- new HdrHistogram(1L, highest, significantDigits, scale)
- }
-
- object HighestTrackableValue {
- val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L
- }
-
- case class Precision(significantDigits: Int)
- object Precision {
- val Low = Precision(1)
- val Normal = Precision(2)
- val Fine = Precision(3)
- }
+ /**
+ * Scala API:
+ *
+ * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given
+ * [[kamon.metric.instrument.Histogram.DynamicRange]].
+ */
+ def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange)
+
+ /**
+ * Java API:
+ *
+ * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given
+ * [[kamon.metric.instrument.Histogram.DynamicRange]].
+ */
+ def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange)
+
+ /**
+ * DynamicRange is a configuration object used to supply range and precision configuration to a
+ * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]]
+ * for more details on how it works and the effects of these configuration values.
+ *
+ * @param lowestDiscernibleValue
+ * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that
+ * is >= 1. May be internally rounded down to nearest power of 2.
+ *
+ * @param highestTrackableValue
+ * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue).
+ * Must not be larger than (Long.MAX_VALUE/2).
+ *
+ * @param precision
+ * The number of significant decimal digits to which the histogram will maintain value resolution and separation.
+ * Must be a non-negative integer between 1 and 3.
+ */
+ case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int)
trait Record {
def level: Long
@@ -67,29 +76,28 @@ object Histogram {
var rawCompactRecord: Long = 0L
}
- trait Snapshot extends MetricSnapshot {
- type SnapshotType = Histogram.Snapshot
+ trait Snapshot extends InstrumentSnapshot {
def isEmpty: Boolean = numberOfMeasurements == 0
- def scale: Scale
def numberOfMeasurements: Long
def min: Long
def max: Long
def sum: Long
def percentile(percentile: Double): Long
def recordsIterator: Iterator[Record]
+ def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
}
object Snapshot {
- def empty(targetScale: Scale) = new Snapshot {
+ val empty = new Snapshot {
override def min: Long = 0L
override def max: Long = 0L
override def sum: Long = 0L
override def percentile(percentile: Double): Long = 0L
override def recordsIterator: Iterator[Record] = Iterator.empty
- override def merge(that: Snapshot, context: CollectionContext): Snapshot = that
- override def scale: Scale = targetScale
+ override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that
+ override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that
override def numberOfMeasurements: Long = 0L
}
}
@@ -100,10 +108,8 @@ object Histogram {
* The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
* leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
*/
-class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit)
- extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits)
- with Histogram with AtomicHistogramFieldsAccessor {
-
+class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue,
+ dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor {
import AtomicHistogramFieldsAccessor.totalCountUpdater
def record(value: Long): Unit = recordValue(value)
@@ -119,7 +125,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign
val measurementsArray = Array.ofDim[Long](buffer.limit())
buffer.get(measurementsArray, 0, measurementsArray.length)
- new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
+ new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
}
def getCounts = countsArray().length()
@@ -160,7 +166,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign
}
-case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
+case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot {
def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0))
@@ -182,53 +188,61 @@ case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long,
percentileLevel
}
- def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = {
- if (that.isEmpty) this else if (this.isEmpty) that else {
- import context.buffer
- buffer.clear()
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot =
+ merge(that.asInstanceOf[InstrumentSnapshot], context)
- val selfIterator = recordsIterator
- val thatIterator = that.recordsIterator
- var thatCurrentRecord: Histogram.Record = null
- var mergedNumberOfMeasurements = 0L
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match {
+ case thatSnapshot: CompactHdrSnapshot ⇒
+ if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else {
+ import context.buffer
+ buffer.clear()
- def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
- def addToBuffer(compactRecord: Long): Unit = {
- mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
- buffer.put(compactRecord)
- }
+ val selfIterator = recordsIterator
+ val thatIterator = thatSnapshot.recordsIterator
+ var thatCurrentRecord: Histogram.Record = null
+ var mergedNumberOfMeasurements = 0L
- while (selfIterator.hasNext) {
- val selfCurrentRecord = selfIterator.next()
+ def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
+ def addToBuffer(compactRecord: Long): Unit = {
+ mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
+ buffer.put(compactRecord)
+ }
- // Advance that to no further than the level of selfCurrentRecord
- thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
- while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
- addToBuffer(thatCurrentRecord.rawCompactRecord)
- thatCurrentRecord = nextOrNull(thatIterator)
+ while (selfIterator.hasNext) {
+ val selfCurrentRecord = selfIterator.next()
+
+ // Advance that to no further than the level of selfCurrentRecord
+ thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
+ while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
+ addToBuffer(thatCurrentRecord.rawCompactRecord)
+ thatCurrentRecord = nextOrNull(thatIterator)
+ }
+
+ // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
+ if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
+ addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
+ thatCurrentRecord = nextOrNull(thatIterator)
+ } else {
+ addToBuffer(selfCurrentRecord.rawCompactRecord)
+ }
}
- // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
- if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
- addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
- thatCurrentRecord = nextOrNull(thatIterator)
- } else {
- addToBuffer(selfCurrentRecord.rawCompactRecord)
+ // Include everything that might have been left from that
+ if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
+ while (thatIterator.hasNext) {
+ addToBuffer(thatIterator.next().rawCompactRecord)
}
- }
- // Include everything that might have been left from that
- if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
- while (thatIterator.hasNext) {
- addToBuffer(thatIterator.next().rawCompactRecord)
+ buffer.flip()
+ val compactRecords = Array.ofDim[Long](buffer.limit())
+ buffer.get(compactRecords)
+
+ new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
}
- buffer.flip()
- val compactRecords = Array.ofDim[Long](buffer.limit())
- buffer.get(compactRecords)
+ case other ⇒
+ sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.")
- new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
- }
}
@inline private def mergeCompactRecords(left: Long, right: Long): Long = {
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
index 3761f5a5..59b4b443 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -14,23 +14,31 @@
* =========================================================================================
*/
-package kamon.metric
+package kamon.metric.instrument
-import java.nio.{ LongBuffer }
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
+import java.nio.LongBuffer
-trait MetricGroupCategory {
- def name: String
+import akka.actor.{ Scheduler, Cancellable }
+import akka.dispatch.MessageDispatcher
+import scala.concurrent.duration.FiniteDuration
+
+private[kamon] trait Instrument {
+ type SnapshotType <: InstrumentSnapshot
+
+ def collect(context: CollectionContext): SnapshotType
+ def cleanup: Unit
}
-trait MetricGroupIdentity {
- def name: String
- def category: MetricGroupCategory
+trait InstrumentSnapshot {
+ def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
}
-trait MetricIdentity {
- def name: String
+class InstrumentType private[kamon] (val id: Int) extends AnyVal
+object InstrumentTypes {
+ val Histogram = new InstrumentType(1)
+ val MinMaxCounter = new InstrumentType(2)
+ val Gauge = new InstrumentType(3)
+ val Counter = new InstrumentType(4)
}
trait CollectionContext {
@@ -43,33 +51,3 @@ object CollectionContext {
}
}
-trait MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot
- def cleanup: Unit
-}
-
-trait MetricSnapshot {
- type SnapshotType
-
- def merge(that: SnapshotType, context: CollectionContext): SnapshotType
-}
-
-trait MetricGroupSnapshot {
- type GroupSnapshotType
-
- def metrics: Map[MetricIdentity, MetricSnapshot]
- def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType
-}
-
-private[kamon] trait MetricRecorder {
- type SnapshotType <: MetricSnapshot
-
- def collect(context: CollectionContext): SnapshotType
- def cleanup: Unit
-}
-
-trait MetricGroupFactory {
- type GroupRecorder <: MetricGroupRecorder
- def create(config: Config, system: ActorSystem): GroupRecorder
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
new file mode 100644
index 00000000..7c0201f7
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
@@ -0,0 +1,51 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+
+import scala.concurrent.duration.FiniteDuration
+
+case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) {
+
+ private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = {
+ configurations.get(instrumentName).flatMap { customSettings ⇒
+ codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default)))
+
+ } getOrElse (codeSettings.getOrElse(default))
+ }
+
+ def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = {
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram)
+ Histogram(settings.dynamicRange)
+ }
+
+ def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = {
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter)
+ MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler)
+ }
+
+ def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None,
+ valueCollector: CurrentValueCollector): Gauge = {
+
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge)
+ Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector)
+ }
+
+ def createCounter(): Counter = Counter()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
new file mode 100644
index 00000000..29f8f46b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
@@ -0,0 +1,65 @@
+package kamon.metric.instrument
+
+import java.util.concurrent.TimeUnit
+
+import com.typesafe.config.Config
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.util.ConfigTools.Syntax
+
+import scala.concurrent.duration.FiniteDuration
+
+case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long],
+ precision: Option[Int], refreshInterval: Option[FiniteDuration]) {
+
+ def combine(that: InstrumentSettings): InstrumentSettings =
+ InstrumentSettings(
+ DynamicRange(
+ lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue),
+ highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue),
+ precision.getOrElse(that.dynamicRange.precision)),
+ refreshInterval.orElse(that.refreshInterval))
+}
+
+object InstrumentCustomSettings {
+
+ def fromConfig(config: Config): InstrumentCustomSettings =
+ InstrumentCustomSettings(
+ if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None,
+ if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None,
+ if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None,
+ if (config.hasPath("refresh-interval")) Some(config.getFiniteDuration("refresh-interval")) else None)
+
+}
+
+case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration])
+
+object InstrumentSettings {
+
+ def readDynamicRange(config: Config): DynamicRange =
+ DynamicRange(
+ config.getLong("lowest-discernible-value"),
+ config.getLong("highest-trackable-value"),
+ parsePrecision(config.getString("precision")))
+
+ def parsePrecision(stringValue: String): Int = stringValue match {
+ case "low" ⇒ 1
+ case "normal" ⇒ 2
+ case "fine" ⇒ 3
+ case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].")
+ }
+}
+
+case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings)
+
+object DefaultInstrumentSettings {
+
+ def fromConfig(config: Config): DefaultInstrumentSettings = {
+ val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None)
+ val minMaxCounterSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")),
+ Some(config.getFiniteDuration("min-max-counter.refresh-interval")))
+ val gaugeSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("gauge")),
+ Some(config.getFiniteDuration("gauge.refresh-interval")))
+
+ DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings)
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
index 61cee02a..0828c8a9 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
@@ -17,16 +17,14 @@ package kamon.metric.instrument
*/
import java.lang.Math.abs
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
-import akka.actor.{ ActorSystem, Cancellable }
-import com.typesafe.config.Config
+import akka.actor.Cancellable
import kamon.jsr166.LongMaxUpdater
-import kamon.metric.{ Scale, MetricRecorder, CollectionContext }
+import kamon.metric.instrument.Histogram.DynamicRange
import kamon.util.PaddedAtomicLong
import scala.concurrent.duration.FiniteDuration
-trait MinMaxCounter extends MetricRecorder {
+trait MinMaxCounter extends Instrument {
override type SnapshotType = Histogram.Snapshot
def increment(): Unit
@@ -38,29 +36,20 @@ trait MinMaxCounter extends MetricRecorder {
object MinMaxCounter {
- def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration,
- system: ActorSystem): MinMaxCounter = {
-
- val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = {
+ val underlyingHistogram = Histogram(dynamicRange)
val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram)
-
- val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
minMaxCounter.refreshValues()
- }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ })
minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule)
minMaxCounter
}
- def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = {
- import scala.concurrent.duration._
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter =
+ apply(dynamicRange, refreshInterval, scheduler)
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
- val refreshInterval = config.getMilliseconds("refresh-interval").toInt
-
- apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system)
- }
}
class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter {
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
new file mode 100644
index 00000000..adb08713
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
@@ -0,0 +1,99 @@
+package kamon.metric.instrument
+
+import akka.actor.{ Scheduler, Cancellable }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+trait RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
+}
+
+/**
+ * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run
+ * in the provided ExecutionContext.
+ */
+class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
+ scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
+}
+
+object DefaultRefreshScheduler {
+ def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ new DefaultRefreshScheduler(scheduler, dispatcher)
+
+ def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ apply(scheduler, dispatcher)
+}
+
+/**
+ * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh
+ * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed
+ * scheduler.
+ */
+class LazyRefreshScheduler extends RefreshScheduler {
+ private val _schedulerPhaser = new WriterReaderPhaser
+ private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]()
+ @volatile private var _target: Option[RefreshScheduler] = None
+
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = {
+ val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map { scheduler ⇒
+ scheduler.schedule(interval, refresh)
+
+ } getOrElse {
+ val entry = (interval, refresh)
+ val cancellable = new RepointableCancellable(entry)
+
+ _backlog.put(entry, cancellable)
+ cancellable
+ }
+
+ } finally {
+ _schedulerPhaser.writerCriticalSectionExit(criticalEnter)
+ }
+ }
+
+ def point(target: RefreshScheduler): Unit = try {
+ _schedulerPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _schedulerPhaser.flipPhase(10000L)
+ _backlog.dropWhile {
+ case ((interval, refresh), repointableCancellable) ⇒
+ repointableCancellable.point(target.schedule(interval, refresh))
+ true
+ }
+ } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.")
+ } finally { _schedulerPhaser.readerUnlock() }
+
+ class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable {
+ private var _isCancelled = false
+ private var _cancellable: Option[Cancellable] = None
+
+ def isCancelled: Boolean = synchronized {
+ _cancellable.map(_.isCancelled).getOrElse(_isCancelled)
+ }
+
+ def cancel(): Boolean = synchronized {
+ _isCancelled = true
+ _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty)
+ }
+
+ def point(cancellable: Cancellable): Unit = synchronized {
+ if (_cancellable.isEmpty) {
+ _cancellable = Some(cancellable)
+
+ if (_isCancelled)
+ cancellable.cancel()
+
+ } else sys.error("A RepointableCancellable cannot be pointed more than once.")
+
+ }
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
new file mode 100644
index 00000000..f2a061d1
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
@@ -0,0 +1,71 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+trait UnitOfMeasurement {
+ def name: String
+ def label: String
+ def factor: Double
+}
+
+object UnitOfMeasurement {
+ case object Unknown extends UnitOfMeasurement {
+ val name = "unknown"
+ val label = "unknown"
+ val factor = 1D
+ }
+
+ def isUnknown(uom: UnitOfMeasurement): Boolean =
+ uom == Unknown
+
+ def isTime(uom: UnitOfMeasurement): Boolean =
+ uom.isInstanceOf[Time]
+
+}
+
+case class Time(factor: Double, label: String) extends UnitOfMeasurement {
+ val name = "time"
+
+ /**
+ * Scale a value from this scale factor to a different scale factor.
+ *
+ * @param toUnit Time unit of the expected result.
+ * @param value Value to scale.
+ * @return Equivalent of value on the target time unit.
+ */
+ def scale(toUnit: Time)(value: Long): Double =
+ (value * factor) / toUnit.factor
+}
+
+object Time {
+ val Nanoseconds = Time(1E-9, "n")
+ val Microseconds = Time(1E-6, "µs")
+ val Milliseconds = Time(1E-3, "ms")
+ val Seconds = Time(1, "s")
+}
+
+case class Memory(factor: Double, label: String) extends UnitOfMeasurement {
+ val name = "bytes"
+}
+
+object Memory {
+ val Bytes = Memory(1, "b")
+ val KiloBytes = Memory(1024, "Kb")
+ val MegaBytes = Memory(1024E2, "Mb")
+ val GigaBytes = Memory(1024E3, "Gb")
+}
+
diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala
deleted file mode 100644
index 490bc127..00000000
--- a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-package kamon.standalone
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.Kamon
-import kamon.metric.UserMetrics
-import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
-
-import scala.concurrent.duration.FiniteDuration
-
-trait KamonStandalone {
- private[kamon] def system: ActorSystem
-
- def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram =
- Kamon(UserMetrics)(system).registerHistogram(name, precision, highestTrackableValue)
-
- def registerHistogram(name: String): Histogram =
- Kamon(UserMetrics)(system).registerHistogram(name)
-
- def registerCounter(name: String): Counter =
- Kamon(UserMetrics)(system).registerCounter(name)
-
- def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration): MinMaxCounter =
- Kamon(UserMetrics)(system).registerMinMaxCounter(name, precision, highestTrackableValue, refreshInterval)
-
- def registerMinMaxCounter(name: String): MinMaxCounter =
- Kamon(UserMetrics)(system).registerMinMaxCounter(name)
-
- def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
- Kamon(UserMetrics)(system).registerGauge(name)(currentValueCollector)
-
- def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
- Kamon(UserMetrics)(system).registerGauge(name, precision, highestTrackableValue, refreshInterval)(currentValueCollector)
-
- def removeHistogram(name: String): Unit =
- Kamon(UserMetrics)(system).removeHistogram(name)
-
- def removeCounter(name: String): Unit =
- Kamon(UserMetrics)(system).removeCounter(name)
-
- def removeMinMaxCounter(name: String): Unit =
- Kamon(UserMetrics)(system).removeMinMaxCounter(name)
-
- def removeGauge(name: String): Unit =
- Kamon(UserMetrics)(system).removeGauge(name)
-}
-
-object KamonStandalone {
-
- def buildFromConfig(config: Config): KamonStandalone = buildFromConfig(config, "kamon-standalone")
-
- def buildFromConfig(config: Config, actorSystemName: String): KamonStandalone = new KamonStandalone {
- val system: ActorSystem = ActorSystem(actorSystemName, config)
- }
-}
-
-object EmbeddedKamonStandalone extends KamonStandalone {
- private[kamon] lazy val system = ActorSystem("kamon-standalone")
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala
index 2f27c1a3..0df9539f 100644
--- a/kamon-core/src/main/scala/kamon/metric/Scale.scala
+++ b/kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -14,18 +14,18 @@
* =========================================================================================
*/
-package kamon.metric
+package kamon.supervisor
-class Scale(val numericValue: Double) extends AnyVal
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
-object Scale {
- val Nano = new Scale(1E-9)
- val Micro = new Scale(1E-6)
- val Milli = new Scale(1E-3)
- val Unit = new Scale(1)
- val Kilo = new Scale(1E3)
- val Mega = new Scale(1E6)
- val Giga = new Scale(1E9)
+@Aspect
+class AspectJPresent {
+
+ @Pointcut("execution(* kamon.supervisor.KamonSupervisor.isAspectJPresent())")
+ def isAspectJPresentAtModuleSupervisor(): Unit = {}
+
+ @Around("isAspectJPresentAtModuleSupervisor()")
+ def aroundIsAspectJPresentAtModuleSupervisor(pjp: ProceedingJoinPoint): Boolean = true
- def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue
}
diff --git a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala
new file mode 100644
index 00000000..ddce63fb
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala
@@ -0,0 +1,125 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.supervisor
+
+import akka.actor
+import akka.actor._
+import kamon.Kamon
+import kamon.supervisor.KamonSupervisor.CreateModule
+
+import scala.concurrent.{ Promise, Future }
+import scala.util.Success
+
+object ModuleSupervisor extends ExtensionId[ModuleSupervisorExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = ModuleSupervisor
+ def createExtension(system: ExtendedActorSystem): ModuleSupervisorExtension = new ModuleSupervisorExtensionImpl(system)
+}
+
+trait ModuleSupervisorExtension extends actor.Extension {
+ def createModule(name: String, props: Props): Future[ActorRef]
+}
+
+class ModuleSupervisorExtensionImpl(system: ExtendedActorSystem) extends ModuleSupervisorExtension {
+ import system.dispatcher
+
+ private val _settings = ModuleSupervisorSettings(system)
+ private val _supervisor = system.actorOf(KamonSupervisor.props(_settings, system.dynamicAccess), "kamon")
+
+ def createModule(name: String, props: Props): Future[ActorRef] = Future {} flatMap { _: Unit ⇒
+ val modulePromise = Promise[ActorRef]()
+ _supervisor ! CreateModule(name, props, modulePromise)
+ modulePromise.future
+ }
+}
+
+class KamonSupervisor(settings: ModuleSupervisorSettings, dynamicAccess: DynamicAccess) extends Actor with ActorLogging {
+
+ init()
+
+ def receive = {
+ case CreateModule(name, props, childPromise) ⇒ createChildModule(name, props, childPromise)
+ }
+
+ def createChildModule(name: String, props: Props, childPromise: Promise[ActorRef]): Unit =
+ context.child(name).map { alreadyAvailableModule ⇒
+ log.warning("Received a request to create module [{}] but the module is already available, returning the existent instance.")
+ childPromise.complete(Success(alreadyAvailableModule))
+
+ } getOrElse (childPromise.complete(Success(context.actorOf(props, name))))
+
+ def init(): Unit = {
+ if (settings.modulesRequiringAspectJ.nonEmpty && !isAspectJPresent && settings.showAspectJMissingWarning)
+ logAspectJWeaverMissing(settings.modulesRequiringAspectJ)
+
+ // Force initialization of all modules marked with auto-start.
+ settings.availableModules.filter(_.autoStart).foreach { module ⇒
+ if (module.extensionClass == "none")
+ log.debug("Ignoring auto start of the [{}] module with no extension class.")
+ else
+ dynamicAccess.getObjectFor[ExtensionId[Kamon.Extension]](module.extensionClass).map { moduleID ⇒
+ moduleID.get(context.system)
+ log.debug("Auto starting the [{}] module.", module.name)
+
+ } recover {
+ case th: Throwable ⇒ log.error(th, "Failed to auto start the [{}] module.", module.name)
+ }
+
+ }
+ }
+
+ // When AspectJ is present the kamon.supervisor.AspectJPresent aspect will make this return true.
+ def isAspectJPresent: Boolean = false
+
+ def logAspectJWeaverMissing(modulesRequiringAspectJ: List[AvailableModuleInfo]): Unit = {
+ val moduleNames = modulesRequiringAspectJ.map(_.name).mkString(", ")
+ val weaverMissingMessage =
+ """
+ |
+ | ___ _ ___ _ _ ___ ___ _ _
+ | / _ \ | | |_ | | | | | | \/ |(_) (_)
+ |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _
+ || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` |
+ || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| |
+ |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, |
+ | | | __/ |
+ | |_| |___/
+ |
+ | It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected
+ | the following modules which require AspecJ to work properly:
+ |
+ """.stripMargin + moduleNames +
+ """
+ |
+ | If you need help on setting up the aspectj weaver go to http://kamon.io/introduction/get-started/ for more info. On the
+ | other hand, if you are sure that you do not need or do not want to use the weaver then you can disable this error message
+ | by changing the kamon.show-aspectj-missing-warning setting in your configuration file.
+ |
+ """.stripMargin
+
+ log.error(weaverMissingMessage)
+ }
+
+}
+
+object KamonSupervisor {
+ case class CreateModule(name: String, props: Props, childPromise: Promise[ActorRef])
+
+ def props(settings: ModuleSupervisorSettings, dynamicAccess: DynamicAccess): Props =
+ Props(new KamonSupervisor(settings, dynamicAccess))
+
+}
+
diff --git a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala
new file mode 100644
index 00000000..c04157aa
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala
@@ -0,0 +1,49 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.supervisor
+
+import akka.actor.ActorSystem
+
+case class AvailableModuleInfo(name: String, extensionClass: String, requiresAspectJ: Boolean, autoStart: Boolean)
+case class ModuleSupervisorSettings(showAspectJMissingWarning: Boolean, availableModules: List[AvailableModuleInfo]) {
+ val modulesRequiringAspectJ = availableModules.filter(_.requiresAspectJ)
+}
+
+object ModuleSupervisorSettings {
+
+ def apply(system: ActorSystem): ModuleSupervisorSettings = {
+ import kamon.util.ConfigTools.Syntax
+
+ val config = system.settings.config.getConfig("kamon.modules")
+ val showAspectJMissingWarning = system.settings.config.getBoolean("kamon.show-aspectj-missing-warning")
+
+ val modules = config.firstLevelKeys
+ val availableModules = modules.map { moduleName ⇒
+ val moduleConfig = config.getConfig(moduleName)
+
+ AvailableModuleInfo(
+ moduleName,
+ moduleConfig.getString("extension-id"),
+ moduleConfig.getBoolean("requires-aspectj"),
+ moduleConfig.getBoolean("auto-start"))
+
+ } toList
+
+ ModuleSupervisorSettings(showAspectJMissingWarning, availableModules)
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
new file mode 100644
index 00000000..19ea4f39
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
@@ -0,0 +1,97 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ ActorLogging, Props, Actor, ActorRef }
+import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
+import scala.annotation.tailrec
+import scala.collection.immutable.Queue
+import scala.concurrent.duration._
+
+class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging {
+ import kamon.util.ConfigTools.Syntax
+ import context.dispatcher
+ val config = context.system.settings.config.getConfig("kamon.trace.incubator")
+
+ val minIncubationTime = new NanoInterval(config.getFiniteDuration("min-incubation-time").toNanos)
+ val maxIncubationTime = new NanoInterval(config.getFiniteDuration("max-incubation-time").toNanos)
+ val checkInterval = config.getFiniteDuration("check-interval")
+
+ val checkSchedule = context.system.scheduler.schedule(checkInterval, checkInterval, self, CheckForCompletedTraces)
+ var waitingForMinimumIncubation = Queue.empty[IncubatingTrace]
+ var waitingForIncubationFinish = List.empty[IncubatingTrace]
+
+ def receive = {
+ case tc: TracingContext ⇒ incubate(tc)
+ case CheckForCompletedTraces ⇒
+ checkWaitingForMinimumIncubation()
+ checkWaitingForIncubationFinish()
+ }
+
+ def incubate(tc: TracingContext): Unit =
+ waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now))
+
+ @tailrec private def checkWaitingForMinimumIncubation(): Unit = {
+ if (waitingForMinimumIncubation.nonEmpty) {
+ val it = waitingForMinimumIncubation.head
+ if (NanoInterval.since(it.incubationStart) >= minIncubationTime) {
+ waitingForMinimumIncubation = waitingForMinimumIncubation.tail
+
+ if (it.tc.shouldIncubate)
+ waitingForIncubationFinish = it :: waitingForIncubationFinish
+ else
+ dispatchTraceInfo(it.tc)
+
+ checkWaitingForMinimumIncubation()
+ }
+ }
+ }
+
+ private def checkWaitingForIncubationFinish(): Unit = {
+ waitingForIncubationFinish = waitingForIncubationFinish.filter {
+ case IncubatingTrace(context, incubationStart) ⇒
+ if (!context.shouldIncubate) {
+ dispatchTraceInfo(context)
+ false
+ } else {
+ if (NanoInterval.since(incubationStart) >= maxIncubationTime) {
+ log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token)
+ dispatchTraceInfo(context);
+ false
+ } else true
+ }
+ }
+ }
+
+ def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo
+
+ override def postStop(): Unit = {
+ super.postStop()
+ checkSchedule.cancel()
+ }
+}
+
+object Incubator {
+
+ def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions))
+
+ case object CheckForCompletedTraces
+ case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
new file mode 100644
index 00000000..5f7fdff5
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
@@ -0,0 +1,120 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import akka.event.LoggingAdapter
+import kamon.metric.{ MetricsExtension, TraceMetrics }
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
+
+import scala.annotation.tailrec
+
+private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
+ val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension)
+ extends TraceContext {
+
+ @volatile private var _name = traceName
+ @volatile private var _isOpen = izOpen
+ @volatile protected var _elapsedTime = NanoInterval.default
+
+ private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]()
+ private val _traceLocalStorage = new TraceLocalStorage
+
+ def rename(newName: String): Unit =
+ if (isOpen)
+ _name = newName
+ else if (log.isWarningEnabled)
+ log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
+
+ def name: String = _name
+ def isEmpty: Boolean = false
+ def isOpen: Boolean = _isOpen
+ def addMetadata(key: String, value: String): Unit = {}
+
+ def finish(): Unit = {
+ _isOpen = false
+ val traceElapsedTime = NanoInterval.since(startTimestamp)
+ _elapsedTime = traceElapsedTime
+
+ metricsExtension.register(TraceMetrics, name).map { registration ⇒
+ registration.recorder.ElapsedTime.record(traceElapsedTime.nanos)
+ drainFinishedSegments(registration.recorder)
+ }
+ }
+
+ def startSegment(segmentName: String, category: String, library: String): Segment =
+ new MetricsOnlySegment(segmentName, category, library)
+
+ @tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = {
+ val segment = _finishedSegments.poll()
+ if (segment != null) {
+ recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos)
+ drainFinishedSegments(recorder)
+ }
+ }
+
+ protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
+ _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration))
+
+ if (isClosed) {
+ metricsExtension.register(TraceMetrics, name).map { registration ⇒
+ drainFinishedSegments(registration.recorder)
+ }
+ }
+ }
+
+ // Should only be used by the TraceLocal utilities.
+ def traceLocalStorage: TraceLocalStorage = _traceLocalStorage
+
+ // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default
+ // will be returned.
+ def elapsedTime: NanoInterval = _elapsedTime
+
+ class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment {
+ private val _startTimestamp = RelativeNanoTimestamp.now
+ @volatile private var _segmentName = segmentName
+ @volatile private var _elapsedTime = NanoInterval.default
+ @volatile private var _isOpen = true
+
+ def name: String = _segmentName
+ def isEmpty: Boolean = false
+ def addMetadata(key: String, value: String): Unit = {}
+ def isOpen: Boolean = _isOpen
+
+ def rename(newName: String): Unit =
+ if (isOpen)
+ _segmentName = newName
+ else if (log.isWarningEnabled)
+ log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)
+
+ def finish: Unit = {
+ _isOpen = false
+ val segmentElapsedTime = NanoInterval.since(_startTimestamp)
+ _elapsedTime = segmentElapsedTime
+
+ finishSegment(name, category, library, segmentElapsedTime)
+ }
+
+ // Handle with care and make sure that the segment is closed before calling this method, otherwise
+ // NanoInterval.default will be returned.
+ def elapsedTime: NanoInterval = _elapsedTime
+ def startTimestamp: RelativeNanoTimestamp = _startTimestamp
+ }
+}
+
+case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval)
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
new file mode 100644
index 00000000..827840d7
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -0,0 +1,73 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import kamon.util.{ NanoInterval, Sequencer }
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.forkjoin.ThreadLocalRandom
+
+trait Sampler {
+ def shouldTrace: Boolean
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean
+}
+
+object NoSampling extends Sampler {
+ def shouldTrace: Boolean = false
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = false
+}
+
+object SampleAll extends Sampler {
+ def shouldTrace: Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
+}
+
+class RandomSampler(chance: Int) extends Sampler {
+ require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0")
+ require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100")
+
+ def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
+}
+
+class OrderedSampler(interval: Int) extends Sampler {
+ import OrderedSampler._
+
+ require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0")
+ assume(interval isPowerOfTwo, "kamon.trace.ordered-sampler.interval must be power of two")
+
+ private val sequencer = Sequencer()
+
+ def shouldTrace: Boolean = (sequencer.next() fastMod interval) == 0
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
+}
+
+object OrderedSampler {
+ implicit class EnhancedInt(i: Int) {
+ def isPowerOfTwo = (i & (i - 1)) == 0
+ }
+
+ implicit class EnhancedLong(dividend: Long) {
+ def fastMod(divisor: Int) = dividend & (divisor - 1)
+ }
+}
+
+class ThresholdSampler(threshold: FiniteDuration) extends Sampler {
+
+ def shouldTrace: Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= threshold.toNanos
+}
+
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 5b74e6b2..48e56153 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -17,146 +17,113 @@
package kamon.trace
import java.io.ObjectStreamException
-
-import akka.actor.ActorSystem
-import kamon.Kamon
-import kamon.metric._
-import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
-import kamon.metric.TraceMetrics.TraceMetricRecorder
-
-import scala.annotation.tailrec
+import kamon.util.RelativeNanoTimestamp
-sealed trait TraceContext {
+trait TraceContext {
def name: String
def token: String
- def rename(name: String): Unit
- def finish(): Unit
- def origin: TraceContextOrigin
- def isOpen: Boolean
- def isClosed: Boolean = !isOpen
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
+ def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
+
+ def finish(): Unit
+ def rename(newName: String): Unit
+
def startSegment(segmentName: String, category: String, library: String): Segment
- def nanoTimestamp: Long
+ def addMetadata(key: String, value: String)
+
+ def startTimestamp: RelativeNanoTimestamp
}
-sealed trait Segment {
+object TraceContext {
+ private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] {
+ override def initialValue(): TraceContext = EmptyTraceContext
+ }
+
+ def currentContext: TraceContext =
+ _traceContextStorage.get()
+
+ def setCurrentContext(context: TraceContext): Unit =
+ _traceContextStorage.set(context)
+
+ def clearCurrentContext: Unit =
+ _traceContextStorage.remove()
+
+ def withContext[T](context: TraceContext)(code: ⇒ T): T = {
+ val oldContext = _traceContextStorage.get()
+ _traceContextStorage.set(context)
+
+ try code finally _traceContextStorage.set(oldContext)
+ }
+
+ def map[T](f: TraceContext ⇒ T): Option[T] = {
+ val current = currentContext
+ if (current.nonEmpty)
+ Some(f(current))
+ else None
+ }
+
+}
+
+trait Segment {
def name: String
- def rename(newName: String): Unit
def category: String
def library: String
- def finish(): Unit
def isEmpty: Boolean
+ def nonEmpty: Boolean = !isEmpty
+ def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
+
+ def finish(): Unit
+ def rename(newName: String): Unit
+ def addMetadata(key: String, value: String)
}
case object EmptyTraceContext extends TraceContext {
def name: String = "empty-trace"
def token: String = ""
- def rename(name: String): Unit = {}
- def finish(): Unit = {}
- def origin: TraceContextOrigin = TraceContextOrigin.Local
- def isOpen: Boolean = false
def isEmpty: Boolean = true
+ def isOpen: Boolean = false
+
+ def finish(): Unit = {}
+ def rename(name: String): Unit = {}
def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment
- def nanoTimestamp: Long = 0L
+ def addMetadata(key: String, value: String): Unit = {}
+ def startTimestamp = new RelativeNanoTimestamp(0L)
case object EmptySegment extends Segment {
val name: String = "empty-segment"
val category: String = "empty-category"
val library: String = "empty-library"
def isEmpty: Boolean = true
- def rename(newName: String): Unit = {}
- def finish: Unit = {}
- }
-}
-
-class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
- val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext {
-
- val isEmpty: Boolean = false
- @volatile private var _name = traceName
- @volatile private var _isOpen = izOpen
-
- private val _nanoTimestamp = nanoTimeztamp
- private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
- private val metricsExtension = Kamon(Metrics)(system)
- private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
-
- def name: String = _name
- def rename(newName: String): Unit =
- if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace.
-
- def isOpen: Boolean = _isOpen
- def nanoTimestamp: Long = _nanoTimestamp
-
- def finish(): Unit = {
- _isOpen = false
- val elapsedNanoTime = System.nanoTime() - _nanoTimestamp
- val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
-
- metricRecorder.map { traceMetrics ⇒
- traceMetrics.elapsedTime.record(elapsedNanoTime)
- drainFinishedSegments(traceMetrics)
- }
- }
-
- def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library)
+ def isOpen: Boolean = false
- @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
- val segment = finishedSegments.poll()
- if (segment != null) {
- metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
- drainFinishedSegments(metricRecorder)
- }
- }
-
- private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
- finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration))
-
- if (isClosed) {
- metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
- drainFinishedSegments(traceMetrics)
- }
- }
- }
-
- class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment {
- private val _segmentStartNanoTime = System.nanoTime()
- @volatile private var _segmentName = segmentName
- @volatile private var _isOpen = true
-
- def name: String = _segmentName
- def rename(newName: String): Unit = _segmentName = newName
- def isEmpty: Boolean = false
-
- def finish: Unit = {
- val segmentFinishNanoTime = System.nanoTime()
- finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime))
- }
+ def finish: Unit = {}
+ def rename(newName: String): Unit = {}
+ def addMetadata(key: String, value: String): Unit = {}
}
}
-case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity
-case class SegmentData(identity: SegmentMetricIdentity, duration: Long)
-
object SegmentCategory {
val HttpClient = "http-client"
+ val Database = "database"
+}
+
+class LOD private[trace] (val level: Int) extends AnyVal
+object LOD {
+ val MetricsOnly = new LOD(1)
+ val SimpleTrace = new LOD(2)
}
sealed trait LevelOfDetail
object LevelOfDetail {
- case object OnlyMetrics extends LevelOfDetail
+ case object MetricsOnly extends LevelOfDetail
case object SimpleTrace extends LevelOfDetail
case object FullTrace extends LevelOfDetail
}
-sealed trait TraceContextOrigin
-object TraceContextOrigin {
- case object Local extends TraceContextOrigin
- case object Remote extends TraceContextOrigin
-}
-
trait TraceContextAware extends Serializable {
def traceContext: TraceContext
}
@@ -165,7 +132,7 @@ object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
- @transient val traceContext = TraceRecorder.currentContext
+ @transient val traceContext = TraceContext.currentContext
//
// Beware of this hack, it might bite us in the future!
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
deleted file mode 100644
index a59abc18..00000000
--- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
-import akka.actor
-import kamon.util.GlobPathFilter
-import kamon.Kamon
-
-class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config.getConfig("kamon.trace")
- val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing")
-}
-
-object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: actor.Extension] = Trace
- def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
-
- case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
- def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
index 0766af74..057f564e 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -16,23 +16,43 @@
package kamon.trace
-import scala.collection.concurrent.TrieMap
import kamon.trace.TraceLocal.TraceLocalKey
+import scala.collection.concurrent.TrieMap
+
object TraceLocal {
+
trait TraceLocalKey {
type ValueType
}
- def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value)
- case EmptyTraceContext ⇒ // Can't store in the empty context.
+ trait AvailableToMdc extends TraceLocalKey {
+ override type ValueType = String
+ def mdcKey: String
+ }
+
+ object AvailableToMdc {
+ case class DefaultKeyAvailableToMdc(mdcKey: String) extends AvailableToMdc
+
+ def fromKey(mdcKey: String): AvailableToMdc = DefaultKeyAvailableToMdc(mdcKey)
+ def apply(mdcKey: String): AvailableToMdc = fromKey(mdcKey)
+ }
+
+ case class HttpContext(agent: String, uri: String, xforwarded: String)
+
+ object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext }
+
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceContext.currentContext match {
+ case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value)
+ case EmptyTraceContext ⇒ // Can't store in the empty context.
}
- def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key)
- case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceContext.currentContext match {
+ case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key)
+ case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
}
+
+ def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value)
}
class TraceLocalStorage {
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
deleted file mode 100644
index 8da187cb..00000000
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import scala.language.experimental.macros
-import java.util.concurrent.atomic.AtomicLong
-import kamon.macros.InlineTraceContextMacro
-
-import scala.util.Try
-import java.net.InetAddress
-import akka.actor.ActorSystem
-
-object TraceRecorder {
- private val traceContextStorage = new ThreadLocal[TraceContext] {
- override def initialValue(): TraceContext = EmptyTraceContext
- }
-
- private val tokenCounter = new AtomicLong
- private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
-
- def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet())
-
- private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = {
- new DefaultTraceContext(
- name,
- token.getOrElse(newToken),
- izOpen = true,
- LevelOfDetail.OnlyMetrics,
- TraceContextOrigin.Local,
- nanoTimeztamp = System.nanoTime,
- system)
- }
-
- def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = {
- val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000)
- new DefaultTraceContext(
- traceName,
- traceToken,
- isOpen,
- LevelOfDetail.OnlyMetrics,
- TraceContextOrigin.Remote,
- equivalentNanotime,
- system)
- }
-
- def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
-
- def clearContext: Unit = traceContextStorage.set(EmptyTraceContext)
-
- def currentContext: TraceContext = traceContextStorage.get()
-
- def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = {
- val ctx = newTraceContext(name, token, system)
- traceContextStorage.set(ctx)
- }
-
- def rename(name: String): Unit = currentContext.rename(name)
-
- def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T =
- withTraceContext(newTraceContext(name, token, system))(thunk)
-
- def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = {
- val oldContext = currentContext
- setContext(context)
-
- try thunk finally setContext(oldContext)
- }
-
- def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match {
- case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system))
- case EmptyTraceContext ⇒ None
- }
-
- def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext]
-
- def finish(): Unit = currentContext.finish()
-
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala
new file mode 100644
index 00000000..f2da404c
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala
@@ -0,0 +1,45 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import akka.actor.{ Terminated, ActorRef, Actor }
+
+class TraceSubscriptions extends Actor {
+ import TraceSubscriptions._
+
+ var subscribers: List[ActorRef] = Nil
+
+ def receive = {
+ case Subscribe(newSubscriber) ⇒
+ if (!subscribers.contains(newSubscriber))
+ subscribers = context.watch(newSubscriber) :: subscribers
+
+ case Unsubscribe(leavingSubscriber) ⇒
+ subscribers = subscribers.filter(_ == leavingSubscriber)
+
+ case Terminated(terminatedSubscriber) ⇒
+ subscribers = subscribers.filter(_ == terminatedSubscriber)
+
+ case trace: TraceInfo ⇒
+ subscribers.foreach(_ ! trace)
+ }
+}
+
+object TraceSubscriptions {
+ case class Subscribe(subscriber: ActorRef)
+ case class Unsubscribe(subscriber: ActorRef)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
new file mode 100644
index 00000000..be565154
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
@@ -0,0 +1,110 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.net.InetAddress
+import java.util.concurrent.atomic.AtomicLong
+
+import akka.actor._
+import com.typesafe.config.Config
+import kamon.metric.MetricsExtension
+import kamon.util._
+
+import scala.util.Try
+
+trait TracerExtension {
+ def newContext(name: String): TraceContext
+ def newContext(name: String, token: String): TraceContext
+ def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext
+
+ def subscribe(subscriber: ActorRef): Unit
+ def unsubscribe(subscriber: ActorRef): Unit
+}
+
+private[kamon] class TracerExtensionImpl(metricsExtension: MetricsExtension, config: Config) extends TracerExtension {
+ private val _settings = TraceSettings(config)
+ private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
+ private val _tokenCounter = new AtomicLong
+
+ private val _subscriptions = new LazyActorRef
+ private val _incubator = new LazyActorRef
+
+ private def newToken: String =
+ _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet())
+
+ def newContext(name: String): TraceContext =
+ createTraceContext(name)
+
+ def newContext(name: String, token: String): TraceContext =
+ createTraceContext(name, token)
+
+ def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext =
+ createTraceContext(name, token, timestamp, isOpen, isLocal)
+
+ private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now,
+ isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = {
+
+ def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, metricsExtension)
+
+ if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal)
+ newMetricsOnlyContext
+ else {
+ if (!_settings.sampler.shouldTrace)
+ newMetricsOnlyContext
+ else
+ new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, metricsExtension, this, dispatchTracingContext)
+ }
+ }
+
+ def subscribe(subscriber: ActorRef): Unit =
+ _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber))
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber))
+
+ private[kamon] def dispatchTracingContext(trace: TracingContext): Unit =
+ if (_settings.sampler.shouldReport(trace.elapsedTime))
+ if (trace.shouldIncubate)
+ _incubator.tell(trace)
+ else
+ _subscriptions.tell(trace.generateTraceInfo)
+
+ /**
+ * Tracer Extension initialization.
+ */
+ private var _system: ActorSystem = null
+ private lazy val _start = {
+ val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
+ _subscriptions.point(subscriptions)
+ _incubator.point(_system.actorOf(Incubator.props(subscriptions)))
+ }
+
+ def start(system: ActorSystem): Unit = synchronized {
+ _system = system
+ _start
+ _system = null
+ }
+}
+
+private[kamon] object TracerExtensionImpl {
+
+ def apply(metricsExtension: MetricsExtension, config: Config) =
+ new TracerExtensionImpl(metricsExtension, config)
+}
+
+case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo])
+case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
new file mode 100644
index 00000000..79f30f23
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
@@ -0,0 +1,46 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import com.typesafe.config.Config
+
+case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler)
+
+object TraceSettings {
+ import kamon.util.ConfigTools.Syntax
+
+ def apply(config: Config): TraceSettings = {
+ val tracerConfig = config.getConfig("kamon.trace")
+
+ val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match {
+ case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
+ case "simple-trace" ⇒ LevelOfDetail.SimpleTrace
+ case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.")
+ }
+
+ val sampler: Sampler =
+ if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling
+ else tracerConfig.getString("sampling") match {
+ case "all" ⇒ SampleAll
+ case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance"))
+ case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.interval"))
+ case "threshold" ⇒ new ThresholdSampler(tracerConfig.getFiniteDuration("threshold-sampler.minimum-elapsed-time"))
+ }
+
+ TraceSettings(detailLevel, sampler)
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
new file mode 100644
index 00000000..3d324886
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
@@ -0,0 +1,92 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.event.LoggingAdapter
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp }
+import kamon.metric.MetricsExtension
+
+import scala.collection.concurrent.TrieMap
+
+private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail,
+ isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension,
+ traceExtension: TracerExtensionImpl, traceInfoSink: TracingContext ⇒ Unit)
+ extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension) {
+
+ private val _openSegments = new AtomicInteger(0)
+ private val _startTimestamp = NanoTimestamp.now
+ private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]()
+ private val _metadata = TrieMap.empty[String, String]
+
+ override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value)
+
+ override def startSegment(segmentName: String, category: String, library: String): Segment = {
+ _openSegments.incrementAndGet()
+ val newSegment = new TracingSegment(segmentName, category, library)
+ _allSegments.add(newSegment)
+ newSegment
+ }
+
+ override def finish(): Unit = {
+ super.finish()
+ traceInfoSink(this)
+ }
+
+ override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
+ _openSegments.decrementAndGet()
+ super.finishSegment(segmentName, category, library, duration)
+ }
+
+ def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0
+
+ // Handle with care, should only be used after a trace is finished.
+ def generateTraceInfo: TraceInfo = {
+ require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.")
+
+ val currentSegments = _allSegments.iterator()
+ var segmentsInfo = List.newBuilder[SegmentInfo]
+
+ while (currentSegments.hasNext()) {
+ val segment = currentSegments.next()
+ if (segment.isClosed)
+ segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp)
+ else
+ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name)
+ }
+
+ TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result())
+ }
+
+ class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) {
+ private val metadata = TrieMap.empty[String, String]
+ override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)
+
+ // Handle with care, should only be used after the segment has finished.
+ def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = {
+ require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.")
+
+ // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both
+ // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it.
+ val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos)
+
+ SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
index f052f009..961c3099 100644
--- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
@@ -17,11 +17,11 @@ package kamon.trace.logging
import ch.qos.logback.classic.pattern.ClassicConverter
import ch.qos.logback.classic.spi.ILoggingEvent
-import kamon.trace.TraceRecorder
+import kamon.trace.TraceContext
class LogbackTraceTokenConverter extends ClassicConverter {
def convert(event: ILoggingEvent): String = {
- val ctx = TraceRecorder.currentContext
+ val ctx = TraceContext.currentContext
if (ctx.isEmpty)
"undefined"
else
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
new file mode 100644
index 00000000..4970d97e
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
@@ -0,0 +1,39 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace.logging
+
+import kamon.trace.TraceLocal.AvailableToMdc
+import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext }
+
+import org.slf4j.MDC
+
+trait MdcKeysSupport {
+
+ def withMdc[A](thunk: ⇒ A): A = {
+ val keys = copyToMdc(TraceContext.currentContext)
+ try thunk finally keys.foreach(key ⇒ MDC.remove(key))
+ }
+
+ private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match {
+ case ctx: MetricsOnlyContext ⇒
+ ctx.traceLocalStorage.underlyingStorage.collect {
+ case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value))
+ }.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten
+
+ case EmptyTraceContext ⇒ Iterable.empty[String]
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala
index 43166058..483278bf 100644
--- a/kamon-core/src/main/scala/kamon/metric/package.scala
+++ b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -14,21 +14,29 @@
* =========================================================================================
*/
-package kamon
+package kamon.util
+
+import java.util.concurrent.TimeUnit
-import scala.annotation.tailrec
import com.typesafe.config.Config
-package object metric {
+import scala.concurrent.duration.FiniteDuration
+
+object ConfigTools {
+ implicit class Syntax(val config: Config) extends AnyVal {
+ // We are using the deprecated .getNanoseconds option to keep Kamon source code compatible with
+ // versions of Akka using older typesafe-config versions.
- @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = {
- if (right.isEmpty)
- left
- else {
- val (key, rightValue) = right.head
- val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue)
+ def getFiniteDuration(path: String): FiniteDuration =
+ FiniteDuration(config.getNanoseconds(path), TimeUnit.NANOSECONDS)
- combineMaps(left.updated(key, value), right.tail)(valueMerger)
+ def firstLevelKeys: Set[String] = {
+ import scala.collection.JavaConverters._
+
+ config.entrySet().asScala.map {
+ case entry ⇒ entry.getKey.takeWhile(_ != '.')
+ } toSet
}
}
+
}
diff --git a/kamon-core/src/main/scala/kamon/util/FastDispatch.scala b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala
new file mode 100644
index 00000000..d2748847
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala
@@ -0,0 +1,38 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.util
+
+import akka.actor.ActorRef
+
+import scala.concurrent.{ ExecutionContext, Future }
+
+/**
+ * Extension for Future[ActorRef]. Try to dispatch a message to a Future[ActorRef] in the same thread if it has already
+ * completed or do the regular scheduling otherwise. Specially useful when using the ModuleSupervisor extension to
+ * create actors.
+ */
+object FastDispatch {
+ implicit class Syntax(val target: Future[ActorRef]) extends AnyVal {
+
+ def fastDispatch(message: Any)(implicit ec: ExecutionContext): Unit =
+ if (target.isCompleted)
+ target.value.get.map(_ ! message)
+ else
+ target.map(_ ! message)
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala
new file mode 100644
index 00000000..855bf1fc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala
@@ -0,0 +1,53 @@
+package kamon.util
+
+import java.util
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import akka.actor.{ Actor, ActorRef }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.annotation.tailrec
+
+/**
+ * A LazyActorRef accumulates messages sent to an actor that doesn't exist yet. Once the actor is created and
+ * the LazyActorRef is pointed to it, all the accumulated messages are flushed and any new message sent to the
+ * LazyActorRef will immediately be sent to the pointed ActorRef.
+ *
+ * This is intended to be used during Kamon's initialization where some components need to use ActorRefs to work
+ * (like subscriptions and the trace incubator) but our internal ActorSystem is not yet ready to create the
+ * required actors.
+ */
+class LazyActorRef {
+ private val _refPhaser = new WriterReaderPhaser
+ private val _backlog = new ConcurrentLinkedQueue[(Any, ActorRef)]()
+ @volatile private var _target: Option[ActorRef] = None
+
+ def tell(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
+ val criticalEnter = _refPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map(_.tell(message, sender)) getOrElse {
+ _backlog.add((message, sender))
+ }
+
+ } finally { _refPhaser.writerCriticalSectionExit(criticalEnter) }
+ }
+
+ def point(target: ActorRef): Unit = {
+ @tailrec def drain(q: util.Queue[(Any, ActorRef)]): Unit = if (!q.isEmpty) {
+ val (msg, sender) = q.poll()
+ target.tell(msg, sender)
+ drain(q)
+ }
+
+ try {
+ _refPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _refPhaser.flipPhase(1000L)
+ drain(_backlog)
+
+ } else sys.error("A LazyActorRef cannot be pointed more than once.")
+ } finally { _refPhaser.readerUnlock() }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/util/MapMerge.scala b/kamon-core/src/main/scala/kamon/util/MapMerge.scala
new file mode 100644
index 00000000..64b4f7ae
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/MapMerge.scala
@@ -0,0 +1,43 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.util
+
+object MapMerge {
+
+ /**
+ * Merge to immutable maps with the same key and value types, using the provided valueMerge function.
+ */
+ implicit class Syntax[K, V](val map: Map[K, V]) extends AnyVal {
+ def merge(that: Map[K, V], valueMerge: (V, V) ⇒ V): Map[K, V] = {
+ val merged = Map.newBuilder[K, V]
+
+ map.foreach {
+ case (key, value) ⇒
+ val mergedValue = that.get(key).map(v ⇒ valueMerge(value, v)).getOrElse(value)
+ merged += key -> mergedValue
+ }
+
+ that.foreach {
+ case kv @ (key, _) if !map.contains(key) ⇒ merged += kv
+ case other ⇒ // ignore, already included.
+ }
+
+ merged.result();
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
index 9c926372..9019eb4c 100644
--- a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
+++ b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala
@@ -1,4 +1,3 @@
-package kamon.util
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
@@ -15,6 +14,8 @@ package kamon.util
* =========================================================================================
*/
+package kamon.util
+
import java.util.concurrent.atomic.AtomicLong
class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) {
diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/util/Sequencer.scala
index b7050c59..f368e54f 100644
--- a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala
+++ b/kamon-core/src/main/scala/kamon/util/Sequencer.scala
@@ -14,17 +14,27 @@
* =========================================================================================
*/
-package kamon
+package kamon.util
-import akka.actor.{ Extension, ActorSystem, ExtensionId }
-import java.util.concurrent.ConcurrentHashMap
+/**
+ * This class implements an extremely efficient, thread-safe way to generate a
+ * incrementing sequence of Longs with a simple Long overflow protection.
+ */
+class Sequencer {
+ private val CloseToOverflow = Long.MaxValue - 1000000000
+ private val sequenceNumber = new PaddedAtomicLong(1L)
-object AkkaExtensionSwap {
- def swap(system: ActorSystem, key: ExtensionId[_], value: Extension): Unit = {
- val extensionsField = system.getClass.getDeclaredField("extensions")
- extensionsField.setAccessible(true)
+ def next(): Long = {
+ val current = sequenceNumber.getAndIncrement
- val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]]
- extensions.put(key, value)
+ // check if this value is getting close to overflow?
+ if (current > CloseToOverflow) {
+ sequenceNumber.set(current - CloseToOverflow) // we need maintain the order
+ }
+ current
}
}
+
+object Sequencer {
+ def apply(): Sequencer = new Sequencer()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/util/Timestamp.scala b/kamon-core/src/main/scala/kamon/util/Timestamp.scala
new file mode 100644
index 00000000..eadc6690
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/Timestamp.scala
@@ -0,0 +1,101 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.util
+
+/**
+ * Epoch time stamp.
+ */
+class Timestamp(val seconds: Long) extends AnyVal {
+ def <(that: Timestamp): Boolean = this.seconds < that.seconds
+ def >(that: Timestamp): Boolean = this.seconds > that.seconds
+ def ==(that: Timestamp): Boolean = this.seconds == that.seconds
+ def >=(that: Timestamp): Boolean = this.seconds >= that.seconds
+ def <=(that: Timestamp): Boolean = this.seconds <= that.seconds
+
+ override def toString: String = String.valueOf(seconds) + ".seconds"
+}
+
+object Timestamp {
+ def now: Timestamp = new Timestamp(System.currentTimeMillis() / 1000)
+ def earlier(l: Timestamp, r: Timestamp): Timestamp = if (l <= r) l else r
+ def later(l: Timestamp, r: Timestamp): Timestamp = if (l >= r) l else r
+}
+
+/**
+ * Epoch time stamp in milliseconds.
+ */
+class MilliTimestamp(val millis: Long) extends AnyVal {
+ override def toString: String = String.valueOf(millis) + ".millis"
+
+ def toTimestamp: Timestamp = new Timestamp(millis / 1000)
+ def toRelativeNanoTimestamp: RelativeNanoTimestamp = {
+ val diff = (System.currentTimeMillis() - millis) * 1000000
+ new RelativeNanoTimestamp(System.nanoTime() - diff)
+ }
+}
+
+object MilliTimestamp {
+ def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis())
+}
+
+/**
+ * Epoch time stamp in nanoseconds.
+ *
+ * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch
+ * timestamp in nanoseconds.
+ */
+class NanoTimestamp(val nanos: Long) extends AnyVal {
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object NanoTimestamp {
+ def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000)
+}
+
+/**
+ * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime()
+ */
+class RelativeNanoTimestamp(val nanos: Long) extends AnyVal {
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+
+ def toMilliTimestamp: MilliTimestamp =
+ new MilliTimestamp(System.currentTimeMillis - ((System.nanoTime - nanos) / 1000000))
+}
+
+object RelativeNanoTimestamp {
+ def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime())
+ def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp =
+ new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000)
+}
+
+/**
+ * Number of nanoseconds that passed between two points in time.
+ */
+class NanoInterval(val nanos: Long) extends AnyVal {
+ def <(that: NanoInterval): Boolean = this.nanos < that.nanos
+ def >(that: NanoInterval): Boolean = this.nanos > that.nanos
+ def ==(that: NanoInterval): Boolean = this.nanos == that.nanos
+ def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos
+ def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos
+
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object NanoInterval {
+ def default: NanoInterval = new NanoInterval(0L)
+ def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos)
+}
diff --git a/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala
new file mode 100644
index 00000000..d1197a5a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala
@@ -0,0 +1,45 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package kamon.util
+
+import scala.collection.concurrent.TrieMap
+
+object TriemapAtomicGetOrElseUpdate {
+
+ /**
+ * Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate]] method. More details on
+ * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]].
+ */
+ implicit class Syntax[K, V](val trieMap: TrieMap[K, V]) extends AnyVal {
+
+ def atomicGetOrElseUpdate(key: K, op: ⇒ V): V =
+ atomicGetOrElseUpdate(key, op, { v: V ⇒ Unit })
+
+ def atomicGetOrElseUpdate(key: K, op: ⇒ V, cleanup: V ⇒ Unit): V =
+ trieMap.get(key) match {
+ case Some(v) ⇒ v
+ case None ⇒
+ val d = op
+ trieMap.putIfAbsent(key, d).map { oldValue ⇒
+ // If there was an old value then `d` was never added
+ // and thus need to be cleanup.
+ cleanup(d)
+ oldValue
+
+ } getOrElse (d)
+ }
+ }
+}