diff options
24 files changed, 794 insertions, 107 deletions
diff --git a/kamon-autoweave/src/main/resources/reference.conf b/kamon-autoweave/src/main/resources/reference.conf new file mode 100644 index 00000000..c47b1a5d --- /dev/null +++ b/kamon-autoweave/src/main/resources/reference.conf @@ -0,0 +1,12 @@ +# ====================================== # +# Kamon-Autowave Reference Configuration # +# ====================================== # + +kamon { + autowave { + options { + verbose = false + showWeaveInfo = false + } + } +}
\ No newline at end of file diff --git a/kamon-autoweave/src/main/scala/kamon/autoweave/Autoweave.scala b/kamon-autoweave/src/main/scala/kamon/autoweave/Autoweave.scala new file mode 100644 index 00000000..58111b48 --- /dev/null +++ b/kamon-autoweave/src/main/scala/kamon/autoweave/Autoweave.scala @@ -0,0 +1,31 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.autoweave + +import kamon.Kamon +import kamon.autoweave.loader.AgentLoader +import org.aspectj.weaver.loadtime.Agent + +object Autoweave { + val config = Kamon.config.getConfig("kamon.autowave.options") + val verbose = config.getBoolean("verbose") + val showWeaveInfo = config.getBoolean("showWeaveInfo") + + System.setProperty("aj.weaving.verbose", verbose.toString) + System.setProperty("org.aspectj.weaver.showWeaveInfo", showWeaveInfo.toString) + + AgentLoader.attachAgentToJVM(classOf[Agent]) +} diff --git a/kamon-autoweave/src/main/scala/kamon/autoweave/loader/AgentLoader.scala b/kamon-autoweave/src/main/scala/kamon/autoweave/loader/AgentLoader.scala new file mode 100644 index 00000000..2f26a39d --- /dev/null +++ b/kamon-autoweave/src/main/scala/kamon/autoweave/loader/AgentLoader.scala @@ -0,0 +1,123 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.autoweave.loader + +import java.io.{ File, FileOutputStream, InputStream } +import java.lang.management.ManagementFactory +import java.util.jar.Attributes.Name +import java.util.jar.{ JarEntry, JarOutputStream, Manifest } + +import scala.util.control.NoStackTrace + +object AgentLoader { + + /** + * Gets the current JVM PID + * + * @return Returns the PID + */ + + private def getPidFromRuntimeMBean: String = { + val jvm = ManagementFactory.getRuntimeMXBean.getName + val pid = jvm.substring(0, jvm.indexOf('@')) + pid + } + + /** + * Loads an agent into a JVM. + * + * @param agent The main agent class. + * @param resources Array of classes to be included with agent. + */ + def attachAgentToJVM(agent: Class[_], resources: Seq[Class[_]] = Seq.empty): Unit = attachToRunningJVM(agent, resources) + + /** + * Java variant + * + * @param agent + */ + def attachAgentToJVM(agent: Class[_]): Unit = attachAgentToJVM(agent, Seq.empty) + + /** + * Generates a temporary agent file to be loaded. + * + * @param agent The main agent class. + * @param resources Array of classes to be included with agent. + * @return Returns a temporary jar file with the specified classes included. + */ + private def generateAgentJar(agent: Class[_], resources: Seq[Class[_]]): File = { + val jarFile = File.createTempFile("agent", ".jar") + jarFile.deleteOnExit() + + val manifest = new Manifest() + val mainAttributes = manifest.getMainAttributes + // Create manifest stating that agent is allowed to transform classes + mainAttributes.put(Name.MANIFEST_VERSION, "1.0") + mainAttributes.put(new Name("Agent-Class"), agent.getName) + mainAttributes.put(new Name("Can-Retransform-Classes"), "true") + mainAttributes.put(new Name("Can-Redefine-Classes"), "true") + mainAttributes.put(new Name("Can-Set-Native-Method-Prefix"), "true") + + val jos = new JarOutputStream(new FileOutputStream(jarFile), manifest) + + jos.putNextEntry(new JarEntry(agent.getName.replace('.', '/') + ".class")) + + jos.write(getBytesFromStream(agent.getClassLoader.getResourceAsStream(unqualify(agent)))) + jos.closeEntry() + + for (clazz ← resources) { + val name = unqualify(clazz) + jos.putNextEntry(new JarEntry(name)) + jos.write(getBytesFromStream(clazz.getClassLoader.getResourceAsStream(name))) + jos.closeEntry() + } + + jos.close() + jarFile + } + + /** + * Attach to the running JVM. + * + * @return + * Returns the attached VirtualMachine + */ + private def attachToRunningJVM(agent: Class[_], resources: Seq[Class[_]]): Unit = { + AttachmentProviders.resolve() match { + case Some(virtualMachine) ⇒ + val virtualMachineInstance = virtualMachine.getDeclaredMethod("attach", classOf[String]).invoke(null, getPidFromRuntimeMBean) + virtualMachine.getDeclaredMethod("loadAgent", classOf[String], classOf[String]) + .invoke(virtualMachineInstance, generateAgentJar(agent, resources).getAbsolutePath, "") + virtualMachine.getDeclaredMethod("detach").invoke(virtualMachineInstance) + case None ⇒ throw new RuntimeException(s"Error trying to use Attach API") with NoStackTrace + } + } + + /** + * Gets bytes from InputStream. + * + * @param stream + * The InputStream. + * @return + * Returns a byte[] representation of given stream. + */ + private def getBytesFromStream(stream: InputStream): Array[Byte] = { + Stream.continually(stream.read).takeWhile(_ != -1).map(_.toByte).toArray + } + + private def unqualify(clazz: Class[_]): String = clazz.getName.replace('.', '/') + ".class" +} + diff --git a/kamon-autoweave/src/main/scala/kamon/autoweave/loader/AttachmentProviders.scala b/kamon-autoweave/src/main/scala/kamon/autoweave/loader/AttachmentProviders.scala new file mode 100644 index 00000000..40f58502 --- /dev/null +++ b/kamon-autoweave/src/main/scala/kamon/autoweave/loader/AttachmentProviders.scala @@ -0,0 +1,69 @@ +/* ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.autoweave.loader + +import java.io.File +import java.net.{ URL, URLClassLoader } +import java.security.{ PrivilegedAction, AccessController } + +object AttachmentProviders extends { + + val VirtualMachineTyeName = "com.sun.tools.attach.VirtualMachine" + + sealed trait AttachmentProvider { + def toolsJarPath: String + + /** + * Gets the current HotSpotVirtualMachine implementation otherwise a None. + * + * @return + * Returns the HotSpotVirtualMachine implementation of the running JVM. + */ + def resolve(): Option[Class[_]] = { + val toolsJar = new File(System.getProperty("java.home").replace('\\', '/') + "/../" + toolsJarPath) + if (toolsJar.isFile && toolsJar.canRead) + Some(AccessController.doPrivileged(new ClassLoaderCreationAction(toolsJar)).loadClass(VirtualMachineTyeName)) + else None + } + } + + case object JVM extends AttachmentProvider { val toolsJarPath = "../lib/tools.jar" } + case object JDK extends AttachmentProvider { val toolsJarPath = "lib/tools.jar" } + case object MAC extends AttachmentProvider { val toolsJarPath = "../Classes/classes.jar" } + + private val providers = Seq(JVM, JDK, MAC) + + private final class ClassLoaderCreationAction(toolsJar: File) extends PrivilegedAction[ClassLoader] { + override def run(): ClassLoader = new URLClassLoader(Array[URL](toolsJar.toURI.toURL), null) + } + + def resolve(): Option[Class[_]] = { + import scala.util.control.Breaks._ + + var vmClazz: Option[Class[_]] = None + + breakable { + for (provider ← providers) { + val vmClass = provider.resolve() + if (vmClass.isDefined) { + vmClazz = vmClass + break + } + } + } + vmClazz + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index e6b93459..b5ea0c28 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -16,13 +16,14 @@ package kamon import _root_.akka.actor import _root_.akka.actor._ +import _root_.scala.util.{ Success, Failure, Try } import com.typesafe.config.{ Config, ConfigFactory } import kamon.metric._ import kamon.trace.TracerModuleImpl import kamon.util.logger.LazyLogger object Kamon { - private val log = LazyLogger(getClass) + private val log = LazyLogger("Kamon") trait Extension extends actor.Extension @@ -40,6 +41,8 @@ object Kamon { log.info("Initializing Kamon...") + tryLoadAutoweaveModule() + ActorSystem("kamon", patchedConfig) } @@ -52,10 +55,22 @@ object Kamon { def start(): Unit = _start def shutdown(): Unit = { - // TODO: Define what a proper shutdown should be like. _system.shutdown() } + private def tryLoadAutoweaveModule(): Unit = { + val color = (msg: String) ⇒ s"""\u001B[32m${msg}\u001B[0m""" + + log.info("Trying to load kamon-autoweave...") + + Try(Class.forName("kamon.autoweave.Autoweave$")) match { + case Success(_) ⇒ + log.info(color("Kamon-autoweave has been successfully loaded.")) + log.info(color("The AspectJ loadtime weaving agent is now attached to the JVM (you don't need to use -javaagent).")) + case Failure(reason) ⇒ log.debug(s"Kamon-autoweave failed to load. Reason: we have not found the ${reason.getMessage} class in the classpath.") + } + } + private def resolveConfiguration: Config = { val defaultConfig = ConfigFactory.load() diff --git a/kamon-core/src/main/scala/kamon/ModuleLoader.scala b/kamon-core/src/main/scala/kamon/ModuleLoader.scala index 55874a33..b594d4cf 100644 --- a/kamon-core/src/main/scala/kamon/ModuleLoader.scala +++ b/kamon-core/src/main/scala/kamon/ModuleLoader.scala @@ -20,7 +20,7 @@ import _root_.akka.actor import _root_.akka.actor._ import kamon.util.logger.LazyLogger import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.{Around, Aspect, Pointcut} +import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } private[kamon] object ModuleLoader extends ExtensionId[ModuleLoaderExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = ModuleLoader diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala index e1e89b79..c1392d4d 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -95,7 +95,7 @@ case class GaugeRecorder(key: MetricKey, instrument: Gauge) extends SingleInstru /** * Base class with plenty of utility methods to facilitate the creation of [[EntityRecorder]] implementations. - * It is not required to use this base class for defining custom a custom [[EntityRecorder]], but it is certainly + * It is not required to use this base class for defining a custom [[EntityRecorder]], but it is certainly * the most convenient way to do it and the preferred approach throughout the Kamon codebase. */ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder { diff --git a/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala b/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala new file mode 100644 index 00000000..e096429d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala @@ -0,0 +1,55 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.{ Actor, ActorRef, Props } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument._ + +/** + * Can be used as a decorator to scale TickMetricSnapshot messages to given `timeUnits` and/or `memoryUnits` + * before forwarding to original receiver + * @param timeUnits Optional time units to scale time metrics to + * @param memoryUnits Optional memory units to scale memory metrics to + * @param receiver Receiver of scaled metrics snapshot, usually a backend sender + */ +class MetricScaleDecorator(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef) extends Actor { + require(timeUnits.isDefined || memoryUnits.isDefined, + "Use MetricScaleDecorator only when any of units is defined") + + override def receive: Receive = { + case tick: TickMetricSnapshot ⇒ + val scaled = tick.copy(metrics = tick.metrics.mapValues { entitySnapshot ⇒ + new DefaultEntitySnapshot(entitySnapshot.metrics.map { + case (metricKey, metricSnapshot) ⇒ + val scaledSnapshot = (metricKey.unitOfMeasurement, timeUnits, memoryUnits) match { + case (time: Time, Some(to), _) ⇒ metricSnapshot.scale(time, to) + case (memory: Memory, _, Some(to)) ⇒ metricSnapshot.scale(memory, to) + case _ ⇒ metricSnapshot + } + metricKey -> scaledSnapshot + }) + }) + receiver forward scaled + } +} + +object MetricScaleDecorator { + def props(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef): Props = + Props(new MetricScaleDecorator(timeUnits, memoryUnits, receiver)) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index c1b69cbe..349a12bd 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -33,6 +33,7 @@ object Counter { trait Snapshot extends InstrumentSnapshot { def count: Long def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot + def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot } } @@ -57,4 +58,8 @@ case class CounterSnapshot(count: Long) extends Counter.Snapshot { case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount) case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.") } + + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot = + CounterSnapshot(from.tryScale(to)(count).toLong) + }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index 5c4c7f71..dc9a4bbf 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -17,9 +17,9 @@ package kamon.metric.instrument import java.nio.LongBuffer -import org.HdrHistogram.AtomicHistogramFieldsAccessor -import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange } -import org.HdrHistogram.AtomicHistogram + +import kamon.metric.instrument.Histogram.{ DynamicRange, Snapshot } +import org.HdrHistogram.ModifiedAtomicHistogram trait Histogram extends Instrument { type SnapshotType = Histogram.Snapshot @@ -54,11 +54,9 @@ object Histogram { * @param lowestDiscernibleValue * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that * is >= 1. May be internally rounded down to nearest power of 2. - * * @param highestTrackableValue * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). * Must not be larger than (Long.MAX_VALUE/2). - * * @param precision * The number of significant decimal digits to which the histogram will maintain value resolution and separation. * Must be a non-negative integer between 1 and 3. @@ -87,6 +85,39 @@ object Histogram { def recordsIterator: Iterator[Record] def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot + + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = + new ScaledSnapshot(from, to, this) + } + + class ScaledSnapshot(from: UnitOfMeasurement, to: UnitOfMeasurement, snapshot: Snapshot) extends Snapshot { + private def doScale(v: Long) = from.tryScale(to)(v).toLong + override def numberOfMeasurements: Long = snapshot.numberOfMeasurements + + override def max: Long = doScale(snapshot.max) + + override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = snapshot.merge(that, context) + + override def merge(that: Snapshot, context: CollectionContext): Snapshot = snapshot.merge(that, context) + + override def percentile(percentile: Double): Long = doScale(snapshot.percentile(percentile)) + + override def min: Long = doScale(snapshot.min) + + override def sum: Long = doScale(snapshot.sum) + + override def recordsIterator: Iterator[Record] = { + snapshot.recordsIterator.map(record ⇒ new Record { + override def count: Long = record.count + + override def level: Long = doScale(record.level) + + override private[kamon] def rawCompactRecord: Long = record.rawCompactRecord + }) + } + + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = + if (this.from == from && this.to == to) this else super.scale(from, to) } object Snapshot { @@ -99,6 +130,7 @@ object Histogram { override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that override def numberOfMeasurements: Long = 0L + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = this } } } @@ -108,9 +140,8 @@ object Histogram { * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue, - dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor { - import AtomicHistogramFieldsAccessor.totalCountUpdater +class HdrHistogram(dynamicRange: DynamicRange) extends ModifiedAtomicHistogram(dynamicRange.lowestDiscernibleValue, + dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram { def record(value: Long): Unit = recordValue(value) @@ -125,7 +156,7 @@ class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRa val measurementsArray = Array.ofDim[Long](buffer.limit()) buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) } def getCounts = countsArray().length() @@ -148,22 +179,8 @@ class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRa index += 1 } - - reestablishTotalCount(nrOfMeasurements) nrOfMeasurements } - - private def reestablishTotalCount(diff: Long): Unit = { - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = totalCountUpdater.get(this) - val newTotalCount = previousTotalCount - diff - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - } - } case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala index 089dbeec..2c4b4319 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -18,10 +18,6 @@ package kamon.metric.instrument import java.nio.LongBuffer -import akka.actor.{ Scheduler, Cancellable } -import akka.dispatch.MessageDispatcher -import scala.concurrent.duration.FiniteDuration - private[kamon] trait Instrument { type SnapshotType <: InstrumentSnapshot @@ -31,6 +27,8 @@ private[kamon] trait Instrument { trait InstrumentSnapshot { def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot + + def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): InstrumentSnapshot } trait CollectionContext { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala index 4423964a..f7516262 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala @@ -1,7 +1,5 @@ package kamon.metric.instrument -import java.util.concurrent.TimeUnit - import com.typesafe.config.Config import kamon.metric.instrument.Histogram.DynamicRange diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala index e79090a8..eb01d114 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala @@ -16,20 +16,16 @@ package org.HdrHistogram -import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } +import java.util.concurrent.atomic.AtomicLongArray -trait AtomicHistogramFieldsAccessor { - self: AtomicHistogram ⇒ +abstract class ModifiedAtomicHistogram(low: Long, high: Long, precision: Int) + extends AtomicHistogram(low, high, precision) { self ⇒ - def countsArray(): AtomicLongArray = self.counts + override def incrementTotalCount(): Unit = {} + override def addToTotalCount(value: Long): Unit = {} - def unitMagnitude(): Int = self.unitMagnitude - - def subBucketHalfCount(): Int = self.subBucketHalfCount - - def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude -} - -object AtomicHistogramFieldsAccessor { - def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater -} + def countsArray(): AtomicLongArray = counts + def protectedUnitMagnitude(): Int = unitMagnitude + def protectedSubBucketHalfCount(): Int = subBucketHalfCount + def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala index c5a1b81a..5952b906 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -22,14 +22,27 @@ package kamon.metric.instrument * recorders and might be used to scale certain kinds of measurements in metric backends. */ trait UnitOfMeasurement { + type U <: UnitOfMeasurement + def name: String def label: String + def scale(toUnit: U)(value: Double): Double = value + + def tryScale(toUnit: UnitOfMeasurement)(value: Double): Double = + if (canScale(toUnit)) scale(toUnit.asInstanceOf[U])(value) + else throw new IllegalArgumentException(s"Can't scale different types of units `$name` and `${toUnit.name}`") + + protected def canScale(toUnit: UnitOfMeasurement): Boolean + } object UnitOfMeasurement { case object Unknown extends UnitOfMeasurement { + override type U = Unknown.type val name = "unknown" val label = "unknown" + + override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isUnknown(toUnit) } def isUnknown(uom: UnitOfMeasurement): Boolean = @@ -47,10 +60,13 @@ object UnitOfMeasurement { * UnitOfMeasurement representing time. */ case class Time(factor: Double, label: String) extends UnitOfMeasurement { + override type U = Time val name = "time" - def scale(toUnit: Time)(value: Double): Double = + override def scale(toUnit: Time)(value: Double): Double = (value * factor) / toUnit.factor + + override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isTime(toUnit) } object Time { @@ -58,22 +74,36 @@ object Time { val Microseconds = Time(1E-6, "µs") val Milliseconds = Time(1E-3, "ms") val Seconds = Time(1, "s") + + val units = List(Nanoseconds, Microseconds, Milliseconds, Seconds) + + def apply(time: String): Time = units.find(_.label.toLowerCase == time.toLowerCase) getOrElse { + throw new IllegalArgumentException(s"Can't recognize time unit '$time'") + } } /** * UnitOfMeasurement representing computer memory space. */ case class Memory(factor: Double, label: String) extends UnitOfMeasurement { + override type U = Memory val name = "bytes" - def scale(toUnit: Memory)(value: Double): Double = + override def scale(toUnit: Memory)(value: Double): Double = (value * factor) / toUnit.factor + + override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isMemory(toUnit) } object Memory { val Bytes = Memory(1, "b") val KiloBytes = Memory(1024, "Kb") - val MegaBytes = Memory(1024E2, "Mb") - val GigaBytes = Memory(1024E3, "Gb") -} + val MegaBytes = Memory(1024 * 1024, "Mb") + val GigaBytes = Memory(1024 * 1024 * 1024, "Gb") + val units = List(Bytes, KiloBytes, MegaBytes, GigaBytes) + + def apply(memory: String): Memory = units.find(_.label.toLowerCase == memory.toLowerCase) getOrElse { + throw new IllegalArgumentException(s"Can't recognize memory unit '$memory'") + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index 869bcc12..c044719c 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -52,7 +52,8 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz val traceElapsedTime = NanoInterval.since(startTimestamp) _elapsedTime = traceElapsedTime - Kamon.metrics.entity(TraceMetrics, name).elapsedTime.record(traceElapsedTime.nanos) + if (Kamon.metrics.shouldTrack(name, TraceMetrics.category)) + Kamon.metrics.entity(TraceMetrics, name).elapsedTime.record(traceElapsedTime.nanos) drainFinishedSegments() } @@ -67,7 +68,8 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz "category" -> segment.category, "library" -> segment.library) - Kamon.metrics.entity(SegmentMetrics, segment.name, segmentTags).elapsedTime.record(segment.duration.nanos) + if (Kamon.metrics.shouldTrack(segment.name, SegmentMetrics.category)) + Kamon.metrics.entity(SegmentMetrics, segment.name, segmentTags).elapsedTime.record(segment.duration.nanos) drainFinishedSegments() } } diff --git a/kamon-core/src/main/scala/kamon/util/ConfigTools.scala b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala index bcec22c3..d0665764 100644 --- a/kamon-core/src/main/scala/kamon/util/ConfigTools.scala +++ b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala @@ -22,6 +22,8 @@ import com.typesafe.config.Config import scala.concurrent.duration.FiniteDuration +import kamon.metric.instrument.{ Memory, Time } + object ConfigTools { implicit class Syntax(val config: Config) extends AnyVal { // We are using the deprecated .getNanoseconds option to keep Kamon source code compatible with @@ -37,6 +39,10 @@ object ConfigTools { case entry ⇒ entry.getKey.takeWhile(_ != '.') } toSet } + + def time(path: String): Time = Time(config.getString(path)) + + def memory(path: String): Memory = Memory(config.getString(path)) } } diff --git a/kamon-core/src/main/scala/kamon/util/NeedToScale.scala b/kamon-core/src/main/scala/kamon/util/NeedToScale.scala new file mode 100644 index 00000000..19e1ae06 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/NeedToScale.scala @@ -0,0 +1,37 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import com.typesafe.config.Config +import kamon.metric.instrument.{ Memory, Time } +import kamon.util.ConfigTools._ + +object NeedToScale { + val TimeUnits = "time-units" + val MemoryUnits = "memory-units" + + def unapply(config: Config): Option[(Option[Time], Option[Memory])] = { + val scaleTimeTo: Option[Time] = + if (config.hasPath(TimeUnits)) Some(config.time(TimeUnits)) else None + + val scaleMemoryTo: Option[Memory] = + if (config.hasPath(MemoryUnits)) Some(config.memory(MemoryUnits)) else None + if (scaleTimeTo.isDefined || scaleMemoryTo.isDefined) Some(scaleTimeTo -> scaleMemoryTo) + else None + } +} + diff --git a/kamon-core/src/main/scala/kamon/util/executors/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/util/executors/ExecutorServiceMetrics.scala index 7a87163f..98d2ea0c 100644 --- a/kamon-core/src/main/scala/kamon/util/executors/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/util/executors/ExecutorServiceMetrics.scala @@ -16,10 +16,10 @@ package kamon.util.executors +import java.util.concurrent.{ ExecutorService, ForkJoinPool ⇒ JavaForkJoinPool, ThreadPoolExecutor } + import kamon.Kamon import kamon.metric.Entity -import java.util.concurrent.{ ForkJoinPool ⇒ JavaForkJoinPool } -import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NoStackTrace @@ -48,8 +48,10 @@ object ExecutorServiceMetrics { * @param threadPool The intance of the [[ThreadPoolExecutor]] * @param tags The tags associated to the [[ThreadPoolExecutor]] */ - private def registerThreadPool(name: String, threadPool: ThreadPoolExecutor, tags: Map[String, String]): Unit = { - Kamon.metrics.entity(ThreadPoolExecutorMetrics.factory(threadPool, Category), Entity(name, Category, tags)) + @inline private def registerThreadPool(name: String, threadPool: ThreadPoolExecutor, tags: Map[String, String]): Entity = { + val threadPoolEntity = Entity(name, Category, tags + ("executor-type" -> "thread-pool-executor")) + Kamon.metrics.entity(ThreadPoolExecutorMetrics.factory(threadPool, Category), threadPoolEntity) + threadPoolEntity } /** @@ -60,8 +62,10 @@ object ExecutorServiceMetrics { * @param forkJoinPool The instance of the [[ForkJoinPool]] * @param tags The tags associated to the [[ForkJoinPool]] */ - private def registerScalaForkJoin(name: String, forkJoinPool: ForkJoinPool, tags: Map[String, String] = Map.empty): Unit = { - Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), Entity(name, Category, tags)) + @inline private def registerScalaForkJoin(name: String, forkJoinPool: ForkJoinPool, tags: Map[String, String]): Entity = { + val forkJoinEntity = Entity(name, Category, tags + ("executor-type" -> "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity) + forkJoinEntity } /** @@ -72,8 +76,10 @@ object ExecutorServiceMetrics { * @param forkJoinPool The instance of the [[JavaForkJoinPool]] * @param tags The tags associated to the [[JavaForkJoinPool]] */ - private def registerJavaForkJoin(name: String, forkJoinPool: JavaForkJoinPool, tags: Map[String, String] = Map.empty): Unit = { - Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), Entity(name, Category, tags)) + @inline private def registerJavaForkJoin(name: String, forkJoinPool: JavaForkJoinPool, tags: Map[String, String]): Entity = { + val forkJoinEntity = Entity(name, Category, tags + ("executor-type" -> "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity) + forkJoinEntity } /** @@ -84,7 +90,7 @@ object ExecutorServiceMetrics { * @param executorService The instance of the [[ExecutorService]] * @param tags The tags associated to the [[ExecutorService]] */ - def register(name: String, executorService: ExecutorService, tags: Map[String, String]): Unit = executorService match { + def register(name: String, executorService: ExecutorService, tags: Map[String, String]): Entity = executorService match { case threadPoolExecutor: ThreadPoolExecutor ⇒ registerThreadPool(name, threadPoolExecutor, tags) case scalaForkJoinPool: ForkJoinPool if scalaForkJoinPool.getClass.isAssignableFrom(ScalaForkJoinPool) ⇒ registerScalaForkJoin(name, scalaForkJoinPool, tags) case javaForkJoinPool: JavaForkJoinPool if javaForkJoinPool.getClass.isAssignableFrom(JavaForkJoinPool) ⇒ registerJavaForkJoin(name, javaForkJoinPool, tags) @@ -94,36 +100,24 @@ object ExecutorServiceMetrics { case other ⇒ throw new NotSupportedException(s"The ExecutorService $name is not supported.") } - //Java variants - def register(name: String, executorService: ExecutorService): Unit = { - register(name, executorService, Map.empty[String, String]) - } - - def register(name: String, executorService: ExecutorService, tags: java.util.Map[String, String]): Unit = { + //Java variant + def register(name: String, executorService: ExecutorService, tags: java.util.Map[String, String]): Entity = { import scala.collection.JavaConverters._ register(name, executorService, tags.asScala.toMap) } /** * - * Remove the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor. + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor. * * @param name The name of the [[ExecutorService]] - * @param tags The tags associated to the [[ExecutorService]] + * @param executorService The instance of the [[ExecutorService]] */ - def remove(name: String, tags: Map[String, String]): Unit = { - Kamon.metrics.removeEntity(name, Category, tags) - } - - //Java variants - def remove(name: String): Unit = { - remove(name, Map.empty[String, String]) + def register(name: String, executorService: ExecutorService): Entity = { + register(name, executorService, Map.empty[String, String]) } - def remove(name: String, tags: java.util.Map[String, String]): Unit = { - import scala.collection.JavaConverters._ - remove(name, tags.asScala.toMap) - } + def remove(entity: Entity): Unit = Kamon.metrics.removeEntity(entity) /** * INTERNAL USAGE ONLY diff --git a/kamon-core/src/test/scala/kamon/metric/MetricScaleDecoratorSpec.scala b/kamon-core/src/test/scala/kamon/metric/MetricScaleDecoratorSpec.scala new file mode 100644 index 00000000..902102cd --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/MetricScaleDecoratorSpec.scala @@ -0,0 +1,100 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{ InstrumentFactory, Memory, Time, UnitOfMeasurement } +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp +import org.scalatest.OptionValues._ + +class MetricScaleDecoratorSpec extends BaseKamonSpec("metrics-scale-decorator-spec") with SnapshotFixtures { + "the MetricScaleDecorator" when { + "receives a snapshot" which { + + val scaleDecorator = system.actorOf(MetricScaleDecorator.props( + Some(Time.Milliseconds), Some(Memory.KiloBytes), testActor)) + "is empty" should { + "do nothing for empty snapshots" in { + scaleDecorator ! emptySnapshot + expectMsg(emptySnapshot) + } + } + "is non empty" should { + scaleDecorator ! nonEmptySnapshot + val scaled = expectMsgType[TickMetricSnapshot] + val snapshot = scaled.metrics(testEntity) + + "scale time metrics" in { + snapshot.histogram("nano-time").value.max should be(10L +- 1L) + snapshot.counter("micro-time").value.count should be(1000L) + } + "scale memory metrics" in { + snapshot.histogram("byte-memory").value.max should be(1) + snapshot.counter("kbyte-memory").value.count should be(100L) + } + "do nothing with unknown metrics" in { + snapshot.histogram("unknown-histogram").value.max should be(1000L) + snapshot.counter("unknown-counter").value.count should be(10L) + } + "not change from and to" in { + scaled.from.millis should be(1000) + scaled.to.millis should be(2000) + } + } + } + } +} + +trait SnapshotFixtures { + self: BaseKamonSpec ⇒ + + class ScaleDecoratorTestMetrics(instrumentFactory: InstrumentFactory) + extends GenericEntityRecorder(instrumentFactory) { + val nanoTime = histogram("nano-time", Time.Nanoseconds) + val microTime = counter("micro-time", Time.Microseconds) + val byteMemory = histogram("byte-memory", Memory.Bytes) + val kbyteMemory = counter("kbyte-memory", Memory.KiloBytes) + val unknownHistogram = histogram("unknown-histogram", UnitOfMeasurement.Unknown) + val unknownCounter = counter("unknown-counter", UnitOfMeasurement.Unknown) + } + + object ScaleDecoratorTestMetrics extends EntityRecorderFactory[ScaleDecoratorTestMetrics] { + override def category: String = "decorator-spec" + + override def createRecorder(instrumentFactory: InstrumentFactory): ScaleDecoratorTestMetrics = + new ScaleDecoratorTestMetrics(instrumentFactory) + } + + val testEntity = Entity("metrics-scale-decorator-spec", "decorator-spec") + val recorder = Kamon.metrics.entity(ScaleDecoratorTestMetrics, "metrics-scale-decorator-spec") + + val emptySnapshot = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map.empty) + + recorder.unknownCounter.increment(10) + recorder.unknownHistogram.record(1000L) + recorder.nanoTime.record(10000000L) + recorder.microTime.increment(1000000L) + recorder.byteMemory.record(1024L) + recorder.kbyteMemory.increment(100L) + + val nonEmptySnapshot = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map( + (testEntity -> recorder.collect(collectionContext)))) + +} + diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala index 094baf4c..850200d4 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala @@ -56,6 +56,15 @@ class CounterSpec extends WordSpec with Matchers { counterBSnapshot.merge(counterASnapshot, collectionContext).count should be(300) } + "produce a snapshot that can be scaled" in new CounterFixture { + counter.increment(100) + + val counterSnapshot = takeSnapshotFrom(counter) + + val scaledSnapshot = counterSnapshot.scale(Time.Milliseconds, Time.Microseconds) + scaledSnapshot.count should be(100000) + } + } trait CounterFixture { diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala index 9a50e149..adfcd826 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala @@ -48,7 +48,7 @@ class HistogramSpec extends WordSpec with Matchers { } "produce a snapshot" which { - "supports min, max, percentile, sum and numberOfMeasurements operations" in new HistogramFixture { + "supports min, max, percentile, sum, numberOfMeasurements and recordsIterator operations" in new HistogramFixture { histogram.record(100) histogram.record(200, count = 200) histogram.record(300) @@ -64,6 +64,36 @@ class HistogramSpec extends WordSpec with Matchers { snapshot.sum should be(41300) snapshot.numberOfMeasurements should be(203) + val records = snapshot.recordsIterator.map(r ⇒ r.level -> r.count).toSeq + records.size should be(4) + records(0) should be(100 -> 1) + records(1) should be(200 -> 200) + records(2) should be(300 -> 1) + records(3) should be(900 -> 1) + } + + "can be scaled" in new HistogramFixture { + histogram.record(100) + histogram.record(200, count = 200) + histogram.record(300) + histogram.record(900) + + val snapshot = takeSnapshot().scale(Time.Seconds, Time.Milliseconds) + + snapshot.min should equal(100000L +- 1000L) + snapshot.max should equal(900000L +- 9000L) + snapshot.percentile(50.0D) should be(200000) + snapshot.percentile(99.5D) should be(300000) + snapshot.percentile(99.9D) should be(900000) + snapshot.sum should be(41300000) + snapshot.numberOfMeasurements should be(203) + + val records = snapshot.recordsIterator.map(r ⇒ r.level -> r.count).toSeq + records.size should be(4) + records(0) should be(100000 -> 1) + records(1) should be(200000 -> 200) + records(2) should be(300000 -> 1) + records(3) should be(900000 -> 1) } "can be merged with another snapshot" in new MultipleHistogramFixture { diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/UnitOfMeasurementSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/UnitOfMeasurementSpec.scala new file mode 100644 index 00000000..10604fe5 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/instrument/UnitOfMeasurementSpec.scala @@ -0,0 +1,98 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import kamon.metric.instrument.UnitOfMeasurement.Unknown +import org.scalatest.{ Matchers, WordSpec } + +class UnitOfMeasurementSpec extends WordSpec with Matchers { + + "Time unit" should { + "resolve Time Unit by valid name" in { + Time("s") should be(Time.Seconds) + Time("n") should be(Time.Nanoseconds) + Time("ms") should be(Time.Milliseconds) + Time("µs") should be(Time.Microseconds) + } + "fail to resolve Time Unit by invalid name" in { + val ex = intercept[IllegalArgumentException](Time("boo")) + ex.getMessage should be("Can't recognize time unit 'boo'") + } + "scale time properly" in { + val epsilon = 0.0001 + + Time.Nanoseconds.scale(Time.Nanoseconds)(1000000D) should be(1000000D +- epsilon) + Time.Nanoseconds.scale(Time.Microseconds)(1000000D) should be(1000D +- epsilon) + Time.Nanoseconds.scale(Time.Milliseconds)(1000000D) should be(1D +- epsilon) + Time.Nanoseconds.scale(Time.Seconds)(1000000D) should be(0.001D +- epsilon) + Time.Seconds.scale(Time.Nanoseconds)(1D) should be(1000000000D +- epsilon) + } + "allow scale only time" in { + intercept[IllegalArgumentException](Time.Nanoseconds.tryScale(Unknown)(100)) + .getMessage should be("Can't scale different types of units `time` and `unknown`") + intercept[IllegalArgumentException](Time.Nanoseconds.tryScale(Memory.Bytes)(100)) + .getMessage should be("Can't scale different types of units `time` and `bytes`") + val epsilon = 0.0001 + + Time.Nanoseconds.tryScale(Time.Nanoseconds)(100D) should be(100D +- epsilon) + } + } + + "Memory unit" should { + "resolve Memory Unit by valid name" in { + Memory("b") should be(Memory.Bytes) + Memory("Kb") should be(Memory.KiloBytes) + Memory("Mb") should be(Memory.MegaBytes) + Memory("Gb") should be(Memory.GigaBytes) + } + "fail to resolve Memory Unit by invalid name" in { + val ex = intercept[IllegalArgumentException](Memory("boo")) + ex.getMessage should be("Can't recognize memory unit 'boo'") + } + "scale memory properly" in { + val epsilon = 0.0001 + + Memory.Bytes.scale(Memory.Bytes)(1000000D) should be(1000000D +- epsilon) + Memory.Bytes.scale(Memory.KiloBytes)(1000000D) should be(976.5625D +- epsilon) + Memory.Bytes.scale(Memory.MegaBytes)(1000000D) should be(0.9536D +- epsilon) + Memory.Bytes.scale(Memory.GigaBytes)(1000000D) should be(9.3132E-4D +- epsilon) + Memory.MegaBytes.scale(Memory.Bytes)(1D) should be(1048576D +- epsilon) + } + "allow scale only memory" in { + intercept[IllegalArgumentException](Memory.Bytes.tryScale(Unknown)(100)) + .getMessage should be("Can't scale different types of units `bytes` and `unknown`") + intercept[IllegalArgumentException](Memory.Bytes.tryScale(Time.Nanoseconds)(100)) + .getMessage should be("Can't scale different types of units `bytes` and `time`") + val epsilon = 0.0001 + + Memory.Bytes.tryScale(Memory.Bytes)(100D) should be(100D +- epsilon) + } + + } + + "Unknown unit" should { + "allow scale only Unknown" in { + intercept[IllegalArgumentException](Unknown.tryScale(Memory.Bytes)(100)) + .getMessage should be("Can't scale different types of units `unknown` and `bytes`") + intercept[IllegalArgumentException](Unknown.tryScale(Time.Nanoseconds)(100)) + .getMessage should be("Can't scale different types of units `unknown` and `time`") + + Unknown.scale(Unknown)(100D) should be(100D) + } + + } +} diff --git a/kamon-core/src/test/scala/kamon/util/NeedToScaleSpec.scala b/kamon-core/src/test/scala/kamon/util/NeedToScaleSpec.scala new file mode 100644 index 00000000..a53451b6 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/util/NeedToScaleSpec.scala @@ -0,0 +1,64 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import com.typesafe.config.ConfigFactory +import kamon.metric.instrument.{ Memory, Time } +import org.scalatest.{ Matchers, WordSpec } + +class NeedToScaleSpec extends WordSpec with Matchers { + + "NeedToScale" should { + "extract time unit to scale to from config" in { + val config = ConfigFactory.parseString( + """ + |time-units = "ms" + """.stripMargin) + + config match { + case NeedToScale(timeUnits, memoryUnits) ⇒ + timeUnits should be(Some(Time.Milliseconds)) + memoryUnits should be(None) + } + } + "extract memory unit to scale to from config" in { + val config = ConfigFactory.parseString( + """ + |memory-units = "kb" + """.stripMargin) + + config match { + case NeedToScale(timeUnits, memoryUnits) ⇒ + timeUnits should be(None) + memoryUnits should be(Some(Memory.KiloBytes)) + } + } + "extract nothing if config has no proper keys" in { + val config = ConfigFactory.parseString( + """ + |some-other-key = "value" + """.stripMargin) + + config match { + case NeedToScale(timeUnits, memoryUnits) ⇒ + fail("Should not match") + case _ ⇒ + } + } + } + +} diff --git a/kamon-core/src/test/scala/kamon/util/executors/ExecutorServiceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/util/executors/ExecutorServiceMetricsSpec.scala index b3bc7623..4e5394f8 100644 --- a/kamon-core/src/test/scala/kamon/util/executors/ExecutorServiceMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/util/executors/ExecutorServiceMetricsSpec.scala @@ -19,7 +19,7 @@ package kamon.util.executors import java.util.concurrent.Executors import kamon.Kamon -import kamon.metric.EntityRecorder +import kamon.metric.{Entity, EntityRecorder} import kamon.testkit.BaseKamonSpec class ExecutorServiceMetricsSpec extends BaseKamonSpec("executor-service-metrics-spec") { @@ -27,51 +27,49 @@ class ExecutorServiceMetricsSpec extends BaseKamonSpec("executor-service-metrics "the ExecutorServiceMetrics" should { "register a SingleThreadPool, collect their metrics and remove it" in { val singleThreadPoolExecutor = Executors.newSingleThreadExecutor() - ExecutorServiceMetrics.register("single-thread-pool", singleThreadPoolExecutor) - findExecutorRecorder("single-thread-pool") should not be empty + val singleThreadPoolExecutorEntity = ExecutorServiceMetrics.register("single-thread-pool", singleThreadPoolExecutor) + findExecutorRecorder(singleThreadPoolExecutorEntity) should not be empty - ExecutorServiceMetrics.remove("single-thread-pool") - findExecutorRecorder("single-thread-pool") should be(empty) + ExecutorServiceMetrics.remove(singleThreadPoolExecutorEntity) + findExecutorRecorder(singleThreadPoolExecutorEntity) should be(empty) } "register a ThreadPoolExecutor, collect their metrics and remove it" in { val threadPoolExecutor = Executors.newCachedThreadPool() - ExecutorServiceMetrics.register("thread-pool-executor", threadPoolExecutor) - findExecutorRecorder("thread-pool-executor") should not be empty + val threadPoolExecutorEntity = ExecutorServiceMetrics.register("thread-pool-executor", threadPoolExecutor) + findExecutorRecorder(threadPoolExecutorEntity) should not be empty - ExecutorServiceMetrics.remove("thread-pool-executor") - findExecutorRecorder("thread-pool-executor") should be(empty) + ExecutorServiceMetrics.remove(threadPoolExecutorEntity) + findExecutorRecorder(threadPoolExecutorEntity) should be(empty) } "register a ScheduledThreadPoolExecutor, collect their metrics and remove it" in { val scheduledThreadPoolExecutor = Executors.newSingleThreadScheduledExecutor() - ExecutorServiceMetrics.register("scheduled-thread-pool-executor", scheduledThreadPoolExecutor) - findExecutorRecorder("scheduled-thread-pool-executor") should not be empty + val scheduledThreadPoolEntity = ExecutorServiceMetrics.register("scheduled-thread-pool-executor", scheduledThreadPoolExecutor) + findExecutorRecorder(scheduledThreadPoolEntity) should not be empty - ExecutorServiceMetrics.remove("scheduled-thread-pool-executor") - findExecutorRecorder("scheduled-thread-pool-executor") should be(empty) + ExecutorServiceMetrics.remove(scheduledThreadPoolEntity) + findExecutorRecorder(scheduledThreadPoolEntity) should be(empty) } "register a Java ForkJoinPool, collect their metrics and remove it" in { val javaForkJoinPool = Executors.newWorkStealingPool() - ExecutorServiceMetrics.register("java-fork-join-pool", javaForkJoinPool) - findExecutorRecorder("java-fork-join-pool") should not be empty + val javaForkJoinPoolEntity = ExecutorServiceMetrics.register("java-fork-join-pool", javaForkJoinPool) + findExecutorRecorder(javaForkJoinPoolEntity) should not be empty - ExecutorServiceMetrics.remove("java-fork-join-pool") - findExecutorRecorder("java-fork-join-pool") should be(empty) + ExecutorServiceMetrics.remove(javaForkJoinPoolEntity) + findExecutorRecorder(javaForkJoinPoolEntity) should be(empty) } "register a Scala ForkJoinPool, collect their metrics and remove it" in { val scalaForkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool() - ExecutorServiceMetrics.register("scala-fork-join-pool", scalaForkJoinPool) - findExecutorRecorder("scala-fork-join-pool") should not be empty + val scalaForkJoinPoolEntity = ExecutorServiceMetrics.register("scala-fork-join-pool", scalaForkJoinPool) + findExecutorRecorder(scalaForkJoinPoolEntity) should not be empty - ExecutorServiceMetrics.remove("scala-fork-join-pool") - findExecutorRecorder("scala-fork-join-pool") should be(empty) + ExecutorServiceMetrics.remove(scalaForkJoinPoolEntity) + findExecutorRecorder(scalaForkJoinPoolEntity) should be(empty) } - def findExecutorRecorder(name: String): Option[EntityRecorder] = - Kamon.metrics.find(name, ExecutorServiceMetrics.Category, Map.empty) + def findExecutorRecorder(entity: Entity): Option[EntityRecorder] = Kamon.metrics.find(entity) } - } |