From 7a10c0ef2a6566229e8571f6d385ca2ff794cc20 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 2 Jan 2014 18:09:53 -0300 Subject: integrate trace and metrics into the base project --- .../ActorMessagePassingTracing.scala | 93 +++++++++++++ .../ActorSystemMessagePassingTracing.scala | 65 +++++++++ .../akka/instrumentation/AskPatternTracing.scala | 49 +++++++ kamon-core/src/main/scala/kamon/Kamon.scala | 6 +- .../instrumentation/ActorLoggingTracing.scala | 37 ++++++ .../scala/kamon/instrumentation/AspectJPimps.scala | 38 ------ .../instrumentation/ExecutorServiceMetrics.scala | 18 +-- .../kamon/instrumentation/FutureTracing.scala | 47 +++++++ .../instrumentation/MessageQueueMetrics.scala | 4 +- .../metric/ExecutorServiceMetricCollector.scala | 74 ----------- .../main/scala/kamon/metric/GaugeGenerator.scala | 27 ---- .../src/main/scala/kamon/metric/Metrics.scala | 132 ------------------ .../main/scala/kamon/metrics/ActorMetrics.scala | 31 +++++ .../main/scala/kamon/metrics/ActorMetricsOps.scala | 148 +++++++++++++++++++++ .../src/main/scala/kamon/metrics/package.scala | 31 +++++ .../src/main/scala/kamon/trace/Segments.scala | 38 ++++++ kamon-core/src/main/scala/kamon/trace/Trace.scala | 114 ++++++++++++++++ .../src/main/scala/kamon/trace/TraceContext.scala | 51 +++++++ .../src/main/scala/kamon/trace/UowTracing.scala | 82 ++++++++++++ .../kamon/trace/logging/LogbackUowConverter.scala | 24 ++++ 20 files changed, 823 insertions(+), 286 deletions(-) create mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala create mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala create mode 100644 kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/Metrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/package.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Segments.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Trace.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TraceContext.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/UowTracing.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala new file mode 100644 index 00000000..6cede344 --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -0,0 +1,93 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package akka.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{ Cell, Props, ActorSystem, ActorRef } +import akka.dispatch.{ Envelope, MessageDispatcher } +import kamon.trace.{ TraceContext, ContextAware, Trace } +import kamon.metrics.{ HdrActorMetricsRecorder, ActorMetrics } +import kamon.Kamon + +@Aspect("perthis(actorCellCreation(*, *, *, *, *))") +class BehaviourInvokeTracing { + var path: Option[String] = None + var actorMetrics: Option[HdrActorMetricsRecorder] = None + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val metricsExtension = Kamon(ActorMetrics)(system) + val simplePathString = ref.path.elements.mkString("/") + + if (metricsExtension.shouldTrackActor(simplePathString)) { + path = Some(ref.path.toString) + actorMetrics = Some(metricsExtension.registerActor(simplePathString)) + } + } + + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware] + + Trace.withContext(contextAndTimestamp.traceContext) { + pjp.proceed() + } + + actorMetrics.map { am ⇒ + am.recordProcessingTime(System.nanoTime() - timestampBeforeProcessing) + am.recordTimeInMailbox(timestampBeforeProcessing - contextAndTimestamp.timestamp) + } + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: Cell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: Cell): Unit = { + path.map(p ⇒ Kamon(ActorMetrics)(cell.system).unregisterActor(p)) + } +} + +@Aspect +class EnvelopeTraceContextMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixin: ContextAndTimestampAware = new ContextAndTimestampAware { + val traceContext: Option[TraceContext] = Trace.context() + val timestamp: Long = System.nanoTime() + } + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +trait ContextAndTimestampAware extends ContextAware { + def timestamp: Long +} diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala new file mode 100644 index 00000000..7d26016e --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala @@ -0,0 +1,65 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation._ +import kamon.trace.{ Trace, ContextAware } +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import org.aspectj.lang.ProceedingJoinPoint + +@Aspect +class SystemMessageTraceContextMixin { + + @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +@Aspect +class RepointableActorRefTraceContextMixin { + + @DeclareMixin("akka.actor.RepointableActorRef") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } + + @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") + def repointableActorRefCreation(repointableActorRef: ContextAware): Unit = {} + + @Around("repointableActorRefCreation(repointableActorRef)") + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: ContextAware): Any = { + Trace.withContext(repointableActorRef.traceContext) { + pjp.proceed() + } + } + +} + +@Aspect +class ActorSystemMessagePassingTracing { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[ContextAware].traceContext + Trace.withContext(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala new file mode 100644 index 00000000..b5b23e61 --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala @@ -0,0 +1,49 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package akka.instrumentation + +import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } +import akka.event.Logging.Warning +import scala.compat.Platform.EOL +import akka.actor.ActorRefProvider +import akka.pattern.{ AskTimeoutException, PromiseActorRef } + +@Aspect +class AskPatternTracing { + + class StackTraceCaptureException extends Throwable + + @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider") + def promiseActorRefApply(provider: ActorRefProvider): Unit = { + provider.settings.config.getBoolean("kamon.trace.ask-pattern-tracing") + } + + @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") + def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { + val future = promiseActor.result.future + val system = promiseActor.provider.guardian.underlying.system + implicit val ec = system.dispatcher + val stack = new StackTraceCaptureException + + future onFailure { + case timeout: AskTimeoutException ⇒ + val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) + + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing], + "Timeout triggered for ask pattern registered at: " + stackString)) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index b5c3d552..b72e8fea 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -18,10 +18,8 @@ package kamon import akka.actor._ object Kamon { - trait Extension extends akka.actor.Extension { - def manager: ActorRef - } + trait Extension extends akka.actor.Extension - def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager + def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system) } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala new file mode 100644 index 00000000..297017cf --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala @@ -0,0 +1,37 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{ ContextAware, Trace } + +@Aspect +class ActorLoggingTracing { + + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = { + Trace.withContext(logEvent.traceContext) { + pjp.proceed() + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala deleted file mode 100644 index 252e5220..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import org.aspectj.lang.ProceedingJoinPoint - -trait ProceedingJoinPointPimp { - import language.implicitConversions - - implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) -} - -object ProceedingJoinPointPimp extends ProceedingJoinPointPimp - -case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { - def proceedWith(newUniqueArg: AnyRef) = { - val args = pjp.getArgs - args.update(0, newUniqueArg) - pjp.proceed(args) - } - - def proceedWithTarget(args: AnyRef*) = { - pjp.proceed(args.toArray) - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index a3da76f7..90d2b270 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -19,10 +19,8 @@ import org.aspectj.lang.annotation._ import java.util.concurrent._ import org.aspectj.lang.ProceedingJoinPoint import java.util -import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector } import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory } import com.typesafe.config.Config -import kamon.Kamon import scala.concurrent.forkjoin.ForkJoinPool import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool @@ -41,8 +39,8 @@ class ActorSystemInstrumentation { @Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") class ForkJoinPoolInstrumentation { - var activeThreadsHistogram: Histogram = _ - var poolSizeHistogram: Histogram = _ + /* var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _*/ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} @@ -71,8 +69,8 @@ class ForkJoinPoolInstrumentation { @After("forkJoinScan(fjp)") def updateMetrics(fjp: AkkaForkJoinPool): Unit = { - activeThreadsHistogram.update(fjp.getActiveThreadCount) - poolSizeHistogram.update(fjp.getPoolSize) + /*activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize)*/ } } @@ -90,6 +88,7 @@ trait WatchedExecutorService { def collector: ExecutorServiceCollector } +/* trait ExecutorServiceMonitoring { def dispatcherMetrics: DispatcherMetricCollector } @@ -97,6 +96,7 @@ trait ExecutorServiceMonitoring { class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { @volatile var dispatcherMetrics: DispatcherMetricCollector = _ } +*/ case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = delegate.createExecutorService @@ -133,9 +133,9 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { val delegate = pjp.proceed().asInstanceOf[ExecutorService] - val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - ExecutorServiceMetricCollector.register(executorFullName, delegate) + //ExecutorServiceMetricCollector.register(executorFullName, delegate) new NamedExecutorServiceDelegate(executorFullName, delegate) } @@ -143,7 +143,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { def shutdown() = { - ExecutorServiceMetricCollector.deregister(fullName) + //ExecutorServiceMetricCollector.deregister(fullName) delegate.shutdown() } def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala new file mode 100644 index 00000000..5600d582 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala @@ -0,0 +1,47 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{ ContextAware, TraceContext, Trace } + +@Aspect +class FutureTracing { + + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") + def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {} + + @After("futureRelatedRunnableCreation(runnable)") + def afterCreation(runnable: ContextAware): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") + def futureRelatedRunnableExecution(runnable: ContextAware) = {} + + @Around("futureRelatedRunnableExecution(runnable)") + def aroundExecution(pjp: ProceedingJoinPoint, runnable: ContextAware): Any = { + Trace.withContext(runnable.traceContext) { + pjp.proceed() + } + } + +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index da797fa1..44eb8c43 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -15,17 +15,16 @@ * ========================================================== */ package kamon.instrumentation -import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram } import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue } import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import akka.actor.{ ActorSystem, ActorRef } -import kamon.metric.{ Metrics, MetricDirectory } import org.aspectj.lang.ProceedingJoinPoint /** * For Mailboxes we would like to track the queue size and message latency. Currently the latency * will be gathered from the ActorCellMetrics. */ +/* @Aspect class MessageQueueInstrumentation { @@ -74,4 +73,5 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def hasMessages: Boolean = delegate.hasMessages def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } +*/ diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala deleted file mode 100644 index 4c4b93e9..00000000 --- a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import java.util.concurrent.{ ThreadPoolExecutor, ExecutorService } -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.{ Metric, MetricFilter } - -object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { - - def register(fullName: String, executorService: ExecutorService) = executorService match { - case fjp: ForkJoinPool ⇒ registerForkJoinPool(fullName, fjp) - case tpe: ThreadPoolExecutor ⇒ registerThreadPoolExecutor(fullName, tpe) - case _ ⇒ // If it is a unknown Executor then just do nothing. - } - - def deregister(fullName: String) = { - Metrics.registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -trait ForkJoinPoolMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { - val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ - - val allMetrics = Map( - fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), - fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), - fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)) - - allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } - } -} - -trait ThreadPoolExecutorMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { - val tpeGauge = newNumericGaugeFor(tpe) _ - - val allMetrics = Map( - fullName + queueSize -> tpeGauge(_.getQueue.size()), - fullName + poolSize -> tpeGauge(_.getPoolSize), - fullName + activeThreads -> tpeGauge(_.getActiveCount)) - - allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } - } -} - -object BasicExecutorMetricNames { - val queueSize = "queueSize" - val poolSize = "threads/poolSize" - val activeThreads = "threads/activeThreads" -} - diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala deleted file mode 100644 index 9eff2739..00000000 --- a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import com.codahale.metrics.Gauge - -trait GaugeGenerator { - - def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T ⇒ V) = new Gauge[V] { - def getValue: V = generator(target) - } -} - -object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala deleted file mode 100644 index b904ec56..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import java.util.concurrent.TimeUnit -import akka.actor.ActorRef -import com.codahale.metrics -import com.codahale.metrics.{ MetricFilter, Metric, ConsoleReporter, MetricRegistry } -import scala.collection.concurrent.TrieMap - -object Metrics { - val registry: MetricRegistry = new MetricRegistry - - val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) - //consoleReporter.build().start(45, TimeUnit.SECONDS) - - //val newrelicReporter = NewRelicReporter(registry) - //newrelicReporter.start(5, TimeUnit.SECONDS) - - def include(name: String, metric: Metric) = { - //registry.register(name, metric) - } - - def exclude(name: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(name) - }) - } - - def deregister(fullName: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -object Watched { - case object Actor - case object Dispatcher -} - -object MetricDirectory { - def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" - - def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" - - def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") - - def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") - - def shouldInstrumentActor(actorPath: String): Boolean = { - !(actorPath.isEmpty || actorPath.startsWith("system")) - } - -} - -case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) - -trait Histogram { - def update(value: Long): Unit - def snapshot: HistogramSnapshot -} - -trait HistogramSnapshot { - def median: Double - def max: Double - def min: Double -} - -case class ActorSystemMetrics(actorSystemName: String) { - val dispatchers = TrieMap.empty[String, DispatcherMetricCollector] - - private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) - - def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { - val stats = createDispatcherCollector - dispatchers.put(dispatcherName, stats) - Some(stats) - } - -} - -case class CodahaleHistogram() extends Histogram { - private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) - - def update(value: Long) = histogram.update(value) - def snapshot: HistogramSnapshot = { - val snapshot = histogram.getSnapshot - - CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) - } -} - -case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot - -/** - * Dispatcher Metrics that we care about currently with a histogram-like nature: - * - Work Queue Size - * - Total/Active Thread Count - */ - -import annotation.tailrec -import java.util.concurrent.atomic.AtomicReference - -object Atomic { - def apply[T](obj: T) = new Atomic(new AtomicReference(obj)) - implicit def toAtomic[T](ref: AtomicReference[T]): Atomic[T] = new Atomic(ref) -} - -class Atomic[T](val atomic: AtomicReference[T]) { - @tailrec - final def update(f: T ⇒ T): T = { - val oldValue = atomic.get() - val newValue = f(oldValue) - if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) - } - - def get() = atomic.get() -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala new file mode 100644 index 00000000..72e473e8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -0,0 +1,31 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ + +package kamon.metrics + +import akka.actor.{ Props, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor +import kamon.Kamon + +object ActorMetrics extends ExtensionId[ActorMetricsExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = ActorMetrics + def createExtension(system: ExtendedActorSystem): ActorMetricsExtension = new ActorMetricsExtension(system) + +} + +class ActorMetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension with ActorMetricsOps { + lazy val metricsDispatcher = system.actorOf(Props[ActorMetricsDispatcher], "kamon-actor-metrics") +} diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala new file mode 100644 index 00000000..dc4abde0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala @@ -0,0 +1,148 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.HdrHistogram.{ AbstractHistogram, AtomicHistogram } +import kamon.util.GlobPathFilter +import scala.collection.concurrent.TrieMap +import scala.collection.JavaConversions.iterableAsScalaIterable +import akka.actor._ +import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics } +import kamon.Kamon +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import kamon.metrics.ActorMetricsDispatcher.Subscribe + +trait ActorMetricsOps { + self: ActorMetricsExtension ⇒ + + val config = system.settings.config.getConfig("kamon.metrics.actors") + val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]() + + val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector + val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector + + val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = { + val settings = config.getConfig("hdr-settings") + val processingTimeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("processing-time")) + val timeInMailboxHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("time-in-mailbox")) + val mailboxSizeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("mailbox-size")) + + () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig) + } + + import scala.concurrent.duration._ + system.scheduler.schedule(0.seconds, 10.seconds)( + actorMetrics.collect { + case (name, recorder: HdrActorMetricsRecorder) ⇒ + println(s"Actor: $name") + recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D) + })(system.dispatcher) + + def shouldTrackActor(path: String): Boolean = + trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path)) + + def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory()) + + def unregisterActor(path: String): Unit = actorMetrics.remove(path) +} + +class HdrActorMetricsRecorder(processingTimeHdrConfig: HdrConfiguration, timeInMailboxHdrConfig: HdrConfiguration, + mailboxSizeHdrConfig: HdrConfiguration) { + + val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits) + val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits) + val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits) + + def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime) + + def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime) + + def snapshot(): HdrActorMetricsSnapshot = { + HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy()) + } + + def reset(): Unit = { + processingTimeHistogram.reset() + timeInMailboxHistogram.reset() + mailboxSizeHistogram.reset() + } +} + +case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram, + mailboxSizeHistogram: AbstractHistogram) + +class ActorMetricsDispatcher extends Actor { + val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty + var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty + var lastTick = System.currentTimeMillis() + + def receive = { + case Subscribe(path, true) ⇒ subscribeForever(path, sender) + case Subscribe(path, false) ⇒ subscribeOneOff(path, sender) + case FlushMetrics ⇒ flushMetrics() + } + + def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever) + + def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne) + + def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = { + val pathFilter = new GlobPathFilter(path) + val oldReceivers = target.get(pathFilter).getOrElse(Nil) + target.updated(pathFilter, receiver :: oldReceivers) + } + + def flushMetrics(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(ActorMetrics)(context.system).actorMetrics.map { + case (path, metrics) ⇒ + val snapshot = metrics.snapshot() + metrics.reset() + + (path, snapshot) + }.toMap + + dispatchMetricsTo(subscribedForOne, snapshots, currentTick) + dispatchMetricsTo(subscribedForever, snapshots, currentTick) + + subscribedForOne = Map.empty + lastTick = currentTick + } + + def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot], + currentTick: Long): Unit = { + + for ((subscribedPath, receivers) ← subscribers) { + val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath)) + val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics) + + receivers.foreach(ref ⇒ ref ! actorMetrics) + } + } +} + +object ActorMetricsDispatcher { + case class Subscribe(path: String, forever: Boolean = false) + case class UnSubscribe(path: String) + + case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot]) + case object FlushMetrics +} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala new file mode 100644 index 00000000..d6359ead --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import scala.concurrent.duration._ +import com.typesafe.config.Config + +package object metrics { + val OneHour = 1.hour.toNanos + + case class HdrConfiguration(highestTrackableValue: Long, significantValueDigits: Int) + case object HdrConfiguration { + def fromConfig(config: Config): HdrConfiguration = { + HdrConfiguration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala new file mode 100644 index 00000000..0bc68ee7 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Segments.scala @@ -0,0 +1,38 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ + +package kamon.trace + +import kamon.trace.Trace.SegmentCompletionHandle + +object Segments { + + trait Category + case object HttpClientRequest extends Category + + case class Start(category: Category, description: String = "", + attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime()) + + case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime()) + + case class Segment(start: Start, end: End) + + trait SegmentCompletionHandleAware { + var completionHandle: Option[SegmentCompletionHandle] + } + + trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware +} diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala new file mode 100644 index 00000000..31e8185a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala @@ -0,0 +1,114 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace + +import kamon.Kamon +import akka.actor._ +import scala.Some +import kamon.trace.Trace.Register +import scala.concurrent.duration._ +import java.util.concurrent.atomic.AtomicLong + +object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: Extension] = Trace + def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) + + /*** Protocol */ + case object Register + + /** User API */ + //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) + private[trace] val traceContext = new ThreadLocal[Option[TraceContext]] { + override def initialValue(): Option[TraceContext] = None + } + private[trace] val tranid = new AtomicLong() + + def context() = traceContext.get + private def set(ctx: Option[TraceContext]) = traceContext.set(ctx) + + def clear: Unit = traceContext.remove() + def start(name: String)(implicit system: ActorSystem): TraceContext = { + val ctx = newTraceContext(name) + ctx.start(name) + set(Some(ctx)) + + ctx + } + + def withContext[T](ctx: Option[TraceContext])(thunk: ⇒ T): T = { + val oldval = context + set(ctx) + + try thunk + finally set(oldval) + } + + def transformContext(f: TraceContext ⇒ TraceContext): Unit = { + context.map(f).foreach(ctx ⇒ set(Some(ctx))) + } + + def finish(): Option[TraceContext] = { + val ctx = context() + ctx.map(_.finish) + clear + ctx + } + + // TODO: FIX + def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name) + + def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = { + val start = Segments.Start(category, description, attributes) + SegmentCompletionHandle(start) + } + + def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start) + + case class SegmentCompletionHandle(start: Segments.Start) { + def complete(): Unit = { + val end = Segments.End() + println(s"Completing the Segment: $start - $end") + } + def complete(end: Segments.End): Unit = { + println(s"Completing the Segment: $start - $end") + } + } +} + +class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") +} + +class TraceManager extends Actor with ActorLogging { + var listeners: Seq[ActorRef] = Seq.empty + + def receive = { + case Register ⇒ + listeners = sender +: listeners + log.info("Registered [{}] as listener for Kamon traces", sender) + + case segment: UowSegment ⇒ + val tracerName = segment.id.toString + context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment + + case trace: UowTrace ⇒ + listeners foreach (_ ! trace) + } + + def newTracer(name: String): ActorRef = { + context.actorOf(UowTraceAggregator.props(self, 30 seconds), name) + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala new file mode 100644 index 00000000..3e68a816 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -0,0 +1,51 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace + +import java.util.UUID +import akka.actor._ +import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.duration._ +import kamon.Kamon +import kamon.trace.UowTracing.{ Finish, Start } + +// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. +case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { + + def start(name: String) = { + collector ! Start(id, name) + } + + def finish: Unit = { + collector ! Finish(id) + } + +} + +trait ContextAware { + def traceContext: Option[TraceContext] +} + +object ContextAware { + def default: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } +} + +trait TimedContextAware { + def timestamp: Long + def traceContext: Option[TraceContext] +} diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..20cce830 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,82 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace + +import akka.actor._ +import scala.concurrent.duration.Duration +import kamon.trace.UowTracing._ + +sealed trait UowSegment { + def id: Long + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start(id: Long, name: String) extends AutoTimestamp + case class Finish(id: Long) extends AutoTimestamp + case class Rename(id: Long, name: String) extends AutoTimestamp + case class WebExternalStart(id: Long, host: String) extends AutoTimestamp + case class WebExternalFinish(id: Long) extends AutoTimestamp + case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp +} + +case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) { + def elapsed: Long = end - start +} + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { + context.setReceiveTimeout(aggregationTimeout) + + var name: String = "UNKNOWN" + var segments: Seq[UowSegment] = Nil + + var pendingExternal = List[WebExternalStart]() + + var start = 0L + var end = 0L + + def receive = { + case start: Start ⇒ + this.start = start.timestamp + segments = segments :+ start + name = start.name + case finish: Finish ⇒ + end = finish.timestamp + segments = segments :+ finish; finishTracing() + case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ { + segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) + }) + case Rename(id, newName) ⇒ name = newName + case segment: UowSegment ⇒ segments = segments :+ segment + case ReceiveTimeout ⇒ + log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) + context.stop(self) + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name, "", start, end, segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +} diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala new file mode 100644 index 00000000..add47fdf --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala @@ -0,0 +1,24 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.logging + +import ch.qos.logback.classic.pattern.ClassicConverter +import ch.qos.logback.classic.spi.ILoggingEvent +import kamon.trace.Trace + +class LogbackUowConverter extends ClassicConverter { + def convert(event: ILoggingEvent): String = Trace.context().map(_.uow).getOrElse("undefined") +} -- cgit v1.2.3