diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-05-09 15:36:16 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-05-09 15:36:16 +0200 |
commit | 76f503b8f954e1b149bea3adb8927704f7095876 (patch) | |
tree | 59cabc6055e855c006115c5847a85f879dc36dd9 /kamon-core/src/main | |
parent | 520895a6a9a6b48b83efe01cf289708efd045b42 (diff) | |
parent | d69f14710b1d933d58412edd63b465b13a09a9d0 (diff) | |
download | Kamon-76f503b8f954e1b149bea3adb8927704f7095876.tar.gz Kamon-76f503b8f954e1b149bea3adb8927704f7095876.tar.bz2 Kamon-76f503b8f954e1b149bea3adb8927704f7095876.zip |
Merge branch 'master' into release-legacy-akka-2.2
Conflicts:
kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala
kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
kamon-play/src/main/scala/kamon/play/action/KamonTraceActions.scala
kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
project/Dependencies.scala
project/Settings.scala
Diffstat (limited to 'kamon-core/src/main')
32 files changed, 965 insertions, 939 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 2ffb8b09..b13f9aac 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -4,7 +4,7 @@ <aspects> <!-- Notify that AspectJ is present --> - <aspect name="kamon.supervisor.AspectJPresent"/> + <aspect name="kamon.AspectJPresent"/> </aspects> diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index ab9ce05e..d819588c 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -18,15 +18,12 @@ import _root_.akka.actor import _root_.akka.actor._ import com.typesafe.config.{ ConfigFactory, Config } import kamon.metric._ -import kamon.trace.{ TracerExtensionImpl, TracerExtension } +import kamon.trace.{ TracerModuleImpl, TracerModule } object Kamon { trait Extension extends actor.Extension - private case class KamonCoreComponents( - metrics: MetricsExtension, - tracer: TracerExtension, - userMetrics: UserMetricsExtension) + private case class KamonCoreComponents(metrics: MetricsModule, tracer: TracerModule) @volatile private var _system: ActorSystem = _ @volatile private var _coreComponents: Option[KamonCoreComponents] = None @@ -42,15 +39,15 @@ object Kamon { } if (_coreComponents.isEmpty) { - val metrics = MetricsExtensionImpl(config) - val simpleMetrics = UserMetricsExtensionImpl(metrics) - val tracer = TracerExtensionImpl(metrics, config) + val metrics = MetricsModuleImpl(config) + val tracer = TracerModuleImpl(metrics, config) - _coreComponents = Some(KamonCoreComponents(metrics, tracer, simpleMetrics)) + _coreComponents = Some(KamonCoreComponents(metrics, tracer)) _system = ActorSystem("kamon", resolveInternalConfig) metrics.start(_system) tracer.start(_system) + _system.registerExtension(ModuleLoader) } else sys.error("Kamon has already been started.") } @@ -58,15 +55,18 @@ object Kamon { def start(): Unit = start(ConfigFactory.load) - def metrics: MetricsExtension = + def shutdown(): Unit = { + _coreComponents = None + _system.shutdown() + _system = null + } + + def metrics: MetricsModule = ifStarted(_.metrics) - def tracer: TracerExtension = + def tracer: TracerModule = ifStarted(_.tracer) - def userMetrics: UserMetricsExtension = - ifStarted(_.userMetrics) - def apply[T <: Kamon.Extension](key: ExtensionId[T]): T = ifStarted { _ ⇒ if (_system ne null) diff --git a/kamon-core/src/main/scala/kamon/ModuleLoader.scala b/kamon-core/src/main/scala/kamon/ModuleLoader.scala new file mode 100644 index 00000000..a26fd339 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ModuleLoader.scala @@ -0,0 +1,123 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import _root_.akka.actor +import _root_.akka.actor._ +import _root_.akka.event.Logging +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect } + +private[kamon] object ModuleLoader extends ExtensionId[ModuleLoaderExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = ModuleLoader + def createExtension(system: ExtendedActorSystem): ModuleLoaderExtension = new ModuleLoaderExtension(system) +} + +private[kamon] class ModuleLoaderExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val log = Logging(system, "ModuleLoader") + val settings = ModuleLoaderSettings(system) + + if (settings.modulesRequiringAspectJ.nonEmpty && !isAspectJPresent && settings.showAspectJMissingWarning) + logAspectJWeaverMissing(settings.modulesRequiringAspectJ) + + // Force initialization of all modules marked with auto-start. + settings.availableModules.filter(_.autoStart).foreach { module ⇒ + if (module.extensionClass == "none") + log.debug("Ignoring auto start of the [{}] module with no extension class.", module.name) + else + system.dynamicAccess.getObjectFor[ExtensionId[Kamon.Extension]](module.extensionClass).map { moduleID ⇒ + log.debug("Auto starting the [{}] module.", module.name) + moduleID.get(system) + + } recover { + case th: Throwable ⇒ log.error(th, "Failed to auto start the [{}] module.", module.name) + } + + } + + // When AspectJ is present the kamon.supervisor.AspectJPresent aspect will make this return true. + def isAspectJPresent: Boolean = false + + def logAspectJWeaverMissing(modulesRequiringAspectJ: List[AvailableModuleInfo]): Unit = { + val moduleNames = modulesRequiringAspectJ.map(_.name).mkString(", ") + val weaverMissingMessage = + """ + | + | ___ _ ___ _ _ ___ ___ _ _ + | / _ \ | | |_ | | | | | | \/ |(_) (_) + |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _ + || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` | + || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| | + |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, | + | | | __/ | + | |_| |___/ + | + | It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected + | the following modules which require AspectJ to work properly: + | + """.stripMargin + moduleNames + + """ + | + | If you need help on setting up the aspectj weaver go to http://kamon.io/introduction/get-started/ for more info. On the + | other hand, if you are sure that you do not need or do not want to use the weaver then you can disable this error message + | by changing the kamon.show-aspectj-missing-warning setting in your configuration file. + | + """.stripMargin + + log.error(weaverMissingMessage) + } +} + +private[kamon] case class AvailableModuleInfo(name: String, extensionClass: String, requiresAspectJ: Boolean, autoStart: Boolean) +private[kamon] case class ModuleLoaderSettings(showAspectJMissingWarning: Boolean, availableModules: List[AvailableModuleInfo]) { + val modulesRequiringAspectJ = availableModules.filter(_.requiresAspectJ) +} + +private[kamon] object ModuleLoaderSettings { + + def apply(system: ActorSystem): ModuleLoaderSettings = { + import kamon.util.ConfigTools.Syntax + + val config = system.settings.config.getConfig("kamon.modules") + val showAspectJMissingWarning = system.settings.config.getBoolean("kamon.show-aspectj-missing-warning") + + val modules = config.firstLevelKeys + val availableModules = modules.map { moduleName ⇒ + val moduleConfig = config.getConfig(moduleName) + + AvailableModuleInfo( + moduleName, + moduleConfig.getString("extension-id"), + moduleConfig.getBoolean("requires-aspectj"), + moduleConfig.getBoolean("auto-start")) + + } toList + + ModuleLoaderSettings(showAspectJMissingWarning, availableModules) + } +} + +@Aspect +private[kamon] class AspectJPresent { + + @Pointcut("execution(* kamon.ModuleLoaderExtension.isAspectJPresent())") + def isAspectJPresentAtModuleSupervisor(): Unit = {} + + @Around("isAspectJPresentAtModuleSupervisor()") + def aroundIsAspectJPresentAtModuleSupervisor(pjp: ProceedingJoinPoint): Boolean = true + +} diff --git a/kamon-core/src/main/scala/kamon/metric/Entity.scala b/kamon-core/src/main/scala/kamon/metric/Entity.scala index 8d328f83..91249af0 100644 --- a/kamon-core/src/main/scala/kamon/metric/Entity.scala +++ b/kamon-core/src/main/scala/kamon/metric/Entity.scala @@ -23,36 +23,15 @@ package kamon.metric * * // TODO: Find a better word for `thing`. */ -class Entity(val name: String, val category: String, val metadata: Map[String, String]) { - - override def equals(o: Any): Boolean = { - if (this eq o.asInstanceOf[AnyRef]) - true - else if ((o.asInstanceOf[AnyRef] eq null) || !o.isInstanceOf[Entity]) - false - else { - val thatAsEntity = o.asInstanceOf[Entity] - category == thatAsEntity.category && name == thatAsEntity.name - } - } - - override def hashCode: Int = { - var result: Int = name.hashCode - result = 31 * result + category.hashCode - return result - } -} +case class Entity(name: String, category: String, tags: Map[String, String]) object Entity { def apply(name: String, category: String): Entity = apply(name, category, Map.empty) - def apply(name: String, category: String, metadata: Map[String, String]): Entity = - new Entity(name, category, metadata) - def create(name: String, category: String): Entity = apply(name, category, Map.empty) - def create(name: String, category: String, metadata: Map[String, String]): Entity = - new Entity(name, category, metadata) + def create(name: String, category: String, tags: Map[String, String]): Entity = + new Entity(name, category, tags) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala index 6e0a4248..65dafa9a 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -19,6 +19,7 @@ package kamon.metric import kamon.metric.instrument.Gauge.CurrentValueCollector import kamon.metric.instrument.Histogram.DynamicRange import kamon.metric.instrument._ +import kamon.util.Function import scala.collection.concurrent.TrieMap import scala.concurrent.duration.FiniteDuration @@ -33,6 +34,64 @@ trait EntityRecorderFactory[T <: EntityRecorder] { def createRecorder(instrumentFactory: InstrumentFactory): T } +object EntityRecorderFactory { + def apply[T <: EntityRecorder](entityCategory: String, factory: InstrumentFactory ⇒ T): EntityRecorderFactory[T] = + new EntityRecorderFactory[T] { + def category: String = entityCategory + def createRecorder(instrumentFactory: InstrumentFactory): T = factory(instrumentFactory) + } + + def create[T <: EntityRecorder](entityCategory: String, factory: Function[InstrumentFactory, T]): EntityRecorderFactory[T] = + new EntityRecorderFactory[T] { + def category: String = entityCategory + def createRecorder(instrumentFactory: InstrumentFactory): T = factory(instrumentFactory) + } +} + +private[kamon] sealed trait SingleInstrumentEntityRecorder extends EntityRecorder { + def key: MetricKey + def instrument: Instrument + + def collect(collectionContext: CollectionContext): EntitySnapshot = + new DefaultEntitySnapshot(Map(key -> instrument.collect(collectionContext))) + + def cleanup: Unit = instrument.cleanup +} + +object SingleInstrumentEntityRecorder { + val Histogram = "histogram" + val MinMaxCounter = "min-max-counter" + val Gauge = "gauge" + val Counter = "counter" + + val AllCategories = List("histogram", "gauge", "counter", "min-max-counter") +} + +/** + * Entity recorder for a single Counter instrument. + */ +case class CounterRecorder(key: MetricKey, instrument: Counter) extends SingleInstrumentEntityRecorder + +/** + * Entity recorder for a single Histogram instrument. + */ +case class HistogramRecorder(key: MetricKey, instrument: Histogram) extends SingleInstrumentEntityRecorder + +/** + * Entity recorder for a single MinMaxCounter instrument. + */ +case class MinMaxCounterRecorder(key: MetricKey, instrument: MinMaxCounter) extends SingleInstrumentEntityRecorder + +/** + * Entity recorder for a single Gauge instrument. + */ +case class GaugeRecorder(key: MetricKey, instrument: Gauge) extends SingleInstrumentEntityRecorder + +/** + * Base class with plenty of utility methods to facilitate the creation of [[EntityRecorder]] implementations. + * It is not required to use this base class for defining custom a custom [[EntityRecorder]], but it is certainly + * the most convenient way to do it and the preferred approach throughout the Kamon codebase. + */ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder { import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax @@ -41,10 +100,10 @@ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) exten _instruments.atomicGetOrElseUpdate(key, instrument, _.cleanup).asInstanceOf[T] protected def histogram(name: String): Histogram = - register(HistogramKey(name), instrumentFactory.createHistogram(name)) + register(HistogramKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createHistogram(name)) protected def histogram(name: String, dynamicRange: DynamicRange): Histogram = - register(HistogramKey(name), instrumentFactory.createHistogram(name, Some(dynamicRange))) + register(HistogramKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createHistogram(name, Some(dynamicRange))) protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name)) @@ -52,32 +111,26 @@ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) exten protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange))) - protected def histogram(key: HistogramKey): Histogram = - register(key, instrumentFactory.createHistogram(key.name)) - - protected def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = - register(key, instrumentFactory.createHistogram(key.name, Some(dynamicRange))) - protected def removeHistogram(name: String): Unit = - _instruments.remove(HistogramKey(name)) + _instruments.remove(HistogramKey(name, UnitOfMeasurement.Unknown)) - protected def removeHistogram(key: HistogramKey): Unit = - _instruments.remove(key) + protected def removeHistogram(name: String, unitOfMeasurement: UnitOfMeasurement): Unit = + _instruments.remove(HistogramKey(name, unitOfMeasurement)) protected def minMaxCounter(name: String): MinMaxCounter = - register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name)) + register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name)) protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = - register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = - register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name)) protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = - register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) @@ -86,7 +139,7 @@ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) exten register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = - register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = register(key, instrumentFactory.createMinMaxCounter(key.name)) @@ -101,31 +154,31 @@ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) exten register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval))) protected def removeMinMaxCounter(name: String): Unit = - _instruments.remove(MinMaxCounterKey(name)) + _instruments.remove(MinMaxCounterKey(name, UnitOfMeasurement.Unknown)) protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit = _instruments.remove(key) protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = - register(GaugeKey(name), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, valueCollector = valueCollector)) protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = - register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = - register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector)) protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = - register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = - register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector)) @@ -143,19 +196,19 @@ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) exten register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) protected def removeGauge(name: String): Unit = - _instruments.remove(GaugeKey(name)) + _instruments.remove(GaugeKey(name, UnitOfMeasurement.Unknown)) protected def removeGauge(key: GaugeKey): Unit = _instruments.remove(key) protected def counter(name: String): Counter = - register(CounterKey(name), instrumentFactory.createCounter()) + register(CounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createCounter()) protected def counter(key: CounterKey): Counter = register(key, instrumentFactory.createCounter()) protected def removeCounter(name: String): Unit = - _instruments.remove(CounterKey(name)) + _instruments.remove(CounterKey(name, UnitOfMeasurement.Unknown)) protected def removeCounter(key: CounterKey): Unit = _instruments.remove(key) diff --git a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala index a5d30c81..0d4e0163 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala @@ -16,154 +16,32 @@ package kamon.metric -import kamon.metric.instrument.{ InstrumentTypes, InstrumentType, UnitOfMeasurement } +import kamon.metric.instrument.UnitOfMeasurement /** - * MetricKeys are used to identify a given metric in entity recorders and snapshots. MetricKeys can be used to encode - * additional metadata for a metric being recorded, as well as the unit of measurement of the data being recorder. + * MetricKeys are used to identify a given metric in entity recorders and snapshots. */ sealed trait MetricKey { def name: String def unitOfMeasurement: UnitOfMeasurement - def instrumentType: InstrumentType - def metadata: Map[String, String] } -// Wish that there was a shorter way to describe the operations bellow, but apparently there is no way to generalize all -// the apply/create versions that would produce the desired return types when used from Java. - /** * MetricKey for all Histogram-based metrics. */ -case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { - val instrumentType = InstrumentTypes.Histogram -} - -object HistogramKey { - def apply(name: String): HistogramKey = - apply(name, UnitOfMeasurement.Unknown) - - def apply(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = - apply(name, unitOfMeasurement, Map.empty) - - def apply(name: String, metadata: Map[String, String]): HistogramKey = - apply(name, UnitOfMeasurement.Unknown, Map.empty) - - /** - * Java friendly versions: - */ - - def create(name: String): HistogramKey = - apply(name, UnitOfMeasurement.Unknown) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = - apply(name, unitOfMeasurement) - - def create(name: String, metadata: Map[String, String]): HistogramKey = - apply(name, metadata) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): HistogramKey = - apply(name, unitOfMeasurement, metadata) -} +private[kamon] case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey /** * MetricKey for all MinMaxCounter-based metrics. */ -case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { - val instrumentType = InstrumentTypes.MinMaxCounter -} - -object MinMaxCounterKey { - def apply(name: String): MinMaxCounterKey = - apply(name, UnitOfMeasurement.Unknown) - - def apply(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = - apply(name, unitOfMeasurement, Map.empty) - - def apply(name: String, metadata: Map[String, String]): MinMaxCounterKey = - apply(name, UnitOfMeasurement.Unknown, Map.empty) - - /** - * Java friendly versions: - */ - - def create(name: String): MinMaxCounterKey = - apply(name, UnitOfMeasurement.Unknown) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = - apply(name, unitOfMeasurement) - - def create(name: String, metadata: Map[String, String]): MinMaxCounterKey = - apply(name, metadata) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): MinMaxCounterKey = - apply(name, unitOfMeasurement, metadata) -} +private[kamon] case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey /** * MetricKey for all Gauge-based metrics. */ -case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { - val instrumentType = InstrumentTypes.Gauge -} - -object GaugeKey { - def apply(name: String): GaugeKey = - apply(name, UnitOfMeasurement.Unknown) - - def apply(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = - apply(name, unitOfMeasurement, Map.empty) - - def apply(name: String, metadata: Map[String, String]): GaugeKey = - apply(name, UnitOfMeasurement.Unknown, Map.empty) - - /** - * Java friendly versions: - */ - - def create(name: String): GaugeKey = - apply(name, UnitOfMeasurement.Unknown) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = - apply(name, unitOfMeasurement) - - def create(name: String, metadata: Map[String, String]): GaugeKey = - apply(name, metadata) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): GaugeKey = - apply(name, unitOfMeasurement, metadata) -} +private[kamon] case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey /** * MetricKey for all Counter-based metrics. */ -case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { - val instrumentType = InstrumentTypes.Counter -} - -object CounterKey { - def apply(name: String): CounterKey = - apply(name, UnitOfMeasurement.Unknown) - - def apply(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = - apply(name, unitOfMeasurement, Map.empty) - - def apply(name: String, metadata: Map[String, String]): CounterKey = - apply(name, UnitOfMeasurement.Unknown, Map.empty) - - /** - * Java friendly versions: - */ - - def create(name: String): CounterKey = - apply(name, UnitOfMeasurement.Unknown) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = - apply(name, unitOfMeasurement) - - def create(name: String, metadata: Map[String, String]): CounterKey = - apply(name, metadata) - - def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): CounterKey = - apply(name, unitOfMeasurement, metadata) -}
\ No newline at end of file +private[kamon] case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala deleted file mode 100644 index 87911352..00000000 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import com.typesafe.config.Config -import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } -import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext } - -import scala.collection.concurrent.TrieMap -import akka.actor._ -import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate } - -case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) - -trait MetricsExtension { - def settings: MetricsExtensionSettings - def shouldTrack(entity: Entity): Boolean - def shouldTrack(entityName: String, category: String): Boolean = - shouldTrack(Entity(entityName, category)) - - def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] - def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] - def unregister(entity: Entity): Unit - - def find(entity: Entity): Option[EntityRecorder] - def find(name: String, category: String): Option[EntityRecorder] - - def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = - subscribe(filter, subscriber, permanently = false) - - def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = - subscribe(SubscriptionFilter(category, selection), subscriber, permanently) - - def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = - subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false) - - def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit - - def unsubscribe(subscriber: ActorRef): Unit - def buildDefaultCollectionContext: CollectionContext - def instrumentFactory(category: String): InstrumentFactory -} - -private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension { - private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] - private val _subscriptions = new LazyActorRef - - val settings = MetricsExtensionSettings(config) - - def shouldTrack(entity: Entity): Boolean = - settings.entityFilters.get(entity.category).map { - filter ⇒ filter.accept(entity.name) - - } getOrElse (settings.trackUnmatchedEntities) - - def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = { - import TriemapAtomicGetOrElseUpdate.Syntax - val entity = Entity(entityName, recorderFactory.category) - - if (shouldTrack(entity)) { - val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory) - val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory), _.cleanup).asInstanceOf[T] - - Some(EntityRegistration(entity, recorder)) - } else None - } - - def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = { - _trackedEntities.put(entity, recorder).map { oldRecorder ⇒ - oldRecorder.cleanup - } - - EntityRegistration(entity, recorder) - } - - def unregister(entity: Entity): Unit = - _trackedEntities.remove(entity).map(_.cleanup) - - def find(entity: Entity): Option[EntityRecorder] = - _trackedEntities.get(entity) - - def find(name: String, category: String): Option[EntityRecorder] = - find(Entity(name, category)) - - def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = - _subscriptions.tell(Subscribe(filter, subscriber, permanent)) - - def unsubscribe(subscriber: ActorRef): Unit = - _subscriptions.tell(Unsubscribe(subscriber)) - - def buildDefaultCollectionContext: CollectionContext = - CollectionContext(settings.defaultCollectionContextBufferSize) - - def instrumentFactory(category: String): InstrumentFactory = - settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) - - private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = { - val builder = Map.newBuilder[Entity, EntitySnapshot] - _trackedEntities.foreach { - case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) - } - - builder.result() - } - - /** - * Metrics Extension initialization. - */ - private var _system: ActorSystem = null - private lazy val _start = { - _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics")) - settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher)) - } - - def start(system: ActorSystem): Unit = synchronized { - _system = system - _start - _system = null - } -} - -private[kamon] object MetricsExtensionImpl { - - def apply(config: Config) = - new MetricsExtensionImpl(config) -} - diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala b/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala new file mode 100644 index 00000000..9f2bbbef --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala @@ -0,0 +1,364 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor._ +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.{ Subscribe, Unsubscribe } +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ +import kamon.util.LazyActorRef + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) + +trait MetricsModule { + def settings: MetricsSettings + + def shouldTrack(entity: Entity): Boolean + + def shouldTrack(entityName: String, category: String): Boolean = + shouldTrack(Entity(entityName, category)) + + // + // Histograms registration and removal + // + + def histogram(name: String): Histogram = + registerHistogram(name) + + def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + registerHistogram(name, unitOfMeasurement = Some(unitOfMeasurement)) + + def histogram(name: String, dynamicRange: DynamicRange): Histogram = + registerHistogram(name, dynamicRange = Some(dynamicRange)) + + def histogram(name: String, unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): Histogram = + registerHistogram(name, unitOfMeasurement = Some(unitOfMeasurement), dynamicRange = Some(dynamicRange)) + + def histogram(name: String, tags: Map[String, String]): Histogram = + registerHistogram(name, tags) + + def histogram(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement): Histogram = + registerHistogram(name, tags, Some(unitOfMeasurement)) + + def histogram(name: String, tags: Map[String, String], dynamicRange: DynamicRange): Histogram = + registerHistogram(name, tags, dynamicRange = Some(dynamicRange)) + + def histogram(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): Histogram = + registerHistogram(name, tags, Some(unitOfMeasurement), Some(dynamicRange)) + + def removeHistogram(name: String): Boolean = + removeHistogram(name, Map.empty) + + def registerHistogram(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None, + dynamicRange: Option[DynamicRange] = None): Histogram + + def removeHistogram(name: String, tags: Map[String, String]): Boolean + + // + // MinMaxCounter registration and removal + // + + def minMaxCounter(name: String): MinMaxCounter = + registerMinMaxCounter(name) + + def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + registerMinMaxCounter(name, unitOfMeasurement = Some(unitOfMeasurement)) + + def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + registerMinMaxCounter(name, dynamicRange = Some(dynamicRange)) + + def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + registerMinMaxCounter(name, refreshInterval = Some(refreshInterval)) + + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + registerMinMaxCounter(name, dynamicRange = Some(dynamicRange), refreshInterval = Some(refreshInterval)) + + def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): MinMaxCounter = + registerMinMaxCounter(name, unitOfMeasurement = Some(unitOfMeasurement), dynamicRange = Some(dynamicRange)) + + def minMaxCounter(name: String, tags: Map[String, String]): MinMaxCounter = + registerMinMaxCounter(name, tags) + + def minMaxCounter(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + registerMinMaxCounter(name, tags, Some(unitOfMeasurement)) + + def minMaxCounter(name: String, tags: Map[String, String], dynamicRange: DynamicRange): MinMaxCounter = + registerMinMaxCounter(name, tags, dynamicRange = Some(dynamicRange)) + + def minMaxCounter(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): MinMaxCounter = + registerMinMaxCounter(name, tags, Some(unitOfMeasurement), Some(dynamicRange)) + + def removeMinMaxCounter(name: String): Boolean = + removeMinMaxCounter(name, Map.empty) + + def removeMinMaxCounter(name: String, tags: Map[String, String]): Boolean + + def registerMinMaxCounter(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None, + dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter + + // + // Gauge registration and removal + // + + def gauge(name: String)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector) + + def gauge(name: String, unitOfMeasurement: UnitOfMeasurement)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, unitOfMeasurement = Some(unitOfMeasurement)) + + def gauge(name: String, dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, dynamicRange = Some(dynamicRange)) + + def gauge(name: String, refreshInterval: FiniteDuration)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, refreshInterval = Some(refreshInterval)) + + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, dynamicRange = Some(dynamicRange), refreshInterval = Some(refreshInterval)) + + def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, unitOfMeasurement = Some(unitOfMeasurement), dynamicRange = Some(dynamicRange)) + + def gauge(name: String, tags: Map[String, String])(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, tags) + + def gauge(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, tags, Some(unitOfMeasurement)) + + def gauge(name: String, tags: Map[String, String], dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, tags, dynamicRange = Some(dynamicRange)) + + def gauge(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge = + registerGauge(name, valueCollector, tags, Some(unitOfMeasurement), Some(dynamicRange)) + + def removeGauge(name: String): Boolean = + removeGauge(name, Map.empty) + + def removeGauge(name: String, tags: Map[String, String]): Boolean + + def registerGauge(name: String, valueCollector: CurrentValueCollector, tags: Map[String, String] = Map.empty, + unitOfMeasurement: Option[UnitOfMeasurement] = None, dynamicRange: Option[DynamicRange] = None, + refreshInterval: Option[FiniteDuration] = None): Gauge + + // + // Counters registration and removal + // + + def counter(name: String): Counter = + registerCounter(name) + + def counter(name: String, unitOfMeasurement: UnitOfMeasurement): Counter = + registerCounter(name, unitOfMeasurement = Some(unitOfMeasurement)) + + def counter(name: String, tags: Map[String, String]): Counter = + registerCounter(name, tags) + + def counter(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement): Counter = + registerCounter(name, tags, Some(unitOfMeasurement)) + + def removeCounter(name: String): Boolean = + removeCounter(name, Map.empty) + + def removeCounter(name: String, tags: Map[String, String]): Boolean + + def registerCounter(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None, + dynamicRange: Option[DynamicRange] = None): Counter + + // + // Entities registration and removal + // + + def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], name: String): T = + entity(recorderFactory, Entity(name, recorderFactory.category)) + + def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], name: String, tags: Map[String, String]): T = + entity(recorderFactory, Entity(name, recorderFactory.category, tags)) + + def removeEntity(name: String, category: String): Boolean = + removeEntity(Entity(name, category, Map.empty)) + + def removeEntity(name: String, category: String, tags: Map[String, String]): Boolean = + removeEntity(Entity(name, category, tags)) + + def removeEntity(entity: Entity): Boolean + + def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entity: Entity): T + + def find(name: String, category: String): Option[EntityRecorder] = + find(Entity(name, category)) + + def find(name: String, category: String, tags: Map[String, String]): Option[EntityRecorder] = + find(Entity(name, category, tags)) + + def find(entity: Entity): Option[EntityRecorder] + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = + subscribe(filter, subscriber, permanently = true) + + def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently) + + def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently = true) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit + + def unsubscribe(subscriber: ActorRef): Unit + + def buildDefaultCollectionContext: CollectionContext + + def instrumentFactory(category: String): InstrumentFactory +} + +private[kamon] class MetricsModuleImpl(config: Config) extends MetricsModule { + import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax + + private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] + private val _subscriptions = new LazyActorRef + + val settings = MetricsSettings(config) + + def shouldTrack(entity: Entity): Boolean = + settings.entityFilters.get(entity.category).map { + filter ⇒ filter.accept(entity.name) + + } getOrElse (settings.trackUnmatchedEntities) + + def registerHistogram(name: String, tags: Map[String, String], unitOfMeasurement: Option[UnitOfMeasurement], + dynamicRange: Option[DynamicRange]): Histogram = { + + val histogramEntity = Entity(name, SingleInstrumentEntityRecorder.Histogram, tags) + val recorder = _trackedEntities.atomicGetOrElseUpdate(histogramEntity, { + val factory = instrumentFactory(histogramEntity.category) + HistogramRecorder(HistogramKey(histogramEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)), + factory.createHistogram(name, dynamicRange)) + }, _.cleanup) + + recorder.asInstanceOf[HistogramRecorder].instrument + } + + def removeHistogram(name: String, tags: Map[String, String]): Boolean = + _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.Histogram, tags)).isDefined + + def registerMinMaxCounter(name: String, tags: Map[String, String], unitOfMeasurement: Option[UnitOfMeasurement], dynamicRange: Option[DynamicRange], + refreshInterval: Option[FiniteDuration]): MinMaxCounter = { + + val minMaxCounterEntity = Entity(name, SingleInstrumentEntityRecorder.MinMaxCounter, tags) + val recorder = _trackedEntities.atomicGetOrElseUpdate(minMaxCounterEntity, { + val factory = instrumentFactory(minMaxCounterEntity.category) + MinMaxCounterRecorder(MinMaxCounterKey(minMaxCounterEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)), + factory.createMinMaxCounter(name, dynamicRange, refreshInterval)) + }, _.cleanup) + + recorder.asInstanceOf[MinMaxCounterRecorder].instrument + } + + def removeMinMaxCounter(name: String, tags: Map[String, String]): Boolean = + _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.MinMaxCounter, tags)).isDefined + + def registerGauge(name: String, valueCollector: CurrentValueCollector, tags: Map[String, String] = Map.empty, + unitOfMeasurement: Option[UnitOfMeasurement] = None, dynamicRange: Option[DynamicRange] = None, + refreshInterval: Option[FiniteDuration] = None): Gauge = { + + val gaugeEntity = Entity(name, SingleInstrumentEntityRecorder.Gauge, tags) + val recorder = _trackedEntities.atomicGetOrElseUpdate(gaugeEntity, { + val factory = instrumentFactory(gaugeEntity.category) + GaugeRecorder(MinMaxCounterKey(gaugeEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)), + factory.createGauge(name, dynamicRange, refreshInterval, valueCollector)) + }, _.cleanup) + + recorder.asInstanceOf[GaugeRecorder].instrument + } + + def removeGauge(name: String, tags: Map[String, String]): Boolean = + _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.Gauge, tags)).isDefined + + def registerCounter(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None, + dynamicRange: Option[DynamicRange] = None): Counter = { + + val counterEntity = Entity(name, SingleInstrumentEntityRecorder.Counter, tags) + val recorder = _trackedEntities.atomicGetOrElseUpdate(counterEntity, { + val factory = instrumentFactory(counterEntity.category) + CounterRecorder(CounterKey(counterEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)), + factory.createCounter()) + }, _.cleanup) + + recorder.asInstanceOf[CounterRecorder].instrument + } + + def removeCounter(name: String, tags: Map[String, String]): Boolean = + _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.Counter, tags)).isDefined + + def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entity: Entity): T = { + _trackedEntities.atomicGetOrElseUpdate(entity, { + recorderFactory.createRecorder(instrumentFactory(recorderFactory.category)) + }, _.cleanup).asInstanceOf[T] + } + + def removeEntity(entity: Entity): Boolean = + _trackedEntities.remove(entity).isDefined + + def find(entity: Entity): Option[EntityRecorder] = + _trackedEntities.get(entity) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = + _subscriptions.tell(Subscribe(filter, subscriber, permanent)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(Unsubscribe(subscriber)) + + def buildDefaultCollectionContext: CollectionContext = + CollectionContext(settings.defaultCollectionContextBufferSize) + + def instrumentFactory(category: String): InstrumentFactory = + settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) + + private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = { + val builder = Map.newBuilder[Entity, EntitySnapshot] + _trackedEntities.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) + } + + builder.result() + } + + /** + * Metrics Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics")) + settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher)) + } + + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null + } +} + +private[kamon] object MetricsModuleImpl { + + def apply(config: Config) = + new MetricsModuleImpl(config) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsSettings.scala index 9881ed00..a472a89b 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsSettings.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration.FiniteDuration /** * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key. */ -case class MetricsExtensionSettings( +case class MetricsSettings( tickInterval: FiniteDuration, defaultCollectionContextBufferSize: Int, trackUnmatchedEntities: Boolean, @@ -48,11 +48,10 @@ case class EntityFilter(includes: List[GlobPathFilter], excludes: List[GlobPathF includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } -object MetricsExtensionSettings { +object MetricsSettings { import kamon.util.ConfigTools.Syntax - import scala.concurrent.duration._ - def apply(config: Config): MetricsExtensionSettings = { + def apply(config: Config): MetricsSettings = { val metricConfig = config.getConfig("kamon.metric") val tickInterval = metricConfig.getFiniteDuration("tick-interval") @@ -65,7 +64,7 @@ object MetricsExtensionSettings { val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler) val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler) - MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories, + MetricsSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories, defaultInstrumentFactory, refreshScheduler) } diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala index 68b545a5..9f8c7be3 100644 --- a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala +++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration /** * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers. */ -private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl) extends Actor { +private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsModuleImpl) extends Actor { var lastTick = MilliTimestamp.now var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter] @@ -81,7 +81,7 @@ private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsEx } object SubscriptionsDispatcher { - def props(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl): Props = + def props(interval: FiniteDuration, metricsExtension: MetricsModuleImpl): Props = Props(new SubscriptionsDispatcher(interval, metricsExtension)) case object Tick diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 3da9c1d4..eb4f327a 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -16,29 +16,36 @@ package kamon.metric -import kamon.metric.instrument.{ Time, InstrumentFactory, Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory } class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { - import TraceMetrics.segmentKey /** * Records blah blah */ - val ElapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds) - - /** - * Records Blah Blah. - * - */ - def segment(name: String, category: String, library: String): Histogram = - histogram(segmentKey(name, category, library)) - + val elapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds) } object TraceMetrics extends EntityRecorderFactory[TraceMetrics] { def category: String = "trace" def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory) - def segmentKey(name: String, category: String, library: String): HistogramKey = - HistogramKey(name, Time.Nanoseconds, Map("category" -> category, "library" -> library)) + // Java API. + def factory: EntityRecorderFactory[TraceMetrics] = this +} + +class SegmentMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + + /** + * Records blah blah + */ + val elapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds) +} + +object SegmentMetrics extends EntityRecorderFactory[SegmentMetrics] { + def category: String = "trace-segment" + def createRecorder(instrumentFactory: InstrumentFactory): SegmentMetrics = new SegmentMetrics(instrumentFactory) + + // Java API. + def factory: EntityRecorderFactory[SegmentMetrics] = this }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala deleted file mode 100644 index e0818292..00000000 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ /dev/null @@ -1,204 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import kamon.metric.instrument.Gauge.CurrentValueCollector -import kamon.metric.instrument.Histogram.DynamicRange -import kamon.metric.instrument._ - -import scala.concurrent.duration.FiniteDuration - -trait UserMetricsExtension { - def histogram(name: String): Histogram - def histogram(name: String, dynamicRange: DynamicRange): Histogram - def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram - def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram - def histogram(key: HistogramKey): Histogram - def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram - def removeHistogram(name: String): Unit - def removeHistogram(key: HistogramKey): Unit - - def minMaxCounter(name: String): MinMaxCounter - def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter - def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter - def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter - def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter - def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter - def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter - def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter - def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter - def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter - def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter - def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter - def removeMinMaxCounter(name: String): Unit - def removeMinMaxCounter(key: MinMaxCounterKey): Unit - - def gauge(name: String, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge - def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge - def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge - def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge - def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge - def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge - def removeGauge(name: String): Unit - def removeGauge(key: GaugeKey): Unit - - def counter(name: String): Counter - def counter(key: CounterKey): Counter - def removeCounter(name: String): Unit - def removeCounter(key: CounterKey): Unit - -} - -private[kamon] class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension { - override def histogram(name: String): Histogram = - super.histogram(name) - - override def histogram(name: String, dynamicRange: DynamicRange): Histogram = - super.histogram(name, dynamicRange) - - override def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = - super.histogram(name, unitOfMeasurement) - - override def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = - super.histogram(name, dynamicRange, unitOfMeasurement) - - override def histogram(key: HistogramKey): Histogram = - super.histogram(key) - - override def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = - super.histogram(key, dynamicRange) - - override def removeHistogram(name: String): Unit = - super.removeHistogram(name) - - override def removeHistogram(key: HistogramKey): Unit = - super.removeHistogram(key) - - override def minMaxCounter(name: String): MinMaxCounter = - super.minMaxCounter(name) - - override def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = - super.minMaxCounter(name, dynamicRange) - - override def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = - super.minMaxCounter(name, refreshInterval) - - override def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = - super.minMaxCounter(name, unitOfMeasurement) - - override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = - super.minMaxCounter(name, dynamicRange, refreshInterval) - - override def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = - super.minMaxCounter(name, dynamicRange, unitOfMeasurement) - - override def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = - super.minMaxCounter(name, refreshInterval, unitOfMeasurement) - - override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = - super.minMaxCounter(name, dynamicRange, refreshInterval, unitOfMeasurement) - - override def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = - super.minMaxCounter(key) - - override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = - super.minMaxCounter(key, dynamicRange) - - override def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = - super.minMaxCounter(key, refreshInterval) - - override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = - super.minMaxCounter(key, dynamicRange, refreshInterval) - - override def removeMinMaxCounter(name: String): Unit = - super.removeMinMaxCounter(name) - - override def removeMinMaxCounter(key: MinMaxCounterKey): Unit = - super.removeMinMaxCounter(key) - - override def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, valueCollector) - - override def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, dynamicRange, valueCollector) - - override def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, refreshInterval, valueCollector) - - override def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, unitOfMeasurement, valueCollector) - - override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, dynamicRange, refreshInterval, valueCollector) - - override def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, dynamicRange, unitOfMeasurement, valueCollector) - - override def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, refreshInterval, unitOfMeasurement, valueCollector) - - override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = - super.gauge(name, dynamicRange, refreshInterval, unitOfMeasurement, valueCollector) - - override def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = - super.gauge(key, valueCollector) - - override def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = - super.gauge(key, dynamicRange, valueCollector) - - override def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = - super.gauge(key, refreshInterval, valueCollector) - - override def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = - super.gauge(key, dynamicRange, refreshInterval, valueCollector) - - override def removeGauge(name: String): Unit = - super.removeGauge(name) - - override def removeGauge(key: GaugeKey): Unit = - super.removeGauge(key) - - override def counter(name: String): Counter = - super.counter(name) - - override def counter(key: CounterKey): Counter = - super.counter(key) - - override def removeCounter(name: String): Unit = - super.removeCounter(name) - - override def removeCounter(key: CounterKey): Unit = - super.removeCounter(key) -} - -private[kamon] object UserMetricsExtensionImpl { - val UserMetricEntity = Entity("user-metric", "user-metric") - - def apply(metricsExtension: MetricsExtension): UserMetricsExtensionImpl = { - val instrumentFactory = metricsExtension.instrumentFactory(UserMetricEntity.category) - val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory) - - metricsExtension.register(UserMetricEntity, userMetricsExtension).recorder - } - -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index 80214510..61b53df2 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -55,6 +55,10 @@ object Gauge { implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { def currentValue: Long = f.apply() } + + implicit def callByNameLongAsCurrentValueCollector(f: ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f + } } /** diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala index 59b4b443..089dbeec 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -33,14 +33,6 @@ trait InstrumentSnapshot { def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot } -class InstrumentType private[kamon] (val id: Int) extends AnyVal -object InstrumentTypes { - val Histogram = new InstrumentType(1) - val MinMaxCounter = new InstrumentType(2) - val Gauge = new InstrumentType(3) - val Counter = new InstrumentType(4) -} - trait CollectionContext { def buffer: LongBuffer } @@ -51,3 +43,11 @@ object CollectionContext { } } +sealed trait InstrumentType + +object InstrumentTypes { + case object Histogram extends InstrumentType + case object MinMaxCounter extends InstrumentType + case object Gauge extends InstrumentType + case object Counter extends InstrumentType +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala index adb08713..4809ac0d 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.metric.instrument import akka.actor.{ Scheduler, Cancellable } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala index f2a061d1..c5a1b81a 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -16,17 +16,20 @@ package kamon.metric.instrument +/** + * A UnitOfMeasurement implementation describes the magnitude of a quantity being measured, such as Time and computer + * Memory space. Kamon uses UnitOfMeasurement implementations just as a informative companion to metrics inside entity + * recorders and might be used to scale certain kinds of measurements in metric backends. + */ trait UnitOfMeasurement { def name: String def label: String - def factor: Double } object UnitOfMeasurement { case object Unknown extends UnitOfMeasurement { val name = "unknown" val label = "unknown" - val factor = 1D } def isUnknown(uom: UnitOfMeasurement): Boolean = @@ -35,19 +38,18 @@ object UnitOfMeasurement { def isTime(uom: UnitOfMeasurement): Boolean = uom.isInstanceOf[Time] + def isMemory(uom: UnitOfMeasurement): Boolean = + uom.isInstanceOf[Memory] + } +/** + * UnitOfMeasurement representing time. + */ case class Time(factor: Double, label: String) extends UnitOfMeasurement { val name = "time" - /** - * Scale a value from this scale factor to a different scale factor. - * - * @param toUnit Time unit of the expected result. - * @param value Value to scale. - * @return Equivalent of value on the target time unit. - */ - def scale(toUnit: Time)(value: Long): Double = + def scale(toUnit: Time)(value: Double): Double = (value * factor) / toUnit.factor } @@ -58,8 +60,14 @@ object Time { val Seconds = Time(1, "s") } +/** + * UnitOfMeasurement representing computer memory space. + */ case class Memory(factor: Double, label: String) extends UnitOfMeasurement { val name = "bytes" + + def scale(toUnit: Memory)(value: Double): Double = + (value * factor) / toUnit.factor } object Memory { diff --git a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala deleted file mode 100644 index ddce63fb..00000000 --- a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.supervisor - -import akka.actor -import akka.actor._ -import kamon.Kamon -import kamon.supervisor.KamonSupervisor.CreateModule - -import scala.concurrent.{ Promise, Future } -import scala.util.Success - -object ModuleSupervisor extends ExtensionId[ModuleSupervisorExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = ModuleSupervisor - def createExtension(system: ExtendedActorSystem): ModuleSupervisorExtension = new ModuleSupervisorExtensionImpl(system) -} - -trait ModuleSupervisorExtension extends actor.Extension { - def createModule(name: String, props: Props): Future[ActorRef] -} - -class ModuleSupervisorExtensionImpl(system: ExtendedActorSystem) extends ModuleSupervisorExtension { - import system.dispatcher - - private val _settings = ModuleSupervisorSettings(system) - private val _supervisor = system.actorOf(KamonSupervisor.props(_settings, system.dynamicAccess), "kamon") - - def createModule(name: String, props: Props): Future[ActorRef] = Future {} flatMap { _: Unit ⇒ - val modulePromise = Promise[ActorRef]() - _supervisor ! CreateModule(name, props, modulePromise) - modulePromise.future - } -} - -class KamonSupervisor(settings: ModuleSupervisorSettings, dynamicAccess: DynamicAccess) extends Actor with ActorLogging { - - init() - - def receive = { - case CreateModule(name, props, childPromise) ⇒ createChildModule(name, props, childPromise) - } - - def createChildModule(name: String, props: Props, childPromise: Promise[ActorRef]): Unit = - context.child(name).map { alreadyAvailableModule ⇒ - log.warning("Received a request to create module [{}] but the module is already available, returning the existent instance.") - childPromise.complete(Success(alreadyAvailableModule)) - - } getOrElse (childPromise.complete(Success(context.actorOf(props, name)))) - - def init(): Unit = { - if (settings.modulesRequiringAspectJ.nonEmpty && !isAspectJPresent && settings.showAspectJMissingWarning) - logAspectJWeaverMissing(settings.modulesRequiringAspectJ) - - // Force initialization of all modules marked with auto-start. - settings.availableModules.filter(_.autoStart).foreach { module ⇒ - if (module.extensionClass == "none") - log.debug("Ignoring auto start of the [{}] module with no extension class.") - else - dynamicAccess.getObjectFor[ExtensionId[Kamon.Extension]](module.extensionClass).map { moduleID ⇒ - moduleID.get(context.system) - log.debug("Auto starting the [{}] module.", module.name) - - } recover { - case th: Throwable ⇒ log.error(th, "Failed to auto start the [{}] module.", module.name) - } - - } - } - - // When AspectJ is present the kamon.supervisor.AspectJPresent aspect will make this return true. - def isAspectJPresent: Boolean = false - - def logAspectJWeaverMissing(modulesRequiringAspectJ: List[AvailableModuleInfo]): Unit = { - val moduleNames = modulesRequiringAspectJ.map(_.name).mkString(", ") - val weaverMissingMessage = - """ - | - | ___ _ ___ _ _ ___ ___ _ _ - | / _ \ | | |_ | | | | | | \/ |(_) (_) - |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _ - || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` | - || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| | - |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, | - | | | __/ | - | |_| |___/ - | - | It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected - | the following modules which require AspecJ to work properly: - | - """.stripMargin + moduleNames + - """ - | - | If you need help on setting up the aspectj weaver go to http://kamon.io/introduction/get-started/ for more info. On the - | other hand, if you are sure that you do not need or do not want to use the weaver then you can disable this error message - | by changing the kamon.show-aspectj-missing-warning setting in your configuration file. - | - """.stripMargin - - log.error(weaverMissingMessage) - } - -} - -object KamonSupervisor { - case class CreateModule(name: String, props: Props, childPromise: Promise[ActorRef]) - - def props(settings: ModuleSupervisorSettings, dynamicAccess: DynamicAccess): Props = - Props(new KamonSupervisor(settings, dynamicAccess)) - -} - diff --git a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala deleted file mode 100644 index c04157aa..00000000 --- a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.supervisor - -import akka.actor.ActorSystem - -case class AvailableModuleInfo(name: String, extensionClass: String, requiresAspectJ: Boolean, autoStart: Boolean) -case class ModuleSupervisorSettings(showAspectJMissingWarning: Boolean, availableModules: List[AvailableModuleInfo]) { - val modulesRequiringAspectJ = availableModules.filter(_.requiresAspectJ) -} - -object ModuleSupervisorSettings { - - def apply(system: ActorSystem): ModuleSupervisorSettings = { - import kamon.util.ConfigTools.Syntax - - val config = system.settings.config.getConfig("kamon.modules") - val showAspectJMissingWarning = system.settings.config.getBoolean("kamon.show-aspectj-missing-warning") - - val modules = config.firstLevelKeys - val availableModules = modules.map { moduleName ⇒ - val moduleConfig = config.getConfig(moduleName) - - AvailableModuleInfo( - moduleName, - moduleConfig.getString("extension-id"), - moduleConfig.getBoolean("requires-aspectj"), - moduleConfig.getBoolean("auto-start")) - - } toList - - ModuleSupervisorSettings(showAspectJMissingWarning, availableModules) - } - -} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index 5f7fdff5..17be661b 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -19,13 +19,14 @@ package kamon.trace import java.util.concurrent.ConcurrentLinkedQueue import akka.event.LoggingAdapter -import kamon.metric.{ MetricsExtension, TraceMetrics } +import kamon.Kamon +import kamon.metric.{ SegmentMetrics, MetricsModule, TraceMetrics } import kamon.util.{ NanoInterval, RelativeNanoTimestamp } import scala.annotation.tailrec private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension) + val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter) extends TraceContext { @volatile private var _name = traceName @@ -51,20 +52,23 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz val traceElapsedTime = NanoInterval.since(startTimestamp) _elapsedTime = traceElapsedTime - metricsExtension.register(TraceMetrics, name).map { registration ⇒ - registration.recorder.ElapsedTime.record(traceElapsedTime.nanos) - drainFinishedSegments(registration.recorder) - } + Kamon.metrics.entity(TraceMetrics, name).elapsedTime.record(traceElapsedTime.nanos) + drainFinishedSegments() } def startSegment(segmentName: String, category: String, library: String): Segment = new MetricsOnlySegment(segmentName, category, library) - @tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = { + @tailrec private def drainFinishedSegments(): Unit = { val segment = _finishedSegments.poll() if (segment != null) { - recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos) - drainFinishedSegments(recorder) + val segmentTags = Map( + "trace" -> name, + "category" -> segment.category, + "library" -> segment.library) + + Kamon.metrics.entity(SegmentMetrics, segment.name, segmentTags).elapsedTime.record(segment.duration.nanos) + drainFinishedSegments() } } @@ -72,9 +76,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration)) if (isClosed) { - metricsExtension.register(TraceMetrics, name).map { registration ⇒ - drainFinishedSegments(registration.recorder) - } + drainFinishedSegments() } } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 48e56153..85b7396f 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -18,7 +18,9 @@ package kamon.trace import java.io.ObjectStreamException import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.util.RelativeNanoTimestamp +import kamon.util.{ SameThreadExecutionContext, Supplier, Function, RelativeNanoTimestamp } + +import scala.concurrent.Future trait TraceContext { def name: String @@ -35,34 +37,31 @@ trait TraceContext { def addMetadata(key: String, value: String) def startTimestamp: RelativeNanoTimestamp -} - -object TraceContext { - private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { - override def initialValue(): TraceContext = EmptyTraceContext - } - - def currentContext: TraceContext = - _traceContextStorage.get() - def setCurrentContext(context: TraceContext): Unit = - _traceContextStorage.set(context) - - def clearCurrentContext: Unit = - _traceContextStorage.remove() + def collect[T](f: TraceContext ⇒ T): Option[T] = + if (nonEmpty) + Some(f(this)) + else None - def withContext[T](context: TraceContext)(code: ⇒ T): T = { - val oldContext = _traceContextStorage.get() - _traceContextStorage.set(context) + def collect[T](f: Function[TraceContext, T]): Option[T] = + if (nonEmpty) + Some(f(this)) + else None - try code finally _traceContextStorage.set(oldContext) + def withNewSegment[T](segmentName: String, category: String, library: String)(code: ⇒ T): T = { + val segment = startSegment(segmentName, category, library) + try code finally segment.finish() } - def map[T](f: TraceContext ⇒ T): Option[T] = { - val current = currentContext - if (current.nonEmpty) - Some(f(current)) - else None + // Java variant. + def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = + withNewSegment(segmentName, category, library)(code.get) + + def withNewAsyncSegment[T](segmentName: String, category: String, library: String)(code: ⇒ Future[T]): Future[T] = { + val segment = startSegment(segmentName, category, library) + val result = code + code.onComplete(_ ⇒ segment.finish())(SameThreadExecutionContext) + result } } @@ -132,7 +131,7 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - @transient val traceContext = TraceContext.currentContext + @transient val traceContext = Tracer.currentContext // // Beware of this hack, it might bite us in the future! diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index 057f564e..0a0a120a 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -42,12 +42,12 @@ object TraceLocal { object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } - def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceContext.currentContext match { + def store(key: TraceLocalKey)(value: key.ValueType): Unit = Tracer.currentContext match { case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) case EmptyTraceContext ⇒ // Can't store in the empty context. } - def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceContext.currentContext match { + def retrieve(key: TraceLocalKey): Option[key.ValueType] = Tracer.currentContext match { case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala deleted file mode 100644 index be565154..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.net.InetAddress -import java.util.concurrent.atomic.AtomicLong - -import akka.actor._ -import com.typesafe.config.Config -import kamon.metric.MetricsExtension -import kamon.util._ - -import scala.util.Try - -trait TracerExtension { - def newContext(name: String): TraceContext - def newContext(name: String, token: String): TraceContext - def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext - - def subscribe(subscriber: ActorRef): Unit - def unsubscribe(subscriber: ActorRef): Unit -} - -private[kamon] class TracerExtensionImpl(metricsExtension: MetricsExtension, config: Config) extends TracerExtension { - private val _settings = TraceSettings(config) - private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - private val _tokenCounter = new AtomicLong - - private val _subscriptions = new LazyActorRef - private val _incubator = new LazyActorRef - - private def newToken: String = - _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) - - def newContext(name: String): TraceContext = - createTraceContext(name) - - def newContext(name: String, token: String): TraceContext = - createTraceContext(name, token) - - def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = - createTraceContext(name, token, timestamp, isOpen, isLocal) - - private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, - isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = { - - def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, metricsExtension) - - if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal) - newMetricsOnlyContext - else { - if (!_settings.sampler.shouldTrace) - newMetricsOnlyContext - else - new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, metricsExtension, this, dispatchTracingContext) - } - } - - def subscribe(subscriber: ActorRef): Unit = - _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber)) - - def unsubscribe(subscriber: ActorRef): Unit = - _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber)) - - private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = - if (_settings.sampler.shouldReport(trace.elapsedTime)) - if (trace.shouldIncubate) - _incubator.tell(trace) - else - _subscriptions.tell(trace.generateTraceInfo) - - /** - * Tracer Extension initialization. - */ - private var _system: ActorSystem = null - private lazy val _start = { - val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") - _subscriptions.point(subscriptions) - _incubator.point(_system.actorOf(Incubator.props(subscriptions))) - } - - def start(system: ActorSystem): Unit = synchronized { - _system = system - _start - _system = null - } -} - -private[kamon] object TracerExtensionImpl { - - def apply(metricsExtension: MetricsExtension, config: Config) = - new TracerExtensionImpl(metricsExtension, config) -} - -case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) -case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String])
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala new file mode 100644 index 00000000..416af20e --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala @@ -0,0 +1,172 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.InetAddress +import java.util.concurrent.atomic.AtomicLong + +import akka.actor._ +import com.typesafe.config.Config +import kamon.Kamon +import kamon.metric.MetricsModule +import kamon.util._ + +import scala.util.Try + +trait TracerModule { + def newContext(name: String): TraceContext + def newContext(name: String, token: Option[String]): TraceContext + def newContext(name: String, token: Option[String], timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext + + def subscribe(subscriber: ActorRef): Unit + def unsubscribe(subscriber: ActorRef): Unit +} + +object Tracer { + private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { + override def initialValue(): TraceContext = EmptyTraceContext + } + + def currentContext: TraceContext = + _traceContextStorage.get() + + def setCurrentContext(context: TraceContext): Unit = + _traceContextStorage.set(context) + + def clearCurrentContext: Unit = + _traceContextStorage.remove() + + def withContext[T](context: TraceContext)(code: ⇒ T): T = { + val oldContext = _traceContextStorage.get() + _traceContextStorage.set(context) + + try code finally _traceContextStorage.set(oldContext) + } + + // Java variant. + def withContext[T](context: TraceContext, code: Supplier[T]): T = + withContext(context)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], autoFinish: Boolean)(code: ⇒ T): T = { + withContext(Kamon.tracer.newContext(traceName, traceToken)) { + val codeResult = code + if (autoFinish) + currentContext.finish() + + codeResult + } + } + + def withNewContext[T](traceName: String)(code: ⇒ T): T = + withNewContext(traceName, None, false)(code) + + def withNewContext[T](traceName: String, traceToken: Option[String])(code: ⇒ T): T = + withNewContext(traceName, traceToken, false)(code) + + def withNewContext[T](traceName: String, autoFinish: Boolean)(code: ⇒ T): T = + withNewContext(traceName, None, autoFinish)(code) + + // Java variants. + def withNewContext[T](traceName: String, traceToken: Option[String], autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, traceToken, autoFinish)(code.get) + + def withNewContext[T](traceName: String, code: Supplier[T]): T = + withNewContext(traceName, None, false)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], code: Supplier[T]): T = + withNewContext(traceName, traceToken, false)(code.get) + + def withNewContext[T](traceName: String, autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, None, autoFinish)(code.get) +} + +private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: Config) extends TracerModule { + private val _settings = TraceSettings(config) + private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + private val _tokenCounter = new AtomicLong + + private val _subscriptions = new LazyActorRef + private val _incubator = new LazyActorRef + + private def newToken: String = + _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) + + def newContext(name: String): TraceContext = + createTraceContext(name, None) + + def newContext(name: String, token: Option[String]): TraceContext = + createTraceContext(name, token) + + def newContext(name: String, token: Option[String], timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = + createTraceContext(name, token, timestamp, isOpen, isLocal) + + private def createTraceContext(traceName: String, token: Option[String], startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, + isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = { + + def newMetricsOnlyContext(token: String): TraceContext = + new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null) + + val traceToken = token.getOrElse(newToken) + + if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal) + newMetricsOnlyContext(traceToken) + else { + if (!_settings.sampler.shouldTrace) + newMetricsOnlyContext(traceToken) + else + new TracingContext(traceName, traceToken, true, _settings.levelOfDetail, isLocal, startTimestamp, null, dispatchTracingContext) + } + } + + def subscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber)) + + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (_settings.sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + _incubator.tell(trace) + else + _subscriptions.tell(trace.generateTraceInfo) + + /** + * Tracer Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + _subscriptions.point(subscriptions) + _incubator.point(_system.actorOf(Incubator.props(subscriptions))) + } + + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null + } +} + +private[kamon] object TracerModuleImpl { + + def apply(metricsExtension: MetricsModule, config: Config) = + new TracerModuleImpl(metricsExtension, config) +} + +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String])
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala index 3d324886..9269a99e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -21,14 +21,13 @@ import java.util.concurrent.atomic.AtomicInteger import akka.event.LoggingAdapter import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp } -import kamon.metric.MetricsExtension +import kamon.metric.MetricsModule import scala.collection.concurrent.TrieMap private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, - isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, - traceExtension: TracerExtensionImpl, traceInfoSink: TracingContext ⇒ Unit) - extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension) { + isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, traceInfoSink: TracingContext ⇒ Unit) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log) { private val _openSegments = new AtomicInteger(0) private val _startTimestamp = NanoTimestamp.now diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index 961c3099..8177ed14 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -17,14 +17,10 @@ package kamon.trace.logging import ch.qos.logback.classic.pattern.ClassicConverter import ch.qos.logback.classic.spi.ILoggingEvent -import kamon.trace.TraceContext +import kamon.trace.Tracer class LogbackTraceTokenConverter extends ClassicConverter { - def convert(event: ILoggingEvent): String = { - val ctx = TraceContext.currentContext - if (ctx.isEmpty) - "undefined" - else - ctx.token - } + + def convert(event: ILoggingEvent): String = + Tracer.currentContext.collect(_.token).getOrElse("undefined") } diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala index 4970d97e..abe56cd3 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -17,17 +17,21 @@ package kamon.trace.logging import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext } +import kamon.trace.{ Tracer, EmptyTraceContext, MetricsOnlyContext, TraceContext } +import kamon.util.Supplier import org.slf4j.MDC trait MdcKeysSupport { def withMdc[A](thunk: ⇒ A): A = { - val keys = copyToMdc(TraceContext.currentContext) + val keys = copyToMdc(Tracer.currentContext) try thunk finally keys.foreach(key ⇒ MDC.remove(key)) } + // Java variant. + def withMdc[A](thunk: Supplier[A]): A = withMdc(thunk.get) + private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.underlyingStorage.collect { @@ -37,3 +41,5 @@ trait MdcKeysSupport { case EmptyTraceContext ⇒ Iterable.empty[String] } } + +object MdcKeysSupport extends MdcKeysSupport
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala b/kamon-core/src/main/scala/kamon/util/FunctionalInterfaces.scala index 0df9539f..8dab6519 100644 --- a/kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala +++ b/kamon-core/src/main/scala/kamon/util/FunctionalInterfaces.scala @@ -14,18 +14,12 @@ * ========================================================================================= */ -package kamon.supervisor - -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } - -@Aspect -class AspectJPresent { - - @Pointcut("execution(* kamon.supervisor.KamonSupervisor.isAspectJPresent())") - def isAspectJPresentAtModuleSupervisor(): Unit = {} - - @Around("isAspectJPresentAtModuleSupervisor()") - def aroundIsAspectJPresentAtModuleSupervisor(pjp: ProceedingJoinPoint): Boolean = true +package kamon.util +trait Supplier[T] { + def get: T } + +trait Function[T, R] { + def apply(t: T): R +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/util/JavaTags.scala b/kamon-core/src/main/scala/kamon/util/JavaTags.scala new file mode 100644 index 00000000..90bece5c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/JavaTags.scala @@ -0,0 +1,14 @@ +package kamon.util + +object JavaTags { + + /** + * Helper method to transform Java maps into Scala maps. Typically this will be used as a static import from + * Java code that wants to use tags, as tags are defined as scala.collection.mutable.Map[String, String] and + * creating them directly from Java is quite verbose. + */ + def tagsFromMap(tags: java.util.Map[String, String]): Map[String, String] = { + import scala.collection.JavaConversions._ + tags.toMap + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala b/kamon-core/src/main/scala/kamon/util/Latency.scala index 4dc5ff41..52e044f8 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala +++ b/kamon-core/src/main/scala/kamon/util/Latency.scala @@ -14,20 +14,16 @@ * ========================================================================================= */ -package kamon.instrumentation +package kamon.util -import _root_.akka.event.EventStream -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect } +import kamon.metric.instrument.Histogram -@Aspect -class AspectJWeaverMissingWarning { - - @Pointcut("execution(* kamon.metric.MetricsExtension.printInitializationMessage(..)) && args(eventStream, *)") - def printInitializationMessage(eventStream: EventStream): Unit = {} - - @Around("printInitializationMessage(eventStream)") - def aroundPrintInitializationMessage(pjp: ProceedingJoinPoint, eventStream: EventStream): Unit = { - pjp.proceed(Array[AnyRef](eventStream, Boolean.box(true))) +object Latency { + def measure[A](histogram: Histogram)(thunk: ⇒ A): A = { + val start = RelativeNanoTimestamp.now + try thunk finally { + val latency = NanoInterval.since(start).nanos + histogram.record(latency) + } } } diff --git a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala index 855bf1fc..e2fb747a 100644 --- a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala +++ b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.util import java.util diff --git a/kamon-core/src/main/scala/kamon/util/MapMerge.scala b/kamon-core/src/main/scala/kamon/util/MapMerge.scala index 64b4f7ae..8573358b 100644 --- a/kamon-core/src/main/scala/kamon/util/MapMerge.scala +++ b/kamon-core/src/main/scala/kamon/util/MapMerge.scala @@ -19,7 +19,7 @@ package kamon.util object MapMerge { /** - * Merge to immutable maps with the same key and value types, using the provided valueMerge function. + * Merge two immutable maps with the same key and value types, using the provided valueMerge function. */ implicit class Syntax[K, V](val map: Map[K, V]) extends AnyVal { def merge(that: Map[K, V], valueMerge: (V, V) ⇒ V): Map[K, V] = { diff --git a/kamon-core/src/main/scala/kamon/util/SameThreadExecutionContext.scala b/kamon-core/src/main/scala/kamon/util/SameThreadExecutionContext.scala new file mode 100644 index 00000000..2aae526f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/SameThreadExecutionContext.scala @@ -0,0 +1,30 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import scala.concurrent.ExecutionContext +import org.slf4j.LoggerFactory + +/** + * For small code blocks that don't need to be run on a separate thread. + */ +object SameThreadExecutionContext extends ExecutionContext { + val logger = LoggerFactory.getLogger("SameThreadExecutionContext") + + override def execute(runnable: Runnable): Unit = runnable.run + override def reportFailure(t: Throwable): Unit = logger.error(t.getMessage, t) +} |