aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-01-31 09:01:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-01-31 09:01:18 -0300
commit59c01d880379dfc48c6d82da13ef628a587a9bbb (patch)
treedd323caa93133a98da5f76be332dfdbf76280ea5 /kamon-core/src
parenta15e17d2462105ad8b72054be58dc9e8f9dc64ed (diff)
downloadKamon-59c01d880379dfc48c6d82da13ef628a587a9bbb.tar.gz
Kamon-59c01d880379dfc48c6d82da13ef628a587a9bbb.tar.bz2
Kamon-59c01d880379dfc48c6d82da13ef628a587a9bbb.zip
remake of trace context and allow different tracing levels
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/resources/reference.conf4
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala4
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Segments.scala39
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Trace.scala123
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala137
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceCtx.scala125
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala69
-rw-r--r--kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala2
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala12
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala35
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala58
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala26
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala38
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala10
18 files changed, 285 insertions, 419 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index e6124365..a6dd6c15 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -11,7 +11,7 @@ kamon {
},
{
trace {
- includes = [ "**" ]
+ includes = [ "*" ]
excludes = []
}
},
@@ -46,7 +46,7 @@ kamon {
highest-trackable-value = 3600000000000
significant-value-digits = 2
}
- segments {
+ segment {
highest-trackable-value = 3600000000000
significant-value-digits = 2
}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 98700974..199b2bb2 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -49,7 +49,7 @@ class BehaviourInvokeTracing {
val timestampBeforeProcessing = System.nanoTime()
val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
- TraceRecorder.withContext(contextAndTimestamp.traceContext) {
+ TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) {
pjp.proceed()
}
@@ -73,7 +73,7 @@ class BehaviourInvokeTracing {
class EnvelopeTraceContextMixin {
@DeclareMixin("akka.dispatch.Envelope")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
@Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
def envelopeCreation(ctx: TraceContextAware): Unit = {}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
index d4f8f769..7d03d946 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
@@ -3,13 +3,13 @@ package akka.instrumentation
import org.aspectj.lang.annotation._
import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
import org.aspectj.lang.ProceedingJoinPoint
-import kamon.trace.{TraceRecorder, TraceContextAware}
+import kamon.trace.{ TraceRecorder, TraceContextAware }
@Aspect
class SystemMessageTraceContextMixin {
@DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default
@Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)")
def envelopeCreation(ctx: TraceContextAware): Unit = {}
@@ -25,7 +25,7 @@ class SystemMessageTraceContextMixin {
class RepointableActorRefTraceContextMixin {
@DeclareMixin("akka.actor.RepointableActorRef")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default
@Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)")
def envelopeCreation(ctx: TraceContextAware): Unit = {}
@@ -41,7 +41,7 @@ class RepointableActorRefTraceContextMixin {
@Around("repointableActorRefCreation(repointableActorRef)")
def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = {
- TraceRecorder.withContext(repointableActorRef.traceContext) {
+ TraceRecorder.withTraceContext(repointableActorRef.traceContext) {
pjp.proceed()
}
}
@@ -58,7 +58,7 @@ class ActorSystemMessagePassingTracing {
def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
if (messages.nonEmpty) {
val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext
- TraceRecorder.withContext(ctx)(pjp.proceed())
+ TraceRecorder.withTraceContext(ctx)(pjp.proceed())
} else pjp.proceed()
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
index abd3514e..954f351a 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
@@ -23,14 +23,14 @@ import kamon.trace.{ TraceContextAware, TraceRecorder }
class ActorLoggingTracing {
@DeclareMixin("akka.event.Logging.LogEvent+")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default
@Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {}
@Around("withMdcInvocation(logSource, logEvent, logStatement)")
def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {
- TraceRecorder.withContext(logEvent.traceContext) {
+ TraceRecorder.withTraceContext(logEvent.traceContext) {
pjp.proceed()
}
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
index b8725dd7..634c94a1 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
@@ -23,7 +23,7 @@ import kamon.trace.{ TraceContextAware, TraceRecorder }
class FutureTracing {
@DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default
@Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)")
def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {}
@@ -39,7 +39,7 @@ class FutureTracing {
@Around("futureRelatedRunnableExecution(runnable)")
def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = {
- TraceRecorder.withContext(runnable.traceContext) {
+ TraceRecorder.withTraceContext(runnable.traceContext) {
pjp.proceed()
}
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
index 25ebce00..57a79653 100644
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
@@ -57,9 +57,7 @@ object TraceMetrics extends MetricGroupIdentity.Category with MetricGroupFactory
val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time"))
val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment"))
- new TraceMetricRecorder(
- HighDynamicRangeRecorder(elapsedTimeHdrConfig),
- () ⇒ HighDynamicRangeRecorder(segmentHdrConfig))
+ new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig))
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala
deleted file mode 100644
index e6d9745b..00000000
--- a/kamon-core/src/main/scala/kamon/trace/Segments.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-
-package kamon.trace
-
-
-import kamon.trace.TraceOld.SegmentCompletionHandle
-
-object Segments {
-
- trait Category
- case object HttpClientRequest extends Category
-
- case class Start(category: Category, description: String = "",
- attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
-
- case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
-
- case class Segment(start: Start, end: End)
-
- trait SegmentCompletionHandleAware {
- var completionHandle: Option[SegmentCompletionHandle]
- }
-
- trait ContextAndSegmentCompletionAware extends TraceContextAware with SegmentCompletionHandleAware
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala
deleted file mode 100644
index bdfd6aa3..00000000
--- a/kamon-core/src/main/scala/kamon/trace/Trace.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.trace
-
-import kamon.Kamon
-import akka.actor._
-import scala.Some
-import kamon.trace.TraceOld.Register
-import scala.concurrent.duration._
-import java.util.concurrent.atomic.AtomicLong
-import scala.util.Try
-import java.net.InetAddress
-
-object TraceOld extends ExtensionId[TraceExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: Extension] = TraceOld
- def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
-
- /*** Protocol */
- case object Register
-
- /** User API */
- //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None)
- private[trace] val traceContext = new ThreadLocal[Option[TraceContextOld]] {
- override def initialValue(): Option[TraceContextOld] = None
- }
- private[trace] val tranid = new AtomicLong()
-
- def context() = traceContext.get
- private def set(ctx: Option[TraceContextOld]) = traceContext.set(ctx)
-
- def clear: Unit = traceContext.remove()
- def start(name: String, token: Option[String])(implicit system: ActorSystem): TraceContextOld = {
- val ctx = newTraceContext(name, token.getOrElse(TraceToken.generate()))
- ctx.start(name)
- set(Some(ctx))
-
- ctx
- }
-
- def withContext[T](ctx: Option[TraceContextOld])(thunk: ⇒ T): T = {
- val oldval = context
- set(ctx)
-
- try thunk
- finally set(oldval)
- }
-
- def transformContext(f: TraceContextOld ⇒ TraceContextOld): Unit = {
- context.map(f).foreach(ctx ⇒ set(Some(ctx)))
- }
-
- def finish(): Option[TraceContextOld] = {
- val ctx = context()
- ctx.map(_.finish)
- clear
- ctx
- }
-
- // TODO: FIX
- def newTraceContext(name: String, token: String)(implicit system: ActorSystem): TraceContextOld = TraceContextOld(Kamon(TraceOld).api, tranid.getAndIncrement, name, token)
-
- def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = {
- val start = Segments.Start(category, description, attributes)
- SegmentCompletionHandle(start)
- }
-
- def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start)
-
- case class SegmentCompletionHandle(start: Segments.Start) {
- def complete(): Unit = {
- val end = Segments.End()
- //println(s"Completing the Segment: $start - $end")
- }
- def complete(end: Segments.End): Unit = {
- //println(s"Completing the Segment: $start - $end")
- }
- }
-}
-
-object TraceToken {
- val tokenCounter = new AtomicLong
- val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
-
- def generate(): String = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet())
-}
-
-class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace")
-}
-
-class TraceManager extends Actor with ActorLogging {
- var listeners: Seq[ActorRef] = Seq.empty
-
- def receive = {
- case Register ⇒
- listeners = sender +: listeners
- log.info("Registered [{}] as listener for Kamon traces", sender)
-
- case segment: UowSegment ⇒
- val tracerName = segment.id.toString
- context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment
-
- case trace: UowTrace ⇒
- listeners foreach (_ ! trace)
- }
-
- def newTracer(name: String): ActorRef = {
- context.actorOf(UowTraceAggregator.props(self, 30 seconds), name)
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 95a3a8b2..d3759a26 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -1,36 +1,129 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.trace
-import java.util.UUID
-import akka.actor._
-import java.util.concurrent.atomic.AtomicLong
-import scala.concurrent.duration._
+import akka.actor.ActorSystem
import kamon.Kamon
-import kamon.trace.UowTracing.{ Finish, Start }
+import kamon.metrics.{ MetricGroupRecorder, MetricIdentity, TraceMetrics, Metrics }
+import java.util.concurrent.ConcurrentLinkedQueue
+import kamon.trace.TraceContextAware.DefaultTraceContextAware
+import kamon.trace.TraceContext.SegmentIdentity
+
+trait TraceContext {
+ def name: String
+ def token: String
+ def system: ActorSystem
+ def rename(name: String): Unit
+ def levelOfDetail: TracingLevelOfDetail
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
+ def finish(metadata: Map[String, String])
+}
+
+object TraceContext {
+ type SegmentIdentity = MetricIdentity
+}
+
+trait SegmentCompletionHandle {
+ def finish(metadata: Map[String, String])
+}
+
+case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])
-// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
-case class TraceContextOld(private val collector: ActorRef, id: Long, name: String, token: String, userContext: Option[Any] = None) {
+sealed trait TracingLevelOfDetail
+case object OnlyMetrics extends TracingLevelOfDetail
+case object SimpleTrace extends TracingLevelOfDetail
+case object FullTrace extends TracingLevelOfDetail
- def start(name: String) = {
- collector ! Start(id, name)
+trait TraceContextAware {
+ def captureMark: Long
+ def traceContext: Option[TraceContext]
+}
+
+object TraceContextAware {
+ def default: TraceContextAware = new DefaultTraceContextAware
+
+ class DefaultTraceContextAware extends TraceContextAware {
+ val captureMark = System.nanoTime()
+ val traceContext = TraceRecorder.currentContext
}
+}
- def finish: Unit = {
- collector ! Finish(id)
+trait SegmentCompletionHandleAware extends TraceContextAware {
+ @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
+}
+
+object SegmentCompletionHandleAware {
+ def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware
+
+ class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
+}
+
+class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String],
+ val system: ActorSystem) extends TraceContext {
+ @volatile private var _isOpen = true
+ val levelOfDetail = OnlyMetrics
+ val startMark = System.nanoTime()
+ val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
+ val metricsExtension = Kamon(Metrics)(system)
+
+ def name: String = _name
+
+ def rename(newName: String): Unit = _name = newName
+
+ def isOpen(): Boolean = _isOpen
+
+ def finish(metadata: Map[String, String]): Unit = {
+ _isOpen = false
+ val finishMark = System.nanoTime()
+ val metricRecorder = metricsExtension.register(name, TraceMetrics)
+
+ metricRecorder.map { traceMetrics ⇒
+ traceMetrics.elapsedTime.record(finishMark - startMark)
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+
+ private def drainFinishedSegments(metricRecorder: MetricGroupRecorder): Unit = {
+ while (!finishedSegments.isEmpty) {
+ val segmentData = finishedSegments.poll()
+ metricRecorder.record(segmentData.identity, segmentData.duration)
+ }
}
+ private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
+ finishedSegments.add(SegmentData(identity, duration, metadata))
+
+ if (!_isOpen) {
+ metricsExtension.register(name, TraceMetrics).map { traceMetrics ⇒
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+ }
+
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
+ new SimpleMetricCollectionCompletionHandle(identity, metadata)
+
+ class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
+ val segmentStartMark = System.nanoTime()
+
+ def finish(metadata: Map[String, String] = Map.empty): Unit = {
+ val segmentFinishMark = System.nanoTime()
+ finishSegment(identity, (segmentFinishMark - segmentStartMark), startMetadata ++ metadata)
+ }
+ }
}
+
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala b/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala
deleted file mode 100644
index 1e552563..00000000
--- a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import akka.actor.{ExtendedActorSystem, ActorSystem}
-import akka.dispatch.AbstractNodeQueue
-import kamon.Kamon
-import kamon.metrics.{TraceMetrics, Metrics}
-import java.util.concurrent.atomic.AtomicLong
-
-sealed trait TracingLevelOfDetail
-case object OnlyMetrics extends TracingLevelOfDetail
-case object SimpleTrace extends TracingLevelOfDetail
-case object FullTrace extends TracingLevelOfDetail
-
-trait TraceContext {
- def token: String
- def name: String
- def rename(newName: String): Unit
- def levelOfDetail: TracingLevelOfDetail
- def startSegment
- def finish(metadata: Map[String, String])
-
-}
-
-trait TraceContextAware {
- def captureMark: Long
- def traceContext: Option[TraceContext]
-}
-
-object TraceContextAware {
- def default: TraceContextAware = new TraceContextAware {
- val captureMark = System.nanoTime()
- val traceContext = TraceRecorder.currentContext
- }
-}
-
-object TraceContext {
-
-}
-
-class SimpleMetricCollectionContext(val token: String, @volatile private var _name: String, val system: ActorSystem,
- metadata: Map[String, String]) extends TraceContext {
- val levelOfDetail = OnlyMetrics
-
- @volatile private var _isOpen = true
-
- val startMark = System.nanoTime()
-
- def name: String = _name
-
- def rename(newName: String): Unit = _name = newName
-
- def isOpen(): Boolean = _isOpen
-
- def finish(metadata: Map[String, String]): Unit = {
- val finishMark = System.nanoTime()
-
- // Store all metrics!
- val metricsExtension = Kamon(Metrics)(system)
- val metricRecorder = metricsExtension.register(name, TraceMetrics)
-
- metricRecorder.map { traceMetrics =>
- traceMetrics.elapsedTime.record(finishMark - startMark)
- }
-
- }
-
- override def startSegment: Unit = ???
-
-
-}
-
-private[kamon] class SegmentRecordingQueue extends AbstractNodeQueue[String]
-
-
-
-
-class TraceRecorder(system: ExtendedActorSystem) {
-
-}
-
-object TraceRecorder {
- private val tokenCounter = new AtomicLong
- private val traceContextStorage = new ThreadLocal[Option[TraceContext]] {
- override def initialValue(): Option[TraceContext] = None
- }
-
- private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], system: ActorSystem): TraceContext = ???
-
- def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context)
-
- def clearContext: Unit = traceContextStorage.set(None)
-
- def currentContext: Option[TraceContext] = traceContextStorage.get()
-
- def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = {
- val ctx = newTraceContext(name, token, metadata, system)
- traceContextStorage.set(Some(ctx))
- }
-
- def withContext[T](context: Option[TraceContext])(thunk: => T): T = {
- val oldContext = currentContext
- setContext(context)
-
- try thunk finally setContext(oldContext)
- }
-
- def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata))
-
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
new file mode 100644
index 00000000..3e3bb19f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -0,0 +1,69 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.util.concurrent.atomic.AtomicLong
+import scala.util.Try
+import java.net.InetAddress
+import akka.actor.ActorSystem
+import kamon.trace.TraceContext.SegmentIdentity
+
+object TraceRecorder {
+ private val traceContextStorage = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue(): Option[TraceContext] = None
+ }
+
+ private val tokenCounter = new AtomicLong
+ private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
+
+ def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet())
+
+ private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String],
+ system: ActorSystem): TraceContext = {
+
+ // In the future this should select between implementations.
+ val finalToken = token.getOrElse(newToken)
+ new SimpleMetricCollectionContext(name, finalToken, metadata, system)
+ }
+
+ def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context)
+
+ def clearContext: Unit = traceContextStorage.set(None)
+
+ def currentContext: Option[TraceContext] = traceContextStorage.get()
+
+ def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = {
+ val ctx = newTraceContext(name, token, metadata, system)
+ traceContextStorage.set(Some(ctx))
+ }
+
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] =
+ currentContext.map(_.startSegment(identity, metadata))
+
+ def withNewTraceContext[T](name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(thunk: ⇒ T)(implicit system: ActorSystem): T =
+ withTraceContext(Some(newTraceContext(name, token, metadata, system)))(thunk)
+
+ def withTraceContext[T](context: Option[TraceContext])(thunk: ⇒ T): T = {
+ val oldContext = currentContext
+ setContext(context)
+
+ try thunk finally setContext(oldContext)
+ }
+
+ def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata))
+
+}
diff --git a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala
index c681e921..a050145a 100644
--- a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala
+++ b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala
@@ -46,7 +46,7 @@ class TestProbeTracing {
case _ ⇒ None
}
- TraceRecorder.withContext(traceContext) {
+ TraceRecorder.withTraceContext(traceContext) {
pjp.proceed
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
index d6648cef..a0d8e933 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
@@ -18,24 +18,24 @@ package kamon.trace.instrumentation
import akka.testkit.TestKit
import org.scalatest.{ Inspectors, Matchers, WordSpecLike }
import akka.actor.{ Props, ActorLogging, Actor, ActorSystem }
-import akka.event.Logging.{ LogEvent }
-import kamon.trace.{ ContextAware, TraceContext, Trace }
+import akka.event.Logging.LogEvent
+import kamon.trace.{TraceContextAware, TraceRecorder}
class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors {
"the ActorLogging instrumentation" should {
"attach the TraceContext (if available) to log events" in {
- val testTraceContext = Some(TraceContext(Actor.noSender, 1, "test", "test-1"))
val loggerActor = system.actorOf(Props[LoggerActor])
system.eventStream.subscribe(testActor, classOf[LogEvent])
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("logging") {
loggerActor ! "info"
+ TraceRecorder.currentContext
}
fishForMessage() {
case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒
- val ctxInEvent = event.asInstanceOf[ContextAware].traceContext
+ val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext
ctxInEvent === testTraceContext
case event: LogEvent ⇒ false
@@ -46,6 +46,6 @@ class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with W
class LoggerActor extends Actor with ActorLogging {
def receive = {
- case "info" ⇒ log.info("TraceContext => {}", Trace.context())
+ case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext)
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
index f32623b9..acc939fb 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
@@ -15,70 +15,71 @@
* ========================================================== */
package kamon.trace.instrumentation
-import org.scalatest.{ WordSpecLike, Matchers }
-import akka.actor.{ ActorRef, Actor, Props, ActorSystem }
+import org.scalatest.WordSpecLike
+import akka.actor.{ Actor, Props, ActorSystem }
import akka.testkit.{ ImplicitSender, TestKit }
-import kamon.trace.Trace
+import kamon.trace.TraceRecorder
import akka.pattern.{ pipe, ask }
import akka.util.Timeout
import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
import akka.routing.RoundRobinRouter
-import kamon.trace.TraceContext
class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender {
implicit val executionContext = system.dispatcher
"the message passing instrumentation" should {
- "propagate the TraceContext using bang" in new TraceContextEchoFixture {
- Trace.withContext(testTraceContext) {
+ "propagate the TraceContext using bang" in new EchoActorFixture {
+ val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") {
ctxEchoActor ! "test"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "propagate the TraceContext using tell" in new TraceContextEchoFixture {
- Trace.withContext(testTraceContext) {
+ "propagate the TraceContext using tell" in new EchoActorFixture {
+ val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") {
ctxEchoActor.tell("test", testActor)
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "propagate the TraceContext using ask" in new TraceContextEchoFixture {
+ "propagate the TraceContext using ask" in new EchoActorFixture {
implicit val timeout = Timeout(1 seconds)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") {
// The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it.
(ctxEchoActor ? "test") pipeTo (testActor)
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture {
- Trace.withContext(testTraceContext) {
+ "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture {
+ val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") {
ctxEchoActor ! "test"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
}
- trait TraceContextEchoFixture {
- val testTraceContext = Some(Trace.newTraceContext("test", "test-1"))
+ trait EchoActorFixture {
val ctxEchoActor = system.actorOf(Props[TraceContextEcho])
}
- trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture {
+ trait RoutedEchoActorFixture extends EchoActorFixture {
override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1)))
}
}
class TraceContextEcho extends Actor {
def receive = {
- case msg: String ⇒ sender ! Trace.context()
+ case msg: String ⇒ sender ! TraceRecorder.currentContext
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
index 7d539370..00ecae79 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
@@ -3,7 +3,7 @@ package kamon.trace.instrumentation
import akka.testkit.{ ImplicitSender, TestKit }
import akka.actor._
import org.scalatest.WordSpecLike
-import kamon.trace.Trace
+import kamon.trace.TraceRecorder
import scala.util.control.NonFatal
import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume }
import scala.concurrent.duration._
@@ -12,43 +12,44 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
implicit val executionContext = system.dispatcher
"the system message passing instrumentation" should {
- "keep the TraceContext while processing the Create message in top level actors" in new TraceContextFixture {
- Trace.withContext(testTraceContext) {
+ "keep the TraceContext while processing the Create message in top level actors" in {
+ val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") {
system.actorOf(Props(new Actor {
-
- testActor ! Trace.context()
-
+ testActor ! TraceRecorder.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
+
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "keep the TraceContext while processing the Create message in non top level actors" in new TraceContextFixture {
- Trace.withContext(testTraceContext) {
+ "keep the TraceContext while processing the Create message in non top level actors" in {
+ val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") {
system.actorOf(Props(new Actor {
def receive: Actor.Receive = {
case any ⇒
context.actorOf(Props(new Actor {
-
- testActor ! Trace.context()
-
+ testActor ! TraceRecorder.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
}
})) ! "any"
+
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
"keep the TraceContext in the supervision cycle" when {
- "the actor is resumed" in new TraceContextFixture {
+ "the actor is resumed" in {
val supervisor = supervisorWithDirective(Resume)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -58,11 +59,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
expectMsg(None)
}
- "the actor is restarted" in new TraceContextFixture {
+ "the actor is restarted" in {
val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -74,11 +76,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
expectMsg(None)
}
- "the actor is stopped" in new TraceContextFixture {
+ "the actor is stopped" in {
val supervisor = supervisorWithDirective(Stop, sendPostStop = true)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -86,11 +89,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
expectNoMsg(1 second)
}
- "the failure is escalated" in new TraceContextFixture {
+ "the failure is escalated" in {
val supervisor = supervisorWithDirective(Escalate, sendPostStop = true)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -108,7 +112,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
val child = context.actorOf(Props(new Parent))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case NonFatal(throwable) ⇒ testActor ! Trace.context(); Stop
+ case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop
}
def receive = {
@@ -120,7 +124,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
val child = context.actorOf(Props(new Child))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case NonFatal(throwable) ⇒ testActor ! Trace.context(); directive
+ case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive
}
def receive: Actor.Receive = {
@@ -128,7 +132,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
}
override def postStop(): Unit = {
- if (sendPostStop) testActor ! Trace.context()
+ if (sendPostStop) testActor ! TraceRecorder.currentContext
super.postStop()
}
}
@@ -136,26 +140,26 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
class Child extends Actor {
def receive = {
case "fail" ⇒ 1 / 0
- case "context" ⇒ sender ! Trace.context()
+ case "context" ⇒ sender ! TraceRecorder.currentContext
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
- if (sendPreRestart) testActor ! Trace.context()
+ if (sendPreRestart) testActor ! TraceRecorder.currentContext
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
- if (sendPostRestart) testActor ! Trace.context()
+ if (sendPostRestart) testActor ! TraceRecorder.currentContext
super.postRestart(reason)
}
override def postStop(): Unit = {
- if (sendPostStop) testActor ! Trace.context()
+ if (sendPostStop) testActor ! TraceRecorder.currentContext
super.postStop()
}
override def preStart(): Unit = {
- if (sendPreStart) testActor ! Trace.context()
+ if (sendPreStart) testActor ! TraceRecorder.currentContext
super.preStart()
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
index 9df67391..0387386c 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
@@ -22,32 +22,30 @@ import akka.event.Logging.Warning
import scala.concurrent.duration._
import akka.pattern.ask
import akka.util.Timeout
-import kamon.trace.{ Trace, ContextAware }
+import kamon.trace.{TraceContextAware, TraceRecorder}
import org.scalatest.OptionValues._
class AskPatternTracingSpec extends TestKit(ActorSystem("ask-pattern-tracing-spec")) with WordSpecLike with Matchers {
"the AskPatternTracing" should {
- "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in new TraceContextFixture {
+ "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in {
implicit val ec = system.dispatcher
implicit val timeout = Timeout(10 milliseconds)
val noReply = system.actorOf(Props[NoReply])
system.eventStream.subscribe(testActor, classOf[Warning])
- within(500 milliseconds) {
- val initialCtx = Trace.withContext(testTraceContext) {
- noReply ? "hello"
- Trace.context()
- }
-
- val warn = expectMsgPF() {
- case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn
- }
- val capturedCtx = warn.asInstanceOf[ContextAware].traceContext
+ val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") {
+ noReply ? "hello"
+ TraceRecorder.currentContext
+ }
- capturedCtx should be('defined)
- capturedCtx should equal(initialCtx)
+ val warn = expectMsgPF() {
+ case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn
}
+ val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext
+
+ capturedCtx should be('defined)
+ capturedCtx should equal(testTraceContext)
}
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
index a5554836..e6797148 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
@@ -15,42 +15,42 @@
* ========================================================== */
package kamon.trace.instrumentation
-import scala.concurrent.{ ExecutionContext, Await, Promise, Future }
-import org.scalatest.{ Matchers, OptionValues, WordSpec }
+import scala.concurrent.{ ExecutionContext, Future }
+import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration }
-import java.util.UUID
-import scala.util.{ Random, Success }
-import scala.concurrent.duration._
-import java.util.concurrent.TimeUnit
-import akka.actor.{ Actor, ActorSystem }
-import kamon.trace.{ Trace, TraceContext }
+import kamon.trace.TraceRecorder
+import akka.testkit.TestKit
+import akka.actor.ActorSystem
-class FutureTracingSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
+class FutureTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with Matchers
+ with ScalaFutures with PatienceConfiguration with OptionValues {
- implicit val execContext = ExecutionContext.Implicits.global
+ implicit val execContext = system.dispatcher
"a Future created with FutureTracing" should {
"capture the TraceContext available when created" which {
- "must be available when executing the future's body" in new TraceContextFixture {
- var future: Future[Option[TraceContext]] = _
+ "must be available when executing the future's body" in {
- Trace.withContext(testTraceContext) {
- future = Future(Trace.context)
+ val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
+ val future = Future(TraceRecorder.currentContext)
+
+ (future, TraceRecorder.currentContext)
}
whenReady(future)(ctxInFuture ⇒
ctxInFuture should equal(testTraceContext))
}
- "must be available when executing callbacks on the future" in new TraceContextFixture {
- var future: Future[Option[TraceContext]] = _
+ "must be available when executing callbacks on the future" in {
- Trace.withContext(testTraceContext) {
- future = Future("Hello Kamon!")
+ val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
+ val future = Future("Hello Kamon!")
// The TraceContext is expected to be available during all intermediate processing.
.map(_.length)
.flatMap(len ⇒ Future(len.toString))
- .map(s ⇒ Trace.context())
+ .map(s ⇒ TraceRecorder.currentContext)
+
+ (future, TraceRecorder.currentContext)
}
whenReady(future)(ctxInFuture ⇒
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala
deleted file mode 100644
index 2df95d09..00000000
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package kamon.trace.instrumentation
-
-import scala.util.Random
-import kamon.trace.TraceContext
-import akka.actor.Actor
-
-trait TraceContextFixture {
- val random = new Random(System.nanoTime)
- val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt, "test", "test-1"))
-} \ No newline at end of file