From 49c426a635d10182e8a628353dfdf5510c4d9df2 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 31 Jan 2014 09:01:18 -0300 Subject: remake of trace context and allow different tracing levels --- kamon-core/src/main/resources/reference.conf | 4 +- .../ActorMessagePassingTracing.scala | 4 +- .../ActorSystemMessagePassingTracing.scala | 10 +- .../instrumentation/ActorLoggingTracing.scala | 4 +- .../kamon/instrumentation/FutureTracing.scala | 4 +- .../main/scala/kamon/metrics/TraceMetrics.scala | 4 +- .../src/main/scala/kamon/trace/Segments.scala | 39 ------ kamon-core/src/main/scala/kamon/trace/Trace.scala | 123 ------------------ .../src/main/scala/kamon/trace/TraceContext.scala | 137 +++++++++++++++++---- .../src/main/scala/kamon/trace/TraceCtx.scala | 125 ------------------- .../src/main/scala/kamon/trace/TraceRecorder.scala | 69 +++++++++++ .../test/scala/akka/testkit/TestProbeTracing.scala | 2 +- .../trace/instrumentation/ActorLoggingSpec.scala | 12 +- .../ActorMessagePassingTracingSpec.scala | 35 +++--- ...orSystemMessagePassingInstrumentationSpec.scala | 58 +++++---- .../instrumentation/AskPatternTracingSpec.scala | 26 ++-- .../trace/instrumentation/FutureTracingSpec.scala | 38 +++--- .../instrumentation/TraceContextFixture.scala | 10 -- 18 files changed, 285 insertions(+), 419 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/trace/Segments.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/Trace.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/TraceCtx.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index e6124365..a6dd6c15 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -11,7 +11,7 @@ kamon { }, { trace { - includes = [ "**" ] + includes = [ "*" ] excludes = [] } }, @@ -46,7 +46,7 @@ kamon { highest-trackable-value = 3600000000000 significant-value-digits = 2 } - segments { + segment { highest-trackable-value = 3600000000000 significant-value-digits = 2 } diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 98700974..199b2bb2 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -49,7 +49,7 @@ class BehaviourInvokeTracing { val timestampBeforeProcessing = System.nanoTime() val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - TraceRecorder.withContext(contextAndTimestamp.traceContext) { + TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { pjp.proceed() } @@ -73,7 +73,7 @@ class BehaviourInvokeTracing { class EnvelopeTraceContextMixin { @DeclareMixin("akka.dispatch.Envelope") - def mixin: TraceContextAware = new TraceContextAware {} + def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") def envelopeCreation(ctx: TraceContextAware): Unit = {} diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala index d4f8f769..7d03d946 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala @@ -3,13 +3,13 @@ package akka.instrumentation import org.aspectj.lang.annotation._ import akka.dispatch.sysmsg.EarliestFirstSystemMessageList import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{TraceRecorder, TraceContextAware} +import kamon.trace.{ TraceRecorder, TraceContextAware } @Aspect class SystemMessageTraceContextMixin { @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixin: TraceContextAware = new TraceContextAware {} + def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") def envelopeCreation(ctx: TraceContextAware): Unit = {} @@ -25,7 +25,7 @@ class SystemMessageTraceContextMixin { class RepointableActorRefTraceContextMixin { @DeclareMixin("akka.actor.RepointableActorRef") - def mixin: TraceContextAware = new TraceContextAware {} + def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") def envelopeCreation(ctx: TraceContextAware): Unit = {} @@ -41,7 +41,7 @@ class RepointableActorRefTraceContextMixin { @Around("repointableActorRefCreation(repointableActorRef)") def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withContext(repointableActorRef.traceContext) { + TraceRecorder.withTraceContext(repointableActorRef.traceContext) { pjp.proceed() } } @@ -58,7 +58,7 @@ class ActorSystemMessagePassingTracing { def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { if (messages.nonEmpty) { val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withContext(ctx)(pjp.proceed()) + TraceRecorder.withTraceContext(ctx)(pjp.proceed()) } else pjp.proceed() } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala index abd3514e..954f351a 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala @@ -23,14 +23,14 @@ import kamon.trace.{ TraceContextAware, TraceRecorder } class ActorLoggingTracing { @DeclareMixin("akka.event.Logging.LogEvent+") - def mixin: TraceContextAware = new TraceContextAware {} + def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} @Around("withMdcInvocation(logSource, logEvent, logStatement)") def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withContext(logEvent.traceContext) { + TraceRecorder.withTraceContext(logEvent.traceContext) { pjp.proceed() } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala index b8725dd7..634c94a1 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala @@ -23,7 +23,7 @@ import kamon.trace.{ TraceContextAware, TraceRecorder } class FutureTracing { @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def mixin: TraceContextAware = new TraceContextAware {} + def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} @@ -39,7 +39,7 @@ class FutureTracing { @Around("futureRelatedRunnableExecution(runnable)") def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { - TraceRecorder.withContext(runnable.traceContext) { + TraceRecorder.withTraceContext(runnable.traceContext) { pjp.proceed() } } diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala index 25ebce00..57a79653 100644 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -57,9 +57,7 @@ object TraceMetrics extends MetricGroupIdentity.Category with MetricGroupFactory val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) - new TraceMetricRecorder( - HighDynamicRangeRecorder(elapsedTimeHdrConfig), - () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) } } diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala deleted file mode 100644 index e6d9745b..00000000 --- a/kamon-core/src/main/scala/kamon/trace/Segments.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ - -package kamon.trace - - -import kamon.trace.TraceOld.SegmentCompletionHandle - -object Segments { - - trait Category - case object HttpClientRequest extends Category - - case class Start(category: Category, description: String = "", - attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime()) - - case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime()) - - case class Segment(start: Start, end: End) - - trait SegmentCompletionHandleAware { - var completionHandle: Option[SegmentCompletionHandle] - } - - trait ContextAndSegmentCompletionAware extends TraceContextAware with SegmentCompletionHandleAware -} diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala deleted file mode 100644 index bdfd6aa3..00000000 --- a/kamon-core/src/main/scala/kamon/trace/Trace.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace - -import kamon.Kamon -import akka.actor._ -import scala.Some -import kamon.trace.TraceOld.Register -import scala.concurrent.duration._ -import java.util.concurrent.atomic.AtomicLong -import scala.util.Try -import java.net.InetAddress - -object TraceOld extends ExtensionId[TraceExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: Extension] = TraceOld - def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) - - /*** Protocol */ - case object Register - - /** User API */ - //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) - private[trace] val traceContext = new ThreadLocal[Option[TraceContextOld]] { - override def initialValue(): Option[TraceContextOld] = None - } - private[trace] val tranid = new AtomicLong() - - def context() = traceContext.get - private def set(ctx: Option[TraceContextOld]) = traceContext.set(ctx) - - def clear: Unit = traceContext.remove() - def start(name: String, token: Option[String])(implicit system: ActorSystem): TraceContextOld = { - val ctx = newTraceContext(name, token.getOrElse(TraceToken.generate())) - ctx.start(name) - set(Some(ctx)) - - ctx - } - - def withContext[T](ctx: Option[TraceContextOld])(thunk: ⇒ T): T = { - val oldval = context - set(ctx) - - try thunk - finally set(oldval) - } - - def transformContext(f: TraceContextOld ⇒ TraceContextOld): Unit = { - context.map(f).foreach(ctx ⇒ set(Some(ctx))) - } - - def finish(): Option[TraceContextOld] = { - val ctx = context() - ctx.map(_.finish) - clear - ctx - } - - // TODO: FIX - def newTraceContext(name: String, token: String)(implicit system: ActorSystem): TraceContextOld = TraceContextOld(Kamon(TraceOld).api, tranid.getAndIncrement, name, token) - - def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = { - val start = Segments.Start(category, description, attributes) - SegmentCompletionHandle(start) - } - - def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start) - - case class SegmentCompletionHandle(start: Segments.Start) { - def complete(): Unit = { - val end = Segments.End() - //println(s"Completing the Segment: $start - $end") - } - def complete(end: Segments.End): Unit = { - //println(s"Completing the Segment: $start - $end") - } - } -} - -object TraceToken { - val tokenCounter = new AtomicLong - val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def generate(): String = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) -} - -class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") -} - -class TraceManager extends Actor with ActorLogging { - var listeners: Seq[ActorRef] = Seq.empty - - def receive = { - case Register ⇒ - listeners = sender +: listeners - log.info("Registered [{}] as listener for Kamon traces", sender) - - case segment: UowSegment ⇒ - val tracerName = segment.id.toString - context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment - - case trace: UowTrace ⇒ - listeners foreach (_ ! trace) - } - - def newTracer(name: String): ActorRef = { - context.actorOf(UowTraceAggregator.props(self, 30 seconds), name) - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 95a3a8b2..d3759a26 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -1,36 +1,129 @@ -/* =================================================== +/* + * ========================================================================================= * Copyright © 2013 the kamon project * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.trace -import java.util.UUID -import akka.actor._ -import java.util.concurrent.atomic.AtomicLong -import scala.concurrent.duration._ +import akka.actor.ActorSystem import kamon.Kamon -import kamon.trace.UowTracing.{ Finish, Start } +import kamon.metrics.{ MetricGroupRecorder, MetricIdentity, TraceMetrics, Metrics } +import java.util.concurrent.ConcurrentLinkedQueue +import kamon.trace.TraceContextAware.DefaultTraceContextAware +import kamon.trace.TraceContext.SegmentIdentity + +trait TraceContext { + def name: String + def token: String + def system: ActorSystem + def rename(name: String): Unit + def levelOfDetail: TracingLevelOfDetail + def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle + def finish(metadata: Map[String, String]) +} + +object TraceContext { + type SegmentIdentity = MetricIdentity +} + +trait SegmentCompletionHandle { + def finish(metadata: Map[String, String]) +} + +case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String]) -// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContextOld(private val collector: ActorRef, id: Long, name: String, token: String, userContext: Option[Any] = None) { +sealed trait TracingLevelOfDetail +case object OnlyMetrics extends TracingLevelOfDetail +case object SimpleTrace extends TracingLevelOfDetail +case object FullTrace extends TracingLevelOfDetail - def start(name: String) = { - collector ! Start(id, name) +trait TraceContextAware { + def captureMark: Long + def traceContext: Option[TraceContext] +} + +object TraceContextAware { + def default: TraceContextAware = new DefaultTraceContextAware + + class DefaultTraceContextAware extends TraceContextAware { + val captureMark = System.nanoTime() + val traceContext = TraceRecorder.currentContext } +} - def finish: Unit = { - collector ! Finish(id) +trait SegmentCompletionHandleAware extends TraceContextAware { + @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None +} + +object SegmentCompletionHandleAware { + def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware + + class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {} +} + +class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String], + val system: ActorSystem) extends TraceContext { + @volatile private var _isOpen = true + val levelOfDetail = OnlyMetrics + val startMark = System.nanoTime() + val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() + val metricsExtension = Kamon(Metrics)(system) + + def name: String = _name + + def rename(newName: String): Unit = _name = newName + + def isOpen(): Boolean = _isOpen + + def finish(metadata: Map[String, String]): Unit = { + _isOpen = false + val finishMark = System.nanoTime() + val metricRecorder = metricsExtension.register(name, TraceMetrics) + + metricRecorder.map { traceMetrics ⇒ + traceMetrics.elapsedTime.record(finishMark - startMark) + drainFinishedSegments(traceMetrics) + } + } + + private def drainFinishedSegments(metricRecorder: MetricGroupRecorder): Unit = { + while (!finishedSegments.isEmpty) { + val segmentData = finishedSegments.poll() + metricRecorder.record(segmentData.identity, segmentData.duration) + } } + private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = { + finishedSegments.add(SegmentData(identity, duration, metadata)) + + if (!_isOpen) { + metricsExtension.register(name, TraceMetrics).map { traceMetrics ⇒ + drainFinishedSegments(traceMetrics) + } + } + } + + def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle = + new SimpleMetricCollectionCompletionHandle(identity, metadata) + + class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle { + val segmentStartMark = System.nanoTime() + + def finish(metadata: Map[String, String] = Map.empty): Unit = { + val segmentFinishMark = System.nanoTime() + finishSegment(identity, (segmentFinishMark - segmentStartMark), startMetadata ++ metadata) + } + } } + diff --git a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala b/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala deleted file mode 100644 index 1e552563..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import akka.actor.{ExtendedActorSystem, ActorSystem} -import akka.dispatch.AbstractNodeQueue -import kamon.Kamon -import kamon.metrics.{TraceMetrics, Metrics} -import java.util.concurrent.atomic.AtomicLong - -sealed trait TracingLevelOfDetail -case object OnlyMetrics extends TracingLevelOfDetail -case object SimpleTrace extends TracingLevelOfDetail -case object FullTrace extends TracingLevelOfDetail - -trait TraceContext { - def token: String - def name: String - def rename(newName: String): Unit - def levelOfDetail: TracingLevelOfDetail - def startSegment - def finish(metadata: Map[String, String]) - -} - -trait TraceContextAware { - def captureMark: Long - def traceContext: Option[TraceContext] -} - -object TraceContextAware { - def default: TraceContextAware = new TraceContextAware { - val captureMark = System.nanoTime() - val traceContext = TraceRecorder.currentContext - } -} - -object TraceContext { - -} - -class SimpleMetricCollectionContext(val token: String, @volatile private var _name: String, val system: ActorSystem, - metadata: Map[String, String]) extends TraceContext { - val levelOfDetail = OnlyMetrics - - @volatile private var _isOpen = true - - val startMark = System.nanoTime() - - def name: String = _name - - def rename(newName: String): Unit = _name = newName - - def isOpen(): Boolean = _isOpen - - def finish(metadata: Map[String, String]): Unit = { - val finishMark = System.nanoTime() - - // Store all metrics! - val metricsExtension = Kamon(Metrics)(system) - val metricRecorder = metricsExtension.register(name, TraceMetrics) - - metricRecorder.map { traceMetrics => - traceMetrics.elapsedTime.record(finishMark - startMark) - } - - } - - override def startSegment: Unit = ??? - - -} - -private[kamon] class SegmentRecordingQueue extends AbstractNodeQueue[String] - - - - -class TraceRecorder(system: ExtendedActorSystem) { - -} - -object TraceRecorder { - private val tokenCounter = new AtomicLong - private val traceContextStorage = new ThreadLocal[Option[TraceContext]] { - override def initialValue(): Option[TraceContext] = None - } - - private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], system: ActorSystem): TraceContext = ??? - - def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context) - - def clearContext: Unit = traceContextStorage.set(None) - - def currentContext: Option[TraceContext] = traceContextStorage.get() - - def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = { - val ctx = newTraceContext(name, token, metadata, system) - traceContextStorage.set(Some(ctx)) - } - - def withContext[T](context: Option[TraceContext])(thunk: => T): T = { - val oldContext = currentContext - setContext(context) - - try thunk finally setContext(oldContext) - } - - def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata)) - -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala new file mode 100644 index 00000000..3e3bb19f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -0,0 +1,69 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.atomic.AtomicLong +import scala.util.Try +import java.net.InetAddress +import akka.actor.ActorSystem +import kamon.trace.TraceContext.SegmentIdentity + +object TraceRecorder { + private val traceContextStorage = new ThreadLocal[Option[TraceContext]] { + override def initialValue(): Option[TraceContext] = None + } + + private val tokenCounter = new AtomicLong + private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + + def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) + + private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], + system: ActorSystem): TraceContext = { + + // In the future this should select between implementations. + val finalToken = token.getOrElse(newToken) + new SimpleMetricCollectionContext(name, finalToken, metadata, system) + } + + def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context) + + def clearContext: Unit = traceContextStorage.set(None) + + def currentContext: Option[TraceContext] = traceContextStorage.get() + + def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = { + val ctx = newTraceContext(name, token, metadata, system) + traceContextStorage.set(Some(ctx)) + } + + def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] = + currentContext.map(_.startSegment(identity, metadata)) + + def withNewTraceContext[T](name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(thunk: ⇒ T)(implicit system: ActorSystem): T = + withTraceContext(Some(newTraceContext(name, token, metadata, system)))(thunk) + + def withTraceContext[T](context: Option[TraceContext])(thunk: ⇒ T): T = { + val oldContext = currentContext + setContext(context) + + try thunk finally setContext(oldContext) + } + + def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata)) + +} diff --git a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala index c681e921..a050145a 100644 --- a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala +++ b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala @@ -46,7 +46,7 @@ class TestProbeTracing { case _ ⇒ None } - TraceRecorder.withContext(traceContext) { + TraceRecorder.withTraceContext(traceContext) { pjp.proceed } } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala index d6648cef..a0d8e933 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala @@ -18,24 +18,24 @@ package kamon.trace.instrumentation import akka.testkit.TestKit import org.scalatest.{ Inspectors, Matchers, WordSpecLike } import akka.actor.{ Props, ActorLogging, Actor, ActorSystem } -import akka.event.Logging.{ LogEvent } -import kamon.trace.{ ContextAware, TraceContext, Trace } +import akka.event.Logging.LogEvent +import kamon.trace.{TraceContextAware, TraceRecorder} class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors { "the ActorLogging instrumentation" should { "attach the TraceContext (if available) to log events" in { - val testTraceContext = Some(TraceContext(Actor.noSender, 1, "test", "test-1")) val loggerActor = system.actorOf(Props[LoggerActor]) system.eventStream.subscribe(testActor, classOf[LogEvent]) - Trace.withContext(testTraceContext) { + val testTraceContext = TraceRecorder.withNewTraceContext("logging") { loggerActor ! "info" + TraceRecorder.currentContext } fishForMessage() { case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ - val ctxInEvent = event.asInstanceOf[ContextAware].traceContext + val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext ctxInEvent === testTraceContext case event: LogEvent ⇒ false @@ -46,6 +46,6 @@ class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with W class LoggerActor extends Actor with ActorLogging { def receive = { - case "info" ⇒ log.info("TraceContext => {}", Trace.context()) + case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) } } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala index f32623b9..acc939fb 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala @@ -15,70 +15,71 @@ * ========================================================== */ package kamon.trace.instrumentation -import org.scalatest.{ WordSpecLike, Matchers } -import akka.actor.{ ActorRef, Actor, Props, ActorSystem } +import org.scalatest.WordSpecLike +import akka.actor.{ Actor, Props, ActorSystem } import akka.testkit.{ ImplicitSender, TestKit } -import kamon.trace.Trace +import kamon.trace.TraceRecorder import akka.pattern.{ pipe, ask } import akka.util.Timeout import scala.concurrent.duration._ -import scala.concurrent.{ Await, Future } import akka.routing.RoundRobinRouter -import kamon.trace.TraceContext class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { implicit val executionContext = system.dispatcher "the message passing instrumentation" should { - "propagate the TraceContext using bang" in new TraceContextEchoFixture { - Trace.withContext(testTraceContext) { + "propagate the TraceContext using bang" in new EchoActorFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { ctxEchoActor ! "test" + TraceRecorder.currentContext } expectMsg(testTraceContext) } - "propagate the TraceContext using tell" in new TraceContextEchoFixture { - Trace.withContext(testTraceContext) { + "propagate the TraceContext using tell" in new EchoActorFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { ctxEchoActor.tell("test", testActor) + TraceRecorder.currentContext } expectMsg(testTraceContext) } - "propagate the TraceContext using ask" in new TraceContextEchoFixture { + "propagate the TraceContext using ask" in new EchoActorFixture { implicit val timeout = Timeout(1 seconds) - Trace.withContext(testTraceContext) { + val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. (ctxEchoActor ? "test") pipeTo (testActor) + TraceRecorder.currentContext } expectMsg(testTraceContext) } - "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture { - Trace.withContext(testTraceContext) { + "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { ctxEchoActor ! "test" + TraceRecorder.currentContext } expectMsg(testTraceContext) } } - trait TraceContextEchoFixture { - val testTraceContext = Some(Trace.newTraceContext("test", "test-1")) + trait EchoActorFixture { val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) } - trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + trait RoutedEchoActorFixture extends EchoActorFixture { override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1))) } } class TraceContextEcho extends Actor { def receive = { - case msg: String ⇒ sender ! Trace.context() + case msg: String ⇒ sender ! TraceRecorder.currentContext } } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala index 7d539370..00ecae79 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala @@ -3,7 +3,7 @@ package kamon.trace.instrumentation import akka.testkit.{ ImplicitSender, TestKit } import akka.actor._ import org.scalatest.WordSpecLike -import kamon.trace.Trace +import kamon.trace.TraceRecorder import scala.util.control.NonFatal import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume } import scala.concurrent.duration._ @@ -12,43 +12,44 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" implicit val executionContext = system.dispatcher "the system message passing instrumentation" should { - "keep the TraceContext while processing the Create message in top level actors" in new TraceContextFixture { - Trace.withContext(testTraceContext) { + "keep the TraceContext while processing the Create message in top level actors" in { + val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { system.actorOf(Props(new Actor { - - testActor ! Trace.context() - + testActor ! TraceRecorder.currentContext def receive: Actor.Receive = { case any ⇒ } })) + + TraceRecorder.currentContext } expectMsg(testTraceContext) } - "keep the TraceContext while processing the Create message in non top level actors" in new TraceContextFixture { - Trace.withContext(testTraceContext) { + "keep the TraceContext while processing the Create message in non top level actors" in { + val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { system.actorOf(Props(new Actor { def receive: Actor.Receive = { case any ⇒ context.actorOf(Props(new Actor { - - testActor ! Trace.context() - + testActor ! TraceRecorder.currentContext def receive: Actor.Receive = { case any ⇒ } })) } })) ! "any" + + TraceRecorder.currentContext } expectMsg(testTraceContext) } "keep the TraceContext in the supervision cycle" when { - "the actor is resumed" in new TraceContextFixture { + "the actor is resumed" in { val supervisor = supervisorWithDirective(Resume) - Trace.withContext(testTraceContext) { + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { supervisor ! "fail" + TraceRecorder.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -58,11 +59,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" expectMsg(None) } - "the actor is restarted" in new TraceContextFixture { + "the actor is restarted" in { val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - Trace.withContext(testTraceContext) { + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { supervisor ! "fail" + TraceRecorder.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -74,11 +76,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" expectMsg(None) } - "the actor is stopped" in new TraceContextFixture { + "the actor is stopped" in { val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - Trace.withContext(testTraceContext) { + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { supervisor ! "fail" + TraceRecorder.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -86,11 +89,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" expectNoMsg(1 second) } - "the failure is escalated" in new TraceContextFixture { + "the failure is escalated" in { val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - Trace.withContext(testTraceContext) { + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { supervisor ! "fail" + TraceRecorder.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -108,7 +112,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" val child = context.actorOf(Props(new Parent)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! Trace.context(); Stop + case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop } def receive = { @@ -120,7 +124,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" val child = context.actorOf(Props(new Child)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! Trace.context(); directive + case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive } def receive: Actor.Receive = { @@ -128,7 +132,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" } override def postStop(): Unit = { - if (sendPostStop) testActor ! Trace.context() + if (sendPostStop) testActor ! TraceRecorder.currentContext super.postStop() } } @@ -136,26 +140,26 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem(" class Child extends Actor { def receive = { case "fail" ⇒ 1 / 0 - case "context" ⇒ sender ! Trace.context() + case "context" ⇒ sender ! TraceRecorder.currentContext } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! Trace.context() + if (sendPreRestart) testActor ! TraceRecorder.currentContext super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! Trace.context() + if (sendPostRestart) testActor ! TraceRecorder.currentContext super.postRestart(reason) } override def postStop(): Unit = { - if (sendPostStop) testActor ! Trace.context() + if (sendPostStop) testActor ! TraceRecorder.currentContext super.postStop() } override def preStart(): Unit = { - if (sendPreStart) testActor ! Trace.context() + if (sendPreStart) testActor ! TraceRecorder.currentContext super.preStart() } } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala index 9df67391..0387386c 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala @@ -22,32 +22,30 @@ import akka.event.Logging.Warning import scala.concurrent.duration._ import akka.pattern.ask import akka.util.Timeout -import kamon.trace.{ Trace, ContextAware } +import kamon.trace.{TraceContextAware, TraceRecorder} import org.scalatest.OptionValues._ class AskPatternTracingSpec extends TestKit(ActorSystem("ask-pattern-tracing-spec")) with WordSpecLike with Matchers { "the AskPatternTracing" should { - "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in new TraceContextFixture { + "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { implicit val ec = system.dispatcher implicit val timeout = Timeout(10 milliseconds) val noReply = system.actorOf(Props[NoReply]) system.eventStream.subscribe(testActor, classOf[Warning]) - within(500 milliseconds) { - val initialCtx = Trace.withContext(testTraceContext) { - noReply ? "hello" - Trace.context() - } - - val warn = expectMsgPF() { - case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn - } - val capturedCtx = warn.asInstanceOf[ContextAware].traceContext + val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { + noReply ? "hello" + TraceRecorder.currentContext + } - capturedCtx should be('defined) - capturedCtx should equal(initialCtx) + val warn = expectMsgPF() { + case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn } + val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext + + capturedCtx should be('defined) + capturedCtx should equal(testTraceContext) } } } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala index a5554836..e6797148 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala @@ -15,42 +15,42 @@ * ========================================================== */ package kamon.trace.instrumentation -import scala.concurrent.{ ExecutionContext, Await, Promise, Future } -import org.scalatest.{ Matchers, OptionValues, WordSpec } +import scala.concurrent.{ ExecutionContext, Future } +import org.scalatest.{ Matchers, OptionValues, WordSpecLike } import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration } -import java.util.UUID -import scala.util.{ Random, Success } -import scala.concurrent.duration._ -import java.util.concurrent.TimeUnit -import akka.actor.{ Actor, ActorSystem } -import kamon.trace.{ Trace, TraceContext } +import kamon.trace.TraceRecorder +import akka.testkit.TestKit +import akka.actor.ActorSystem -class FutureTracingSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { +class FutureTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with Matchers + with ScalaFutures with PatienceConfiguration with OptionValues { - implicit val execContext = ExecutionContext.Implicits.global + implicit val execContext = system.dispatcher "a Future created with FutureTracing" should { "capture the TraceContext available when created" which { - "must be available when executing the future's body" in new TraceContextFixture { - var future: Future[Option[TraceContext]] = _ + "must be available when executing the future's body" in { - Trace.withContext(testTraceContext) { - future = Future(Trace.context) + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future(TraceRecorder.currentContext) + + (future, TraceRecorder.currentContext) } whenReady(future)(ctxInFuture ⇒ ctxInFuture should equal(testTraceContext)) } - "must be available when executing callbacks on the future" in new TraceContextFixture { - var future: Future[Option[TraceContext]] = _ + "must be available when executing callbacks on the future" in { - Trace.withContext(testTraceContext) { - future = Future("Hello Kamon!") + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future("Hello Kamon!") // The TraceContext is expected to be available during all intermediate processing. .map(_.length) .flatMap(len ⇒ Future(len.toString)) - .map(s ⇒ Trace.context()) + .map(s ⇒ TraceRecorder.currentContext) + + (future, TraceRecorder.currentContext) } whenReady(future)(ctxInFuture ⇒ diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala deleted file mode 100644 index 2df95d09..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala +++ /dev/null @@ -1,10 +0,0 @@ -package kamon.trace.instrumentation - -import scala.util.Random -import kamon.trace.TraceContext -import akka.actor.Actor - -trait TraceContextFixture { - val random = new Random(System.nanoTime) - val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt, "test", "test-1")) -} \ No newline at end of file -- cgit v1.2.3