diff options
Diffstat (limited to 'kamon-core/src/main')
34 files changed, 1169 insertions, 748 deletions
diff --git a/kamon-core/src/main/java/kamon/util/Example.java b/kamon-core/src/main/java/kamon/util/Example.java new file mode 100644 index 00000000..a5031182 --- /dev/null +++ b/kamon-core/src/main/java/kamon/util/Example.java @@ -0,0 +1,8 @@ +package kamon.util; + +public class Example { + + public static void main(String args[]) { + + } +} diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 3f7dd42d..e1edaed9 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -3,23 +3,23 @@ <aspectj> <aspects> <!-- Actors --> - <aspect name="akka.instrumentation.RepointableActorRefTraceContextMixin"/> - <aspect name="akka.instrumentation.SystemMessageTraceContextMixin"/> - <aspect name="akka.instrumentation.ActorSystemMessagePassingTracing"/> - <aspect name="akka.instrumentation.EnvelopeTraceContextMixin"/> - <aspect name="akka.instrumentation.ActorCellMetricsMixin"/> - <aspect name="akka.instrumentation.BehaviourInvokeTracing"/> - <aspect name="kamon.instrumentation.ActorLoggingTracing"/> + <aspect name="akka.instrumentation.TraceContextIntoRepointableActorRefMixin"/> + <aspect name="akka.instrumentation.TraceContextIntoSystemMessageMixin"/> + <aspect name="akka.instrumentation.ActorSystemMessageInstrumentation"/> + <aspect name="akka.instrumentation.TraceContextIntoEnvelopeMixin"/> + <aspect name="akka.instrumentation.ActorCellMetricsIntoActorCellMixin"/> + <aspect name="akka.instrumentation.ActorCellInstrumentation"/> + <aspect name="akka.instrumentation.ActorLoggingInstrumentation"/> <!-- Dispatchers --> - <aspect name="akka.instrumentation.DispatcherTracing"/> - <aspect name="akka.instrumentation.DispatcherMetricsMixin"/> + <aspect name="akka.instrumentation.DispatcherInstrumentation"/> + <aspect name="akka.instrumentation.DispatcherMetricCollectionInfoIntoDispatcherMixin"/> <!-- Futures --> - <aspect name="kamon.instrumentation.FutureTracing"/> + <aspect name="kamon.instrumentation.scala.FutureInstrumentation"/> <!-- Patterns --> - <aspect name="akka.instrumentation.AskPatternTracing"/> + <aspect name="akka.instrumentation.AskPatternInstrumentation"/> </aspects> <weaver options="-XmessageHandlerClass:kamon.weaver.logging.KamonWeaverMessageHandler"> diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index d2830892..b7f5c70e 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -48,30 +48,33 @@ kamon { ] precision { + default-histogram-precision { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + + default-min-max-counter-precision { + refresh-interval = 100 milliseconds + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + + default-gauge-precision { + refresh-interval = 100 milliseconds + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + + actor { - processing-time { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } - time-in-mailbox { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } - mailbox-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} } trace { - elapsed-time { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } - segment { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } + elapsed-time = ${kamon.metrics.precision.default-histogram-precision} + segment = ${kamon.metrics.precision.default-histogram-precision} } dispatcher { diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 6db86828..5fce4555 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -1,34 +1,32 @@ -/* =================================================== +/* + * ========================================================================================= * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ package akka.instrumentation -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } -import kamon.trace._ -import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon -import kamon.metrics.ActorMetrics.ActorMetricRecorder -import kamon.metrics.instruments.MinMaxCounter -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement +import kamon.metric.ActorMetrics.ActorMetricsRecorder +import kamon.metric.{ ActorMetrics, Metrics } +import kamon.trace._ +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ @Aspect -class BehaviourInvokeTracing { +class ActorCellInstrumentation { @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} @@ -42,19 +40,6 @@ class BehaviourInvokeTracing { cellWithMetrics.metricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - if (cellWithMetrics.actorMetricsRecorder.isDefined) { - cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - cellWithMetrics.actorMetricsRecorder.map { am ⇒ - import am.mailboxSize._ - val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect() - - record(min) - record(max) - record(current) - } - } - } } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -75,7 +60,7 @@ class BehaviourInvokeTracing { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) - cellWithMetrics.queueSize.decrement() + am.mailboxSize.decrement() } } } @@ -86,7 +71,7 @@ class BehaviourInvokeTracing { @After("sendingMessageToActorCell(cell)") def afterSendMessageToActorCell(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment()) + cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) } @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") @@ -110,27 +95,26 @@ class BehaviourInvokeTracing { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errorCounter.record(1L) + am ⇒ am.errors.increment() } } } trait ActorCellMetrics { var metricIdentity: ActorMetrics = _ - var actorMetricsRecorder: Option[ActorMetricRecorder] = _ + var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ var mailboxSizeCollectorCancellable: Cancellable = _ - val queueSize = MinMaxCounter() } @Aspect -class ActorCellMetricsMixin { +class ActorCellMetricsIntoActorCellMixin { @DeclareMixin("akka.actor.ActorCell") def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} } @Aspect -class EnvelopeTraceContextMixin { +class TraceContextIntoEnvelopeMixin { @DeclareMixin("akka.dispatch.Envelope") def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala index 85d39d3e..ee9d442f 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala @@ -1,11 +1,11 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, @@ -14,14 +14,14 @@ * ========================================================================================= */ -package kamon.instrumentation +package akka.instrumentation -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ @Aspect -class ActorLoggingTracing { +class ActorLoggingInstrumentation { @DeclareMixin("akka.event.Logging.LogEvent+") def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala index 7d03d946..9b6b6866 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala @@ -1,12 +1,44 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package akka.instrumentation -import org.aspectj.lang.annotation._ import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import kamon.trace.{ TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceRecorder, TraceContextAware } +import org.aspectj.lang.annotation._ + +@Aspect +class ActorSystemMessageInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext + TraceRecorder.withTraceContext(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} @Aspect -class SystemMessageTraceContextMixin { +class TraceContextIntoSystemMessageMixin { @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default @@ -22,7 +54,7 @@ class SystemMessageTraceContextMixin { } @Aspect -class RepointableActorRefTraceContextMixin { +class TraceContextIntoRepointableActorRefMixin { @DeclareMixin("akka.actor.RepointableActorRef") def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default @@ -45,21 +77,4 @@ class RepointableActorRefTraceContextMixin { pjp.proceed() } } - -} - -@Aspect -class ActorSystemMessagePassingTracing { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withTraceContext(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala index 31ec92a4..3bf13ce2 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -1,11 +1,11 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, @@ -16,16 +16,17 @@ package akka.instrumentation -import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } -import akka.event.Logging.Warning -import scala.compat.Platform.EOL import akka.actor.ActorRefProvider +import akka.event.Logging.Warning import akka.pattern.{ AskTimeoutException, PromiseActorRef } -import kamon.trace.Trace import kamon.Kamon +import kamon.trace.Trace +import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut } + +import scala.compat.Platform.EOL @Aspect -class AskPatternTracing { +class AskPatternInstrumentation { class StackTraceCaptureException extends Throwable @@ -46,7 +47,7 @@ class AskPatternTracing { case timeout: AskTimeoutException ⇒ val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing], + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], "Timeout triggered for ask pattern registered at: " + stackString)) } } diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala index 60cc4ddf..db366e8c 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala @@ -5,7 +5,7 @@ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, @@ -16,19 +16,21 @@ package akka.instrumentation -import org.aspectj.lang.annotation._ -import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher } -import kamon.metrics.{ Metrics, DispatcherMetrics } -import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder -import kamon.Kamon -import akka.actor.{ Cancellable, ActorSystemImpl } -import scala.concurrent.forkjoin.ForkJoinPool -import java.util.concurrent.ThreadPoolExecutor import java.lang.reflect.Method +import java.util.concurrent.ThreadPoolExecutor + +import akka.actor.{ ActorSystemImpl, Cancellable } +import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement +import kamon.Kamon +import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder +import kamon.metric.{ DispatcherMetrics, Metrics } +import org.aspectj.lang.annotation._ + +import scala.concurrent.forkjoin.ForkJoinPool @Aspect -class DispatcherTracing { +class DispatcherInstrumentation { @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} @@ -45,7 +47,7 @@ class DispatcherTracing { @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem } @@ -62,7 +64,7 @@ class DispatcherTracing { @After("onDispatcherStartup(dispatcher)") def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) val metricIdentity = DispatcherMetrics(dispatcher.id) @@ -90,7 +92,7 @@ class DispatcherTracing { @After("onDispatcherShutdown(dispatcher)") def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] dispatcherWithMetrics.dispatcherMetricsRecorder.map { dispatcher ⇒ @@ -101,16 +103,16 @@ class DispatcherTracing { } @Aspect -class DispatcherMetricsMixin { +class DispatcherMetricCollectionInfoIntoDispatcherMixin { @DeclareMixin("akka.dispatch.Dispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {} + def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} @DeclareMixin("akka.dispatch.Dispatchers") def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} } -trait DispatcherMessageMetrics { +trait DispatcherMetricCollectionInfo { var metricIdentity: DispatcherMetrics = _ var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ var dispatcherCollectorCancellable: Cancellable = _ diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala index e5efbc15..e79090a8 100644 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala @@ -1,4 +1,3 @@ -package kamon.metrics.instruments /* * ========================================================================================= * Copyright © 2013-2014 the kamon project <http://kamon.io/> @@ -15,24 +14,22 @@ package kamon.metrics.instruments * ========================================================================================= */ -import kamon.metrics._ -import kamon.metrics.MetricSnapshot.Measurement +package org.HdrHistogram -import jsr166e.LongAdder +import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } -class CounterRecorder extends MetricRecorder { - private val counter = new LongAdder +trait AtomicHistogramFieldsAccessor { + self: AtomicHistogram ⇒ - def record(value: Long): Unit = { - counter.add(value) - } + def countsArray(): AtomicLongArray = self.counts - def collect(): MetricSnapshotLike = { - val sum = counter.sumThenReset() - MetricSnapshot(InstrumentTypes.Counter, sum, Scale.Unit, Vector(Measurement(1, sum))) - } + def unitMagnitude(): Int = self.unitMagnitude + + def subBucketHalfCount(): Int = self.subBucketHalfCount + + def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude } -object CounterRecorder { - def apply(): CounterRecorder = new CounterRecorder() -}
\ No newline at end of file +object AtomicHistogramFieldsAccessor { + def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala index 634c94a1..d8f2b620 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala @@ -1,26 +1,27 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.scala -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ @Aspect -class FutureTracing { +class FutureInstrumentation { @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala new file mode 100644 index 00000000..bb412f79 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } + +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, + errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.collect(context), + timeInMailbox.collect(context), + mailboxSize.collect(context), + errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + mailboxSize.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, + mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = ActorMetricSnapshot + + def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + mailboxSize.merge(that.mailboxSize, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (ProcessingTime -> processingTime), + (MailboxSize -> mailboxSize), + (TimeInMailbox -> timeInMailbox), + (Errors -> errors)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = ActorMetricsRecorder + + def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { + val settings = config.getConfig("precision.actor") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + val mailboxSizeConfig = settings.getConfig("mailbox-size") + + new ActorMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + MinMaxCounter.fromConfig(mailboxSizeConfig, system), + Counter()) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala new file mode 100644 index 00000000..fbce783c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala @@ -0,0 +1,88 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ Histogram, HdrHistogram } + +case class DispatcherMetrics(name: String) extends MetricGroupIdentity { + val category = DispatcherMetrics +} + +object DispatcherMetrics extends MetricGroupCategory { + val name = "dispatcher" + + case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } + case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } + case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } + case object PoolSize extends MetricIdentity { val name = "pool-size" } + + case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, + queueTaskCount: Histogram, poolSize: Histogram) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.collect(context), + runningThreadCount.collect(context), + queueTaskCount.collect(context), + poolSize.collect(context)) + + def cleanup: Unit = {} + + } + + case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, + queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = DispatcherMetricSnapshot + + def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.merge(that.maximumPoolSize, context), + runningThreadCount.merge(that.runningThreadCount, context), + queueTaskCount.merge(that.queueTaskCount, context), + poolSize.merge(that.poolSize, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (MaximumPoolSize -> maximumPoolSize), + (RunningThreadCount -> runningThreadCount), + (QueueTaskCount -> queueTaskCount), + (PoolSize -> poolSize)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = DispatcherMetricRecorder + + def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { + val settings = config.getConfig("precision.dispatcher") + + val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") + val runningThreadCountConfig = settings.getConfig("running-thread-count") + val queueTaskCountConfig = settings.getConfig("queued-task-count") + val poolSizeConfig = settings.getConfig("pool-size") + + new DispatcherMetricRecorder( + Histogram.fromConfig(maximumPoolSizeConfig), + Histogram.fromConfig(runningThreadCountConfig), + Histogram.fromConfig(queueTaskCountConfig), + Histogram.fromConfig(poolSizeConfig)) + } + } +} + diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala new file mode 100644 index 00000000..325dd216 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala @@ -0,0 +1,75 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer } +import akka.actor.ActorSystem +import com.typesafe.config.Config + +trait MetricGroupCategory { + def name: String +} + +trait MetricGroupIdentity { + def name: String + def category: MetricGroupCategory +} + +trait MetricIdentity { + def name: String +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def default: CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } +} + +trait MetricGroupRecorder { + def collect(context: CollectionContext): MetricGroupSnapshot + def cleanup: Unit +} + +trait MetricSnapshot { + type SnapshotType + + def merge(that: SnapshotType, context: CollectionContext): SnapshotType +} + +trait MetricGroupSnapshot { + type GroupSnapshotType + + def metrics: Map[MetricIdentity, MetricSnapshot] + def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType +} + +private[kamon] trait MetricRecorder { + type SnapshotType <: MetricSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait MetricGroupFactory { + type GroupRecorder <: MetricGroupRecorder + def create(config: Config, system: ActorSystem): GroupRecorder +} + diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index c60babb2..1025f0de 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -14,7 +14,9 @@ * ========================================================================================= */ -package kamon.metrics +package kamon.metric + +import java.nio.{ LongBuffer, ByteBuffer } import scala.collection.concurrent.TrieMap import akka.actor._ @@ -22,8 +24,8 @@ import com.typesafe.config.Config import kamon.util.GlobPathFilter import kamon.Kamon import akka.actor -import kamon.metrics.Metrics.MetricGroupFilter -import kamon.metrics.Subscriptions.Subscribe +import kamon.metric.Metrics.MetricGroupFilter +import kamon.metric.Subscriptions.Subscribe import java.util.concurrent.TimeUnit class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -42,7 +44,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder]) + Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) else None } @@ -56,7 +58,11 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { } def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { - (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap + // TODO: Improve the way in which we are getting the context. + val context = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(50000) + } + (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap } def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { @@ -68,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { } private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false) + filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) } def loadFilters(config: Config): Map[String, MetricGroupFilter] = { diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala index 6899490a..2f27c1a3 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Scale.scala +++ b/kamon-core/src/main/scala/kamon/metric/Scale.scala @@ -14,7 +14,7 @@ * ========================================================================================= */ -package kamon.metrics +package kamon.metric class Scale(val numericValue: Double) extends AnyVal diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala index c9990229..a9f4c721 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala @@ -14,15 +14,15 @@ * ========================================================================================= */ -package kamon.metrics +package kamon.metric import akka.actor.{ Props, ActorRef, Actor } -import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } +import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } import kamon.util.GlobPathFilter import scala.concurrent.duration.{ FiniteDuration, Duration } import java.util.concurrent.TimeUnit import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer class Subscriptions extends Actor { import context.system @@ -65,7 +65,7 @@ class Subscriptions extends Actor { } def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { for ((filter, receivers) ← subscriptions) yield { val selection = snapshots.filter(group ⇒ filter.accept(group._1)) @@ -90,6 +90,7 @@ object Subscriptions { class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext = CollectionContext.default def receive = empty @@ -116,14 +117,12 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef super.postStop() } - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) } object TickMetricSnapshotBuffer { case object FlushBuffer - case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) } diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala new file mode 100644 index 00000000..1ee1eab4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import kamon.metric.instrument.{ Histogram } + +import scala.collection.concurrent.TrieMap +import com.typesafe.config.Config + +case class TraceMetrics(name: String) extends MetricGroupIdentity { + val category = TraceMetrics +} + +object TraceMetrics extends MetricGroupCategory { + val name = "trace" + + case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } + case class HttpClientRequest(name: String) extends MetricIdentity + + case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) + extends MetricGroupRecorder { + + private val segments = TrieMap[MetricIdentity, Histogram]() + + def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + + def collect(context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot( + elapsedTime.collect(context), + segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + + def cleanup: Unit = {} + } + + case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = TraceMetricsSnapshot + + def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) + + def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = TraceMetricRecorder + + def create(config: Config, system: ActorSystem): TraceMetricRecorder = { + + val settings = config.getConfig("precision.trace") + val elapsedTimeConfig = settings.getConfig("elapsed-time") + val segmentConfig = settings.getConfig("segment") + + new TraceMetricRecorder( + Histogram.fromConfig(elapsedTimeConfig), + () ⇒ Histogram.fromConfig(segmentConfig)) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala new file mode 100644 index 00000000..dea03968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -0,0 +1,139 @@ +package kamon.metric + +import akka.actor +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import com.typesafe.config.Config +import kamon.Kamon +import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + lazy val userMetricsRecorder = Kamon(Metrics)(system).register(UserMetrics, UserMetrics.Factory).get + + def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + userMetricsRecorder.buildHistogram(name, precision, highestTrackableValue) + + def registerHistogram(name: String): Histogram = + userMetricsRecorder.buildHistogram(name) + + def registerCounter(name: String): Counter = + userMetricsRecorder.buildCounter(name) + + def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + userMetricsRecorder.buildMinMaxCounter(name, precision, highestTrackableValue, refreshInterval) + } + + def registerMinMaxCounter(name: String): MinMaxCounter = + userMetricsRecorder.buildMinMaxCounter(name) + + def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name)(currentValueCollector) + + def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name, precision, highestTrackableValue, refreshInterval, currentValueCollector) +} + +object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider with MetricGroupIdentity { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + + val name: String = "user-metrics-recorder" + val category = new MetricGroupCategory { + val name: String = "user-metrics" + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = UserMetricsRecorder + def create(config: Config, system: ActorSystem): UserMetricsRecorder = new UserMetricsRecorder(system) + } + + class UserMetricsRecorder(system: ActorSystem) extends MetricGroupRecorder { + val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") + val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") + val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") + val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") + + val histograms = TrieMap[String, Histogram]() + val counters = TrieMap[String, Counter]() + val minMaxCounters = TrieMap[String, MinMaxCounter]() + val gauges = TrieMap[String, Gauge]() + + def buildHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + histograms.getOrElseUpdate(name, Histogram(highestTrackableValue, precision, Scale.Unit)) + + def buildHistogram(name: String): Histogram = + histograms.getOrElseUpdate(name, Histogram.fromConfig(defaultHistogramPrecisionConfig)) + + def buildCounter(name: String): Counter = + counters.getOrElseUpdate(name, Counter()) + + def buildMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + minMaxCounters.getOrElseUpdate(name, MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) + } + + def buildMinMaxCounter(name: String): MinMaxCounter = + minMaxCounters.getOrElseUpdate(name, MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) + + def buildGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration, currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) + + def buildGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) + + def collect(context: CollectionContext): UserMetricsSnapshot = { + val histogramSnapshots = histograms.map { + case (name, histogram) ⇒ + (UserHistogram(name), histogram.collect(context)) + } toMap + + val counterSnapshots = counters.map { + case (name, counter) ⇒ + (UserCounter(name), counter.collect(context)) + } toMap + + val minMaxCounterSnapshots = minMaxCounters.map { + case (name, minMaxCounter) ⇒ + (UserMinMaxCounter(name), minMaxCounter.collect(context)) + } toMap + + val gaugeSnapshots = gauges.map { + case (name, gauge) ⇒ + (UserGauge(name), gauge.collect(context)) + } toMap + + UserMetricsSnapshot(histogramSnapshots, counterSnapshots, minMaxCounterSnapshots, gaugeSnapshots) + } + + def cleanup: Unit = {} + } + + case class UserHistogram(name: String) extends MetricIdentity + case class UserCounter(name: String) extends MetricIdentity + case class UserMinMaxCounter(name: String) extends MetricIdentity + case class UserGauge(name: String) extends MetricIdentity + + case class UserMetricsSnapshot(histograms: Map[UserHistogram, Histogram.Snapshot], + counters: Map[UserCounter, Counter.Snapshot], + minMaxCounters: Map[UserMinMaxCounter, Histogram.Snapshot], + gauges: Map[UserGauge, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = UserMetricsSnapshot + + def merge(that: UserMetricsSnapshot, context: CollectionContext): UserMetricsSnapshot = + UserMetricsSnapshot( + combineMaps(histograms, that.histograms)((l, r) ⇒ l.merge(r, context)), + combineMaps(counters, that.counters)((l, r) ⇒ l.merge(r, context)), + combineMaps(minMaxCounters, that.minMaxCounters)((l, r) ⇒ l.merge(r, context)), + combineMaps(gauges, that.gauges)((l, r) ⇒ l.merge(r, context))) + + def metrics: Map[MetricIdentity, MetricSnapshot] = histograms ++ counters ++ minMaxCounters ++ gauges + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala new file mode 100644 index 00000000..b592bcd3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -0,0 +1,59 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import jsr166e.LongAdder +import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } + +trait Counter extends MetricRecorder { + type SnapshotType = Counter.Snapshot + + def increment(): Unit + def increment(times: Long): Unit +} + +object Counter { + + def apply(): Counter = new LongAdderCounter + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Counter.Snapshot + + def count: Long + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + } +} + +class LongAdderCounter extends Counter { + private val counter = new LongAdder + + def increment(): Unit = counter.increment() + + def increment(times: Long): Unit = { + if (times < 0) + throw new UnsupportedOperationException("Counters cannot be decremented") + counter.add(times) + } + + def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset()) + + def cleanup: Unit = {} +} + +case class CounterSnapshot(count: Long) extends Counter.Snapshot { + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala new file mode 100644 index 00000000..1efff2bc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -0,0 +1,78 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ Cancellable, ActorSystem } +import com.typesafe.config.Config +import kamon.metric.{ CollectionContext, Scale, MetricRecorder } + +import scala.concurrent.duration.FiniteDuration + +trait Gauge extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Gauge { + + trait CurrentValueCollector { + def currentValue: Long + } + + def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + gauge.refreshValue() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge + } + + def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = + fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + + def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { + val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") + fromConfig(config, system)(currentValueCollector) + } + + def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector) + } + + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} + +class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + def record(value: Long): Unit = underlyingHistogram.record(value) + + def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala new file mode 100644 index 00000000..9ae077f4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -0,0 +1,246 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import java.nio.LongBuffer +import com.typesafe.config.Config +import org.HdrHistogram.AtomicHistogramFieldsAccessor +import org.HdrHistogram.AtomicHistogram +import kamon.metric._ + +trait Histogram extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Histogram { + + def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = + new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) + + def fromConfig(config: Config): Histogram = { + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + + new HdrHistogram(1L, highest, significantDigits) + } + + object HighestTrackableValue { + val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L + } + + case class Precision(significantDigits: Int) + object Precision { + val Low = Precision(1) + val Normal = Precision(2) + val Fine = Precision(3) + } + + trait Record { + def level: Long + def count: Long + + private[kamon] def rawCompactRecord: Long + } + + case class MutableRecord(var level: Long, var count: Long) extends Record { + var rawCompactRecord: Long = 0L + } + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Histogram.Snapshot + + def isEmpty: Boolean = numberOfMeasurements == 0 + def scale: Scale + def numberOfMeasurements: Long + def min: Long + def max: Long + def recordsIterator: Iterator[Record] + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot + } +} + +/** + * This implementation is meant to be used for real time data collection where data snapshots are taken often over time. + * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) + extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) + with Histogram with AtomicHistogramFieldsAccessor { + + import AtomicHistogramFieldsAccessor.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def record(value: Long, count: Long): Unit = recordValueWithCount(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = { + import context.buffer + buffer.clear() + val nrOfMeasurements = writeSnapshotTo(buffer) + + buffer.flip() + + val measurementsArray = Array.ofDim[Long](buffer.limit()) + buffer.get(measurementsArray, 0, measurementsArray.length) + new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + } + + def cleanup: Unit = {} + + private def writeSnapshotTo(buffer: LongBuffer): Long = { + val counts = countsArray() + val countsLength = counts.length() + + var nrOfMeasurements = 0L + var index = 0L + while (index < countsLength) { + val countAtIndex = counts.getAndSet(index.toInt, 0L) + + if (countAtIndex > 0) { + buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex)) + nrOfMeasurements += countAtIndex + } + + index += 1 + } + + reestablishTotalCount(nrOfMeasurements) + nrOfMeasurements + } + + private def reestablishTotalCount(diff: Long): Unit = { + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - diff + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + } + +} + +class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, + subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { + + def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) + def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1)) + + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { + if (that.isEmpty) this else if (this.isEmpty) that else { + import context.buffer + buffer.clear() + + val selfIterator = recordsIterator + val thatIterator = that.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L + + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } + + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } + } + + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) + } + + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) + } + } + + @inline private def mergeCompactRecords(left: Long, right: Long): Long = { + val index = left >> 48 + val leftCount = countFromCompactRecord(left) + val rightCount = countFromCompactRecord(right) + + CompactHdrSnapshot.compactRecord(index, leftCount + rightCount) + } + + @inline private def levelFromCompactRecord(compactRecord: Long): Long = { + val countsArrayIndex = (compactRecord >> 48).toInt + var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + + @inline private def countFromCompactRecord(compactRecord: Long): Long = + compactRecord & CompactHdrSnapshot.CompactRecordCountMask + + def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] { + var currentIndex = 0 + val mutableRecord = Histogram.MutableRecord(0, 0) + + override def hasNext: Boolean = currentIndex < compactRecords.length + + override def next(): Histogram.Record = { + if (hasNext) { + val measurement = compactRecords(currentIndex) + mutableRecord.rawCompactRecord = measurement + mutableRecord.level = levelFromCompactRecord(measurement) + mutableRecord.count = countFromCompactRecord(measurement) + currentIndex += 1 + + mutableRecord + } else { + throw new IllegalStateException("The iterator has already been consumed.") + } + } + } +} + +object CompactHdrSnapshot { + val CompactRecordCountMask = 0xFFFFFFFFFFFFL + + def compactRecord(index: Long, count: Long): Long = (index << 48) | count +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala new file mode 100644 index 00000000..471e7bd4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -0,0 +1,116 @@ +package kamon.metric.instrument + +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import java.lang.Math.abs +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference +import akka.actor.{ ActorSystem, Cancellable } +import com.typesafe.config.Config +import jsr166e.LongMaxUpdater +import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.util.PaddedAtomicLong +import scala.concurrent.duration.FiniteDuration + +trait MinMaxCounter extends MetricRecorder { + override type SnapshotType = Histogram.Snapshot + + def increment(): Unit + def increment(times: Long): Unit + def decrement() + def decrement(times: Long) +} + +object MinMaxCounter { + + def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem): MinMaxCounter = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + minMaxCounter.refreshValues() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) + minMaxCounter + } + + def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) + } +} + +class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { + private val min = new LongMaxUpdater + private val max = new LongMaxUpdater + private val sum = new PaddedAtomicLong + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + min.update(0L) + max.update(0L) + + def increment(): Unit = increment(1L) + + def increment(times: Long): Unit = { + val currentValue = sum.addAndGet(times) + max.update(currentValue) + } + + def decrement(): Unit = decrement(1L) + + def decrement(times: Long): Unit = { + val currentValue = sum.addAndGet(-times) + min.update(-currentValue) + } + + def collect(context: CollectionContext): Histogram.Snapshot = { + refreshValues() + underlyingHistogram.collect(context) + } + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValues(): Unit = { + val currentValue = { + val value = sum.get() + if (value < 0) 0 else value + } + + val currentMin = { + val minAbs = abs(min.maxThenReset()) + if (minAbs <= currentValue) minAbs else 0 + } + + underlyingHistogram.record(currentValue) + underlyingHistogram.record(currentMin) + underlyingHistogram.record(max.maxThenReset()) + + max.update(currentValue) + min.update(-currentValue) + } +} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala index 640157a9..43166058 100644 --- a/kamon-core/src/main/scala/kamon/metrics/package.scala +++ b/kamon-core/src/main/scala/kamon/metric/package.scala @@ -19,12 +19,7 @@ package kamon import scala.annotation.tailrec import com.typesafe.config.Config -package object metrics { - - case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int) - - def extractPrecisionConfig(config: Config): HdrPrecisionConfig = - HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) +package object metric { @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { if (right.isEmpty) diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala deleted file mode 100644 index 9e19dced..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import com.typesafe.config.Config -import kamon.metrics.instruments.CounterRecorder -import org.HdrHistogram.HdrRecorder - -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name, tag = "processing-time" } - case object MailboxSize extends MetricIdentity { val name, tag = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name, tag = "time-in-mailbox" } - case object ErrorCounter extends MetricIdentity { val name, tag = "errors" } - - case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder, errorCounter: MetricRecorder) - extends MetricGroupRecorder { - - def collect: MetricGroupSnapshot = { - ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect(), errorCounter.collect()) - } - } - - case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike, errorCounter: MetricSnapshotLike) - extends MetricGroupSnapshot { - - val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (ErrorCounter -> errorCounter)) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = ActorMetricRecorder - - def create(config: Config): ActorMetricRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) - val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) - val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox")) - - new ActorMetricRecorder( - HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano), - HdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit), - HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano), - CounterRecorder()) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala deleted file mode 100644 index cd0afac1..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import kamon.metrics.instruments.ContinuousHdrRecorder -import org.HdrHistogram.HdrRecorder -import com.typesafe.config.Config - -case class CustomMetric(name: String) extends MetricGroupIdentity { - val category = CustomMetric -} - -object CustomMetric extends MetricGroupCategory { - val name = "custom-metric" - val RecordedValues = new MetricIdentity { val name, tag = "recorded-values" } - - def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) = - new MetricGroupFactory { - - type GroupRecorder = CustomMetricRecorder - - def create(config: Config): CustomMetricRecorder = { - val recorder = - if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) - else HdrRecorder(highestTrackableValue, significantValueDigits, scale) - - new CustomMetricRecorder(RecordedValues, recorder) - } - } - - class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder) - extends MetricGroupRecorder { - - def record(value: Long): Unit = underlyingRecorder.record(value) - - def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect()))) - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala deleted file mode 100644 index f41e0c3f..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import com.typesafe.config.Config -import org.HdrHistogram.HdrRecorder - -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} - -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" - - case object MaximumPoolSize extends MetricIdentity { val name, tag = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name, tag = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name, tag = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name, tag = "pool-size" } - - case class DispatcherMetricRecorder(maximumPoolSize: MetricRecorder, runningThreadCount: MetricRecorder, queueTaskCount: MetricRecorder, poolSize: MetricRecorder) - extends MetricGroupRecorder { - - def collect: MetricGroupSnapshot = { - DispatcherMetricSnapshot(maximumPoolSize.collect(), runningThreadCount.collect(), queueTaskCount.collect(), poolSize.collect()) - } - } - - case class DispatcherMetricSnapshot(maximumPoolSize: MetricSnapshotLike, runningThreadCount: MetricSnapshotLike, queueTaskCount: MetricSnapshotLike, poolSize: MetricSnapshotLike) - extends MetricGroupSnapshot { - - val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val MaximumPoolSizeConfig = extractPrecisionConfig(settings.getConfig("maximum-pool-size")) - val RunningThreadCountConfig = extractPrecisionConfig(settings.getConfig("running-thread-count")) - val QueueTaskCountConfig = extractPrecisionConfig(settings.getConfig("queued-task-count")) - val PoolSizeConfig = extractPrecisionConfig(settings.getConfig("pool-size")) - - new DispatcherMetricRecorder( - HdrRecorder(MaximumPoolSizeConfig.highestTrackableValue, MaximumPoolSizeConfig.significantValueDigits, Scale.Unit), - HdrRecorder(RunningThreadCountConfig.highestTrackableValue, RunningThreadCountConfig.significantValueDigits, Scale.Unit), - HdrRecorder(QueueTaskCountConfig.highestTrackableValue, QueueTaskCountConfig.significantValueDigits, Scale.Unit), - HdrRecorder(PoolSizeConfig.highestTrackableValue, PoolSizeConfig.significantValueDigits, Scale.Unit)) - } - } -} - diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala deleted file mode 100644 index f07bf38e..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import annotation.tailrec -import com.typesafe.config.Config -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.InstrumentType - -trait MetricGroupCategory { - def name: String -} - -trait MetricGroupIdentity { - def name: String - def category: MetricGroupCategory -} - -trait MetricIdentity { - def name: String - def tag: String -} - -trait MetricGroupRecorder { - def collect: MetricGroupSnapshot -} - -trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshotLike] -} - -case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - -trait MetricRecorder { - def record(value: Long) - def collect(): MetricSnapshotLike -} - -object InstrumentTypes { - sealed trait InstrumentType - case object Histogram extends InstrumentType - case object Gauge extends InstrumentType - case object Counter extends InstrumentType -} - -trait MetricSnapshotLike { - def instrumentType: InstrumentType - def numberOfMeasurements: Long - def scale: Scale - def measurements: Vector[Measurement] - - def max: Long = measurements.lastOption.map(_.value).getOrElse(0) - def min: Long = measurements.headOption.map(_.value).getOrElse(0) - - def merge(that: MetricSnapshotLike): MetricSnapshotLike = { - val mergedMeasurements = Vector.newBuilder[Measurement] - - @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { - if (left.nonEmpty && right.nonEmpty) { - val leftValue = left.head - val rightValue = right.head - - if (rightValue.value == leftValue.value) { - val merged = rightValue.merge(leftValue) - mergedMeasurements += merged - go(left.tail, right.tail, totalNrOfMeasurements + merged.count) - } else { - if (leftValue.value < rightValue.value) { - mergedMeasurements += leftValue - go(left.tail, right, totalNrOfMeasurements + leftValue.count) - } else { - mergedMeasurements += rightValue - go(left, right.tail, totalNrOfMeasurements + rightValue.count) - } - } - } else { - if (left.isEmpty && right.nonEmpty) { - mergedMeasurements += right.head - go(left, right.tail, totalNrOfMeasurements + right.head.count) - } else { - if (left.nonEmpty && right.isEmpty) { - mergedMeasurements += left.head - go(left.tail, right, totalNrOfMeasurements + left.head.count) - } else totalNrOfMeasurements - } - } - } - - val totalNrOfMeasurements = go(measurements, that.measurements, 0) - MetricSnapshot(instrumentType, totalNrOfMeasurements, scale, mergedMeasurements.result()) - } -} - -case class MetricSnapshot(instrumentType: InstrumentType, numberOfMeasurements: Long, scale: Scale, - measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike - -object MetricSnapshot { - case class Measurement(value: Long, count: Long) { - def merge(that: Measurement) = Measurement(value, count + that.count) - } -} - -trait MetricGroupFactory { - type GroupRecorder <: MetricGroupRecorder - def create(config: Config): GroupRecorder -} - diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala deleted file mode 100644 index 5454edf5..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.HdrHistogram.HdrRecorder -import scala.collection.concurrent.TrieMap -import com.typesafe.config.Config - -case class TraceMetrics(name: String) extends MetricGroupIdentity { - val category = TraceMetrics -} - -object TraceMetrics extends MetricGroupCategory { - val name = "trace" - - case object ElapsedTime extends MetricIdentity { val name, tag = "elapsed-time" } - case class HttpClientRequest(name: String, tag: String) extends MetricIdentity - - class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder) - extends MetricGroupRecorder { - - private val segments = TrieMap[MetricIdentity, HdrRecorder]() - - def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder = - segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) - - def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), - segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) - } - - case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike]) - extends MetricGroupSnapshot { - - def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = TraceMetricRecorder - - def create(config: Config): TraceMetricRecorder = { - - val settings = config.getConfig("precision.trace") - val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) - val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) - - new TraceMetricRecorder( - HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano), - () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano)) - } - } - -} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala deleted file mode 100644 index 3a39ec69..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics.instruments - -import org.HdrHistogram.HdrRecorder -import kamon.metrics.{ Scale, MetricSnapshotLike } - -/** - * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is - * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this - * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, - * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing - * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value - * after a new event is processed. - * - */ -class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) - extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) { - - @volatile private var lastRecordedValue: Long = 0 - - override def record(value: Long): Unit = { - lastRecordedValue = value - super.record(value) - } - - override def collect(): MetricSnapshotLike = { - val snapshot = super.collect() - super.record(lastRecordedValue) - - snapshot - } -} - -object ContinuousHdrRecorder { - def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) = - new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala deleted file mode 100644 index ce4fd76d..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.AtomicLongFieldUpdater -import scala.annotation.tailrec -import kamon.metrics._ - -/** - * This implementation aims to be used for real time data collection where data snapshots are taken often over time. - * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) - extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder { - - import HdrRecorder.totalCountUpdater - - def record(value: Long): Unit = recordValue(value) - - def collect(): MetricSnapshotLike = { - val entries = Vector.newBuilder[MetricSnapshot.Measurement] - val countsLength = counts.length() - - @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if (index < countsLength) { - val currentValue = previousValue + increment - val countAtValue = counts.getAndSet(index, 0) - - if (countAtValue > 0) - entries += MetricSnapshot.Measurement(currentValue, countAtValue) - - if (currentValue == bucketLimit) - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) - else - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) - } else { - nrOfRecordings - } - } - - val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) - - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = getTotalCount - val newTotalCount = previousTotalCount - nrOfRecordings - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - - MetricSnapshot(InstrumentTypes.Histogram, nrOfRecordings, scale, entries.result()) - } - -} - -object HdrRecorder { - val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - - def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder = - new HdrRecorder(highestTrackableValue, significantValueDigits, scale) - -} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala deleted file mode 100644 index ba2550af..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala +++ /dev/null @@ -1,58 +0,0 @@ -package kamon.metrics.instruments - -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -import java.lang.Math._ -import jsr166e.LongMaxUpdater -import kamon.util.PaddedAtomicLong -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement - -class MinMaxCounter { - private val min = new LongMaxUpdater - private val max = new LongMaxUpdater - private val sum = new PaddedAtomicLong - - min.update(0L) - max.update(0L) - - def increment(value: Long = 1L): Unit = { - val currentValue = sum.addAndGet(value) - max.update(currentValue) - } - - def decrement(value: Long = 1L): Unit = { - val currentValue = sum.addAndGet(-value) - min.update(-currentValue) - } - - def collect(): CounterMeasurement = { - val currentValue = { - val value = sum.get() - if (value < 0) 0 else value - } - val result = CounterMeasurement(abs(min.maxThenReset()), max.maxThenReset(), currentValue) - max.update(currentValue) - min.update(-currentValue) - result - } -} - -object MinMaxCounter { - def apply() = new MinMaxCounter() - - case class CounterMeasurement(min: Long, max: Long, current: Long) -} diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala new file mode 100644 index 00000000..258cc1b2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala @@ -0,0 +1,11 @@ +package kamon.standalone + +import akka.actor.ActorSystem + +object KamonStandalone { + private lazy val system = ActorSystem("kamon-standalone") + + def registerHistogram(name: String) = { + + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 307cf17a..9ce3cd4e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -18,11 +18,11 @@ package kamon.trace import akka.actor.ActorSystem import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware import kamon.trace.TraceContext.SegmentIdentity -import kamon.metrics.TraceMetrics.TraceMetricRecorder +import kamon.metric.TraceMetrics.TraceMetricRecorder trait TraceContext { def name: String @@ -41,7 +41,7 @@ object TraceContext { } trait SegmentCompletionHandle { - def finish(metadata: Map[String, String]) + def finish(metadata: Map[String, String] = Map.empty) } case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String]) @@ -76,7 +76,7 @@ object SegmentCompletionHandleAware { } class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String], - val system: ActorSystem) extends TraceContext { + val system: ActorSystem) extends TraceContext { @volatile private var _isOpen = true val levelOfDetail = OnlyMetrics val startMark = System.nanoTime() diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 0e264cd2..efe08cdb 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -33,7 +33,7 @@ object TraceRecorder { def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], - system: ActorSystem): TraceContext = { + system: ActorSystem): TraceContext = { // In the future this should select between implementations. val finalToken = token.getOrElse(newToken) @@ -51,7 +51,7 @@ object TraceRecorder { traceContextStorage.set(Some(ctx)) } - def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] = + def startSegment(identity: SegmentIdentity, metadata: Map[String, String] = Map.empty): Option[SegmentCompletionHandle] = currentContext.map(_.startSegment(identity, metadata)) def rename(name: String): Unit = currentContext.map(_.rename(name)) |