From 29068fc70a3e5a17a630c2c7fff951572bb5fa21 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 3 Jul 2014 14:36:42 -0300 Subject: ! all: refactor the core metric recording instruments and accomodate UserMetrics This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module. --- .../ActorMessagePassingTracing.scala | 146 ------------ .../ActorSystemMessagePassingTracing.scala | 65 ------ .../akka/instrumentation/AskPatternTracing.scala | 54 ----- .../akka/instrumentation/DispatcherTracing.scala | 161 -------------- .../instrumentation/ActorLoggingTracing.scala | 47 ---- .../kamon/instrumentation/FutureTracing.scala | 47 ---- .../akka/ActorCellInstrumentation.scala | 130 +++++++++++ .../akka/ActorLoggingInstrumentation.scala | 47 ++++ .../akka/ActorSystemMessageInstrumentation.scala | 80 +++++++ .../akka/AskPatternInstrumentation.scala | 55 +++++ .../akka/DispatcherInstrumentation.scala | 163 ++++++++++++++ .../AtomicHistogramFieldsAccessor.scala | 35 +++ .../scala/FutureInstrumentation.scala | 48 ++++ .../src/main/scala/kamon/metric/ActorMetrics.scala | 89 ++++++++ .../scala/kamon/metric/DispatcherMetrics.scala | 88 ++++++++ .../main/scala/kamon/metric/EntityMetrics.scala | 75 +++++++ .../main/scala/kamon/metric/MetricsExtension.scala | 110 +++++++++ kamon-core/src/main/scala/kamon/metric/Scale.scala | 31 +++ .../main/scala/kamon/metric/Subscriptions.scala | 128 +++++++++++ .../src/main/scala/kamon/metric/TraceMetrics.scala | 77 +++++++ .../src/main/scala/kamon/metric/UserMetrics.scala | 139 ++++++++++++ .../scala/kamon/metric/instrument/Counter.scala | 59 +++++ .../main/scala/kamon/metric/instrument/Gauge.scala | 78 +++++++ .../scala/kamon/metric/instrument/Histogram.scala | 246 +++++++++++++++++++++ .../kamon/metric/instrument/MinMaxCounter.scala | 116 ++++++++++ .../src/main/scala/kamon/metric/package.scala | 34 +++ .../main/scala/kamon/metrics/ActorMetrics.scala | 70 ------ .../main/scala/kamon/metrics/CustomMetric.scala | 52 ----- .../scala/kamon/metrics/DispatcherMetrics.scala | 71 ------ .../src/main/scala/kamon/metrics/Metrics.scala | 121 ---------- .../scala/kamon/metrics/MetricsExtension.scala | 104 --------- .../src/main/scala/kamon/metrics/Scale.scala | 31 --- .../main/scala/kamon/metrics/Subscriptions.scala | 129 ----------- .../main/scala/kamon/metrics/TraceMetrics.scala | 66 ------ .../instruments/ContinuousHdrRecorder.scala | 52 ----- .../metrics/instruments/CounterRecorder.scala | 38 ---- .../kamon/metrics/instruments/HdrRecorder.scala | 78 ------- .../kamon/metrics/instruments/MinMaxCounter.scala | 58 ----- .../src/main/scala/kamon/metrics/package.scala | 39 ---- .../scala/kamon/standalone/KamonStandalone.scala | 11 + .../src/main/scala/kamon/trace/TraceContext.scala | 8 +- .../src/main/scala/kamon/trace/TraceRecorder.scala | 4 +- 42 files changed, 1845 insertions(+), 1435 deletions(-) delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Scale.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Subscriptions.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/UserMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/package.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/Metrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/Scale.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/package.scala create mode 100644 kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala deleted file mode 100644 index 6db86828..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* =================================================== - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ - -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import kamon.trace._ -import kamon.metrics.{ ActorMetrics, Metrics } -import kamon.Kamon -import kamon.metrics.ActorMetrics.ActorMetricRecorder -import kamon.metrics.instruments.MinMaxCounter -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement - -@Aspect -class BehaviourInvokeTracing { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.metricIdentity = metricIdentity - cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - if (cellWithMetrics.actorMetricsRecorder.isDefined) { - cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - cellWithMetrics.actorMetricsRecorder.map { am ⇒ - import am.mailboxSize._ - val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect() - - record(min) - record(max) - record(current) - } - } - } - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - try { - TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) - cellWithMetrics.queueSize.decrement() - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") - def sendingMessageToActorCell(cell: ActorCell): Unit = {} - - @After("sendingMessageToActorCell(cell)") - def afterSendMessageToActorCell(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { p ⇒ - cellWithMetrics.mailboxSizeCollectorCancellable.cancel() - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errorCounter.record(1L) - } - } -} - -trait ActorCellMetrics { - var metricIdentity: ActorMetrics = _ - var actorMetricsRecorder: Option[ActorMetricRecorder] = _ - var mailboxSizeCollectorCancellable: Cancellable = _ - val queueSize = MinMaxCounter() -} - -@Aspect -class ActorCellMetricsMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} -} - -@Aspect -class EnvelopeTraceContextMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala deleted file mode 100644 index 7d03d946..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala +++ /dev/null @@ -1,65 +0,0 @@ -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceRecorder, TraceContextAware } - -@Aspect -class SystemMessageTraceContextMixin { - - @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -@Aspect -class RepointableActorRefTraceContextMixin { - - @DeclareMixin("akka.actor.RepointableActorRef") - def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } - - @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} - - @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withTraceContext(repointableActorRef.traceContext) { - pjp.proceed() - } - } - -} - -@Aspect -class ActorSystemMessagePassingTracing { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withTraceContext(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala deleted file mode 100644 index 31ec92a4..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } -import akka.event.Logging.Warning -import scala.compat.Platform.EOL -import akka.actor.ActorRefProvider -import akka.pattern.{ AskTimeoutException, PromiseActorRef } -import kamon.trace.Trace -import kamon.Kamon - -@Aspect -class AskPatternTracing { - - class StackTraceCaptureException extends Throwable - - @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *, *)", argNames = "provider") - def promiseActorRefApply(provider: ActorRefProvider): Unit = {} - - @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") - def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { - val system = promiseActor.provider.guardian.underlying.system - val traceExtension = Kamon(Trace)(system) - - if (traceExtension.enableAskPatternTracing) { - val future = promiseActor.result.future - implicit val ec = system.dispatcher - val stack = new StackTraceCaptureException - - future onFailure { - case timeout: AskTimeoutException ⇒ - val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) - - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing], - "Timeout triggered for ask pattern registered at: " + stackString)) - } - } - } -} diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala deleted file mode 100644 index 60cc4ddf..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher } -import kamon.metrics.{ Metrics, DispatcherMetrics } -import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder -import kamon.Kamon -import akka.actor.{ Cancellable, ActorSystemImpl } -import scala.concurrent.forkjoin.ForkJoinPool -import java.util.concurrent.ThreadPoolExecutor -import java.lang.reflect.Method -import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement - -@Aspect -class DispatcherTracing { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricsMixin { - - @DeclareMixin("akka.dispatch.Dispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMessageMetrics { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala deleted file mode 100644 index 85d39d3e..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceContextAware, TraceRecorder } - -@Aspect -class ActorLoggingTracing { - - @DeclareMixin("akka.event.Logging.LogEvent+") - def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") - def logEventCreation(event: TraceContextAware): Unit = {} - - @After("logEventCreation(event)") - def captureTraceContext(event: TraceContextAware): Unit = { - // Force initialization of TraceContextAware - event.traceContext - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withTraceContext(logEvent.traceContext) { - pjp.proceed() - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala deleted file mode 100644 index 634c94a1..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceContextAware, TraceRecorder } - -@Aspect -class FutureTracing { - - @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default - - @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") - def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} - - @After("futureRelatedRunnableCreation(runnable)") - def afterCreation(runnable: TraceContextAware): Unit = { - // Force traceContext initialization. - runnable.traceContext - } - - @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") - def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} - - @Around("futureRelatedRunnableExecution(runnable)") - def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { - TraceRecorder.withTraceContext(runnable.traceContext) { - pjp.proceed() - } - } - -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala new file mode 100644 index 00000000..5fce4555 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -0,0 +1,130 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.actor._ +import akka.dispatch.{ Envelope, MessageDispatcher } +import kamon.Kamon +import kamon.metric.ActorMetrics.ActorMetricsRecorder +import kamon.metric.{ ActorMetrics, Metrics } +import kamon.trace._ +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorCellInstrumentation { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") + def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + + val metricsExtension = Kamon(Metrics)(system) + val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellWithMetrics.metricIdentity = metricIdentity + cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + } + + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + try { + TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { + pjp.proceed() + } + } finally { + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ + am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) + am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) + am.mailboxSize.decrement() + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") + def sendingMessageToActorCell(cell: ActorCell): Unit = {} + + @After("sendingMessageToActorCell(cell)") + def afterSendMessageToActorCell(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellWithMetrics.actorMetricsRecorder.map { p ⇒ + cellWithMetrics.mailboxSizeCollectorCancellable.cancel() + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") + def actorInvokeFailure(cell: ActorCell): Unit = {} + + @Before("actorInvokeFailure(cell)") + def beforeInvokeFailure(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ am.errors.increment() + } + } +} + +trait ActorCellMetrics { + var metricIdentity: ActorMetrics = _ + var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var mailboxSizeCollectorCancellable: Cancellable = _ +} + +@Aspect +class ActorCellMetricsIntoActorCellMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} +} + +@Aspect +class TraceContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..ee9d442f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala @@ -0,0 +1,47 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorLoggingInstrumentation { + + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") + def logEventCreation(event: TraceContextAware): Unit = {} + + @After("logEventCreation(event)") + def captureTraceContext(event: TraceContextAware): Unit = { + // Force initialization of TraceContextAware + event.traceContext + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { + TraceRecorder.withTraceContext(logEvent.traceContext) { + pjp.proceed() + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala new file mode 100644 index 00000000..9b6b6866 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala @@ -0,0 +1,80 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorSystemMessageInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext + TraceRecorder.withTraceContext(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} + +@Aspect +class TraceContextIntoSystemMessageMixin { + + @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") + def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +@Aspect +class TraceContextIntoRepointableActorRefMixin { + + @DeclareMixin("akka.actor.RepointableActorRef") + def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } + + @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") + def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} + + @Around("repointableActorRefCreation(repointableActorRef)") + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { + TraceRecorder.withTraceContext(repointableActorRef.traceContext) { + pjp.proceed() + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala new file mode 100644 index 00000000..3bf13ce2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -0,0 +1,55 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.actor.ActorRefProvider +import akka.event.Logging.Warning +import akka.pattern.{ AskTimeoutException, PromiseActorRef } +import kamon.Kamon +import kamon.trace.Trace +import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut } + +import scala.compat.Platform.EOL + +@Aspect +class AskPatternInstrumentation { + + class StackTraceCaptureException extends Throwable + + @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *, *)", argNames = "provider") + def promiseActorRefApply(provider: ActorRefProvider): Unit = {} + + @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") + def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { + val system = promiseActor.provider.guardian.underlying.system + val traceExtension = Kamon(Trace)(system) + + if (traceExtension.enableAskPatternTracing) { + val future = promiseActor.result.future + implicit val ec = system.dispatcher + val stack = new StackTraceCaptureException + + future onFailure { + case timeout: AskTimeoutException ⇒ + val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) + + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], + "Timeout triggered for ask pattern registered at: " + stackString)) + } + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala new file mode 100644 index 00000000..db366e8c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala @@ -0,0 +1,163 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import java.lang.reflect.Method +import java.util.concurrent.ThreadPoolExecutor + +import akka.actor.{ ActorSystemImpl, Cancellable } +import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } +import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement +import kamon.Kamon +import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder +import kamon.metric.{ DispatcherMetrics, Metrics } +import org.aspectj.lang.annotation._ + +import scala.concurrent.forkjoin.ForkJoinPool + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") + def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} + + @Before("onActorSystemStartup(dispatchers, system)") + def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { + val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] + currentDispatchers.actorSystem = system + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") + def onDispatchersLookup(dispatchers: Dispatchers) = {} + + @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") + def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { + val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + + dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem + } + + @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") + def onCreateExecutorService(): Unit = {} + + @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") + def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} + + @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") + def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} + + @After("onDispatcherStartup(dispatcher)") + def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { + + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) + val metricIdentity = DispatcherMetrics(dispatcher.id) + + dispatcherWithMetrics.metricIdentity = metricIdentity + dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) + + if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { + dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dm ⇒ + val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = + DispatcherMetricsCollector.collect(dispatcher) + + dm.maximumPoolSize.record(maximumPoolSize) + dm.runningThreadCount.record(runningThreadCount) + dm.queueTaskCount.record(queueTaskCount) + dm.poolSize.record(poolSize) + } + } + } + } + + @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") + def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} + + @After("onDispatcherShutdown(dispatcher)") + def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dispatcher ⇒ + dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() + Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) + } + } +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatcher") + def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} +} + +trait DispatcherMetricCollectionInfo { + var metricIdentity: DispatcherMetrics = _ + var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ + var dispatcherCollectorCancellable: Cancellable = _ + var actorSystem: ActorSystemImpl = _ +} + +trait DispatchersWithActorSystem { + var actorSystem: ActorSystemImpl = _ +} + +object DispatcherMetricsCollector { + + case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) + + private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, + (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) + } + + private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) + } + + private val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } + + def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { + dispatcher match { + case x: Dispatcher ⇒ { + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + + executor match { + case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) + case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) + case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } + case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala new file mode 100644 index 00000000..e79090a8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala @@ -0,0 +1,35 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } + +trait AtomicHistogramFieldsAccessor { + self: AtomicHistogram ⇒ + + def countsArray(): AtomicLongArray = self.counts + + def unitMagnitude(): Int = self.unitMagnitude + + def subBucketHalfCount(): Int = self.subBucketHalfCount + + def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude +} + +object AtomicHistogramFieldsAccessor { + def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala new file mode 100644 index 00000000..d8f2b620 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala @@ -0,0 +1,48 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.scala + +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class FutureInstrumentation { + + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default + + @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") + def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} + + @After("futureRelatedRunnableCreation(runnable)") + def afterCreation(runnable: TraceContextAware): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") + def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} + + @Around("futureRelatedRunnableExecution(runnable)") + def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { + TraceRecorder.withTraceContext(runnable.traceContext) { + pjp.proceed() + } + } + +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala new file mode 100644 index 00000000..bb412f79 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } + +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, + errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.collect(context), + timeInMailbox.collect(context), + mailboxSize.collect(context), + errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + mailboxSize.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, + mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = ActorMetricSnapshot + + def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + mailboxSize.merge(that.mailboxSize, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (ProcessingTime -> processingTime), + (MailboxSize -> mailboxSize), + (TimeInMailbox -> timeInMailbox), + (Errors -> errors)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = ActorMetricsRecorder + + def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { + val settings = config.getConfig("precision.actor") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + val mailboxSizeConfig = settings.getConfig("mailbox-size") + + new ActorMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + MinMaxCounter.fromConfig(mailboxSizeConfig, system), + Counter()) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala new file mode 100644 index 00000000..fbce783c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala @@ -0,0 +1,88 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ Histogram, HdrHistogram } + +case class DispatcherMetrics(name: String) extends MetricGroupIdentity { + val category = DispatcherMetrics +} + +object DispatcherMetrics extends MetricGroupCategory { + val name = "dispatcher" + + case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } + case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } + case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } + case object PoolSize extends MetricIdentity { val name = "pool-size" } + + case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, + queueTaskCount: Histogram, poolSize: Histogram) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.collect(context), + runningThreadCount.collect(context), + queueTaskCount.collect(context), + poolSize.collect(context)) + + def cleanup: Unit = {} + + } + + case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, + queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = DispatcherMetricSnapshot + + def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.merge(that.maximumPoolSize, context), + runningThreadCount.merge(that.runningThreadCount, context), + queueTaskCount.merge(that.queueTaskCount, context), + poolSize.merge(that.poolSize, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (MaximumPoolSize -> maximumPoolSize), + (RunningThreadCount -> runningThreadCount), + (QueueTaskCount -> queueTaskCount), + (PoolSize -> poolSize)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = DispatcherMetricRecorder + + def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { + val settings = config.getConfig("precision.dispatcher") + + val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") + val runningThreadCountConfig = settings.getConfig("running-thread-count") + val queueTaskCountConfig = settings.getConfig("queued-task-count") + val poolSizeConfig = settings.getConfig("pool-size") + + new DispatcherMetricRecorder( + Histogram.fromConfig(maximumPoolSizeConfig), + Histogram.fromConfig(runningThreadCountConfig), + Histogram.fromConfig(queueTaskCountConfig), + Histogram.fromConfig(poolSizeConfig)) + } + } +} + diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala new file mode 100644 index 00000000..325dd216 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala @@ -0,0 +1,75 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer } +import akka.actor.ActorSystem +import com.typesafe.config.Config + +trait MetricGroupCategory { + def name: String +} + +trait MetricGroupIdentity { + def name: String + def category: MetricGroupCategory +} + +trait MetricIdentity { + def name: String +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def default: CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } +} + +trait MetricGroupRecorder { + def collect(context: CollectionContext): MetricGroupSnapshot + def cleanup: Unit +} + +trait MetricSnapshot { + type SnapshotType + + def merge(that: SnapshotType, context: CollectionContext): SnapshotType +} + +trait MetricGroupSnapshot { + type GroupSnapshotType + + def metrics: Map[MetricIdentity, MetricSnapshot] + def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType +} + +private[kamon] trait MetricRecorder { + type SnapshotType <: MetricSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait MetricGroupFactory { + type GroupRecorder <: MetricGroupRecorder + def create(config: Config, system: ActorSystem): GroupRecorder +} + diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala new file mode 100644 index 00000000..1025f0de --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer, ByteBuffer } + +import scala.collection.concurrent.TrieMap +import akka.actor._ +import com.typesafe.config.Config +import kamon.util.GlobPathFilter +import kamon.Kamon +import akka.actor +import kamon.metric.Metrics.MetricGroupFilter +import kamon.metric.Subscriptions.Subscribe +import java.util.concurrent.TimeUnit + +class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") + + /** Configured Dispatchers */ + val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) + val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) + + /** Configuration Settings */ + val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + + val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() + val filters = loadFilters(metricsExtConfig) + lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") + + def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { + if (shouldTrack(identity)) + Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) + else + None + } + + def unregister(identity: MetricGroupIdentity): Unit = { + storage.remove(identity) + } + + def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { + subscriptions.tell(Subscribe(category, selection, permanently), receiver) + } + + def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { + // TODO: Improve the way in which we are getting the context. + val context = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(50000) + } + (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap + } + + def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { + import scala.concurrent.duration._ + + system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { + body + }(gaugeRecordingsDispatcher) + } + + private def shouldTrack(identity: MetricGroupIdentity): Boolean = { + filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) + } + + def loadFilters(config: Config): Map[String, MetricGroupFilter] = { + import scala.collection.JavaConverters._ + + val filters = config.getObjectList("filters").asScala + + val allFilters = + for ( + filter ← filters; + entry ← filter.entrySet().asScala + ) yield { + val key = entry.getKey + val keyBasedConfig = entry.getValue.atKey(key) + + val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList + val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + + (key, MetricGroupFilter(includes, excludes)) + } + + allFilters.toMap + } +} + +object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) + + case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { + def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala new file mode 100644 index 00000000..2f27c1a3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Scale.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +class Scale(val numericValue: Double) extends AnyVal + +object Scale { + val Nano = new Scale(1E-9) + val Micro = new Scale(1E-6) + val Milli = new Scale(1E-3) + val Unit = new Scale(1) + val Kilo = new Scale(1E3) + val Mega = new Scale(1E6) + val Giga = new Scale(1E9) + + def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue +} diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala new file mode 100644 index 00000000..a9f4c721 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala @@ -0,0 +1,128 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.{ Props, ActorRef, Actor } +import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } +import kamon.util.GlobPathFilter +import scala.concurrent.duration.{ FiniteDuration, Duration } +import java.util.concurrent.TimeUnit +import kamon.Kamon +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) + case FlushMetrics ⇒ flush() + } + + def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if (permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for ((filter, receivers) ← subscriptions) yield { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } +} + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext = CollectionContext.default + + def receive = empty + + def empty: Actor.Receive = { + case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) + case FlushBuffer ⇒ // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) ⇒ + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become (buffering(combinedSnapshot)) + + case FlushBuffer ⇒ + receiver ! buffered + context become (empty) + + } + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala new file mode 100644 index 00000000..1ee1eab4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import kamon.metric.instrument.{ Histogram } + +import scala.collection.concurrent.TrieMap +import com.typesafe.config.Config + +case class TraceMetrics(name: String) extends MetricGroupIdentity { + val category = TraceMetrics +} + +object TraceMetrics extends MetricGroupCategory { + val name = "trace" + + case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } + case class HttpClientRequest(name: String) extends MetricIdentity + + case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) + extends MetricGroupRecorder { + + private val segments = TrieMap[MetricIdentity, Histogram]() + + def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + + def collect(context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot( + elapsedTime.collect(context), + segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + + def cleanup: Unit = {} + } + + case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = TraceMetricsSnapshot + + def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) + + def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = TraceMetricRecorder + + def create(config: Config, system: ActorSystem): TraceMetricRecorder = { + + val settings = config.getConfig("precision.trace") + val elapsedTimeConfig = settings.getConfig("elapsed-time") + val segmentConfig = settings.getConfig("segment") + + new TraceMetricRecorder( + Histogram.fromConfig(elapsedTimeConfig), + () ⇒ Histogram.fromConfig(segmentConfig)) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala new file mode 100644 index 00000000..dea03968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -0,0 +1,139 @@ +package kamon.metric + +import akka.actor +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import com.typesafe.config.Config +import kamon.Kamon +import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + lazy val userMetricsRecorder = Kamon(Metrics)(system).register(UserMetrics, UserMetrics.Factory).get + + def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + userMetricsRecorder.buildHistogram(name, precision, highestTrackableValue) + + def registerHistogram(name: String): Histogram = + userMetricsRecorder.buildHistogram(name) + + def registerCounter(name: String): Counter = + userMetricsRecorder.buildCounter(name) + + def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + userMetricsRecorder.buildMinMaxCounter(name, precision, highestTrackableValue, refreshInterval) + } + + def registerMinMaxCounter(name: String): MinMaxCounter = + userMetricsRecorder.buildMinMaxCounter(name) + + def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name)(currentValueCollector) + + def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name, precision, highestTrackableValue, refreshInterval, currentValueCollector) +} + +object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider with MetricGroupIdentity { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + + val name: String = "user-metrics-recorder" + val category = new MetricGroupCategory { + val name: String = "user-metrics" + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = UserMetricsRecorder + def create(config: Config, system: ActorSystem): UserMetricsRecorder = new UserMetricsRecorder(system) + } + + class UserMetricsRecorder(system: ActorSystem) extends MetricGroupRecorder { + val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") + val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") + val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") + val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") + + val histograms = TrieMap[String, Histogram]() + val counters = TrieMap[String, Counter]() + val minMaxCounters = TrieMap[String, MinMaxCounter]() + val gauges = TrieMap[String, Gauge]() + + def buildHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + histograms.getOrElseUpdate(name, Histogram(highestTrackableValue, precision, Scale.Unit)) + + def buildHistogram(name: String): Histogram = + histograms.getOrElseUpdate(name, Histogram.fromConfig(defaultHistogramPrecisionConfig)) + + def buildCounter(name: String): Counter = + counters.getOrElseUpdate(name, Counter()) + + def buildMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + minMaxCounters.getOrElseUpdate(name, MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) + } + + def buildMinMaxCounter(name: String): MinMaxCounter = + minMaxCounters.getOrElseUpdate(name, MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) + + def buildGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration, currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) + + def buildGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) + + def collect(context: CollectionContext): UserMetricsSnapshot = { + val histogramSnapshots = histograms.map { + case (name, histogram) ⇒ + (UserHistogram(name), histogram.collect(context)) + } toMap + + val counterSnapshots = counters.map { + case (name, counter) ⇒ + (UserCounter(name), counter.collect(context)) + } toMap + + val minMaxCounterSnapshots = minMaxCounters.map { + case (name, minMaxCounter) ⇒ + (UserMinMaxCounter(name), minMaxCounter.collect(context)) + } toMap + + val gaugeSnapshots = gauges.map { + case (name, gauge) ⇒ + (UserGauge(name), gauge.collect(context)) + } toMap + + UserMetricsSnapshot(histogramSnapshots, counterSnapshots, minMaxCounterSnapshots, gaugeSnapshots) + } + + def cleanup: Unit = {} + } + + case class UserHistogram(name: String) extends MetricIdentity + case class UserCounter(name: String) extends MetricIdentity + case class UserMinMaxCounter(name: String) extends MetricIdentity + case class UserGauge(name: String) extends MetricIdentity + + case class UserMetricsSnapshot(histograms: Map[UserHistogram, Histogram.Snapshot], + counters: Map[UserCounter, Counter.Snapshot], + minMaxCounters: Map[UserMinMaxCounter, Histogram.Snapshot], + gauges: Map[UserGauge, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = UserMetricsSnapshot + + def merge(that: UserMetricsSnapshot, context: CollectionContext): UserMetricsSnapshot = + UserMetricsSnapshot( + combineMaps(histograms, that.histograms)((l, r) ⇒ l.merge(r, context)), + combineMaps(counters, that.counters)((l, r) ⇒ l.merge(r, context)), + combineMaps(minMaxCounters, that.minMaxCounters)((l, r) ⇒ l.merge(r, context)), + combineMaps(gauges, that.gauges)((l, r) ⇒ l.merge(r, context))) + + def metrics: Map[MetricIdentity, MetricSnapshot] = histograms ++ counters ++ minMaxCounters ++ gauges + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala new file mode 100644 index 00000000..b592bcd3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -0,0 +1,59 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import jsr166e.LongAdder +import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } + +trait Counter extends MetricRecorder { + type SnapshotType = Counter.Snapshot + + def increment(): Unit + def increment(times: Long): Unit +} + +object Counter { + + def apply(): Counter = new LongAdderCounter + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Counter.Snapshot + + def count: Long + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + } +} + +class LongAdderCounter extends Counter { + private val counter = new LongAdder + + def increment(): Unit = counter.increment() + + def increment(times: Long): Unit = { + if (times < 0) + throw new UnsupportedOperationException("Counters cannot be decremented") + counter.add(times) + } + + def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset()) + + def cleanup: Unit = {} +} + +case class CounterSnapshot(count: Long) extends Counter.Snapshot { + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala new file mode 100644 index 00000000..1efff2bc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -0,0 +1,78 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ Cancellable, ActorSystem } +import com.typesafe.config.Config +import kamon.metric.{ CollectionContext, Scale, MetricRecorder } + +import scala.concurrent.duration.FiniteDuration + +trait Gauge extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Gauge { + + trait CurrentValueCollector { + def currentValue: Long + } + + def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + gauge.refreshValue() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge + } + + def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = + fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + + def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { + val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") + fromConfig(config, system)(currentValueCollector) + } + + def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector) + } + + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} + +class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + def record(value: Long): Unit = underlyingHistogram.record(value) + + def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala new file mode 100644 index 00000000..9ae077f4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -0,0 +1,246 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import java.nio.LongBuffer +import com.typesafe.config.Config +import org.HdrHistogram.AtomicHistogramFieldsAccessor +import org.HdrHistogram.AtomicHistogram +import kamon.metric._ + +trait Histogram extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Histogram { + + def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = + new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) + + def fromConfig(config: Config): Histogram = { + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + + new HdrHistogram(1L, highest, significantDigits) + } + + object HighestTrackableValue { + val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L + } + + case class Precision(significantDigits: Int) + object Precision { + val Low = Precision(1) + val Normal = Precision(2) + val Fine = Precision(3) + } + + trait Record { + def level: Long + def count: Long + + private[kamon] def rawCompactRecord: Long + } + + case class MutableRecord(var level: Long, var count: Long) extends Record { + var rawCompactRecord: Long = 0L + } + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Histogram.Snapshot + + def isEmpty: Boolean = numberOfMeasurements == 0 + def scale: Scale + def numberOfMeasurements: Long + def min: Long + def max: Long + def recordsIterator: Iterator[Record] + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot + } +} + +/** + * This implementation is meant to be used for real time data collection where data snapshots are taken often over time. + * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) + extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) + with Histogram with AtomicHistogramFieldsAccessor { + + import AtomicHistogramFieldsAccessor.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def record(value: Long, count: Long): Unit = recordValueWithCount(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = { + import context.buffer + buffer.clear() + val nrOfMeasurements = writeSnapshotTo(buffer) + + buffer.flip() + + val measurementsArray = Array.ofDim[Long](buffer.limit()) + buffer.get(measurementsArray, 0, measurementsArray.length) + new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + } + + def cleanup: Unit = {} + + private def writeSnapshotTo(buffer: LongBuffer): Long = { + val counts = countsArray() + val countsLength = counts.length() + + var nrOfMeasurements = 0L + var index = 0L + while (index < countsLength) { + val countAtIndex = counts.getAndSet(index.toInt, 0L) + + if (countAtIndex > 0) { + buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex)) + nrOfMeasurements += countAtIndex + } + + index += 1 + } + + reestablishTotalCount(nrOfMeasurements) + nrOfMeasurements + } + + private def reestablishTotalCount(diff: Long): Unit = { + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - diff + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + } + +} + +class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, + subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { + + def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) + def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1)) + + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { + if (that.isEmpty) this else if (this.isEmpty) that else { + import context.buffer + buffer.clear() + + val selfIterator = recordsIterator + val thatIterator = that.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L + + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } + + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } + } + + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) + } + + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) + } + } + + @inline private def mergeCompactRecords(left: Long, right: Long): Long = { + val index = left >> 48 + val leftCount = countFromCompactRecord(left) + val rightCount = countFromCompactRecord(right) + + CompactHdrSnapshot.compactRecord(index, leftCount + rightCount) + } + + @inline private def levelFromCompactRecord(compactRecord: Long): Long = { + val countsArrayIndex = (compactRecord >> 48).toInt + var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + + @inline private def countFromCompactRecord(compactRecord: Long): Long = + compactRecord & CompactHdrSnapshot.CompactRecordCountMask + + def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] { + var currentIndex = 0 + val mutableRecord = Histogram.MutableRecord(0, 0) + + override def hasNext: Boolean = currentIndex < compactRecords.length + + override def next(): Histogram.Record = { + if (hasNext) { + val measurement = compactRecords(currentIndex) + mutableRecord.rawCompactRecord = measurement + mutableRecord.level = levelFromCompactRecord(measurement) + mutableRecord.count = countFromCompactRecord(measurement) + currentIndex += 1 + + mutableRecord + } else { + throw new IllegalStateException("The iterator has already been consumed.") + } + } + } +} + +object CompactHdrSnapshot { + val CompactRecordCountMask = 0xFFFFFFFFFFFFL + + def compactRecord(index: Long, count: Long): Long = (index << 48) | count +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala new file mode 100644 index 00000000..471e7bd4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -0,0 +1,116 @@ +package kamon.metric.instrument + +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import java.lang.Math.abs +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference +import akka.actor.{ ActorSystem, Cancellable } +import com.typesafe.config.Config +import jsr166e.LongMaxUpdater +import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.util.PaddedAtomicLong +import scala.concurrent.duration.FiniteDuration + +trait MinMaxCounter extends MetricRecorder { + override type SnapshotType = Histogram.Snapshot + + def increment(): Unit + def increment(times: Long): Unit + def decrement() + def decrement(times: Long) +} + +object MinMaxCounter { + + def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem): MinMaxCounter = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + minMaxCounter.refreshValues() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) + minMaxCounter + } + + def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) + } +} + +class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { + private val min = new LongMaxUpdater + private val max = new LongMaxUpdater + private val sum = new PaddedAtomicLong + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + min.update(0L) + max.update(0L) + + def increment(): Unit = increment(1L) + + def increment(times: Long): Unit = { + val currentValue = sum.addAndGet(times) + max.update(currentValue) + } + + def decrement(): Unit = decrement(1L) + + def decrement(times: Long): Unit = { + val currentValue = sum.addAndGet(-times) + min.update(-currentValue) + } + + def collect(context: CollectionContext): Histogram.Snapshot = { + refreshValues() + underlyingHistogram.collect(context) + } + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValues(): Unit = { + val currentValue = { + val value = sum.get() + if (value < 0) 0 else value + } + + val currentMin = { + val minAbs = abs(min.maxThenReset()) + if (minAbs <= currentValue) minAbs else 0 + } + + underlyingHistogram.record(currentValue) + underlyingHistogram.record(currentMin) + underlyingHistogram.record(max.maxThenReset()) + + max.update(currentValue) + min.update(-currentValue) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala new file mode 100644 index 00000000..43166058 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/package.scala @@ -0,0 +1,34 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import scala.annotation.tailrec +import com.typesafe.config.Config + +package object metric { + + @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { + if (right.isEmpty) + left + else { + val (key, rightValue) = right.head + val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + + combineMaps(left.updated(key, value), right.tail)(valueMerger) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala deleted file mode 100644 index 9e19dced..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import com.typesafe.config.Config -import kamon.metrics.instruments.CounterRecorder -import org.HdrHistogram.HdrRecorder - -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name, tag = "processing-time" } - case object MailboxSize extends MetricIdentity { val name, tag = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name, tag = "time-in-mailbox" } - case object ErrorCounter extends MetricIdentity { val name, tag = "errors" } - - case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder, errorCounter: MetricRecorder) - extends MetricGroupRecorder { - - def collect: MetricGroupSnapshot = { - ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect(), errorCounter.collect()) - } - } - - case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike, errorCounter: MetricSnapshotLike) - extends MetricGroupSnapshot { - - val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (ErrorCounter -> errorCounter)) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = ActorMetricRecorder - - def create(config: Config): ActorMetricRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) - val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) - val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox")) - - new ActorMetricRecorder( - HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano), - HdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit), - HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano), - CounterRecorder()) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala deleted file mode 100644 index cd0afac1..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import kamon.metrics.instruments.ContinuousHdrRecorder -import org.HdrHistogram.HdrRecorder -import com.typesafe.config.Config - -case class CustomMetric(name: String) extends MetricGroupIdentity { - val category = CustomMetric -} - -object CustomMetric extends MetricGroupCategory { - val name = "custom-metric" - val RecordedValues = new MetricIdentity { val name, tag = "recorded-values" } - - def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) = - new MetricGroupFactory { - - type GroupRecorder = CustomMetricRecorder - - def create(config: Config): CustomMetricRecorder = { - val recorder = - if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) - else HdrRecorder(highestTrackableValue, significantValueDigits, scale) - - new CustomMetricRecorder(RecordedValues, recorder) - } - } - - class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder) - extends MetricGroupRecorder { - - def record(value: Long): Unit = underlyingRecorder.record(value) - - def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect()))) - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala deleted file mode 100644 index f41e0c3f..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import com.typesafe.config.Config -import org.HdrHistogram.HdrRecorder - -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} - -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" - - case object MaximumPoolSize extends MetricIdentity { val name, tag = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name, tag = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name, tag = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name, tag = "pool-size" } - - case class DispatcherMetricRecorder(maximumPoolSize: MetricRecorder, runningThreadCount: MetricRecorder, queueTaskCount: MetricRecorder, poolSize: MetricRecorder) - extends MetricGroupRecorder { - - def collect: MetricGroupSnapshot = { - DispatcherMetricSnapshot(maximumPoolSize.collect(), runningThreadCount.collect(), queueTaskCount.collect(), poolSize.collect()) - } - } - - case class DispatcherMetricSnapshot(maximumPoolSize: MetricSnapshotLike, runningThreadCount: MetricSnapshotLike, queueTaskCount: MetricSnapshotLike, poolSize: MetricSnapshotLike) - extends MetricGroupSnapshot { - - val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val MaximumPoolSizeConfig = extractPrecisionConfig(settings.getConfig("maximum-pool-size")) - val RunningThreadCountConfig = extractPrecisionConfig(settings.getConfig("running-thread-count")) - val QueueTaskCountConfig = extractPrecisionConfig(settings.getConfig("queued-task-count")) - val PoolSizeConfig = extractPrecisionConfig(settings.getConfig("pool-size")) - - new DispatcherMetricRecorder( - HdrRecorder(MaximumPoolSizeConfig.highestTrackableValue, MaximumPoolSizeConfig.significantValueDigits, Scale.Unit), - HdrRecorder(RunningThreadCountConfig.highestTrackableValue, RunningThreadCountConfig.significantValueDigits, Scale.Unit), - HdrRecorder(QueueTaskCountConfig.highestTrackableValue, QueueTaskCountConfig.significantValueDigits, Scale.Unit), - HdrRecorder(PoolSizeConfig.highestTrackableValue, PoolSizeConfig.significantValueDigits, Scale.Unit)) - } - } -} - diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala deleted file mode 100644 index f07bf38e..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import annotation.tailrec -import com.typesafe.config.Config -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.InstrumentType - -trait MetricGroupCategory { - def name: String -} - -trait MetricGroupIdentity { - def name: String - def category: MetricGroupCategory -} - -trait MetricIdentity { - def name: String - def tag: String -} - -trait MetricGroupRecorder { - def collect: MetricGroupSnapshot -} - -trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshotLike] -} - -case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - -trait MetricRecorder { - def record(value: Long) - def collect(): MetricSnapshotLike -} - -object InstrumentTypes { - sealed trait InstrumentType - case object Histogram extends InstrumentType - case object Gauge extends InstrumentType - case object Counter extends InstrumentType -} - -trait MetricSnapshotLike { - def instrumentType: InstrumentType - def numberOfMeasurements: Long - def scale: Scale - def measurements: Vector[Measurement] - - def max: Long = measurements.lastOption.map(_.value).getOrElse(0) - def min: Long = measurements.headOption.map(_.value).getOrElse(0) - - def merge(that: MetricSnapshotLike): MetricSnapshotLike = { - val mergedMeasurements = Vector.newBuilder[Measurement] - - @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { - if (left.nonEmpty && right.nonEmpty) { - val leftValue = left.head - val rightValue = right.head - - if (rightValue.value == leftValue.value) { - val merged = rightValue.merge(leftValue) - mergedMeasurements += merged - go(left.tail, right.tail, totalNrOfMeasurements + merged.count) - } else { - if (leftValue.value < rightValue.value) { - mergedMeasurements += leftValue - go(left.tail, right, totalNrOfMeasurements + leftValue.count) - } else { - mergedMeasurements += rightValue - go(left, right.tail, totalNrOfMeasurements + rightValue.count) - } - } - } else { - if (left.isEmpty && right.nonEmpty) { - mergedMeasurements += right.head - go(left, right.tail, totalNrOfMeasurements + right.head.count) - } else { - if (left.nonEmpty && right.isEmpty) { - mergedMeasurements += left.head - go(left.tail, right, totalNrOfMeasurements + left.head.count) - } else totalNrOfMeasurements - } - } - } - - val totalNrOfMeasurements = go(measurements, that.measurements, 0) - MetricSnapshot(instrumentType, totalNrOfMeasurements, scale, mergedMeasurements.result()) - } -} - -case class MetricSnapshot(instrumentType: InstrumentType, numberOfMeasurements: Long, scale: Scale, - measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike - -object MetricSnapshot { - case class Measurement(value: Long, count: Long) { - def merge(that: Measurement) = Measurement(value, count + that.count) - } -} - -trait MetricGroupFactory { - type GroupRecorder <: MetricGroupRecorder - def create(config: Config): GroupRecorder -} - diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala deleted file mode 100644 index c60babb2..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import scala.collection.concurrent.TrieMap -import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metrics.Metrics.MetricGroupFilter -import kamon.metrics.Subscriptions.Subscribe -import java.util.concurrent.TimeUnit - -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) - - /** Configuration Settings */ - val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) - - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") - - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder]) - else - None - } - - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity) - } - - def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { - subscriptions.tell(Subscribe(category, selection, permanently), receiver) - } - - def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { - (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap - } - - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ - - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } - - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false) - } - - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ - - val filters = config.getObjectList("filters").asScala - - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) - - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList - - (key, MetricGroupFilter(includes, excludes)) - } - - allFilters.toMap - } -} - -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metrics/Scale.scala deleted file mode 100644 index 6899490a..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Scale.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -class Scale(val numericValue: Double) extends AnyVal - -object Scale { - val Nano = new Scale(1E-9) - val Micro = new Scale(1E-6) - val Milli = new Scale(1E-3) - val Unit = new Scale(1) - val Kilo = new Scale(1E3) - val Mega = new Scale(1E6) - val Giga = new Scale(1E9) - - def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue -} diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala deleted file mode 100644 index c9990229..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import akka.actor.{ Props, ActorRef, Actor } -import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } -import kamon.util.GlobPathFilter -import scala.concurrent.duration.{ FiniteDuration, Duration } -import java.util.concurrent.TimeUnit -import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } - -class Subscriptions extends Actor { - import context.system - - val config = context.system.settings.config - val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) - val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - - var lastTick: Long = System.currentTimeMillis() - var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - - def receive = { - case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) - case FlushMetrics ⇒ flush() - } - - def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { - val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) - if (permanent) { - val receivers = subscribedPermanently.get(filter).getOrElse(Nil) - subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) - - } else { - val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) - subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) - } - - } - - def flush(): Unit = { - val currentTick = System.currentTimeMillis() - val snapshots = Kamon(Metrics).collect - - dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) - - lastTick = currentTick - subscribedForOneShot = Map.empty - } - - def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - - for ((filter, receivers) ← subscriptions) yield { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - receivers.foreach(_ ! tickMetrics) - } - } -} - -object Subscriptions { - case object FlushMetrics - case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) - case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - - case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { - def accept(identity: MetricGroupIdentity): Boolean = { - category.equals(identity.category) && globFilter.accept(identity.name) - } - } -} - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - - def receive = empty - - def empty: Actor.Receive = { - case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) - case FlushBuffer ⇒ // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) ⇒ - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become (buffering(combinedSnapshot)) - - case FlushBuffer ⇒ - receiver ! buffered - context become (empty) - - } - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala deleted file mode 100644 index 5454edf5..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.HdrHistogram.HdrRecorder -import scala.collection.concurrent.TrieMap -import com.typesafe.config.Config - -case class TraceMetrics(name: String) extends MetricGroupIdentity { - val category = TraceMetrics -} - -object TraceMetrics extends MetricGroupCategory { - val name = "trace" - - case object ElapsedTime extends MetricIdentity { val name, tag = "elapsed-time" } - case class HttpClientRequest(name: String, tag: String) extends MetricIdentity - - class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder) - extends MetricGroupRecorder { - - private val segments = TrieMap[MetricIdentity, HdrRecorder]() - - def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder = - segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) - - def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), - segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) - } - - case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike]) - extends MetricGroupSnapshot { - - def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = TraceMetricRecorder - - def create(config: Config): TraceMetricRecorder = { - - val settings = config.getConfig("precision.trace") - val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) - val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) - - new TraceMetricRecorder( - HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano), - () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano)) - } - } - -} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala deleted file mode 100644 index 3a39ec69..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics.instruments - -import org.HdrHistogram.HdrRecorder -import kamon.metrics.{ Scale, MetricSnapshotLike } - -/** - * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is - * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this - * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, - * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing - * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value - * after a new event is processed. - * - */ -class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) - extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) { - - @volatile private var lastRecordedValue: Long = 0 - - override def record(value: Long): Unit = { - lastRecordedValue = value - super.record(value) - } - - override def collect(): MetricSnapshotLike = { - val snapshot = super.collect() - super.record(lastRecordedValue) - - snapshot - } -} - -object ContinuousHdrRecorder { - def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) = - new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala deleted file mode 100644 index e5efbc15..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala +++ /dev/null @@ -1,38 +0,0 @@ -package kamon.metrics.instruments -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -import kamon.metrics._ -import kamon.metrics.MetricSnapshot.Measurement - -import jsr166e.LongAdder - -class CounterRecorder extends MetricRecorder { - private val counter = new LongAdder - - def record(value: Long): Unit = { - counter.add(value) - } - - def collect(): MetricSnapshotLike = { - val sum = counter.sumThenReset() - MetricSnapshot(InstrumentTypes.Counter, sum, Scale.Unit, Vector(Measurement(1, sum))) - } -} - -object CounterRecorder { - def apply(): CounterRecorder = new CounterRecorder() -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala deleted file mode 100644 index ce4fd76d..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.AtomicLongFieldUpdater -import scala.annotation.tailrec -import kamon.metrics._ - -/** - * This implementation aims to be used for real time data collection where data snapshots are taken often over time. - * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) - extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder { - - import HdrRecorder.totalCountUpdater - - def record(value: Long): Unit = recordValue(value) - - def collect(): MetricSnapshotLike = { - val entries = Vector.newBuilder[MetricSnapshot.Measurement] - val countsLength = counts.length() - - @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if (index < countsLength) { - val currentValue = previousValue + increment - val countAtValue = counts.getAndSet(index, 0) - - if (countAtValue > 0) - entries += MetricSnapshot.Measurement(currentValue, countAtValue) - - if (currentValue == bucketLimit) - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) - else - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) - } else { - nrOfRecordings - } - } - - val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) - - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = getTotalCount - val newTotalCount = previousTotalCount - nrOfRecordings - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - - MetricSnapshot(InstrumentTypes.Histogram, nrOfRecordings, scale, entries.result()) - } - -} - -object HdrRecorder { - val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - - def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder = - new HdrRecorder(highestTrackableValue, significantValueDigits, scale) - -} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala deleted file mode 100644 index ba2550af..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala +++ /dev/null @@ -1,58 +0,0 @@ -package kamon.metrics.instruments - -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -import java.lang.Math._ -import jsr166e.LongMaxUpdater -import kamon.util.PaddedAtomicLong -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement - -class MinMaxCounter { - private val min = new LongMaxUpdater - private val max = new LongMaxUpdater - private val sum = new PaddedAtomicLong - - min.update(0L) - max.update(0L) - - def increment(value: Long = 1L): Unit = { - val currentValue = sum.addAndGet(value) - max.update(currentValue) - } - - def decrement(value: Long = 1L): Unit = { - val currentValue = sum.addAndGet(-value) - min.update(-currentValue) - } - - def collect(): CounterMeasurement = { - val currentValue = { - val value = sum.get() - if (value < 0) 0 else value - } - val result = CounterMeasurement(abs(min.maxThenReset()), max.maxThenReset(), currentValue) - max.update(currentValue) - min.update(-currentValue) - result - } -} - -object MinMaxCounter { - def apply() = new MinMaxCounter() - - case class CounterMeasurement(min: Long, max: Long, current: Long) -} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala deleted file mode 100644 index 640157a9..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/package.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon - -import scala.annotation.tailrec -import com.typesafe.config.Config - -package object metrics { - - case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int) - - def extractPrecisionConfig(config: Config): HdrPrecisionConfig = - HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) - - @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - if (right.isEmpty) - left - else { - val (key, rightValue) = right.head - val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) - - combineMaps(left.updated(key, value), right.tail)(valueMerger) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala new file mode 100644 index 00000000..258cc1b2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala @@ -0,0 +1,11 @@ +package kamon.standalone + +import akka.actor.ActorSystem + +object KamonStandalone { + private lazy val system = ActorSystem("kamon-standalone") + + def registerHistogram(name: String) = { + + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 307cf17a..9ce3cd4e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -18,11 +18,11 @@ package kamon.trace import akka.actor.ActorSystem import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware import kamon.trace.TraceContext.SegmentIdentity -import kamon.metrics.TraceMetrics.TraceMetricRecorder +import kamon.metric.TraceMetrics.TraceMetricRecorder trait TraceContext { def name: String @@ -41,7 +41,7 @@ object TraceContext { } trait SegmentCompletionHandle { - def finish(metadata: Map[String, String]) + def finish(metadata: Map[String, String] = Map.empty) } case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String]) @@ -76,7 +76,7 @@ object SegmentCompletionHandleAware { } class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String], - val system: ActorSystem) extends TraceContext { + val system: ActorSystem) extends TraceContext { @volatile private var _isOpen = true val levelOfDetail = OnlyMetrics val startMark = System.nanoTime() diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 0e264cd2..efe08cdb 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -33,7 +33,7 @@ object TraceRecorder { def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], - system: ActorSystem): TraceContext = { + system: ActorSystem): TraceContext = { // In the future this should select between implementations. val finalToken = token.getOrElse(newToken) @@ -51,7 +51,7 @@ object TraceRecorder { traceContextStorage.set(Some(ctx)) } - def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] = + def startSegment(identity: SegmentIdentity, metadata: Map[String, String] = Map.empty): Option[SegmentCompletionHandle] = currentContext.map(_.startSegment(identity, metadata)) def rename(name: String): Unit = currentContext.map(_.rename(name)) -- cgit v1.2.3