diff options
Diffstat (limited to 'kamon-core/src/main/scala')
59 files changed, 2858 insertions, 1965 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index dfebd3a5..ab9ce05e 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -14,10 +14,72 @@ */ package kamon -import akka.actor._ +import _root_.akka.actor +import _root_.akka.actor._ +import com.typesafe.config.{ ConfigFactory, Config } +import kamon.metric._ +import kamon.trace.{ TracerExtensionImpl, TracerExtension } object Kamon { - trait Extension extends akka.actor.Extension - def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system) + trait Extension extends actor.Extension + + private case class KamonCoreComponents( + metrics: MetricsExtension, + tracer: TracerExtension, + userMetrics: UserMetricsExtension) + + @volatile private var _system: ActorSystem = _ + @volatile private var _coreComponents: Option[KamonCoreComponents] = None + + def start(config: Config): Unit = synchronized { + def resolveInternalConfig: Config = { + val internalConfig = config.getConfig("kamon.internal-config") + + config + .withoutPath("akka") + .withoutPath("spray") + .withFallback(internalConfig) + } + + if (_coreComponents.isEmpty) { + val metrics = MetricsExtensionImpl(config) + val simpleMetrics = UserMetricsExtensionImpl(metrics) + val tracer = TracerExtensionImpl(metrics, config) + + _coreComponents = Some(KamonCoreComponents(metrics, tracer, simpleMetrics)) + _system = ActorSystem("kamon", resolveInternalConfig) + + metrics.start(_system) + tracer.start(_system) + + } else sys.error("Kamon has already been started.") + } + + def start(): Unit = + start(ConfigFactory.load) + + def metrics: MetricsExtension = + ifStarted(_.metrics) + + def tracer: TracerExtension = + ifStarted(_.tracer) + + def userMetrics: UserMetricsExtension = + ifStarted(_.userMetrics) + + def apply[T <: Kamon.Extension](key: ExtensionId[T]): T = + ifStarted { _ ⇒ + if (_system ne null) + key(_system) + else + sys.error("Cannot retrieve extensions while Kamon is being initialized.") + } + + def extension[T <: Kamon.Extension](key: ExtensionId[T]): T = + apply(key) + + private def ifStarted[T](thunk: KamonCoreComponents ⇒ T): T = + _coreComponents.map(thunk(_)) getOrElse (sys.error("Kamon has not been started yet.")) + } diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala index 0dd189f6..553d59ed 100644 --- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala +++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala @@ -1,99 +1,41 @@ -package kamon.http - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.Counter -import kamon.metric._ - -import scala.collection.concurrent.TrieMap - -object HttpServerMetrics extends MetricGroupIdentity { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val name: String = "http-server-metrics-recorder" - val category = new MetricGroupCategory { - val name: String = "http-server" - } - - type TraceName = String - type StatusCode = String - - case class CountPerStatusCode(statusCode: String) extends MetricIdentity { - def name: String = statusCode - } - - case class TraceCountPerStatus(traceName: TraceName, statusCode: StatusCode) extends MetricIdentity { - def name: String = traceName + "_" + statusCode - } - - class HttpServerMetricsRecorder extends MetricGroupRecorder { - - private val counters = TrieMap[StatusCode, Counter]() - private val countersPerTrace = TrieMap[TraceName, TrieMap[StatusCode, Counter]]() - - def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L) +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ - def recordResponse(statusCode: StatusCode, count: Long): Unit = - counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count) - - def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L) - - def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = { - recordResponse(statusCode, count) - countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count) - } - - def collect(context: CollectionContext): HttpServerMetricsSnapshot = { - val countsPerStatusCode = counters.map { - case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) - }.toMap - - val countsPerTraceAndStatus = countersPerTrace.map { - case (traceName, countsPerStatus) ⇒ - (traceName, countsPerStatus.map { case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) }.toMap) - }.toMap - - HttpServerMetricsSnapshot(countsPerStatusCode, countsPerTraceAndStatus) - } - - def cleanup: Unit = {} - } +package kamon.http - case class HttpServerMetricsSnapshot(countsPerStatusCode: Map[StatusCode, Counter.Snapshot], - countsPerTraceAndStatusCode: Map[TraceName, Map[StatusCode, Counter.Snapshot]]) extends MetricGroupSnapshot { +import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder } +import kamon.metric.instrument.InstrumentFactory - type GroupSnapshotType = HttpServerMetricsSnapshot +/** + * Counts HTTP response status codes into per status code and per trace name + status counters. If recording a HTTP + * response with status 500 for the trace "GetUser", the counter with name "500" as well as the counter with name + * "GetUser_500" will be incremented. + */ +class HttpServerMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { - def merge(that: HttpServerMetricsSnapshot, context: CollectionContext): HttpServerMetricsSnapshot = { - val combinedCountsPerStatus = combineMaps(countsPerStatusCode, that.countsPerStatusCode)((l, r) ⇒ l.merge(r, context)) - val combinedCountsPerTraceAndStatus = combineMaps(countsPerTraceAndStatusCode, that.countsPerTraceAndStatusCode) { - (leftCounts, rightCounts) ⇒ combineMaps(leftCounts, rightCounts)((l, r) ⇒ l.merge(r, context)) - } - HttpServerMetricsSnapshot(combinedCountsPerStatus, combinedCountsPerTraceAndStatus) - } + def recordResponse(statusCode: String): Unit = + counter(statusCode).increment() - def metrics: Map[MetricIdentity, MetricSnapshot] = { - countsPerStatusCode.map { - case (statusCode, count) ⇒ (CountPerStatusCode(statusCode), count) - } ++ { - for ( - (traceName, countsPerStatus) ← countsPerTraceAndStatusCode; - (statusCode, count) ← countsPerStatus - ) yield (TraceCountPerStatus(traceName, statusCode), count) - } - } + def recordResponse(traceName: String, statusCode: String): Unit = { + recordResponse(statusCode) + counter(traceName + "_" + statusCode).increment() } - - val Factory = HttpServerMetricGroupFactory } -case object HttpServerMetricGroupFactory extends MetricGroupFactory { - - import HttpServerMetrics._ - - type GroupRecorder = HttpServerMetricsRecorder - - def create(config: Config, system: ActorSystem): HttpServerMetricsRecorder = - new HttpServerMetricsRecorder() - -}
\ No newline at end of file +object HttpServerMetrics extends EntityRecorderFactory[HttpServerMetrics] { + def category: String = "http-server" + def createRecorder(instrumentFactory: InstrumentFactory): HttpServerMetrics = new HttpServerMetrics(instrumentFactory) +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala index 5ca4481e..4dc5ff41 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJWeaverMissingWarning.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.instrumentation import _root_.akka.event.EventStream diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala deleted file mode 100644 index 366f446d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import akka.routing.RoutedActorCell -import kamon.Kamon -import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.RouterMetrics.RouterMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } -import kamon.trace._ -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorCellInstrumentation { - - import ActorCellInstrumentation.PimpedActorCellMetrics - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricIdentity = metricIdentity - cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") - routedActorCell.routerMetricIdentity = routerMetricIdentity - routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) - } - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime - - am.processingTime.record(processingTime) - am.timeInMailbox.record(timeInMailbox) - am.mailboxSize.decrement() - - (processingTime, timeInMailbox) - } map { - case (processingTime, timeInMailbox) ⇒ - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } - } - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") - def sendingMessageToActorCell(cell: ActorCell): Unit = {} - - @After("sendingMessageToActorCell(cell)") - def afterSendMessageToActorCell(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { rm ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errors.increment() - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ rm.errors.increment() - } - } - } - -} - -trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var routerMetricIdentity: RouterMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ -} - -@Aspect -class ActorCellMetricsIntoActorCellMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} -} - -@Aspect -class TraceContextIntoEnvelopeMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -object ActorCellInstrumentation { - implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { - def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit): Unit = { - if (cell.isInstanceOf[RoutedActorCell]) - block(cell) - } - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala deleted file mode 100644 index 82b8304d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorLoggingInstrumentation { - - @DeclareMixin("akka.event.Logging.LogEvent+") - def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") - def logEventCreation(event: TraceContextAware): Unit = {} - - @After("logEventCreation(event)") - def captureTraceContext(event: TraceContextAware): Unit = { - // Force initialization of TraceContextAware - event.traceContext - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { - pjp.proceed() - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala deleted file mode 100644 index 7845e90d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorSystemMessageInstrumentation { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} - -@Aspect -class TraceContextIntoSystemMessageMixin { - - @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -@Aspect -class TraceContextIntoRepointableActorRefMixin { - - @DeclareMixin("akka.actor.RepointableActorRef") - def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } - - @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} - - @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { - pjp.proceed() - } - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala deleted file mode 100644 index 5e8175fa..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import akka.actor.ActorRefProvider -import akka.event.Logging.Warning -import akka.pattern.{ AskTimeoutException, PromiseActorRef } -import kamon.Kamon -import kamon.trace.Trace -import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut } - -import scala.compat.Platform.EOL - -@Aspect -class AskPatternInstrumentation { - - class StackTraceCaptureException extends Throwable - - @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider") - def promiseActorRefApply(provider: ActorRefProvider): Unit = {} - - @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") - def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { - val system = promiseActor.provider.guardian.underlying.system - val traceExtension = Kamon(Trace)(system) - - if (traceExtension.enableAskPatternTracing) { - val future = promiseActor.result.future - implicit val ec = system.dispatcher - val stack = new StackTraceCaptureException - - future onFailure { - case timeout: AskTimeoutException ⇒ - val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) - - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - "Timeout triggered for ask pattern registered at: " + stackString)) - } - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala deleted file mode 100644 index 8b3af3d6..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import java.lang.reflect.Method -import java.util.concurrent.ThreadPoolExecutor - -import akka.actor.{ ActorSystemImpl, Cancellable } -import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } -import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement -import kamon.Kamon -import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.{ DispatcherMetrics, Metrics } -import org.aspectj.lang.annotation._ - -import scala.concurrent.forkjoin.ForkJoinPool - -@Aspect -class DispatcherInstrumentation { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricCollectionInfoIntoDispatcherMixin { - - @DeclareMixin("akka.dispatch.MessageDispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMetricCollectionInfo { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala deleted file mode 100644 index bda2da78..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.scala - -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class FutureInstrumentation { - - @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default - - @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") - def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} - - @After("futureRelatedRunnableCreation(runnable)") - def afterCreation(runnable: TraceContextAware): Unit = { - // Force traceContext initialization. - runnable.traceContext - } - - @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") - def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} - - @Around("futureRelatedRunnableExecution(runnable)") - def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(runnable.traceContext) { - pjp.proceed() - } - } - -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala deleted file mode 100644 index 65caaa8f..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.scalaz - -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class FutureInstrumentation { - - @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+") - def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware = - TraceContextAware.default - - @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)") - def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {} - - @After("futureRelatedCallableCreation(callable)") - def afterCreation(callable: TraceContextAware): Unit = - // Force traceContext initialization. - callable.traceContext - - @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)") - def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {} - - @Around("futureRelatedCallableExecution(callable)") - def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any = - TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) { - pjp.proceed() - } - -} diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala deleted file mode 100644 index d2cb4e38..00000000 --- a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } - -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, - errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.collect(context), - timeInMailbox.collect(context), - mailboxSize.collect(context), - errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - mailboxSize.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, - mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = ActorMetricSnapshot - - def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - mailboxSize.merge(that.mailboxSize, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (Errors -> errors)) - } - - val Factory = ActorMetricGroupFactory -} - -case object ActorMetricGroupFactory extends MetricGroupFactory { - import ActorMetrics._ - - type GroupRecorder = ActorMetricsRecorder - - def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - val mailboxSizeConfig = settings.getConfig("mailbox-size") - - new ActorMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - MinMaxCounter.fromConfig(mailboxSizeConfig, system), - Counter()) - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala deleted file mode 100644 index 126f6333..00000000 --- a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ Histogram, HdrHistogram } - -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} - -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" - - case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name = "pool-size" } - - case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, - queueTaskCount: Histogram, poolSize: Histogram) - extends MetricGroupRecorder { - - def collect(context: CollectionContext): MetricGroupSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.collect(context), - runningThreadCount.collect(context), - queueTaskCount.collect(context), - poolSize.collect(context)) - - def cleanup: Unit = {} - - } - - case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, - queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = DispatcherMetricSnapshot - - def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.merge(that.maximumPoolSize, context), - runningThreadCount.merge(that.runningThreadCount, context), - queueTaskCount.merge(that.queueTaskCount, context), - poolSize.merge(that.poolSize, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } - - val Factory = DispatcherMetricGroupFactory -} - -case object DispatcherMetricGroupFactory extends MetricGroupFactory { - - import DispatcherMetrics._ - - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") - val runningThreadCountConfig = settings.getConfig("running-thread-count") - val queueTaskCountConfig = settings.getConfig("queued-task-count") - val poolSizeConfig = settings.getConfig("pool-size") - - new DispatcherMetricRecorder( - Histogram.fromConfig(maximumPoolSizeConfig), - Histogram.fromConfig(runningThreadCountConfig), - Histogram.fromConfig(queueTaskCountConfig), - Histogram.fromConfig(poolSizeConfig)) - } - -} diff --git a/kamon-core/src/main/scala/kamon/metric/Entity.scala b/kamon-core/src/main/scala/kamon/metric/Entity.scala new file mode 100644 index 00000000..8d328f83 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Entity.scala @@ -0,0 +1,58 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +/** + * Identify a `thing` that is being monitored by Kamon. A [[kamon.metric.Entity]] is used to identify tracked `things` + * in both the metrics recording and reporting sides. Only the name and category fields are used with determining + * equality between two entities. + * + * // TODO: Find a better word for `thing`. + */ +class Entity(val name: String, val category: String, val metadata: Map[String, String]) { + + override def equals(o: Any): Boolean = { + if (this eq o.asInstanceOf[AnyRef]) + true + else if ((o.asInstanceOf[AnyRef] eq null) || !o.isInstanceOf[Entity]) + false + else { + val thatAsEntity = o.asInstanceOf[Entity] + category == thatAsEntity.category && name == thatAsEntity.name + } + } + + override def hashCode: Int = { + var result: Int = name.hashCode + result = 31 * result + category.hashCode + return result + } +} + +object Entity { + def apply(name: String, category: String): Entity = + apply(name, category, Map.empty) + + def apply(name: String, category: String, metadata: Map[String, String]): Entity = + new Entity(name, category, metadata) + + def create(name: String, category: String): Entity = + apply(name, category, Map.empty) + + def create(name: String, category: String, metadata: Map[String, String]): Entity = + new Entity(name, category, metadata) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala new file mode 100644 index 00000000..6e0a4248 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -0,0 +1,173 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +trait EntityRecorder { + def collect(collectionContext: CollectionContext): EntitySnapshot + def cleanup: Unit +} + +trait EntityRecorderFactory[T <: EntityRecorder] { + def category: String + def createRecorder(instrumentFactory: InstrumentFactory): T +} + +abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder { + import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax + + private val _instruments = TrieMap.empty[MetricKey, Instrument] + private def register[T <: Instrument](key: MetricKey, instrument: ⇒ T): T = + _instruments.atomicGetOrElseUpdate(key, instrument, _.cleanup).asInstanceOf[T] + + protected def histogram(name: String): Histogram = + register(HistogramKey(name), instrumentFactory.createHistogram(name)) + + protected def histogram(name: String, dynamicRange: DynamicRange): Histogram = + register(HistogramKey(name), instrumentFactory.createHistogram(name, Some(dynamicRange))) + + protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name)) + + protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = + register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange))) + + protected def histogram(key: HistogramKey): Histogram = + register(key, instrumentFactory.createHistogram(key.name)) + + protected def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = + register(key, instrumentFactory.createHistogram(key.name, Some(dynamicRange))) + + protected def removeHistogram(name: String): Unit = + _instruments.remove(HistogramKey(name)) + + protected def removeHistogram(key: HistogramKey): Unit = + _instruments.remove(key) + + protected def minMaxCounter(name: String): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name)) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + + protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name)) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + + protected def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + + protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name)) + + protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange))) + + protected def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval))) + + protected def removeMinMaxCounter(name: String): Unit = + _instruments.remove(MinMaxCounterKey(name)) + + protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit = + _instruments.remove(key) + + protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector)) + + protected def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + + protected def removeGauge(name: String): Unit = + _instruments.remove(GaugeKey(name)) + + protected def removeGauge(key: GaugeKey): Unit = + _instruments.remove(key) + + protected def counter(name: String): Counter = + register(CounterKey(name), instrumentFactory.createCounter()) + + protected def counter(key: CounterKey): Counter = + register(key, instrumentFactory.createCounter()) + + protected def removeCounter(name: String): Unit = + _instruments.remove(CounterKey(name)) + + protected def removeCounter(key: CounterKey): Unit = + _instruments.remove(key) + + def collect(collectionContext: CollectionContext): EntitySnapshot = { + val snapshots = Map.newBuilder[MetricKey, InstrumentSnapshot] + _instruments.foreach { + case (key, instrument) ⇒ snapshots += key -> instrument.collect(collectionContext) + } + + new DefaultEntitySnapshot(snapshots.result()) + } + + def cleanup: Unit = _instruments.values.foreach(_.cleanup) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala new file mode 100644 index 00000000..7ebb79e2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala @@ -0,0 +1,63 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import kamon.metric.instrument.{ Counter, Histogram, CollectionContext, InstrumentSnapshot } +import kamon.util.MapMerge +import scala.reflect.ClassTag + +trait EntitySnapshot { + def metrics: Map[MetricKey, InstrumentSnapshot] + def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot + + def histogram(name: String): Option[Histogram.Snapshot] = + find[HistogramKey, Histogram.Snapshot](name) + + def minMaxCounter(name: String): Option[Histogram.Snapshot] = + find[MinMaxCounterKey, Histogram.Snapshot](name) + + def gauge(name: String): Option[Histogram.Snapshot] = + find[GaugeKey, Histogram.Snapshot](name) + + def counter(name: String): Option[Counter.Snapshot] = + find[CounterKey, Counter.Snapshot](name) + + def histograms: Map[HistogramKey, Histogram.Snapshot] = + filterByType[HistogramKey, Histogram.Snapshot] + + def minMaxCounters: Map[MinMaxCounterKey, Histogram.Snapshot] = + filterByType[MinMaxCounterKey, Histogram.Snapshot] + + def gauges: Map[GaugeKey, Histogram.Snapshot] = + filterByType[GaugeKey, Histogram.Snapshot] + + def counters: Map[CounterKey, Counter.Snapshot] = + filterByType[CounterKey, Counter.Snapshot] + + private def filterByType[K <: MetricKey, V <: InstrumentSnapshot](implicit keyCT: ClassTag[K]): Map[K, V] = + metrics.collect { case (k, v) if keyCT.runtimeClass.isInstance(k) ⇒ (k.asInstanceOf[K], v.asInstanceOf[V]) } + + private def find[K <: MetricKey, V <: InstrumentSnapshot](name: String)(implicit keyCT: ClassTag[K]) = + metrics.find { case (k, v) ⇒ keyCT.runtimeClass.isInstance(k) && k.name == name } map (_._2.asInstanceOf[V]) +} + +class DefaultEntitySnapshot(val metrics: Map[MetricKey, InstrumentSnapshot]) extends EntitySnapshot { + import MapMerge.Syntax + + override def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot = + new DefaultEntitySnapshot(metrics.merge(that.metrics, (l, r) ⇒ l.merge(r, collectionContext))) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala new file mode 100644 index 00000000..a5d30c81 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala @@ -0,0 +1,169 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import kamon.metric.instrument.{ InstrumentTypes, InstrumentType, UnitOfMeasurement } + +/** + * MetricKeys are used to identify a given metric in entity recorders and snapshots. MetricKeys can be used to encode + * additional metadata for a metric being recorded, as well as the unit of measurement of the data being recorder. + */ +sealed trait MetricKey { + def name: String + def unitOfMeasurement: UnitOfMeasurement + def instrumentType: InstrumentType + def metadata: Map[String, String] +} + +// Wish that there was a shorter way to describe the operations bellow, but apparently there is no way to generalize all +// the apply/create versions that would produce the desired return types when used from Java. + +/** + * MetricKey for all Histogram-based metrics. + */ +case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Histogram +} + +object HistogramKey { + def apply(name: String): HistogramKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): HistogramKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): HistogramKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): HistogramKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): HistogramKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all MinMaxCounter-based metrics. + */ +case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.MinMaxCounter +} + +object MinMaxCounterKey { + def apply(name: String): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all Gauge-based metrics. + */ +case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Gauge +} + +object GaugeKey { + def apply(name: String): GaugeKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): GaugeKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): GaugeKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): GaugeKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): GaugeKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all Counter-based metrics. + */ +case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Counter +} + +object CounterKey { + def apply(name: String): CounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): CounterKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): CounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): CounterKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): CounterKey = + apply(name, unitOfMeasurement, metadata) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index f491cc57..87911352 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -16,129 +16,126 @@ package kamon.metric -import akka.event.Logging.Error -import akka.event.EventStream +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } +import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext } import scala.collection.concurrent.TrieMap import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metric.Metrics.MetricGroupFilter -import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } -import java.util.concurrent.TimeUnit - -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) - - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) - - /** Configuration Settings */ - val gaugeRecordingInterval: Long = metricsExtConfig.getMilliseconds("gauge-recording-interval") - - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") - - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) - else - None - } +import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate } - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity).map(_.cleanup) - } +case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) - def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit = - subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber) +trait MetricsExtension { + def settings: MetricsExtensionSettings + def shouldTrack(entity: Entity): Boolean + def shouldTrack(entityName: String, category: String): Boolean = + shouldTrack(Entity(entityName, category)) - def unsubscribe(subscriber: ActorRef): Unit = - subscriptions.tell(Unsubscribe(subscriber), subscriber) + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] + def unregister(entity: Entity): Unit - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ + def find(entity: Entity): Option[EntityRecorder] + def find(name: String, category: String): Option[EntityRecorder] - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = + subscribe(filter, subscriber, permanently = false) - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) - } + def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently) + + def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false) - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit - val filters = config.getObjectList("filters").asScala + def unsubscribe(subscriber: ActorRef): Unit + def buildDefaultCollectionContext: CollectionContext + def instrumentFactory(category: String): InstrumentFactory +} + +private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension { + private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] + private val _subscriptions = new LazyActorRef + + val settings = MetricsExtensionSettings(config) + + def shouldTrack(entity: Entity): Boolean = + settings.entityFilters.get(entity.category).map { + filter ⇒ filter.accept(entity.name) - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) + } getOrElse (settings.trackUnmatchedEntities) - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = { + import TriemapAtomicGetOrElseUpdate.Syntax + val entity = Entity(entityName, recorderFactory.category) - (key, MetricGroupFilter(includes, excludes)) - } + if (shouldTrack(entity)) { + val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory) + val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory), _.cleanup).asInstanceOf[T] - allFilters.toMap + Some(EntityRegistration(entity, recorder)) + } else None } - def buildDefaultCollectionContext: CollectionContext = - CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size")) - - def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { - if (!disableWeaverMissingError) { - val weaverMissingMessage = - """ - | - | ___ _ ___ _ _ ___ ___ _ _ - | / _ \ | | |_ | | | | | | \/ |(_) (_) - |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _ - || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` | - || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| | - |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, | - | | | __/ | - | |_| |___/ - | - | It seems like your application wasn't started with the -javaagent:/path-to-aspectj-weaver.jar option. Without that Kamon might - | not work properly, if you need help on setting up the weaver go to http://kamon.io/introduction/get-started/ for more info. If - | you are sure that you don't need the weaver (e.g. you are only using KamonStandalone) then you can disable this error message - | by changing the kamon.metrics.disable-aspectj-weaver-missing-error setting in your configuration file. - | - """.stripMargin - - eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage)) + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = { + _trackedEntities.put(entity, recorder).map { oldRecorder ⇒ + oldRecorder.cleanup } + + EntityRegistration(entity, recorder) } -} -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) + def unregister(entity: Entity): Unit = + _trackedEntities.remove(entity).map(_.cleanup) + + def find(entity: Entity): Option[EntityRecorder] = + _trackedEntities.get(entity) - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) + def find(name: String, category: String): Option[EntityRecorder] = + find(Entity(name, category)) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = + _subscriptions.tell(Subscribe(filter, subscriber, permanent)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(Unsubscribe(subscriber)) + + def buildDefaultCollectionContext: CollectionContext = + CollectionContext(settings.defaultCollectionContextBufferSize) + + def instrumentFactory(category: String): InstrumentFactory = + settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) + + private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = { + val builder = Map.newBuilder[Entity, EntitySnapshot] + _trackedEntities.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) + } + + builder.result() + } + + /** + * Metrics Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics")) + settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher)) } - implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { - def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = - trieMap.get(key) match { - case Some(v) ⇒ v - case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) - } + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null } } + +private[kamon] object MetricsExtensionImpl { + + def apply(config: Config) = + new MetricsExtensionImpl(config) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala new file mode 100644 index 00000000..9881ed00 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala @@ -0,0 +1,117 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import com.typesafe.config.Config +import kamon.metric.instrument._ +import kamon.util.GlobPathFilter + +import scala.concurrent.duration.FiniteDuration + +/** + * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key. + */ +case class MetricsExtensionSettings( + tickInterval: FiniteDuration, + defaultCollectionContextBufferSize: Int, + trackUnmatchedEntities: Boolean, + entityFilters: Map[String, EntityFilter], + instrumentFactories: Map[String, InstrumentFactory], + defaultInstrumentFactory: InstrumentFactory, + refreshScheduler: RefreshScheduler) { + + private[kamon] def pointScheduler(targetScheduler: RefreshScheduler): Unit = refreshScheduler match { + case lrs: LazyRefreshScheduler ⇒ lrs.point(targetScheduler) + case others ⇒ + } +} + +/** + * + */ +case class EntityFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { + def accept(name: String): Boolean = + includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) +} + +object MetricsExtensionSettings { + import kamon.util.ConfigTools.Syntax + import scala.concurrent.duration._ + + def apply(config: Config): MetricsExtensionSettings = { + val metricConfig = config.getConfig("kamon.metric") + + val tickInterval = metricConfig.getFiniteDuration("tick-interval") + val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size") + val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities") + val entityFilters = loadFilters(metricConfig.getConfig("filters")) + val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings")) + + val refreshScheduler = new LazyRefreshScheduler + val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler) + val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler) + + MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories, + defaultInstrumentFactory, refreshScheduler) + } + + /** + * Load all the default filters configured under the `kamon.metric.filters` configuration key. All filters are + * defined with the entity category as a sub-key of the `kamon.metric.filters` key and two sub-keys to it: includes + * and excludes with lists of string glob patterns as values. Example: + * + * {{{ + * + * kamon.metrics.filters { + * actor { + * includes = ["user/test-actor", "user/service/worker-*"] + * excludes = ["user/IO-*"] + * } + * } + * + * }}} + * + * @return a Map from category name to corresponding entity filter. + */ + def loadFilters(filtersConfig: Config): Map[String, EntityFilter] = { + import scala.collection.JavaConverters._ + + filtersConfig.firstLevelKeys map { category: String ⇒ + val includes = filtersConfig.getStringList(s"$category.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList + val excludes = filtersConfig.getStringList(s"$category.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + + (category, EntityFilter(includes, excludes)) + } toMap + } + + /** + * Load any custom configuration settings defined under the `kamon.metric.instrument-settings` configuration key and + * create InstrumentFactories for them. + * + * @return a Map from category name to InstrumentFactory. + */ + def loadInstrumentFactories(instrumentSettings: Config, defaults: DefaultInstrumentSettings, refreshScheduler: RefreshScheduler): Map[String, InstrumentFactory] = { + instrumentSettings.firstLevelKeys.map { category ⇒ + val categoryConfig = instrumentSettings.getConfig(category) + val customSettings = categoryConfig.firstLevelKeys.map { instrumentName ⇒ + (instrumentName, InstrumentCustomSettings.fromConfig(categoryConfig.getConfig(instrumentName))) + } toMap + + (category, new InstrumentFactory(customSettings, defaults, refreshScheduler)) + } toMap + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala deleted file mode 100644 index ddfef416..00000000 --- a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ Counter, Histogram } - -case class RouterMetrics(name: String) extends MetricGroupIdentity { - val category = RouterMetrics -} - -object RouterMetrics extends MetricGroupCategory { - val name = "router" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = RouterMetricSnapshot - - def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - ProcessingTime -> processingTime, - TimeInMailbox -> timeInMailbox, - Errors -> errors) - } - - val Factory = RouterMetricGroupFactory -} - -case object RouterMetricGroupFactory extends MetricGroupFactory { - - import RouterMetrics._ - - type GroupRecorder = RouterMetricsRecorder - - def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { - val settings = config.getConfig("precision.router") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - - new RouterMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - Counter()) - } -} - diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala deleted file mode 100644 index 1ba9f312..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor._ -import kamon.metric.Subscriptions._ -import kamon.util.GlobPathFilter -import scala.concurrent.duration.{ FiniteDuration, Duration } -import java.util.concurrent.TimeUnit -import kamon.Kamon -import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer - -class Subscriptions extends Actor { - import context.system - - val flushMetricsSchedule = scheduleFlushMessage() - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - - var lastTick: Long = System.currentTimeMillis() - var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty - var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty - - def receive = { - case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) - case Terminated(subscriber) ⇒ unsubscribe(subscriber) - case FlushMetrics ⇒ flush() - } - - def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = { - context.watch(subscriber) - val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection)) - - if (permanent) { - permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine { - permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) - }) - } else { - oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine { - oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) - }) - } - } - - def unsubscribe(subscriber: ActorRef): Unit = { - if (permanentSubscriptions.contains(subscriber)) - permanentSubscriptions = permanentSubscriptions - subscriber - - if (oneShotSubscriptions.contains(subscriber)) - oneShotSubscriptions = oneShotSubscriptions - subscriber - } - - def flush(): Unit = { - val currentTick = System.currentTimeMillis() - val snapshots = collectAll() - - dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots) - - lastTick = currentTick - oneShotSubscriptions = Map.empty - } - - def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = { - val allMetrics = Kamon(Metrics).storage - val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot] - - allMetrics.foreach { - case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) - } - - builder.result() - } - - def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[ActorRef, MetricSelectionFilter], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - - for ((subscriber, filter) ← subscriptions) { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - subscriber ! tickMetrics - } - } - - def scheduleFlushMessage(): Cancellable = { - val config = context.system.settings.config - val tickInterval = Duration(config.getMilliseconds("kamon.metrics.tick-interval"), TimeUnit.MILLISECONDS) - context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - } -} - -object Subscriptions { - case object FlushMetrics - case class Unsubscribe(subscriber: ActorRef) - case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false) - case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - - trait MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean - } - - object MetricSelectionFilter { - val empty = new MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = false - } - - implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) { - def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity) - } - } - } - - case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = { - category.equals(identity.category) && globFilter.accept(identity.name) - } - } -} - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext - - def receive = empty - - def empty: Actor.Receive = { - case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) - case FlushBuffer ⇒ // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) ⇒ - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become (buffering(combinedSnapshot)) - - case FlushBuffer ⇒ - receiver ! buffered - context become (empty) - - } - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala new file mode 100644 index 00000000..68b545a5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala @@ -0,0 +1,116 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor._ +import kamon.metric.SubscriptionsDispatcher._ +import kamon.util.{ MilliTimestamp, GlobPathFilter } +import scala.concurrent.duration.FiniteDuration + +/** + * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers. + */ +private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl) extends Actor { + var lastTick = MilliTimestamp.now + var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher) + val collectionContext = metricsExtension.buildDefaultCollectionContext + + def receive = { + case Tick ⇒ processTick() + case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case Terminated(subscriber) ⇒ unsubscribe(subscriber) + } + + def processTick(): Unit = + dispatch(metricsExtension.collectSnapshots(collectionContext)) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = { + def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] = + storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter)) + + context.watch(subscriber) + + if (permanent) + permanentSubscriptions = addSubscription(permanentSubscriptions) + else + oneShotSubscriptions = addSubscription(oneShotSubscriptions) + } + + def unsubscribe(subscriber: ActorRef): Unit = { + permanentSubscriptions = permanentSubscriptions - subscriber + oneShotSubscriptions = oneShotSubscriptions - subscriber + } + + def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = { + val currentTick = MilliTimestamp.now + + dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots) + dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots) + + lastTick = currentTick + oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + } + + def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter], + snapshots: Map[Entity, EntitySnapshot]): Unit = { + + for ((subscriber, filter) ← subscriptions) { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + subscriber ! tickMetrics + } + } +} + +object SubscriptionsDispatcher { + def props(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl): Props = + Props(new SubscriptionsDispatcher(interval, metricsExtension)) + + case object Tick + case class Unsubscribe(subscriber: ActorRef) + case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false) + case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot]) + +} + +trait SubscriptionFilter { self ⇒ + + def accept(entity: Entity): Boolean + + final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter { + override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity) + } +} + +object SubscriptionFilter { + val Empty = new SubscriptionFilter { + def accept(entity: Entity): Boolean = false + } + + def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter { + val categoryPattern = new GlobPathFilter(category) + val namePattern = new GlobPathFilter(name) + + def accept(entity: Entity): Boolean = { + categoryPattern.accept(entity.category) && namePattern.accept(entity.name) + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala new file mode 100644 index 00000000..dfc5d5f0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala @@ -0,0 +1,65 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer +import kamon.metric.instrument.CollectionContext +import kamon.util.MapMerge + +import scala.concurrent.duration.FiniteDuration + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + import MapMerge.Syntax + + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext: CollectionContext = Kamon.metrics.buildDefaultCollectionContext + + def receive = empty + + def empty: Actor.Receive = { + case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) + case FlushBuffer ⇒ // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) ⇒ + val combinedMetrics = buffered.metrics.merge(tickMetrics, (l, r) ⇒ l.merge(r, collectionContext)) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become (buffering(combinedSnapshot)) + + case FlushBuffer ⇒ + receiver ! buffered + context become (empty) + + } + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index eaad6e0d..3da9c1d4 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -16,67 +16,29 @@ package kamon.metric -import akka.actor.ActorSystem -import kamon.metric.instrument.{ Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory, Histogram } -import scala.collection.concurrent.TrieMap -import com.typesafe.config.Config +class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + import TraceMetrics.segmentKey -case class TraceMetrics(name: String) extends MetricGroupIdentity { - val category = TraceMetrics -} - -object TraceMetrics extends MetricGroupCategory { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val name = "trace" - - case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } - - case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) - extends MetricGroupRecorder { - - val segments = TrieMap[MetricIdentity, Histogram]() - - def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = - segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) - - def collect(context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot( - elapsedTime.collect(context), - segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + /** + * Records blah blah + */ + val ElapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds) - def cleanup: Unit = {} - } - - case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) - extends MetricGroupSnapshot { - - type GroupSnapshotType = TraceMetricsSnapshot - - def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) ⇒ l.merge(r, context))) - - def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) - } - - val Factory = TraceMetricGroupFactory + /** + * Records Blah Blah. + * + */ + def segment(name: String, category: String, library: String): Histogram = + histogram(segmentKey(name, category, library)) } -case object TraceMetricGroupFactory extends MetricGroupFactory { - - import TraceMetrics._ - - type GroupRecorder = TraceMetricRecorder - - def create(config: Config, system: ActorSystem): TraceMetricRecorder = { - val settings = config.getConfig("precision.trace") - val elapsedTimeConfig = settings.getConfig("elapsed-time") - val segmentConfig = settings.getConfig("segment") +object TraceMetrics extends EntityRecorderFactory[TraceMetrics] { + def category: String = "trace" + def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory) - new TraceMetricRecorder( - Histogram.fromConfig(elapsedTimeConfig, Scale.Nano), - () ⇒ Histogram.fromConfig(segmentConfig, Scale.Nano)) - } + def segmentKey(name: String, category: String, library: String): HistogramKey = + HistogramKey(name, Time.Nanoseconds, Map("category" -> category, "library" -> library)) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala index b7ac1ac5..e0818292 100644 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -1,189 +1,204 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.metric -import akka.actor -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } -import kamon.Kamon -import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ import scala.concurrent.duration.FiniteDuration -class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap - import UserMetrics._ +trait UserMetricsExtension { + def histogram(name: String): Histogram + def histogram(name: String, dynamicRange: DynamicRange): Histogram + def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram + def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram + def histogram(key: HistogramKey): Histogram + def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram + def removeHistogram(name: String): Unit + def removeHistogram(key: HistogramKey): Unit + + def minMaxCounter(name: String): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter + def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter + def removeMinMaxCounter(name: String): Unit + def removeMinMaxCounter(key: MinMaxCounterKey): Unit + + def gauge(name: String, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def removeGauge(name: String): Unit + def removeGauge(key: GaugeKey): Unit + + def counter(name: String): Counter + def counter(key: CounterKey): Counter + def removeCounter(name: String): Unit + def removeCounter(key: CounterKey): Unit - lazy val metricsExtension = Kamon(Metrics)(system) - val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") +} - val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") - val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") - val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") +private[kamon] class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension { + override def histogram(name: String): Histogram = + super.histogram(name) - def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = { - metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { - UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit)) - }).asInstanceOf[UserHistogramRecorder].histogram - } + override def histogram(name: String, dynamicRange: DynamicRange): Histogram = + super.histogram(name, dynamicRange) - def registerHistogram(name: String): Histogram = { - metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { - UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig)) - }).asInstanceOf[UserHistogramRecorder].histogram - } + override def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + super.histogram(name, unitOfMeasurement) - def registerCounter(name: String): Counter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), { - UserCounterRecorder(Counter()) - }).asInstanceOf[UserCounterRecorder].counter - } + override def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = + super.histogram(name, dynamicRange, unitOfMeasurement) - def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration): MinMaxCounter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { - UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) - }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter - } + override def histogram(key: HistogramKey): Histogram = + super.histogram(key) - def registerMinMaxCounter(name: String): MinMaxCounter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { - UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) - }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter - } + override def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = + super.histogram(key, dynamicRange) - def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { - UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) - }).asInstanceOf[UserGaugeRecorder].gauge - } + override def removeHistogram(name: String): Unit = + super.removeHistogram(name) - def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { - UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) - }).asInstanceOf[UserGaugeRecorder].gauge - } + override def removeHistogram(key: HistogramKey): Unit = + super.removeHistogram(key) - def removeHistogram(name: String): Unit = - metricsExtension.unregister(UserHistogram(name)) + override def minMaxCounter(name: String): MinMaxCounter = + super.minMaxCounter(name) - def removeCounter(name: String): Unit = - metricsExtension.unregister(UserCounter(name)) + override def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + super.minMaxCounter(name, dynamicRange) - def removeMinMaxCounter(name: String): Unit = - metricsExtension.unregister(UserMinMaxCounter(name)) + override def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(name, refreshInterval) - def removeGauge(name: String): Unit = - metricsExtension.unregister(UserGauge(name)) -} + override def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, unitOfMeasurement) -object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, refreshInterval) - def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + override def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, unitOfMeasurement) - sealed trait UserMetricGroup - // - // Histograms - // + override def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, refreshInterval, unitOfMeasurement) - case class UserHistogram(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserHistograms - } + override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, refreshInterval, unitOfMeasurement) - case class UserHistogramRecorder(histogram: Histogram) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserHistogramSnapshot(histogram.collect(context)) + override def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = + super.minMaxCounter(key) - def cleanup: Unit = histogram.cleanup - } + override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = + super.minMaxCounter(key, dynamicRange) - case class UserHistogramSnapshot(histogramSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserHistogramSnapshot + override def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(key, refreshInterval) - def merge(that: UserHistogramSnapshot, context: CollectionContext): UserHistogramSnapshot = - UserHistogramSnapshot(that.histogramSnapshot.merge(histogramSnapshot, context)) + override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(key, dynamicRange, refreshInterval) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, histogramSnapshot)) - } + override def removeMinMaxCounter(name: String): Unit = + super.removeMinMaxCounter(name) - // - // Counters - // + override def removeMinMaxCounter(key: MinMaxCounterKey): Unit = + super.removeMinMaxCounter(key) - case class UserCounter(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserCounters - } + override def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, valueCollector) - case class UserCounterRecorder(counter: Counter) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserCounterSnapshot(counter.collect(context)) + override def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, valueCollector) - def cleanup: Unit = counter.cleanup - } + override def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, refreshInterval, valueCollector) - case class UserCounterSnapshot(counterSnapshot: Counter.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserCounterSnapshot + override def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, unitOfMeasurement, valueCollector) - def merge(that: UserCounterSnapshot, context: CollectionContext): UserCounterSnapshot = - UserCounterSnapshot(that.counterSnapshot.merge(counterSnapshot, context)) + override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, refreshInterval, valueCollector) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((Count, counterSnapshot)) - } + override def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, unitOfMeasurement, valueCollector) - // - // MinMaxCounters - // + override def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, refreshInterval, unitOfMeasurement, valueCollector) - case class UserMinMaxCounter(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserMinMaxCounters - } + override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, refreshInterval, unitOfMeasurement, valueCollector) - case class UserMinMaxCounterRecorder(minMaxCounter: MinMaxCounter) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserMinMaxCounterSnapshot(minMaxCounter.collect(context)) + override def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, valueCollector) - def cleanup: Unit = minMaxCounter.cleanup - } + override def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, dynamicRange, valueCollector) - case class UserMinMaxCounterSnapshot(minMaxCounterSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserMinMaxCounterSnapshot + override def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, refreshInterval, valueCollector) - def merge(that: UserMinMaxCounterSnapshot, context: CollectionContext): UserMinMaxCounterSnapshot = - UserMinMaxCounterSnapshot(that.minMaxCounterSnapshot.merge(minMaxCounterSnapshot, context)) + override def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, dynamicRange, refreshInterval, valueCollector) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, minMaxCounterSnapshot)) - } + override def removeGauge(name: String): Unit = + super.removeGauge(name) - // - // Gauges - // + override def removeGauge(key: GaugeKey): Unit = + super.removeGauge(key) - case class UserGauge(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserGauges - } + override def counter(name: String): Counter = + super.counter(name) - case class UserGaugeRecorder(gauge: Gauge) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserGaugeSnapshot(gauge.collect(context)) + override def counter(key: CounterKey): Counter = + super.counter(key) - def cleanup: Unit = gauge.cleanup - } + override def removeCounter(name: String): Unit = + super.removeCounter(name) - case class UserGaugeSnapshot(gaugeSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserGaugeSnapshot - - def merge(that: UserGaugeSnapshot, context: CollectionContext): UserGaugeSnapshot = - UserGaugeSnapshot(that.gaugeSnapshot.merge(gaugeSnapshot, context)) - - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, gaugeSnapshot)) - } + override def removeCounter(key: CounterKey): Unit = + super.removeCounter(key) +} - case object UserHistograms extends MetricGroupCategory { val name: String = "histogram" } - case object UserCounters extends MetricGroupCategory { val name: String = "counter" } - case object UserMinMaxCounters extends MetricGroupCategory { val name: String = "min-max-counter" } - case object UserGauges extends MetricGroupCategory { val name: String = "gauge" } +private[kamon] object UserMetricsExtensionImpl { + val UserMetricEntity = Entity("user-metric", "user-metric") - case object RecordedValues extends MetricIdentity { val name: String = "values" } - case object Count extends MetricIdentity { val name: String = "count" } + def apply(metricsExtension: MetricsExtension): UserMetricsExtensionImpl = { + val instrumentFactory = metricsExtension.instrumentFactory(UserMetricEntity.category) + val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory) -} + metricsExtension.register(UserMetricEntity, userMetricsExtension).recorder + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala index e79090a8..e79090a8 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index 0f29ba6f..c1b69cbe 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -17,9 +17,8 @@ package kamon.metric.instrument import kamon.jsr166.LongAdder -import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } -trait Counter extends MetricRecorder { +trait Counter extends Instrument { type SnapshotType = Counter.Snapshot def increment(): Unit @@ -29,12 +28,11 @@ trait Counter extends MetricRecorder { object Counter { def apply(): Counter = new LongAdderCounter + def create(): Counter = apply() - trait Snapshot extends MetricSnapshot { - type SnapshotType = Counter.Snapshot - + trait Snapshot extends InstrumentSnapshot { def count: Long - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot } } @@ -55,5 +53,8 @@ class LongAdderCounter extends Counter { } case class CounterSnapshot(count: Long) extends Counter.Snapshot { - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match { + case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount) + case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.") + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index 0c1815c3..80214510 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,70 +1,102 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.metric.instrument -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference } -import akka.actor.{ Cancellable, ActorSystem } -import com.typesafe.config.Config -import kamon.metric.{ CollectionContext, Scale, MetricRecorder } +import akka.actor.Cancellable +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange import scala.concurrent.duration.FiniteDuration -trait Gauge extends MetricRecorder { +trait Gauge extends Instrument { type SnapshotType = Histogram.Snapshot - def record(value: Long) - def record(value: Long, count: Long) + def record(value: Long): Unit + def record(value: Long, count: Long): Unit + def refreshValue(): Unit } object Gauge { - trait CurrentValueCollector { - def currentValue: Long - } - - def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) - val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = { + val underlyingHistogram = Histogram(dynamicRange) + val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector) + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { gauge.refreshValue() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) - gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule) gauge } - def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = - fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = + apply(dynamicRange, refreshInterval, scheduler, valueCollector) - def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { - val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") - fromConfig(config, system)(currentValueCollector) + trait CurrentValueCollector { + def currentValue: Long } - def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = { - import scala.concurrent.duration._ + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getMilliseconds("refresh-interval").toInt +/** + * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between + * the current observed value and the previously observed value. Should only be used if the observed value is always + * increasing or staying steady, but is never able to decrease. + * + * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between + * the current value and the last value will be returned. + */ +class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector { + @volatile private var _readAtLeastOnce = false + private val _lastObservedValue = new AtomicLong(0) + + def currentValue: Long = { + if (_readAtLeastOnce) { + val wrappedCurrent = wrappedValueCollector.currentValue + val diff = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent) + + if (diff >= 0) diff else 0L + + } else { + _lastObservedValue.set(wrappedValueCollector.currentValue) + _readAtLeastOnce = true + 0L + } - Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector) } +} - def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - fromConfig(config, system, Scale.Unit)(currentValueCollector) - } +object DifferentialValueCollector { + def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector = + new DifferentialValueCollector(wrappedValueCollector) - implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { - def currentValue: Long = f.apply() - } + def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector = + new DifferentialValueCollector(new CurrentValueCollector { + def currentValue: Long = wrappedValueCollector + }) } class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { - val refreshValuesSchedule = new AtomicReference[Cancellable]() + private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]() def record(value: Long): Unit = underlyingHistogram.record(value) @@ -73,10 +105,12 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) def cleanup: Unit = { - if (refreshValuesSchedule.get() != null) - refreshValuesSchedule.get().cancel() + if (automaticValueCollectorSchedule.get() != null) + automaticValueCollectorSchedule.get().cancel() } - def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) + def refreshValue(): Unit = + underlyingHistogram.record(currentValueCollector.currentValue) + } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index bed75fc8..5c4c7f71 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -17,12 +17,11 @@ package kamon.metric.instrument import java.nio.LongBuffer -import com.typesafe.config.Config import org.HdrHistogram.AtomicHistogramFieldsAccessor +import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange } import org.HdrHistogram.AtomicHistogram -import kamon.metric._ -trait Histogram extends MetricRecorder { +trait Histogram extends Instrument { type SnapshotType = Histogram.Snapshot def record(value: Long) @@ -31,30 +30,40 @@ trait Histogram extends MetricRecorder { object Histogram { - def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = - new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) - - def fromConfig(config: Config): Histogram = { - fromConfig(config, Scale.Unit) - } - - def fromConfig(config: Config, scale: Scale): Histogram = { - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - - new HdrHistogram(1L, highest, significantDigits, scale) - } - - object HighestTrackableValue { - val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L - } - - case class Precision(significantDigits: Int) - object Precision { - val Low = Precision(1) - val Normal = Precision(2) - val Fine = Precision(3) - } + /** + * Scala API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange) + + /** + * Java API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange) + + /** + * DynamicRange is a configuration object used to supply range and precision configuration to a + * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]] + * for more details on how it works and the effects of these configuration values. + * + * @param lowestDiscernibleValue + * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that + * is >= 1. May be internally rounded down to nearest power of 2. + * + * @param highestTrackableValue + * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). + * Must not be larger than (Long.MAX_VALUE/2). + * + * @param precision + * The number of significant decimal digits to which the histogram will maintain value resolution and separation. + * Must be a non-negative integer between 1 and 3. + */ + case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int) trait Record { def level: Long @@ -67,29 +76,28 @@ object Histogram { var rawCompactRecord: Long = 0L } - trait Snapshot extends MetricSnapshot { - type SnapshotType = Histogram.Snapshot + trait Snapshot extends InstrumentSnapshot { def isEmpty: Boolean = numberOfMeasurements == 0 - def scale: Scale def numberOfMeasurements: Long def min: Long def max: Long def sum: Long def percentile(percentile: Double): Long def recordsIterator: Iterator[Record] + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot } object Snapshot { - def empty(targetScale: Scale) = new Snapshot { + val empty = new Snapshot { override def min: Long = 0L override def max: Long = 0L override def sum: Long = 0L override def percentile(percentile: Double): Long = 0L override def recordsIterator: Iterator[Record] = Iterator.empty - override def merge(that: Snapshot, context: CollectionContext): Snapshot = that - override def scale: Scale = targetScale + override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that + override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that override def numberOfMeasurements: Long = 0L } } @@ -100,10 +108,8 @@ object Histogram { * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) - extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) - with Histogram with AtomicHistogramFieldsAccessor { - +class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue, + dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor { import AtomicHistogramFieldsAccessor.totalCountUpdater def record(value: Long): Unit = recordValue(value) @@ -119,7 +125,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign val measurementsArray = Array.ofDim[Long](buffer.limit()) buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) } def getCounts = countsArray().length() @@ -160,7 +166,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign } -case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, +case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) @@ -182,53 +188,61 @@ case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, percentileLevel } - def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { - if (that.isEmpty) this else if (this.isEmpty) that else { - import context.buffer - buffer.clear() + def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot = + merge(that.asInstanceOf[InstrumentSnapshot], context) - val selfIterator = recordsIterator - val thatIterator = that.recordsIterator - var thatCurrentRecord: Histogram.Record = null - var mergedNumberOfMeasurements = 0L + def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match { + case thatSnapshot: CompactHdrSnapshot ⇒ + if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else { + import context.buffer + buffer.clear() - def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null - def addToBuffer(compactRecord: Long): Unit = { - mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) - buffer.put(compactRecord) - } + val selfIterator = recordsIterator + val thatIterator = thatSnapshot.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L - while (selfIterator.hasNext) { - val selfCurrentRecord = selfIterator.next() + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } - // Advance that to no further than the level of selfCurrentRecord - thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord - while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { - addToBuffer(thatCurrentRecord.rawCompactRecord) - thatCurrentRecord = nextOrNull(thatIterator) + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } } - // Include the current record of self and optionally merge if has the same level as thatCurrentRecord - if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { - addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) - thatCurrentRecord = nextOrNull(thatIterator) - } else { - addToBuffer(selfCurrentRecord.rawCompactRecord) + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) } - } - // Include everything that might have been left from that - if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) - while (thatIterator.hasNext) { - addToBuffer(thatIterator.next().rawCompactRecord) + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) } - buffer.flip() - val compactRecords = Array.ofDim[Long](buffer.limit()) - buffer.get(compactRecords) + case other ⇒ + sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.") - new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) - } } @inline private def mergeCompactRecords(left: Long, right: Long): Long = { diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala index 3761f5a5..59b4b443 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -14,23 +14,31 @@ * ========================================================================================= */ -package kamon.metric +package kamon.metric.instrument -import java.nio.{ LongBuffer } -import akka.actor.ActorSystem -import com.typesafe.config.Config +import java.nio.LongBuffer -trait MetricGroupCategory { - def name: String +import akka.actor.{ Scheduler, Cancellable } +import akka.dispatch.MessageDispatcher +import scala.concurrent.duration.FiniteDuration + +private[kamon] trait Instrument { + type SnapshotType <: InstrumentSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit } -trait MetricGroupIdentity { - def name: String - def category: MetricGroupCategory +trait InstrumentSnapshot { + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot } -trait MetricIdentity { - def name: String +class InstrumentType private[kamon] (val id: Int) extends AnyVal +object InstrumentTypes { + val Histogram = new InstrumentType(1) + val MinMaxCounter = new InstrumentType(2) + val Gauge = new InstrumentType(3) + val Counter = new InstrumentType(4) } trait CollectionContext { @@ -43,33 +51,3 @@ object CollectionContext { } } -trait MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot - def cleanup: Unit -} - -trait MetricSnapshot { - type SnapshotType - - def merge(that: SnapshotType, context: CollectionContext): SnapshotType -} - -trait MetricGroupSnapshot { - type GroupSnapshotType - - def metrics: Map[MetricIdentity, MetricSnapshot] - def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType -} - -private[kamon] trait MetricRecorder { - type SnapshotType <: MetricSnapshot - - def collect(context: CollectionContext): SnapshotType - def cleanup: Unit -} - -trait MetricGroupFactory { - type GroupRecorder <: MetricGroupRecorder - def create(config: Config, system: ActorSystem): GroupRecorder -} - diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala new file mode 100644 index 00000000..7c0201f7 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -0,0 +1,51 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) { + + private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = { + configurations.get(instrumentName).flatMap { customSettings ⇒ + codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default))) + + } getOrElse (codeSettings.getOrElse(default)) + } + + def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram) + Histogram(settings.dynamicRange) + } + + def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter) + MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler) + } + + def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None, + valueCollector: CurrentValueCollector): Gauge = { + + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge) + Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector) + } + + def createCounter(): Counter = Counter() +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala new file mode 100644 index 00000000..29f8f46b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala @@ -0,0 +1,65 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.util.ConfigTools.Syntax + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long], + precision: Option[Int], refreshInterval: Option[FiniteDuration]) { + + def combine(that: InstrumentSettings): InstrumentSettings = + InstrumentSettings( + DynamicRange( + lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue), + highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue), + precision.getOrElse(that.dynamicRange.precision)), + refreshInterval.orElse(that.refreshInterval)) +} + +object InstrumentCustomSettings { + + def fromConfig(config: Config): InstrumentCustomSettings = + InstrumentCustomSettings( + if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None, + if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None, + if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None, + if (config.hasPath("refresh-interval")) Some(config.getFiniteDuration("refresh-interval")) else None) + +} + +case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration]) + +object InstrumentSettings { + + def readDynamicRange(config: Config): DynamicRange = + DynamicRange( + config.getLong("lowest-discernible-value"), + config.getLong("highest-trackable-value"), + parsePrecision(config.getString("precision"))) + + def parsePrecision(stringValue: String): Int = stringValue match { + case "low" ⇒ 1 + case "normal" ⇒ 2 + case "fine" ⇒ 3 + case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].") + } +} + +case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings) + +object DefaultInstrumentSettings { + + def fromConfig(config: Config): DefaultInstrumentSettings = { + val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None) + val minMaxCounterSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")), + Some(config.getFiniteDuration("min-max-counter.refresh-interval"))) + val gaugeSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("gauge")), + Some(config.getFiniteDuration("gauge.refresh-interval"))) + + DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings) + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 61cee02a..0828c8a9 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -17,16 +17,14 @@ package kamon.metric.instrument */ import java.lang.Math.abs -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ ActorSystem, Cancellable } -import com.typesafe.config.Config +import akka.actor.Cancellable import kamon.jsr166.LongMaxUpdater -import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.metric.instrument.Histogram.DynamicRange import kamon.util.PaddedAtomicLong import scala.concurrent.duration.FiniteDuration -trait MinMaxCounter extends MetricRecorder { +trait MinMaxCounter extends Instrument { override type SnapshotType = Histogram.Snapshot def increment(): Unit @@ -38,29 +36,20 @@ trait MinMaxCounter extends MetricRecorder { object MinMaxCounter { - def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem): MinMaxCounter = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = { + val underlyingHistogram = Histogram(dynamicRange) val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { minMaxCounter.refreshValues() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) minMaxCounter } - def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { - import scala.concurrent.duration._ + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = + apply(dynamicRange, refreshInterval, scheduler) - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getMilliseconds("refresh-interval").toInt - - apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) - } } class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala new file mode 100644 index 00000000..adb08713 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala @@ -0,0 +1,99 @@ +package kamon.metric.instrument + +import akka.actor.{ Scheduler, Cancellable } +import org.HdrHistogram.WriterReaderPhaser + +import scala.collection.concurrent.TrieMap +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration + +trait RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable +} + +/** + * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run + * in the provided ExecutionContext. + */ +class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = + scheduler.schedule(interval, interval)(refresh.apply())(dispatcher) +} + +object DefaultRefreshScheduler { + def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler = + new DefaultRefreshScheduler(scheduler, dispatcher) + + def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler = + apply(scheduler, dispatcher) +} + +/** + * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh + * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed + * scheduler. + */ +class LazyRefreshScheduler extends RefreshScheduler { + private val _schedulerPhaser = new WriterReaderPhaser + private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]() + @volatile private var _target: Option[RefreshScheduler] = None + + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = { + val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter() + try { + _target.map { scheduler ⇒ + scheduler.schedule(interval, refresh) + + } getOrElse { + val entry = (interval, refresh) + val cancellable = new RepointableCancellable(entry) + + _backlog.put(entry, cancellable) + cancellable + } + + } finally { + _schedulerPhaser.writerCriticalSectionExit(criticalEnter) + } + } + + def point(target: RefreshScheduler): Unit = try { + _schedulerPhaser.readerLock() + + if (_target.isEmpty) { + _target = Some(target) + _schedulerPhaser.flipPhase(10000L) + _backlog.dropWhile { + case ((interval, refresh), repointableCancellable) ⇒ + repointableCancellable.point(target.schedule(interval, refresh)) + true + } + } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.") + } finally { _schedulerPhaser.readerUnlock() } + + class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable { + private var _isCancelled = false + private var _cancellable: Option[Cancellable] = None + + def isCancelled: Boolean = synchronized { + _cancellable.map(_.isCancelled).getOrElse(_isCancelled) + } + + def cancel(): Boolean = synchronized { + _isCancelled = true + _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty) + } + + def point(cancellable: Cancellable): Unit = synchronized { + if (_cancellable.isEmpty) { + _cancellable = Some(cancellable) + + if (_isCancelled) + cancellable.cancel() + + } else sys.error("A RepointableCancellable cannot be pointed more than once.") + + } + } +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala new file mode 100644 index 00000000..f2a061d1 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -0,0 +1,71 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +trait UnitOfMeasurement { + def name: String + def label: String + def factor: Double +} + +object UnitOfMeasurement { + case object Unknown extends UnitOfMeasurement { + val name = "unknown" + val label = "unknown" + val factor = 1D + } + + def isUnknown(uom: UnitOfMeasurement): Boolean = + uom == Unknown + + def isTime(uom: UnitOfMeasurement): Boolean = + uom.isInstanceOf[Time] + +} + +case class Time(factor: Double, label: String) extends UnitOfMeasurement { + val name = "time" + + /** + * Scale a value from this scale factor to a different scale factor. + * + * @param toUnit Time unit of the expected result. + * @param value Value to scale. + * @return Equivalent of value on the target time unit. + */ + def scale(toUnit: Time)(value: Long): Double = + (value * factor) / toUnit.factor +} + +object Time { + val Nanoseconds = Time(1E-9, "n") + val Microseconds = Time(1E-6, "µs") + val Milliseconds = Time(1E-3, "ms") + val Seconds = Time(1, "s") +} + +case class Memory(factor: Double, label: String) extends UnitOfMeasurement { + val name = "bytes" +} + +object Memory { + val Bytes = Memory(1, "b") + val KiloBytes = Memory(1024, "Kb") + val MegaBytes = Memory(1024E2, "Mb") + val GigaBytes = Memory(1024E3, "Gb") +} + diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala deleted file mode 100644 index 490bc127..00000000 --- a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala +++ /dev/null @@ -1,61 +0,0 @@ -package kamon.standalone - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.Kamon -import kamon.metric.UserMetrics -import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } - -import scala.concurrent.duration.FiniteDuration - -trait KamonStandalone { - private[kamon] def system: ActorSystem - - def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = - Kamon(UserMetrics)(system).registerHistogram(name, precision, highestTrackableValue) - - def registerHistogram(name: String): Histogram = - Kamon(UserMetrics)(system).registerHistogram(name) - - def registerCounter(name: String): Counter = - Kamon(UserMetrics)(system).registerCounter(name) - - def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration): MinMaxCounter = - Kamon(UserMetrics)(system).registerMinMaxCounter(name, precision, highestTrackableValue, refreshInterval) - - def registerMinMaxCounter(name: String): MinMaxCounter = - Kamon(UserMetrics)(system).registerMinMaxCounter(name) - - def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = - Kamon(UserMetrics)(system).registerGauge(name)(currentValueCollector) - - def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = - Kamon(UserMetrics)(system).registerGauge(name, precision, highestTrackableValue, refreshInterval)(currentValueCollector) - - def removeHistogram(name: String): Unit = - Kamon(UserMetrics)(system).removeHistogram(name) - - def removeCounter(name: String): Unit = - Kamon(UserMetrics)(system).removeCounter(name) - - def removeMinMaxCounter(name: String): Unit = - Kamon(UserMetrics)(system).removeMinMaxCounter(name) - - def removeGauge(name: String): Unit = - Kamon(UserMetrics)(system).removeGauge(name) -} - -object KamonStandalone { - - def buildFromConfig(config: Config): KamonStandalone = buildFromConfig(config, "kamon-standalone") - - def buildFromConfig(config: Config, actorSystemName: String): KamonStandalone = new KamonStandalone { - val system: ActorSystem = ActorSystem(actorSystemName, config) - } -} - -object EmbeddedKamonStandalone extends KamonStandalone { - private[kamon] lazy val system = ActorSystem("kamon-standalone") -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala index 2f27c1a3..0df9539f 100644 --- a/kamon-core/src/main/scala/kamon/metric/Scale.scala +++ b/kamon-core/src/main/scala/kamon/supervisor/AspectJPresent.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -14,18 +14,18 @@ * ========================================================================================= */ -package kamon.metric +package kamon.supervisor -class Scale(val numericValue: Double) extends AnyVal +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } -object Scale { - val Nano = new Scale(1E-9) - val Micro = new Scale(1E-6) - val Milli = new Scale(1E-3) - val Unit = new Scale(1) - val Kilo = new Scale(1E3) - val Mega = new Scale(1E6) - val Giga = new Scale(1E9) +@Aspect +class AspectJPresent { + + @Pointcut("execution(* kamon.supervisor.KamonSupervisor.isAspectJPresent())") + def isAspectJPresentAtModuleSupervisor(): Unit = {} + + @Around("isAspectJPresentAtModuleSupervisor()") + def aroundIsAspectJPresentAtModuleSupervisor(pjp: ProceedingJoinPoint): Boolean = true - def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue } diff --git a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala new file mode 100644 index 00000000..ddce63fb --- /dev/null +++ b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorExtension.scala @@ -0,0 +1,125 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.supervisor + +import akka.actor +import akka.actor._ +import kamon.Kamon +import kamon.supervisor.KamonSupervisor.CreateModule + +import scala.concurrent.{ Promise, Future } +import scala.util.Success + +object ModuleSupervisor extends ExtensionId[ModuleSupervisorExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = ModuleSupervisor + def createExtension(system: ExtendedActorSystem): ModuleSupervisorExtension = new ModuleSupervisorExtensionImpl(system) +} + +trait ModuleSupervisorExtension extends actor.Extension { + def createModule(name: String, props: Props): Future[ActorRef] +} + +class ModuleSupervisorExtensionImpl(system: ExtendedActorSystem) extends ModuleSupervisorExtension { + import system.dispatcher + + private val _settings = ModuleSupervisorSettings(system) + private val _supervisor = system.actorOf(KamonSupervisor.props(_settings, system.dynamicAccess), "kamon") + + def createModule(name: String, props: Props): Future[ActorRef] = Future {} flatMap { _: Unit ⇒ + val modulePromise = Promise[ActorRef]() + _supervisor ! CreateModule(name, props, modulePromise) + modulePromise.future + } +} + +class KamonSupervisor(settings: ModuleSupervisorSettings, dynamicAccess: DynamicAccess) extends Actor with ActorLogging { + + init() + + def receive = { + case CreateModule(name, props, childPromise) ⇒ createChildModule(name, props, childPromise) + } + + def createChildModule(name: String, props: Props, childPromise: Promise[ActorRef]): Unit = + context.child(name).map { alreadyAvailableModule ⇒ + log.warning("Received a request to create module [{}] but the module is already available, returning the existent instance.") + childPromise.complete(Success(alreadyAvailableModule)) + + } getOrElse (childPromise.complete(Success(context.actorOf(props, name)))) + + def init(): Unit = { + if (settings.modulesRequiringAspectJ.nonEmpty && !isAspectJPresent && settings.showAspectJMissingWarning) + logAspectJWeaverMissing(settings.modulesRequiringAspectJ) + + // Force initialization of all modules marked with auto-start. + settings.availableModules.filter(_.autoStart).foreach { module ⇒ + if (module.extensionClass == "none") + log.debug("Ignoring auto start of the [{}] module with no extension class.") + else + dynamicAccess.getObjectFor[ExtensionId[Kamon.Extension]](module.extensionClass).map { moduleID ⇒ + moduleID.get(context.system) + log.debug("Auto starting the [{}] module.", module.name) + + } recover { + case th: Throwable ⇒ log.error(th, "Failed to auto start the [{}] module.", module.name) + } + + } + } + + // When AspectJ is present the kamon.supervisor.AspectJPresent aspect will make this return true. + def isAspectJPresent: Boolean = false + + def logAspectJWeaverMissing(modulesRequiringAspectJ: List[AvailableModuleInfo]): Unit = { + val moduleNames = modulesRequiringAspectJ.map(_.name).mkString(", ") + val weaverMissingMessage = + """ + | + | ___ _ ___ _ _ ___ ___ _ _ + | / _ \ | | |_ | | | | | | \/ |(_) (_) + |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _ + || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` | + || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| | + |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, | + | | | __/ | + | |_| |___/ + | + | It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected + | the following modules which require AspecJ to work properly: + | + """.stripMargin + moduleNames + + """ + | + | If you need help on setting up the aspectj weaver go to http://kamon.io/introduction/get-started/ for more info. On the + | other hand, if you are sure that you do not need or do not want to use the weaver then you can disable this error message + | by changing the kamon.show-aspectj-missing-warning setting in your configuration file. + | + """.stripMargin + + log.error(weaverMissingMessage) + } + +} + +object KamonSupervisor { + case class CreateModule(name: String, props: Props, childPromise: Promise[ActorRef]) + + def props(settings: ModuleSupervisorSettings, dynamicAccess: DynamicAccess): Props = + Props(new KamonSupervisor(settings, dynamicAccess)) + +} + diff --git a/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala new file mode 100644 index 00000000..c04157aa --- /dev/null +++ b/kamon-core/src/main/scala/kamon/supervisor/ModuleSupervisorSettings.scala @@ -0,0 +1,49 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.supervisor + +import akka.actor.ActorSystem + +case class AvailableModuleInfo(name: String, extensionClass: String, requiresAspectJ: Boolean, autoStart: Boolean) +case class ModuleSupervisorSettings(showAspectJMissingWarning: Boolean, availableModules: List[AvailableModuleInfo]) { + val modulesRequiringAspectJ = availableModules.filter(_.requiresAspectJ) +} + +object ModuleSupervisorSettings { + + def apply(system: ActorSystem): ModuleSupervisorSettings = { + import kamon.util.ConfigTools.Syntax + + val config = system.settings.config.getConfig("kamon.modules") + val showAspectJMissingWarning = system.settings.config.getBoolean("kamon.show-aspectj-missing-warning") + + val modules = config.firstLevelKeys + val availableModules = modules.map { moduleName ⇒ + val moduleConfig = config.getConfig(moduleName) + + AvailableModuleInfo( + moduleName, + moduleConfig.getString("extension-id"), + moduleConfig.getBoolean("requires-aspectj"), + moduleConfig.getBoolean("auto-start")) + + } toList + + ModuleSupervisorSettings(showAspectJMissingWarning, availableModules) + } + +} diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..19ea4f39 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,97 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.TimeUnit + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.duration._ + +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { + import kamon.util.ConfigTools.Syntax + import context.dispatcher + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getFiniteDuration("min-incubation-time").toNanos) + val maxIncubationTime = new NanoInterval(config.getFiniteDuration("max-incubation-time").toNanos) + val checkInterval = config.getFiniteDuration("check-interval") + + val checkSchedule = context.system.scheduler.schedule(checkInterval, checkInterval, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] + + def receive = { + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() + } + + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() + } + } + } + + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) +} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala new file mode 100644 index 00000000..5f7fdff5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -0,0 +1,120 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.event.LoggingAdapter +import kamon.metric.{ MetricsExtension, TraceMetrics } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } + +import scala.annotation.tailrec + +private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, + val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension) + extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + @volatile protected var _elapsedTime = NanoInterval.default + + private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() + private val _traceLocalStorage = new TraceLocalStorage + + def rename(newName: String): Unit = + if (isOpen) + _name = newName + else if (log.isWarningEnabled) + log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + + def name: String = _name + def isEmpty: Boolean = false + def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + + def finish(): Unit = { + _isOpen = false + val traceElapsedTime = NanoInterval.since(startTimestamp) + _elapsedTime = traceElapsedTime + + metricsExtension.register(TraceMetrics, name).map { registration ⇒ + registration.recorder.ElapsedTime.record(traceElapsedTime.nanos) + drainFinishedSegments(registration.recorder) + } + } + + def startSegment(segmentName: String, category: String, library: String): Segment = + new MetricsOnlySegment(segmentName, category, library) + + @tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = { + val segment = _finishedSegments.poll() + if (segment != null) { + recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos) + drainFinishedSegments(recorder) + } + } + + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration)) + + if (isClosed) { + metricsExtension.register(TraceMetrics, name).map { registration ⇒ + drainFinishedSegments(registration.recorder) + } + } + } + + // Should only be used by the TraceLocal utilities. + def traceLocalStorage: TraceLocalStorage = _traceLocalStorage + + // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default + // will be returned. + def elapsedTime: NanoInterval = _elapsedTime + + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + private val _startTimestamp = RelativeNanoTimestamp.now + @volatile private var _segmentName = segmentName + @volatile private var _elapsedTime = NanoInterval.default + @volatile private var _isOpen = true + + def name: String = _segmentName + def isEmpty: Boolean = false + def addMetadata(key: String, value: String): Unit = {} + def isOpen: Boolean = _isOpen + + def rename(newName: String): Unit = + if (isOpen) + _segmentName = newName + else if (log.isWarningEnabled) + log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + + def finish: Unit = { + _isOpen = false + val segmentElapsedTime = NanoInterval.since(_startTimestamp) + _elapsedTime = segmentElapsedTime + + finishSegment(name, category, library, segmentElapsedTime) + } + + // Handle with care and make sure that the segment is closed before calling this method, otherwise + // NanoInterval.default will be returned. + def elapsedTime: NanoInterval = _elapsedTime + def startTimestamp: RelativeNanoTimestamp = _startTimestamp + } +} + +case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval) diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala new file mode 100644 index 00000000..827840d7 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -0,0 +1,73 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.util.{ NanoInterval, Sequencer } +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.forkjoin.ThreadLocalRandom + +trait Sampler { + def shouldTrace: Boolean + def shouldReport(traceElapsedTime: NanoInterval): Boolean +} + +object NoSampling extends Sampler { + def shouldTrace: Boolean = false + def shouldReport(traceElapsedTime: NanoInterval): Boolean = false +} + +object SampleAll extends Sampler { + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class RandomSampler(chance: Int) extends Sampler { + require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") + require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") + + def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class OrderedSampler(interval: Int) extends Sampler { + import OrderedSampler._ + + require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0") + assume(interval isPowerOfTwo, "kamon.trace.ordered-sampler.interval must be power of two") + + private val sequencer = Sequencer() + + def shouldTrace: Boolean = (sequencer.next() fastMod interval) == 0 + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +object OrderedSampler { + implicit class EnhancedInt(i: Int) { + def isPowerOfTwo = (i & (i - 1)) == 0 + } + + implicit class EnhancedLong(dividend: Long) { + def fastMod(divisor: Int) = dividend & (divisor - 1) + } +} + +class ThresholdSampler(threshold: FiniteDuration) extends Sampler { + + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= threshold.toNanos +} + diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 5b74e6b2..48e56153 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -17,146 +17,113 @@ package kamon.trace import java.io.ObjectStreamException - -import akka.actor.ActorSystem -import kamon.Kamon -import kamon.metric._ -import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.metric.TraceMetrics.TraceMetricRecorder - -import scala.annotation.tailrec +import kamon.util.RelativeNanoTimestamp -sealed trait TraceContext { +trait TraceContext { def name: String def token: String - def rename(name: String): Unit - def finish(): Unit - def origin: TraceContextOrigin - def isOpen: Boolean - def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + + def finish(): Unit + def rename(newName: String): Unit + def startSegment(segmentName: String, category: String, library: String): Segment - def nanoTimestamp: Long + def addMetadata(key: String, value: String) + + def startTimestamp: RelativeNanoTimestamp } -sealed trait Segment { +object TraceContext { + private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { + override def initialValue(): TraceContext = EmptyTraceContext + } + + def currentContext: TraceContext = + _traceContextStorage.get() + + def setCurrentContext(context: TraceContext): Unit = + _traceContextStorage.set(context) + + def clearCurrentContext: Unit = + _traceContextStorage.remove() + + def withContext[T](context: TraceContext)(code: ⇒ T): T = { + val oldContext = _traceContextStorage.get() + _traceContextStorage.set(context) + + try code finally _traceContextStorage.set(oldContext) + } + + def map[T](f: TraceContext ⇒ T): Option[T] = { + val current = currentContext + if (current.nonEmpty) + Some(f(current)) + else None + } + +} + +trait Segment { def name: String - def rename(newName: String): Unit def category: String def library: String - def finish(): Unit def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + + def finish(): Unit + def rename(newName: String): Unit + def addMetadata(key: String, value: String) } case object EmptyTraceContext extends TraceContext { def name: String = "empty-trace" def token: String = "" - def rename(name: String): Unit = {} - def finish(): Unit = {} - def origin: TraceContextOrigin = TraceContextOrigin.Local - def isOpen: Boolean = false def isEmpty: Boolean = true + def isOpen: Boolean = false + + def finish(): Unit = {} + def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment - def nanoTimestamp: Long = 0L + def addMetadata(key: String, value: String): Unit = {} + def startTimestamp = new RelativeNanoTimestamp(0L) case object EmptySegment extends Segment { val name: String = "empty-segment" val category: String = "empty-category" val library: String = "empty-library" def isEmpty: Boolean = true - def rename(newName: String): Unit = {} - def finish: Unit = {} - } -} - -class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { - - val isEmpty: Boolean = false - @volatile private var _name = traceName - @volatile private var _isOpen = izOpen - - private val _nanoTimestamp = nanoTimeztamp - private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() - private val metricsExtension = Kamon(Metrics)(system) - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage - - def name: String = _name - def rename(newName: String): Unit = - if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. - - def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - - def finish(): Unit = { - _isOpen = false - val elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) - - metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) - drainFinishedSegments(traceMetrics) - } - } - - def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) + def isOpen: Boolean = false - @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { - val segment = finishedSegments.poll() - if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) - drainFinishedSegments(metricRecorder) - } - } - - private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) - - if (isClosed) { - metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ - drainFinishedSegments(traceMetrics) - } - } - } - - class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { - private val _segmentStartNanoTime = System.nanoTime() - @volatile private var _segmentName = segmentName - @volatile private var _isOpen = true - - def name: String = _segmentName - def rename(newName: String): Unit = _segmentName = newName - def isEmpty: Boolean = false - - def finish: Unit = { - val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) - } + def finish: Unit = {} + def rename(newName: String): Unit = {} + def addMetadata(key: String, value: String): Unit = {} } } -case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentData(identity: SegmentMetricIdentity, duration: Long) - object SegmentCategory { val HttpClient = "http-client" + val Database = "database" +} + +class LOD private[trace] (val level: Int) extends AnyVal +object LOD { + val MetricsOnly = new LOD(1) + val SimpleTrace = new LOD(2) } sealed trait LevelOfDetail object LevelOfDetail { - case object OnlyMetrics extends LevelOfDetail + case object MetricsOnly extends LevelOfDetail case object SimpleTrace extends LevelOfDetail case object FullTrace extends LevelOfDetail } -sealed trait TraceContextOrigin -object TraceContextOrigin { - case object Local extends TraceContextOrigin - case object Remote extends TraceContextOrigin -} - trait TraceContextAware extends Serializable { def traceContext: TraceContext } @@ -165,7 +132,7 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - @transient val traceContext = TraceRecorder.currentContext + @transient val traceContext = TraceContext.currentContext // // Beware of this hack, it might bite us in the future! diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala deleted file mode 100644 index a59abc18..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } -import akka.actor -import kamon.util.GlobPathFilter -import kamon.Kamon - -class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.trace") - val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") -} - -object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Trace - def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index 0766af74..057f564e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -16,23 +16,43 @@ package kamon.trace -import scala.collection.concurrent.TrieMap import kamon.trace.TraceLocal.TraceLocalKey +import scala.collection.concurrent.TrieMap + object TraceLocal { + trait TraceLocalKey { type ValueType } - def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value) - case EmptyTraceContext ⇒ // Can't store in the empty context. + trait AvailableToMdc extends TraceLocalKey { + override type ValueType = String + def mdcKey: String + } + + object AvailableToMdc { + case class DefaultKeyAvailableToMdc(mdcKey: String) extends AvailableToMdc + + def fromKey(mdcKey: String): AvailableToMdc = DefaultKeyAvailableToMdc(mdcKey) + def apply(mdcKey: String): AvailableToMdc = fromKey(mdcKey) + } + + case class HttpContext(agent: String, uri: String, xforwarded: String) + + object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } + + def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceContext.currentContext match { + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. } - def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key) - case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceContext.currentContext match { + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } + + def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) } class TraceLocalStorage { diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala deleted file mode 100644 index 8da187cb..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import scala.language.experimental.macros -import java.util.concurrent.atomic.AtomicLong -import kamon.macros.InlineTraceContextMacro - -import scala.util.Try -import java.net.InetAddress -import akka.actor.ActorSystem - -object TraceRecorder { - private val traceContextStorage = new ThreadLocal[TraceContext] { - override def initialValue(): TraceContext = EmptyTraceContext - } - - private val tokenCounter = new AtomicLong - private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - - private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { - new DefaultTraceContext( - name, - token.getOrElse(newToken), - izOpen = true, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Local, - nanoTimeztamp = System.nanoTime, - system) - } - - def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - new DefaultTraceContext( - traceName, - traceToken, - isOpen, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Remote, - equivalentNanotime, - system) - } - - def setContext(context: TraceContext): Unit = traceContextStorage.set(context) - - def clearContext: Unit = traceContextStorage.set(EmptyTraceContext) - - def currentContext: TraceContext = traceContextStorage.get() - - def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = { - val ctx = newTraceContext(name, token, system) - traceContextStorage.set(ctx) - } - - def rename(name: String): Unit = currentContext.rename(name) - - def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T = - withTraceContext(newTraceContext(name, token, system))(thunk) - - def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = { - val oldContext = currentContext - setContext(context) - - try thunk finally setContext(oldContext) - } - - def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { - case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) - case EmptyTraceContext ⇒ None - } - - def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] - - def finish(): Unit = currentContext.finish() - -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala new file mode 100644 index 00000000..f2da404c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala @@ -0,0 +1,45 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import akka.actor.{ Terminated, ActorRef, Actor } + +class TraceSubscriptions extends Actor { + import TraceSubscriptions._ + + var subscribers: List[ActorRef] = Nil + + def receive = { + case Subscribe(newSubscriber) ⇒ + if (!subscribers.contains(newSubscriber)) + subscribers = context.watch(newSubscriber) :: subscribers + + case Unsubscribe(leavingSubscriber) ⇒ + subscribers = subscribers.filter(_ == leavingSubscriber) + + case Terminated(terminatedSubscriber) ⇒ + subscribers = subscribers.filter(_ == terminatedSubscriber) + + case trace: TraceInfo ⇒ + subscribers.foreach(_ ! trace) + } +} + +object TraceSubscriptions { + case class Subscribe(subscriber: ActorRef) + case class Unsubscribe(subscriber: ActorRef) +} diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala new file mode 100644 index 00000000..be565154 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.InetAddress +import java.util.concurrent.atomic.AtomicLong + +import akka.actor._ +import com.typesafe.config.Config +import kamon.metric.MetricsExtension +import kamon.util._ + +import scala.util.Try + +trait TracerExtension { + def newContext(name: String): TraceContext + def newContext(name: String, token: String): TraceContext + def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext + + def subscribe(subscriber: ActorRef): Unit + def unsubscribe(subscriber: ActorRef): Unit +} + +private[kamon] class TracerExtensionImpl(metricsExtension: MetricsExtension, config: Config) extends TracerExtension { + private val _settings = TraceSettings(config) + private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + private val _tokenCounter = new AtomicLong + + private val _subscriptions = new LazyActorRef + private val _incubator = new LazyActorRef + + private def newToken: String = + _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) + + def newContext(name: String): TraceContext = + createTraceContext(name) + + def newContext(name: String, token: String): TraceContext = + createTraceContext(name, token) + + def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = + createTraceContext(name, token, timestamp, isOpen, isLocal) + + private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, + isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = { + + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, metricsExtension) + + if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal) + newMetricsOnlyContext + else { + if (!_settings.sampler.shouldTrace) + newMetricsOnlyContext + else + new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, metricsExtension, this, dispatchTracingContext) + } + } + + def subscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber)) + + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (_settings.sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + _incubator.tell(trace) + else + _subscriptions.tell(trace.generateTraceInfo) + + /** + * Tracer Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + _subscriptions.point(subscriptions) + _incubator.point(_system.actorOf(Incubator.props(subscriptions))) + } + + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null + } +} + +private[kamon] object TracerExtensionImpl { + + def apply(metricsExtension: MetricsExtension, config: Config) = + new TracerExtensionImpl(metricsExtension, config) +} + +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String])
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala new file mode 100644 index 00000000..79f30f23 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala @@ -0,0 +1,46 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import com.typesafe.config.Config + +case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler) + +object TraceSettings { + import kamon.util.ConfigTools.Syntax + + def apply(config: Config): TraceSettings = { + val tracerConfig = config.getConfig("kamon.trace") + + val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else tracerConfig.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.interval")) + case "threshold" ⇒ new ThresholdSampler(tracerConfig.getFiniteDuration("threshold-sampler.minimum-elapsed-time")) + } + + TraceSettings(detailLevel, sampler) + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala new file mode 100644 index 00000000..3d324886 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -0,0 +1,92 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger + +import akka.event.LoggingAdapter +import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp } +import kamon.metric.MetricsExtension + +import scala.collection.concurrent.TrieMap + +private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, + isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, + traceExtension: TracerExtensionImpl, traceInfoSink: TracingContext ⇒ Unit) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension) { + + private val _openSegments = new AtomicInteger(0) + private val _startTimestamp = NanoTimestamp.now + private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val _metadata = TrieMap.empty[String, String] + + override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) + + override def startSegment(segmentName: String, category: String, library: String): Segment = { + _openSegments.incrementAndGet() + val newSegment = new TracingSegment(segmentName, category, library) + _allSegments.add(newSegment) + newSegment + } + + override def finish(): Unit = { + super.finish() + traceInfoSink(this) + } + + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _openSegments.decrementAndGet() + super.finishSegment(segmentName, category, library, duration) + } + + def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 + + // Handle with care, should only be used after a trace is finished. + def generateTraceInfo: TraceInfo = { + require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") + + val currentSegments = _allSegments.iterator() + var segmentsInfo = List.newBuilder[SegmentInfo] + + while (currentSegments.hasNext()) { + val segment = currentSegments.next() + if (segment.isClosed) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) + else + log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) + } + + TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) + } + + class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + private val metadata = TrieMap.empty[String, String] + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + // Handle with care, should only be used after the segment has finished. + def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { + require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") + + // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both + // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. + val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) + + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index f052f009..961c3099 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -17,11 +17,11 @@ package kamon.trace.logging import ch.qos.logback.classic.pattern.ClassicConverter import ch.qos.logback.classic.spi.ILoggingEvent -import kamon.trace.TraceRecorder +import kamon.trace.TraceContext class LogbackTraceTokenConverter extends ClassicConverter { def convert(event: ILoggingEvent): String = { - val ctx = TraceRecorder.currentContext + val ctx = TraceContext.currentContext if (ctx.isEmpty) "undefined" else diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala new file mode 100644 index 00000000..4970d97e --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -0,0 +1,39 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace.logging + +import kamon.trace.TraceLocal.AvailableToMdc +import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext } + +import org.slf4j.MDC + +trait MdcKeysSupport { + + def withMdc[A](thunk: ⇒ A): A = { + val keys = copyToMdc(TraceContext.currentContext) + try thunk finally keys.foreach(key ⇒ MDC.remove(key)) + } + + private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { + case ctx: MetricsOnlyContext ⇒ + ctx.traceLocalStorage.underlyingStorage.collect { + case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value)) + }.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten + + case EmptyTraceContext ⇒ Iterable.empty[String] + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala index 43166058..483278bf 100644 --- a/kamon-core/src/main/scala/kamon/metric/package.scala +++ b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -14,21 +14,29 @@ * ========================================================================================= */ -package kamon +package kamon.util + +import java.util.concurrent.TimeUnit -import scala.annotation.tailrec import com.typesafe.config.Config -package object metric { +import scala.concurrent.duration.FiniteDuration + +object ConfigTools { + implicit class Syntax(val config: Config) extends AnyVal { + // We are using the deprecated .getNanoseconds option to keep Kamon source code compatible with + // versions of Akka using older typesafe-config versions. - @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - if (right.isEmpty) - left - else { - val (key, rightValue) = right.head - val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + def getFiniteDuration(path: String): FiniteDuration = + FiniteDuration(config.getNanoseconds(path), TimeUnit.NANOSECONDS) - combineMaps(left.updated(key, value), right.tail)(valueMerger) + def firstLevelKeys: Set[String] = { + import scala.collection.JavaConverters._ + + config.entrySet().asScala.map { + case entry ⇒ entry.getKey.takeWhile(_ != '.') + } toSet } } + } diff --git a/kamon-core/src/main/scala/kamon/util/FastDispatch.scala b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala new file mode 100644 index 00000000..d2748847 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala @@ -0,0 +1,38 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import akka.actor.ActorRef + +import scala.concurrent.{ ExecutionContext, Future } + +/** + * Extension for Future[ActorRef]. Try to dispatch a message to a Future[ActorRef] in the same thread if it has already + * completed or do the regular scheduling otherwise. Specially useful when using the ModuleSupervisor extension to + * create actors. + */ +object FastDispatch { + implicit class Syntax(val target: Future[ActorRef]) extends AnyVal { + + def fastDispatch(message: Any)(implicit ec: ExecutionContext): Unit = + if (target.isCompleted) + target.value.get.map(_ ! message) + else + target.map(_ ! message) + } + +} diff --git a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala new file mode 100644 index 00000000..855bf1fc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala @@ -0,0 +1,53 @@ +package kamon.util + +import java.util +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.actor.{ Actor, ActorRef } +import org.HdrHistogram.WriterReaderPhaser + +import scala.annotation.tailrec + +/** + * A LazyActorRef accumulates messages sent to an actor that doesn't exist yet. Once the actor is created and + * the LazyActorRef is pointed to it, all the accumulated messages are flushed and any new message sent to the + * LazyActorRef will immediately be sent to the pointed ActorRef. + * + * This is intended to be used during Kamon's initialization where some components need to use ActorRefs to work + * (like subscriptions and the trace incubator) but our internal ActorSystem is not yet ready to create the + * required actors. + */ +class LazyActorRef { + private val _refPhaser = new WriterReaderPhaser + private val _backlog = new ConcurrentLinkedQueue[(Any, ActorRef)]() + @volatile private var _target: Option[ActorRef] = None + + def tell(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { + val criticalEnter = _refPhaser.writerCriticalSectionEnter() + try { + _target.map(_.tell(message, sender)) getOrElse { + _backlog.add((message, sender)) + } + + } finally { _refPhaser.writerCriticalSectionExit(criticalEnter) } + } + + def point(target: ActorRef): Unit = { + @tailrec def drain(q: util.Queue[(Any, ActorRef)]): Unit = if (!q.isEmpty) { + val (msg, sender) = q.poll() + target.tell(msg, sender) + drain(q) + } + + try { + _refPhaser.readerLock() + + if (_target.isEmpty) { + _target = Some(target) + _refPhaser.flipPhase(1000L) + drain(_backlog) + + } else sys.error("A LazyActorRef cannot be pointed more than once.") + } finally { _refPhaser.readerUnlock() } + } +} diff --git a/kamon-core/src/main/scala/kamon/util/MapMerge.scala b/kamon-core/src/main/scala/kamon/util/MapMerge.scala new file mode 100644 index 00000000..64b4f7ae --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/MapMerge.scala @@ -0,0 +1,43 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +object MapMerge { + + /** + * Merge to immutable maps with the same key and value types, using the provided valueMerge function. + */ + implicit class Syntax[K, V](val map: Map[K, V]) extends AnyVal { + def merge(that: Map[K, V], valueMerge: (V, V) ⇒ V): Map[K, V] = { + val merged = Map.newBuilder[K, V] + + map.foreach { + case (key, value) ⇒ + val mergedValue = that.get(key).map(v ⇒ valueMerge(value, v)).getOrElse(value) + merged += key -> mergedValue + } + + that.foreach { + case kv @ (key, _) if !map.contains(key) ⇒ merged += kv + case other ⇒ // ignore, already included. + } + + merged.result(); + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala index 9c926372..9019eb4c 100644 --- a/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala +++ b/kamon-core/src/main/scala/kamon/util/PaddedAtomicLong.scala @@ -1,4 +1,3 @@ -package kamon.util /* * ========================================================================================= * Copyright © 2013-2014 the kamon project <http://kamon.io/> @@ -15,6 +14,8 @@ package kamon.util * ========================================================================================= */ +package kamon.util + import java.util.concurrent.atomic.AtomicLong class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) { diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/util/Sequencer.scala index b7050c59..f368e54f 100644 --- a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala +++ b/kamon-core/src/main/scala/kamon/util/Sequencer.scala @@ -14,17 +14,27 @@ * ========================================================================================= */ -package kamon +package kamon.util -import akka.actor.{ Extension, ActorSystem, ExtensionId } -import java.util.concurrent.ConcurrentHashMap +/** + * This class implements an extremely efficient, thread-safe way to generate a + * incrementing sequence of Longs with a simple Long overflow protection. + */ +class Sequencer { + private val CloseToOverflow = Long.MaxValue - 1000000000 + private val sequenceNumber = new PaddedAtomicLong(1L) -object AkkaExtensionSwap { - def swap(system: ActorSystem, key: ExtensionId[_], value: Extension): Unit = { - val extensionsField = system.getClass.getDeclaredField("extensions") - extensionsField.setAccessible(true) + def next(): Long = { + val current = sequenceNumber.getAndIncrement - val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]] - extensions.put(key, value) + // check if this value is getting close to overflow? + if (current > CloseToOverflow) { + sequenceNumber.set(current - CloseToOverflow) // we need maintain the order + } + current } } + +object Sequencer { + def apply(): Sequencer = new Sequencer() +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/util/Timestamp.scala b/kamon-core/src/main/scala/kamon/util/Timestamp.scala new file mode 100644 index 00000000..eadc6690 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/Timestamp.scala @@ -0,0 +1,101 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +/** + * Epoch time stamp. + */ +class Timestamp(val seconds: Long) extends AnyVal { + def <(that: Timestamp): Boolean = this.seconds < that.seconds + def >(that: Timestamp): Boolean = this.seconds > that.seconds + def ==(that: Timestamp): Boolean = this.seconds == that.seconds + def >=(that: Timestamp): Boolean = this.seconds >= that.seconds + def <=(that: Timestamp): Boolean = this.seconds <= that.seconds + + override def toString: String = String.valueOf(seconds) + ".seconds" +} + +object Timestamp { + def now: Timestamp = new Timestamp(System.currentTimeMillis() / 1000) + def earlier(l: Timestamp, r: Timestamp): Timestamp = if (l <= r) l else r + def later(l: Timestamp, r: Timestamp): Timestamp = if (l >= r) l else r +} + +/** + * Epoch time stamp in milliseconds. + */ +class MilliTimestamp(val millis: Long) extends AnyVal { + override def toString: String = String.valueOf(millis) + ".millis" + + def toTimestamp: Timestamp = new Timestamp(millis / 1000) + def toRelativeNanoTimestamp: RelativeNanoTimestamp = { + val diff = (System.currentTimeMillis() - millis) * 1000000 + new RelativeNanoTimestamp(System.nanoTime() - diff) + } +} + +object MilliTimestamp { + def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) +} + +/** + * Epoch time stamp in nanoseconds. + * + * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch + * timestamp in nanoseconds. + */ +class NanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoTimestamp { + def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) +} + +/** + * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() + */ +class RelativeNanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" + + def toMilliTimestamp: MilliTimestamp = + new MilliTimestamp(System.currentTimeMillis - ((System.nanoTime - nanos) / 1000000)) +} + +object RelativeNanoTimestamp { + def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) + def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = + new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) +} + +/** + * Number of nanoseconds that passed between two points in time. + */ +class NanoInterval(val nanos: Long) extends AnyVal { + def <(that: NanoInterval): Boolean = this.nanos < that.nanos + def >(that: NanoInterval): Boolean = this.nanos > that.nanos + def ==(that: NanoInterval): Boolean = this.nanos == that.nanos + def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos + def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos + + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoInterval { + def default: NanoInterval = new NanoInterval(0L) + def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) +} diff --git a/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala new file mode 100644 index 00000000..d1197a5a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala @@ -0,0 +1,45 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.util + +import scala.collection.concurrent.TrieMap + +object TriemapAtomicGetOrElseUpdate { + + /** + * Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate]] method. More details on + * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]]. + */ + implicit class Syntax[K, V](val trieMap: TrieMap[K, V]) extends AnyVal { + + def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = + atomicGetOrElseUpdate(key, op, { v: V ⇒ Unit }) + + def atomicGetOrElseUpdate(key: K, op: ⇒ V, cleanup: V ⇒ Unit): V = + trieMap.get(key) match { + case Some(v) ⇒ v + case None ⇒ + val d = op + trieMap.putIfAbsent(key, d).map { oldValue ⇒ + // If there was an old value then `d` was never added + // and thus need to be cleanup. + cleanup(d) + oldValue + + } getOrElse (d) + } + } +} |