From ea1a0d5d76988992227eb30b0baaf8e97678c946 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sat, 25 Oct 2014 04:45:21 +0200 Subject: ! core: replace Option[TraceContext] by empty object pattern and implement basic segments with renaming. --- .../src/main/scala/kamon/metric/TraceMetrics.scala | 2 +- .../src/main/scala/kamon/trace/TraceContext.scala | 192 +++++++++++---------- .../src/main/scala/kamon/trace/TraceLocal.scala | 14 +- .../src/main/scala/kamon/trace/TraceRecorder.scala | 53 +++--- .../trace/logging/LogbackTraceTokenConverter.scala | 8 +- .../ActorSystemMessageInstrumentationSpec.scala | 6 +- .../akka/AskPatternInstrumentationSpec.scala | 1 - .../test/scala/kamon/metric/TraceMetricsSpec.scala | 21 +-- .../kamon/trace/TraceContextManipulationSpec.scala | 24 +-- .../can/client/ClientRequestInstrumentation.scala | 18 +- 10 files changed, 179 insertions(+), 160 deletions(-) diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 10dbcc01..54626b6c 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -35,7 +35,7 @@ object TraceMetrics extends MetricGroupCategory { case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) extends MetricGroupRecorder { - private val segments = TrieMap[MetricIdentity, Histogram]() + val segments = TrieMap[MetricIdentity, Histogram]() def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 64ee70be..a5855308 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -23,113 +23,67 @@ import kamon.Kamon import kamon.metric._ import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.trace.TraceContext.SegmentIdentity import kamon.metric.TraceMetrics.TraceMetricRecorder import scala.annotation.tailrec -trait TraceContext { +sealed trait TraceContext { def name: String def token: String - def system: ActorSystem def rename(name: String): Unit - def levelOfDetail: TracingLevelOfDetail - def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle - def finish(metadata: Map[String, String]) + def finish(): Unit def origin: TraceContextOrigin - def startMilliTime: Long def isOpen: Boolean - - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage -} - -object TraceContext { - type SegmentIdentity = MetricIdentity -} - -trait SegmentCompletionHandle { - def finish(metadata: Map[String, String] = Map.empty) + def isEmpty: Boolean + def startSegment(segmentName: String, label: String): Segment } -case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String]) - -sealed trait TracingLevelOfDetail -case object OnlyMetrics extends TracingLevelOfDetail -case object SimpleTrace extends TracingLevelOfDetail -case object FullTrace extends TracingLevelOfDetail - -sealed trait TraceContextOrigin -object TraceContextOrigin { - case object Local extends TraceContextOrigin - case object Remote extends TraceContextOrigin -} - -trait TraceContextAware extends Serializable { - def captureNanoTime: Long - def traceContext: Option[TraceContext] +sealed trait Segment { + def name: String + def rename(newName: String): Unit + def label: String + def finish(): Unit } -object TraceContextAware { - def default: TraceContextAware = new DefaultTraceContextAware - - class DefaultTraceContextAware extends TraceContextAware { - @transient val captureNanoTime = System.nanoTime() - @transient val traceContext = TraceRecorder.currentContext - - // - // Beware of this hack, it might bite us in the future! - // - // When using remoting/cluster all messages carry the TraceContext in the envelope in which they - // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is - // available (if any) when the system messages are read and this will make sure that it is correctly - // captured and propagated. - @throws[ObjectStreamException] - private def readResolve: AnyRef = { - new DefaultTraceContextAware - } +case object EmptyTraceContext extends TraceContext { + def name: String = "empty-trace" + def token: String = "" + def rename(name: String): Unit = {} + def finish(): Unit = {} + def origin: TraceContextOrigin = TraceContextOrigin.Local + def isOpen: Boolean = false + def isEmpty: Boolean = true + def startSegment(segmentName: String, label: String): Segment = EmptySegment + + case object EmptySegment extends Segment { + val name: String = "empty-segment" + val label: String = "empty-label" + def rename(newName: String): Unit = {} + def finish: Unit = {} } } -trait SegmentCompletionHandleAware extends TraceContextAware { - @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None -} - -object SegmentCompletionHandleAware { - def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware - - class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {} -} - -class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String], - val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis, - izOpen: Boolean = true) extends TraceContext { +class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, + val origin: TraceContextOrigin, startNanoTime: Long)(implicit system: ActorSystem) extends TraceContext { + val isEmpty: Boolean = false @volatile private var _name = traceName @volatile private var _isOpen = izOpen - val levelOfDetail = OnlyMetrics - val startNanoTime = System.nanoTime() - val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() - val metricsExtension = Kamon(Metrics)(system) + private val _startNanoTime = startNanoTime + private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() + private val metricsExtension = Kamon(Metrics)(system) + private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage def name: String = _name + def rename(newName: String): Unit = + if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. - def rename(newName: String): Unit = _name = newName - - def isOpen(): Boolean = _isOpen + def isOpen: Boolean = _isOpen - def finish(metadata: Map[String, String]): Unit = { + def finish(): Unit = { _isOpen = false - - val elapsedNanoTime = - if (origin == TraceContextOrigin.Local) - // Everything is local, nanoTime is still the best resolution we can use. - System.nanoTime() - startNanoTime - else - // For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds - // to be consistent with unit used for all latency measurements. - (System.currentTimeMillis() - startMilliTime) * 1000000L - + val elapsedNanoTime = System.nanoTime() - _startNanoTime val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ @@ -138,6 +92,8 @@ class SimpleMetricCollectionContext(traceName: String, val token: String, metada } } + def startSegment(segmentName: String, segmentLabel: String): Segment = new DefaultSegment(segmentName, segmentLabel) + @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { val segment = finishedSegments.poll() if (segment != null) { @@ -146,8 +102,8 @@ class SimpleMetricCollectionContext(traceName: String, val token: String, metada } } - private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = { - finishedSegments.add(SegmentData(identity, duration, metadata)) + private def finishSegment(segmentName: String, label: String, duration: Long): Unit = { + finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, label), duration)) if (!_isOpen) { metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ @@ -156,16 +112,72 @@ class SimpleMetricCollectionContext(traceName: String, val token: String, metada } } - def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle = - new SimpleMetricCollectionCompletionHandle(identity, metadata) + class DefaultSegment(segmentName: String, val label: String) extends Segment { + private val _segmentStartNanoTime = System.nanoTime() + @volatile private var _segmentName = segmentName + @volatile private var _isOpen = true - class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle { - val segmentStartNanoTime = System.nanoTime() + def name: String = _segmentName + def rename(newName: String): Unit = _segmentName = newName - def finish(metadata: Map[String, String] = Map.empty): Unit = { + def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() - finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata) + finishSegment(segmentName, label, (segmentFinishNanoTime - _segmentStartNanoTime)) } } } +case class SegmentMetricIdentity(name: String, label: String) extends MetricIdentity +case class SegmentData(identity: SegmentMetricIdentity, duration: Long) + +object SegmentMetricIdentityLabel { + val HttpClient = "http-client" +} + +sealed trait LevelOfDetail +object LevelOfDetail { + case object OnlyMetrics extends LevelOfDetail + case object SimpleTrace extends LevelOfDetail + case object FullTrace extends LevelOfDetail +} + +sealed trait TraceContextOrigin +object TraceContextOrigin { + case object Local extends TraceContextOrigin + case object Remote extends TraceContextOrigin +} + +trait TraceContextAware extends Serializable { + def captureNanoTime: Long + def traceContext: TraceContext +} + +object TraceContextAware { + def default: TraceContextAware = new DefaultTraceContextAware + + class DefaultTraceContextAware extends TraceContextAware { + @transient val captureNanoTime = System.nanoTime() + @transient val traceContext = TraceRecorder.currentContext + + // + // Beware of this hack, it might bite us in the future! + // + // When using remoting/cluster all messages carry the TraceContext in the envelope in which they + // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is + // available (if any) when the system messages are read and this will make sure that it is correctly + // captured and propagated. + @throws[ObjectStreamException] + private def readResolve: AnyRef = { + new DefaultTraceContextAware + } + } +} + +trait SegmentAware extends TraceContextAware { + @volatile var segment: Segment = EmptyTraceContext.EmptySegment +} + +object SegmentAware { + def default: SegmentAware = new DefaultSegmentAware + class DefaultSegmentAware extends DefaultTraceContextAware with SegmentAware {} +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index 3ff074b6..0766af74 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -24,18 +24,20 @@ object TraceLocal { type ValueType } - def store(key: TraceLocalKey)(value: key.ValueType): Unit = - TraceRecorder.currentContext.map(_.traceLocalStorage.store(key)(value)) - - def retrieve(key: TraceLocalKey): Option[key.ValueType] = - TraceRecorder.currentContext.flatMap(_.traceLocalStorage.retrieve(key)) + def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. + } + def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + } } class TraceLocalStorage { val underlyingStorage = TrieMap[TraceLocal.TraceLocalKey, Any]() def store(key: TraceLocalKey)(value: key.ValueType): Unit = underlyingStorage.put(key, value) - def retrieve(key: TraceLocalKey): Option[key.ValueType] = underlyingStorage.get(key).map(_.asInstanceOf[key.ValueType]) } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 778edc42..9b0ba038 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -23,65 +23,64 @@ import kamon.macros.InlineTraceContextMacro import scala.util.Try import java.net.InetAddress import akka.actor.ActorSystem -import kamon.trace.TraceContext.SegmentIdentity object TraceRecorder { - private val traceContextStorage = new ThreadLocal[Option[TraceContext]] { - override def initialValue(): Option[TraceContext] = None + private val traceContextStorage = new ThreadLocal[TraceContext] { + override def initialValue(): TraceContext = EmptyTraceContext } private val tokenCounter = new AtomicLong private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) + def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], - system: ActorSystem): TraceContext = { - - // In the future this should select between implementations. - val finalToken = token.getOrElse(newToken) - new SimpleMetricCollectionContext(name, finalToken, metadata, TraceContextOrigin.Local, system) + private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { + new DefaultTraceContext( + name, token.getOrElse(newToken), + izOpen = true, + LevelOfDetail.OnlyMetrics, + TraceContextOrigin.Local, + startNanoTime = System.nanoTime)(system) } def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - new SimpleMetricCollectionContext( + /*new SimpleMetricCollectionContext( traceName, traceToken, Map.empty, TraceContextOrigin.Remote, system, startMilliTime, - isOpen) + isOpen)*/ + ??? } - def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context) + def setContext(context: TraceContext): Unit = traceContextStorage.set(context) - def clearContext: Unit = traceContextStorage.set(None) + def clearContext: Unit = traceContextStorage.set(EmptyTraceContext) - def currentContext: Option[TraceContext] = traceContextStorage.get() + def currentContext: TraceContext = traceContextStorage.get() - def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = { - val ctx = newTraceContext(name, token, metadata, system) - traceContextStorage.set(Some(ctx)) + // TODO: Remove this method. + def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = { + //val ctx = newTraceContext(name, token, metadata, system) + //traceContextStorage.set(Some(ctx)) } - def startSegment(identity: SegmentIdentity, metadata: Map[String, String] = Map.empty): Option[SegmentCompletionHandle] = - currentContext.map(_.startSegment(identity, metadata)) - - def rename(name: String): Unit = currentContext.map(_.rename(name)) + def rename(name: String): Unit = currentContext.rename(name) - def withNewTraceContext[T](name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(thunk: ⇒ T)(implicit system: ActorSystem): T = - withTraceContext(Some(newTraceContext(name, token, metadata, system)))(thunk) + def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T = + withTraceContext(newTraceContext(name, token, system))(thunk) - def withTraceContext[T](context: Option[TraceContext])(thunk: ⇒ T): T = { + def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = { val oldContext = currentContext setContext(context) try thunk finally setContext(oldContext) } - def withInlineTraceContextReplacement[T](traceCtx: Option[TraceContext])(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, Option[TraceContext]] + def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] - def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata)) + def finish(): Unit = currentContext.finish() } diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index 4b7dbb28..f052f009 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -20,5 +20,11 @@ import ch.qos.logback.classic.spi.ILoggingEvent import kamon.trace.TraceRecorder class LogbackTraceTokenConverter extends ClassicConverter { - def convert(event: ILoggingEvent): String = TraceRecorder.currentContext.map(_.token).getOrElse("undefined") + def convert(event: ILoggingEvent): String = { + val ctx = TraceRecorder.currentContext + if (ctx.isEmpty) + "undefined" + else + ctx.token + } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala index cb39f5e6..d79ccbe0 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala @@ -3,7 +3,7 @@ package kamon.instrumentation.akka import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } import akka.actor._ import akka.testkit.{ ImplicitSender, TestKit } -import kamon.trace.TraceRecorder +import kamon.trace.{ EmptyTraceContext, TraceRecorder } import org.scalatest.WordSpecLike import scala.concurrent.duration._ @@ -59,7 +59,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-s // Ensure we didn't tie the actor with the context supervisor ! "context" - expectMsg(None) + expectMsg(EmptyTraceContext) } "the actor is restarted" in { @@ -76,7 +76,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-s // Ensure we didn't tie the actor with the context supervisor ! "context" - expectMsg(None) + expectMsg(EmptyTraceContext) } "the actor is stopped" in { diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala index d914ffe8..17312ba3 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala @@ -54,7 +54,6 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M } val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext - capturedCtx should be('defined) capturedCtx should equal(testTraceContext) } } diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala index 8a87408d..6453dd77 100644 --- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala @@ -5,8 +5,7 @@ import akka.testkit.{ ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.metric.TraceMetrics.TraceMetricsSnapshot -import kamon.trace.TraceContext.SegmentIdentity -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } import org.scalatest.{ Matchers, WordSpecLike } class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { @@ -54,39 +53,37 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with "record the elapsed time for segments that occur inside a given trace" in { TraceRecorder.withNewTraceContext("trace-with-segments") { - val segmentHandle = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment")) - segmentHandle.get.finish() + val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-label") + segment.finish() TraceRecorder.finish() } val snapshot = takeSnapshotOf("trace-with-segments") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1) } "record the elapsed time for segments that finish after their correspondent trace has finished" in { - val segmentHandle = TraceRecorder.withNewTraceContext("closing-segment-after-trace") { - val sh = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment")) + val segment = TraceRecorder.withNewTraceContext("closing-segment-after-trace") { + val s = TraceRecorder.currentContext.startSegment("test-segment", "test-label") TraceRecorder.finish() - sh + s } val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") beforeFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(1) beforeFinishSegmentSnapshot.segments.size should be(0) - segmentHandle.get.finish() + segment.finish() val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0) afterFinishSegmentSnapshot.segments.size should be(1) - afterFinishSegmentSnapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1) + afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1) } } - case class TraceMetricsTestSegment(name: String) extends SegmentIdentity - def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) val collectionContext = Kamon(Metrics).buildDefaultCollectionContext diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala index 4d0049f1..e2031a72 100644 --- a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala @@ -3,7 +3,6 @@ package kamon.trace import akka.actor.ActorSystem import akka.testkit.{ ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory -import kamon.trace.TraceContext.SegmentIdentity import org.scalatest.{ Matchers, WordSpecLike } class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { @@ -38,7 +37,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow starting a trace within a specified block of code, and only within that block of code" in { val createdContext = TraceRecorder.withNewTraceContext("start-context") { TraceRecorder.currentContext should not be empty - TraceRecorder.currentContext.get + TraceRecorder.currentContext } TraceRecorder.currentContext shouldBe empty @@ -48,7 +47,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in { val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) { TraceRecorder.currentContext should not be empty - TraceRecorder.currentContext.get + TraceRecorder.currentContext } TraceRecorder.currentContext shouldBe empty @@ -70,7 +69,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow renaming a trace" in { val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") { TraceRecorder.rename("renamed-trace") - TraceRecorder.currentContext.get + TraceRecorder.currentContext } TraceRecorder.currentContext shouldBe empty @@ -79,17 +78,22 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow creating a segment within a trace" in { val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") { - val segmentHandle = TraceRecorder.startSegment(TraceManipulationTestSegment("segment-1")) - - TraceRecorder.currentContext.get + val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-label") + TraceRecorder.currentContext } TraceRecorder.currentContext shouldBe empty createdContext.name shouldBe ("trace-with-segments") - } - } - case class TraceManipulationTestSegment(name: String) extends SegmentIdentity + "allow renaming a segment" in { + TraceRecorder.withNewTraceContext("trace-with-renamed-segment") { + val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label") + segment.name should be("original-segment-name") + segment.rename("new-segment-name") + segment.name should be("new-segment-name") + } + } + } } diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index df1d2b59..d9cdde08 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -20,7 +20,7 @@ import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } import spray.http.HttpHeaders.{ RawHeader, Host } -import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware } +import kamon.trace.{SegmentAware, TraceRecorder, SegmentCompletionHandleAware} import kamon.metric.TraceMetrics.HttpClientRequest import kamon.Kamon import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } @@ -32,13 +32,13 @@ import akka.util.Timeout class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: SegmentCompletionHandleAware = SegmentCompletionHandleAware.default + def mixin: SegmentAware = SegmentAware.default @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") - def requestContextCreation(ctx: SegmentCompletionHandleAware, request: HttpRequest): Unit = {} + def requestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {} @After("requestContextCreation(ctx, request)") - def afterRequestContextCreation(ctx: SegmentCompletionHandleAware, request: HttpRequest): Unit = { + def afterRequestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = { // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the // completion handle the first time we create one. @@ -59,25 +59,25 @@ class ClientRequestInstrumentation { } @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") - def copyingRequestContext(old: SegmentCompletionHandleAware): Unit = {} + def copyingRequestContext(old: SegmentAware): Unit = {} @Around("copyingRequestContext(old)") - def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentCompletionHandleAware): Any = { + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { pjp.proceed() } } @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") - def dispatchToCommander(requestContext: SegmentCompletionHandleAware, message: Any): Unit = {} + def dispatchToCommander(requestContext: SegmentAware, message: Any): Unit = {} @Around("dispatchToCommander(requestContext, message)") - def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentCompletionHandleAware, message: Any) = { + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { requestContext.traceContext match { case ctx @ Some(_) ⇒ TraceRecorder.withInlineTraceContextReplacement(ctx) { if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segmentCompletionHandle.map(_.finish(Map.empty)) + requestContext.segment.finish() pjp.proceed() } -- cgit v1.2.3 From cd8dce169b4231bf533445656bfb5a35034a6304 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 26 Oct 2014 02:21:11 +0200 Subject: = all: upgrade to be compatible with the latest code in core --- .../akka/RemotingInstrumentation.scala | 11 ++- .../akka/RemotingInstrumentationSpec.scala | 12 +-- .../src/main/scala/kamon/metric/TraceMetrics.scala | 1 - .../src/main/scala/kamon/trace/TraceContext.scala | 13 ++- .../src/main/scala/kamon/trace/TraceRecorder.scala | 28 +++--- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 5 +- .../LoggerLikeInstrumentation.scala | 21 +++-- .../instrumentation/RequestInstrumentation.scala | 23 +++-- .../play/instrumentation/WSInstrumentation.scala | 29 ++---- .../kamon/play/RequestInstrumentationSpec.scala | 2 +- .../scala/kamon/play/WSInstrumentationSpec.scala | 6 +- .../main/scala/test/SimpleRequestProcessor.scala | 7 +- .../can/client/ClientRequestInstrumentation.scala | 104 ++++++++++----------- .../can/server/ServerRequestInstrumentation.scala | 47 +++++----- .../spray/ClientRequestInstrumentationSpec.scala | 20 ++-- .../scala/testkit/TestProbeInstrumentation.scala | 4 +- 16 files changed, 164 insertions(+), 169 deletions(-) diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala index 3278ec67..560008cf 100644 --- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala +++ b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala @@ -31,12 +31,15 @@ class RemotingInstrumentation { envelopeBuilder.setMessage(serializedMessage) // Attach the TraceContext info, if available. - TraceRecorder.currentContext.foreach { context ⇒ + if (!TraceRecorder.currentContext.isEmpty) { + val context = TraceRecorder.currentContext + val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.nanoTimestamp) / 1000000) + envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() .setTraceName(context.name) .setTraceToken(context.token) .setIsOpen(context.isOpen) - .setStartMilliTime(context.startMilliTime) + .setStartMilliTime(relativeStartMilliTime) .build()) } @@ -81,14 +84,14 @@ class RemotingInstrumentation { if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext val system = provider.guardian.underlying.system - val tc = TraceRecorder.joinRemoteTraceContext( + val ctx = TraceRecorder.joinRemoteTraceContext( remoteTraceContext.getTraceName(), remoteTraceContext.getTraceToken(), remoteTraceContext.getStartMilliTime(), remoteTraceContext.getIsOpen(), system) - TraceRecorder.setContext(Some(tc)) + TraceRecorder.setContext(ctx) } pjp.proceed() diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala index 29276dd0..8a3973ca 100644 --- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala +++ b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala @@ -127,14 +127,13 @@ class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends case "fail" ⇒ throw new ArithmeticException("Division by zero.") case "reply-trace-token" ⇒ - log.info("Sending back the TT: " + TraceRecorder.currentContext.map(_.token).getOrElse("unavailable")) + log.info("Sending back the TT: " + TraceRecorder.currentContext.token) sender ! currentTraceContextInfo } def currentTraceContextInfo: String = { - TraceRecorder.currentContext.map { context ⇒ - s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}" - }.getOrElse("unavailable") + val ctx = TraceRecorder.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" } } @@ -162,8 +161,7 @@ class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) } def currentTraceContextInfo: String = { - TraceRecorder.currentContext.map { context ⇒ - s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}" - }.getOrElse("unavailable") + val ctx = TraceRecorder.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" } } diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 54626b6c..7246ccb5 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -30,7 +30,6 @@ object TraceMetrics extends MetricGroupCategory { val name = "trace" case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } - case class HttpClientRequest(name: String) extends MetricIdentity case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) extends MetricGroupRecorder { diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index a5855308..08289acf 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -35,7 +35,9 @@ sealed trait TraceContext { def origin: TraceContextOrigin def isOpen: Boolean def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty def startSegment(segmentName: String, label: String): Segment + def nanoTimestamp: Long } sealed trait Segment { @@ -43,6 +45,7 @@ sealed trait Segment { def rename(newName: String): Unit def label: String def finish(): Unit + def isEmpty: Boolean } case object EmptyTraceContext extends TraceContext { @@ -54,23 +57,25 @@ case object EmptyTraceContext extends TraceContext { def isOpen: Boolean = false def isEmpty: Boolean = true def startSegment(segmentName: String, label: String): Segment = EmptySegment + def nanoTimestamp: Long = 0L case object EmptySegment extends Segment { val name: String = "empty-segment" val label: String = "empty-label" + def isEmpty: Boolean = true def rename(newName: String): Unit = {} def finish: Unit = {} } } class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, startNanoTime: Long)(implicit system: ActorSystem) extends TraceContext { + val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { val isEmpty: Boolean = false @volatile private var _name = traceName @volatile private var _isOpen = izOpen - private val _startNanoTime = startNanoTime + private val _nanoTimestamp = nanoTimeztamp private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() private val metricsExtension = Kamon(Metrics)(system) private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage @@ -80,10 +85,11 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. def isOpen: Boolean = _isOpen + def nanoTimestamp: Long = _nanoTimestamp def finish(): Unit = { _isOpen = false - val elapsedNanoTime = System.nanoTime() - _startNanoTime + val elapsedNanoTime = System.nanoTime() - _nanoTimestamp val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ @@ -119,6 +125,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, def name: String = _segmentName def rename(newName: String): Unit = _segmentName = newName + def isEmpty: Boolean = false def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 9b0ba038..8da187cb 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -36,23 +36,25 @@ object TraceRecorder { private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { new DefaultTraceContext( - name, token.getOrElse(newToken), + name, + token.getOrElse(newToken), izOpen = true, LevelOfDetail.OnlyMetrics, TraceContextOrigin.Local, - startNanoTime = System.nanoTime)(system) + nanoTimeztamp = System.nanoTime, + system) } def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - /*new SimpleMetricCollectionContext( + val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) + new DefaultTraceContext( traceName, traceToken, - Map.empty, + isOpen, + LevelOfDetail.OnlyMetrics, TraceContextOrigin.Remote, - system, - startMilliTime, - isOpen)*/ - ??? + equivalentNanotime, + system) } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) @@ -61,10 +63,9 @@ object TraceRecorder { def currentContext: TraceContext = traceContextStorage.get() - // TODO: Remove this method. def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = { - //val ctx = newTraceContext(name, token, metadata, system) - //traceContextStorage.set(Some(ctx)) + val ctx = newTraceContext(name, token, system) + traceContextStorage.set(ctx) } def rename(name: String): Unit = currentContext.rename(name) @@ -79,6 +80,11 @@ object TraceRecorder { try thunk finally setContext(oldContext) } + def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { + case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) + case EmptyTraceContext ⇒ None + } + def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] def finish(): Unit = currentContext.finish() diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 4bb0ad3a..08b5df99 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -34,10 +34,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging { val params = new java.util.HashMap[String, String]() val ctx = error.asInstanceOf[TraceContextAware].traceContext - - for (c ← ctx) { - params.put("TraceToken", c.token) - } + params.put("TraceToken", ctx.token) if (error.cause == Error.NoCause) { NR.noticeError(error.message.toString, params) diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala index 92686ff0..e2ffd3f9 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala @@ -15,7 +15,7 @@ package kamon.play.instrumentation -import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder } +import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.slf4j.MDC @@ -52,21 +52,24 @@ class LoggerLikeInstrumentation { object LoggerLikeInstrumentation { @inline final def withMDC[A](block: ⇒ A): A = { - val keys = TraceRecorder.currentContext.map(extractProperties).map(putAndExtractKeys) + val keys = putAndExtractKeys(extractProperties(TraceRecorder.currentContext)) - try block finally keys.map(k ⇒ k.foreach(MDC.remove(_))) + try block finally keys.foreach(k ⇒ MDC.remove(k)) } def putAndExtractKeys(values: Iterable[Map[String, Any]]): Iterable[String] = values.map { value ⇒ value.map { case (key, value) ⇒ MDC.put(key, value.toString); key } }.flatten - def extractProperties(ctx: TraceContext): Iterable[Map[String, Any]] = ctx.traceLocalStorage.underlyingStorage.values.map { - case traceLocalValue @ (p: Product) ⇒ { - val properties = p.productIterator - traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap - } - case anything ⇒ Map.empty[String, Any] + def extractProperties(traceContext: TraceContext): Iterable[Map[String, Any]] = traceContext match { + case ctx: DefaultTraceContext ⇒ + ctx.traceLocalStorage.underlyingStorage.values.collect { + case traceLocalValue @ (p: Product) ⇒ { + val properties = p.productIterator + traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap + } + } + case EmptyTraceContext ⇒ Iterable.empty[Map[String, Any]] } } diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index c761e72f..82a43926 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -54,16 +54,23 @@ class RequestInstrumentation { @Around("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)") def aroundDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = { val essentialAction = (requestHeader: RequestHeader) ⇒ { + // TODO: Move to a Kamon-specific dispatcher. val executor = Kamon(Play)(Akka.system()).defaultDispatcher def onResult(result: Result): Result = { - TraceRecorder.finish() - TraceRecorder.currentContext.map { ctx ⇒ - val playExtension = Kamon(Play)(ctx.system) + + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + ctx.finish() + + val playExtension = Kamon(Play)(system) recordHttpServerMetrics(result.header, ctx.name, playExtension) - if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token) - else result - }.getOrElse(result) + + if (playExtension.includeTraceToken) + result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token) + else + result + + } getOrElse (result) } //override the current trace name @@ -76,8 +83,8 @@ class RequestInstrumentation { } @Before("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") - def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = request.traceContext.map { - ctx ⇒ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(ctx.system)) + def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(system)) } private def recordHttpServerMetrics(header: ResponseHeader, traceName: String, playExtension: PlayExtension): Unit = diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index 87467050..c2eafa2b 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -17,12 +17,10 @@ package kamon.play.instrumentation import kamon.Kamon -import kamon.metric.TraceMetrics.HttpClientRequest import kamon.play.Play -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } -import play.api.libs.ws.ning.NingWSRequest import play.api.libs.ws.{ WSRequest, WSResponse } import scala.concurrent.Future @@ -35,28 +33,13 @@ class WSInstrumentation { @Around("onExecuteRequest(request)") def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = { - - import kamon.play.instrumentation.WSInstrumentation._ - - TraceRecorder.currentContext.map { ctx ⇒ - val executor = Kamon(Play)(ctx.system).defaultDispatcher - val segmentHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request)) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val executor = Kamon(Play)(system).defaultDispatcher + val segment = ctx.startSegment(request.url, SegmentMetricIdentityLabel.HttpClient) val response = pjp.proceed().asInstanceOf[Future[WSResponse]] - response.map(result ⇒ segmentHandle.map(_.finish()))(executor) + response.map(result ⇒ segment.finish())(executor) response - }.getOrElse(pjp.proceed()) - } -} - -object WSInstrumentation { - - def uri(request: WSRequest): java.net.URI = request.asInstanceOf[NingWSRequest].builder.build().getURI - - def basicRequestAttributes(request: WSRequest): Map[String, String] = { - Map[String, String]( - "host" -> uri(request).getHost, - "path" -> uri(request).getPath, - "method" -> request.method) + } getOrElse (pjp.proceed()) } } \ No newline at end of file diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala index 2afd31fd..3feb6246 100644 --- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala @@ -117,7 +117,7 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { "respond to the Async Action with X-Trace-Token and the renamed trace" in { val result = Await.result(route(FakeRequest(GET, "/async-renamed").withHeaders(traceTokenHeader)).get, 10 seconds) - TraceRecorder.currentContext.map(_.name) must be(Some("renamed-trace")) + TraceRecorder.currentContext.name must be("renamed-trace") Some(result.header.headers(traceTokenHeaderName)) must be(expectedToken) } diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala index b72659d2..bf1ead05 100644 --- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala @@ -17,9 +17,9 @@ package kamon.play import kamon.Kamon -import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } +import kamon.metric.TraceMetrics.TraceMetricsSnapshot import kamon.metric.{ Metrics, TraceMetrics } -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import org.scalatest.{ Matchers, WordSpecLike } import org.scalatestplus.play.OneServerPerSuite import play.api.libs.ws.WS @@ -49,7 +49,7 @@ class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPer val snapshot = takeSnapshotOf("GET: /inside") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(HttpClientRequest("http://localhost:19001/async")).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) } "propagate the TraceContext outside an Action and complete the WS request" in { diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 3e6e982e..878c3c8c 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -21,10 +21,9 @@ import akka.routing.RoundRobinPool import akka.util.Timeout import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.TraceMetrics.HttpClientRequest import kamon.metric._ import kamon.spray.KamonTraceDirectives -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentityLabel, TraceRecorder } import spray.http.{ StatusCodes, Uri } import spray.httpx.RequestBuilding import spray.routing.SimpleRoutingApp @@ -128,9 +127,9 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } ~ path("segment") { complete { - val segment = TraceRecorder.startSegment(HttpClientRequest("hello-world")) + val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentMetricIdentityLabel.HttpClient) (replier ? "hello").mapTo[String].onComplete { t ⇒ - segment.get.finish() + segment.finish() } "segment" diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index d9cdde08..cfd204df 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -19,9 +19,8 @@ package spray.can.client import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } -import spray.http.HttpHeaders.{ RawHeader, Host } -import kamon.trace.{SegmentAware, TraceRecorder, SegmentCompletionHandleAware} -import kamon.metric.TraceMetrics.HttpClientRequest +import spray.http.HttpHeaders.RawHeader +import kamon.trace._ import kamon.Kamon import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } import akka.actor.ActorRef @@ -34,26 +33,28 @@ class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixin: SegmentAware = SegmentAware.default - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") - def requestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {} + @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") + def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {} - @After("requestContextCreation(ctx, request)") - def afterRequestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = { + @After("requestContextCreation(requestContext, request)") + def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = { // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the - // completion handle the first time we create one. + // segment the first time we create one. // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely. - if (ctx.segmentCompletionHandle.isEmpty) { - TraceRecorder.currentContext.map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) + if (requestContext.segment.isEmpty) { + TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - val requestAttributes = basicRequestAttributes(request) - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + val clientRequestName = sprayExtension.assignHttpClientRequestName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) - ctx.segmentCompletionHandle = Some(completionHandle) - } + requestContext.segment = segment + } + + case EmptyTraceContext ⇒ // Nothing to do here. } } } @@ -73,17 +74,15 @@ class ClientRequestInstrumentation { @Around("dispatchToCommander(requestContext, message)") def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { - requestContext.traceContext match { - case ctx @ Some(_) ⇒ - TraceRecorder.withInlineTraceContextReplacement(ctx) { - if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segment.finish() + if (requestContext.traceContext.nonEmpty) { + TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { + if (message.isInstanceOf[HttpMessageEnd]) + requestContext.segment.finish() - pjp.proceed() - } + pjp.proceed() + } - case None ⇒ pjp.proceed() - } + } else pjp.proceed() } @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") @@ -95,18 +94,21 @@ class ClientRequestInstrumentation { (request: HttpRequest) ⇒ { val responseFuture = originalSendReceive.apply(request) - TraceRecorder.currentContext.map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) - - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { - val requestAttributes = basicRequestAttributes(request) - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) - - responseFuture.onComplete { result ⇒ - completionHandle.finish(Map.empty) - }(ec) - } + + TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) + + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { + val clientRequestName = sprayExtension.assignHttpClientRequestName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) + } + + case EmptyTraceContext ⇒ // Nothing to do here. } responseFuture @@ -114,26 +116,22 @@ class ClientRequestInstrumentation { } - def basicRequestAttributes(request: HttpRequest): Map[String, String] = { - Map[String, String]( - "host" -> request.header[Host].map(_.value).getOrElse("unknown"), - "path" -> request.uri.path.toString(), - "method" -> request.method.toString()) - } - @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)") def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {} @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)") def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) - - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders - else - defaultHeaders - } getOrElse defaultHeaders + val modifiedHeaders = TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) + + if (sprayExtension.includeTraceToken) + RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders + + case EmptyTraceContext ⇒ defaultHeaders + } pjp.proceed(Array(modifiedHeaders)) } diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index 69b0160e..74d98564 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -16,11 +16,10 @@ package spray.can.server import org.aspectj.lang.annotation._ -import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware } +import kamon.trace._ import akka.actor.ActorSystem import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } import akka.event.Logging.Warning -import scala.Some import kamon.Kamon import kamon.spray.{ SprayExtension, Spray } import org.aspectj.lang.ProceedingJoinPoint @@ -67,40 +66,36 @@ class ServerRequestInstrumentation { val incomingContext = TraceRecorder.currentContext val storedContext = openRequest.traceContext - verifyTraceContextConsistency(incomingContext, storedContext) - incomingContext match { - case None ⇒ pjp.proceed() - case Some(traceContext) ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) + // The stored context is always a DefaultTraceContext if the instrumentation is running + val system = storedContext.asInstanceOf[DefaultTraceContext].system - val proceedResult = if (sprayExtension.includeTraceToken) { - val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token) - pjp.proceed(Array(openRequest, responseWithHeader)) + verifyTraceContextConsistency(incomingContext, storedContext, system) - } else pjp.proceed + if (incomingContext.isEmpty) + pjp.proceed() + else { + val sprayExtension = Kamon(Spray)(system) - TraceRecorder.finish() - recordHttpServerMetrics(response, traceContext.name, sprayExtension) - proceedResult - } - } + val proceedResult = if (sprayExtension.includeTraceToken) { + val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token) + pjp.proceed(Array(openRequest, responseWithHeader)) - def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = { - for (original ← storedTraceContext) { - incomingTraceContext match { - case Some(incoming) if original.token != incoming.token ⇒ - publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system) + } else pjp.proceed - case Some(_) ⇒ // nothing to do here. - - case None ⇒ - publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system) - } + TraceRecorder.finish() + recordHttpServerMetrics(response, incomingContext.name, sprayExtension) + proceedResult } + } + def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = { def publishWarning(text: String, system: ActorSystem): Unit = system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text)) + if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token) + publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) + else + publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system) } def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 54329645..fbf69c8a 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -21,7 +21,7 @@ import akka.actor.ActorSystem import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader @@ -31,7 +31,7 @@ import spray.client.pipelining import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ import akka.pattern.pipe -import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } +import kamon.metric.TraceMetrics.TraceMetricsSnapshot class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( @@ -78,12 +78,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token)) + request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - testContext.map(_.finish(Map.empty)) + testContext.finish() } } @@ -106,12 +106,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token)) + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - testContext.map(_.finish(Map.empty)) + testContext.finish() } } @@ -143,12 +143,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit client.expectMsgType[HttpResponse] // Finish the trace - testContext.map(_.finish(Map.empty)) + testContext.finish() val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -184,12 +184,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit client.expectMsgType[HttpResponse] // Finish the trace - testContext.map(_.finish(Map.empty)) + testContext.finish() val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) diff --git a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala index de867035..825cc718 100644 --- a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala +++ b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala @@ -17,7 +17,7 @@ package akka.testkit import org.aspectj.lang.annotation._ -import kamon.trace.{ TraceContextAware, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import akka.testkit.TestActor.RealMessage @@ -43,7 +43,7 @@ class TestProbeInstrumentation { def aroundTestProbeReply(pjp: ProceedingJoinPoint, testProbe: TestProbe): Any = { val traceContext = testProbe.lastMessage match { case msg: RealMessage ⇒ msg.asInstanceOf[TraceContextAware].traceContext - case _ ⇒ None + case _ ⇒ EmptyTraceContext } TraceRecorder.withTraceContext(traceContext) { -- cgit v1.2.3 From 89d3057f8025add4b94b32c142e220ffb79f6c33 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 31 Oct 2014 02:45:41 +0100 Subject: + spray: external naming for traces and segments, related to #65 --- .../akka/ActorCellInstrumentation.scala | 8 +- .../src/main/scala/kamon/trace/TraceContext.scala | 16 +- kamon-spray/src/main/resources/reference.conf | 3 + kamon-spray/src/main/scala/kamon/spray/Spray.scala | 20 +- .../can/client/ClientRequestInstrumentation.scala | 121 +++++++----- .../can/server/ServerRequestInstrumentation.scala | 2 +- .../spray/ClientRequestInstrumentationSpec.scala | 214 ++++++++++++++------- .../src/test/scala/kamon/spray/TestServer.scala | 4 +- 8 files changed, 255 insertions(+), 133 deletions(-) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 9b541a32..90928ba0 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -59,7 +59,7 @@ class ActorCellInstrumentation { def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] try { TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { @@ -154,13 +154,13 @@ class ActorCellMetricsIntoActorCellMixin { class TraceContextIntoEnvelopeMixin { @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default + def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} + def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 08289acf..c4c28a68 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -129,7 +129,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() - finishSegment(segmentName, label, (segmentFinishNanoTime - _segmentStartNanoTime)) + finishSegment(name, label, (segmentFinishNanoTime - _segmentStartNanoTime)) } } } @@ -155,7 +155,6 @@ object TraceContextOrigin { } trait TraceContextAware extends Serializable { - def captureNanoTime: Long def traceContext: TraceContext } @@ -163,7 +162,6 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - @transient val captureNanoTime = System.nanoTime() @transient val traceContext = TraceRecorder.currentContext // @@ -180,7 +178,17 @@ object TraceContextAware { } } -trait SegmentAware extends TraceContextAware { +trait TimestampedTraceContextAware extends TraceContextAware { + def captureNanoTime: Long +} + +object TimestampedTraceContextAware { + def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware { + @transient val captureNanoTime = System.nanoTime() + } +} + +trait SegmentAware { @volatile var segment: Segment = EmptyTraceContext.EmptySegment } diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf index d497e681..9fed5a2b 100644 --- a/kamon-spray/src/main/resources/reference.conf +++ b/kamon-spray/src/main/resources/reference.conf @@ -16,6 +16,9 @@ kamon { # in the `HttpRequest` headers. automatic-trace-token-propagation = true + # Fully qualified name of the implementation of kamon.spray.SprayNameGenerator that will be used for assigning names + # to traces and client http segments. + name-generator = kamon.spray.SimpleSprayNameGenerator client { # Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala index 76adb214..4a0fd74e 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala @@ -43,6 +43,9 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it. + private val nameGeneratorFQN = config.getString("name-generator") + private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems. + val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy = config.getString("client.segment-collection-strategy") match { case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining @@ -51,6 +54,19 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte s"only pipelining and internal are valid options.") } - // Later we should expose a way for the user to customize this. - def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address + def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request) + def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request) + def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request) +} + +trait SprayNameGenerator { + def generateTraceName(request: HttpRequest): String + def generateRequestLevelApiSegmentName(request: HttpRequest): String + def generateHostLevelApiSegmentName(request: HttpRequest): String +} + +class SimpleSprayNameGenerator extends SprayNameGenerator { + def generateRequestLevelApiSegmentName(request: HttpRequest): String = request.method.value + ": " + request.uri.path + def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path + def generateHostLevelApiSegmentName(request: HttpRequest): String = request.uri.authority.host.address } diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index cfd204df..94fc3572 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -18,7 +18,7 @@ package spray.can.client import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } +import spray.http._ import spray.http.HttpHeaders.RawHeader import kamon.trace._ import kamon.Kamon @@ -31,60 +31,79 @@ import akka.util.Timeout class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: SegmentAware = SegmentAware.default + def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default + + @DeclareMixin("spray.http.HttpRequest") + def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") - def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {} + def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {} @After("requestContextCreation(requestContext, request)") - def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = { - // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the - // segment the first time we create one. + def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = { + // This read to requestContext.traceContext takes care of initializing the aspect timely. + requestContext.traceContext - // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely. - if (requestContext.segment.isEmpty) { - TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + if (requestContext.segment.isEmpty) { + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + requestContext.segment = segment + } - requestContext.segment = segment - } + } else { - case EmptyTraceContext ⇒ // Nothing to do here. + // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a + // name again here is that when the request was initially sent it might not have the Host information available + // and it might be important to decide a proper segment name. + + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + request.asInstanceOf[SegmentAware].segment.rename(clientRequestName) } } } @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") - def copyingRequestContext(old: SegmentAware): Unit = {} + def copyingRequestContext(old: TraceContextAware): Unit = {} @Around("copyingRequestContext(old)") - def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = { TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { pjp.proceed() } } @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") - def dispatchToCommander(requestContext: SegmentAware, message: Any): Unit = {} + def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {} @Around("dispatchToCommander(requestContext, message)") - def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = { if (requestContext.traceContext.nonEmpty) { TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segment.finish() + requestContext.asInstanceOf[SegmentAware].segment.finish() pjp.proceed() } - } else pjp.proceed() } + @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)") + def copyingHttpRequest(old: SegmentAware): Unit = {} + + @Around("copyingHttpRequest(old)") + def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware] + copiedHttpRequest.segment = old.segment + copiedHttpRequest + } + @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {} @@ -93,46 +112,42 @@ class ClientRequestInstrumentation { val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]] (request: HttpRequest) ⇒ { - val responseFuture = originalSendReceive.apply(request) - - TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) + val segment = + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) + ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient) + else + EmptyTraceContext.EmptySegment - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + request.asInstanceOf[SegmentAware].segment = segment - responseFuture.onComplete { result ⇒ - segment.finish() - }(ec) - } + val responseFuture = originalSendReceive.apply(request) + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) - case EmptyTraceContext ⇒ // Nothing to do here. - } + responseFuture - responseFuture + } getOrElse (originalSendReceive.apply(request)) } - } - @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)") - def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {} + @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)") + def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {} - @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)") - def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)") + def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = { - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders - else - defaultHeaders + val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) + if (sprayExtension.includeTraceToken) + RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders - case EmptyTraceContext ⇒ defaultHeaders - } + } getOrElse (defaultHeaders) - pjp.proceed(Array(modifiedHeaders)) + pjp.proceed(Array[AnyRef](request, modifiedHeaders)) } } diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index 74d98564..eb25412b 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -39,7 +39,7 @@ class ServerRequestInstrumentation { val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system val sprayExtension = Kamon(Spray)(system) - val defaultTraceName: String = request.method.value + ": " + request.uri.path + val defaultTraceName = sprayExtension.generateTraceName(request) val token = if (sprayExtension.includeTraceToken) { request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value) } else None diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index fbf69c8a..57f9ebe1 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -18,22 +18,23 @@ package kamon.spray import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.ActorSystem +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon import kamon.metric.{ TraceMetrics, Metrics } -import spray.client.pipelining +import spray.client.pipelining.sendReceive import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ -import akka.pattern.pipe import kamon.metric.TraceMetrics.TraceMetricsSnapshot -class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { +class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( """ |akka { @@ -41,8 +42,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit |} | |kamon { + | spray { + | name-generator = kamon.spray.TestSprayNameGenerator + | } + | | metrics { - | tick-interval = 2 seconds + | tick-interval = 1 hour | | filters = [ | { @@ -57,19 +62,21 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit """.stripMargin)) implicit def ec = system.dispatcher + implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis)) - "the client instrumentation" when { - "configured to do automatic-trace-token-propagation" should { - "include the trace token header on spray-client requests" in { + "the spray client instrumentation" when { + "using the request-level api" should { + "include the trace token header if automatic-trace-token-propagation is enabled" in { enableAutomaticTraceTokenPropagation() - - val (hostConnector, server) = buildSHostConnectorAndServer - val client = TestProbe() + val (_, server, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { - client.send(hostConnector, Get("/dummy-path")) - TraceRecorder.currentContext + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") + } + + (TraceRecorder.currentContext, rF) } // Accept the connection at the server side @@ -82,20 +89,105 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) - client.expectMsgType[HttpResponse] + responseFuture.futureValue.entity.asString should be("ok") testContext.finish() } - } - "not configured to do automatic-trace-token-propagation" should { - "not include the trace token header on spray-client requests" in { + "not include the trace token header if automatic-trace-token-propagation is disabled" in { disableAutomaticTraceTokenPropagation() + val (_, server, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") + } + + (TraceRecorder.currentContext, rF) + } + + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. + server.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + } + + "start and finish a segment that must be named using the request level api name assignation" in { + enableAutomaticTraceTokenPropagation() + enablePipeliningSegmentCollectionStrategy() + + val transport = TestProbe() + val (_, _, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") { + val rF = sendReceive(transport.ref)(ec, 10.seconds) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") + } + + (TraceRecorder.currentContext, rF) + } + + // Receive the request and reply back + transport.expectMsgType[HttpRequest] + transport.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + + val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + } + + "rename a request level api segment once it reaches the relevant host connector" in { + enableAutomaticTraceTokenPropagation() + enablePipeliningSegmentCollectionStrategy() + + val (_, server, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") + } + + (TraceRecorder.currentContext, rF) + } - val (hostConnector, server) = buildSHostConnectorAndServer + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + + val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + } + } + + "using the host-level api" should { + "include the trace token header on spray-client requests if automatic-trace-token-propagation is enabled" in { + enableAutomaticTraceTokenPropagation() + enableInternalSegmentCollectionStrategy() + + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } @@ -106,30 +198,24 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] testContext.finish() } - } - "configured to use pipelining segment collection strategy" should { - "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in { - enablePipeliningSegmentCollectionStrategy() + "not include the trace token header on spray-client requests if automatic-trace-token-propagation is disabled" in { + disableAutomaticTraceTokenPropagation() + enableInternalSegmentCollectionStrategy() - val (hostConnector, server) = buildSHostConnectorAndServer + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() - val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") { - pipeline(Get("/dummy-path")) to client.ref + val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } @@ -138,39 +224,25 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit server.reply(Http.Register(server.ref)) // Receive the request and reply back - val req = server.expectMsgType[HttpRequest] + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - - // Finish the trace testContext.finish() - - val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) - traceMetrics.elapsedTime.numberOfMeasurements should be(1L) - traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) - recordedSegment should not be empty - recordedSegment map { segmentMetrics ⇒ - segmentMetrics.numberOfMeasurements should be(1L) - } } - } - "configured to use internal segment collection strategy" should { - "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in { + "start and finish a segment that must be named using the host level api name assignation" in { + disableAutomaticTraceTokenPropagation() enableInternalSegmentCollectionStrategy() - val (hostConnector, server) = buildSHostConnectorAndServer + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() - val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") { - pipeline(Get("/dummy-path")) to client.ref + val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") { + client.send(hostConnector, Get("/host-level-api-segment")) TraceRecorder.currentContext } @@ -179,21 +251,17 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit server.reply(Http.Register(server.ref)) // Receive the request and reply back - server.expectMsgType[HttpRequest] + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - - // Finish the trace testContext.finish() - val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) - traceMetrics.elapsedTime.numberOfMeasurements should be(1L) - traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) - recordedSegment should not be empty - recordedSegment map { segmentMetrics ⇒ - segmentMetrics.numberOfMeasurements should be(1L) - } + val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) } } } @@ -208,6 +276,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit metricsOption.get.asInstanceOf[TraceMetricsSnapshot] } + def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { + val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + recorder.get.collect(collectionContext) + } + def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining) def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) @@ -227,3 +301,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit field.set(target, include) } } + +class TestSprayNameGenerator extends SprayNameGenerator { + def generateTraceName(request: HttpRequest): String = request.uri.path.toString() + def generateRequestLevelApiSegmentName(request: HttpRequest): String = "request-level " + request.uri.path.toString() + def generateHostLevelApiSegmentName(request: HttpRequest): String = "host-level " + request.uri.path.toString() +} diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala index 65506770..379b8fc8 100644 --- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala +++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala @@ -45,13 +45,13 @@ trait TestServer { probe.sender } - def buildSHostConnectorAndServer: (ActorRef, TestProbe) = { + def buildSHostConnectorAndServer: (ActorRef, TestProbe, Http.Bound) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = httpHostConnector(bound) - (client, serverHandler) + (client, serverHandler, bound) } private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = { -- cgit v1.2.3 From 2c958ea86ded8a047af6450026ca7ec64a28bea1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 31 Oct 2014 03:05:13 +0100 Subject: + play: external naming for traces and segments, related to #65 --- kamon-play/src/main/resources/reference.conf | 20 +++++++++++++++----- kamon-play/src/main/scala/kamon/play/Play.scala | 19 ++++++++++++++++++- .../instrumentation/RequestInstrumentation.scala | 2 +- .../play/instrumentation/WSInstrumentation.scala | 6 ++++-- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/kamon-play/src/main/resources/reference.conf b/kamon-play/src/main/resources/reference.conf index 72266a0c..5ad070ce 100644 --- a/kamon-play/src/main/resources/reference.conf +++ b/kamon-play/src/main/resources/reference.conf @@ -3,14 +3,24 @@ # ================================== # kamon { - metrics { - tick-interval = 1 hour - } - play { - include-trace-token-header = true + + # Header name used when propagating the `TraceContext.token` value across applications. trace-token-header-name = "X-Trace-Token" + # When set to true, Kamon will automatically set and propogate the `TraceContext.token` value under the following + # conditions: + # - When a server side request is received containing the trace token header, the new `TraceContext` will have that + # some token, and once the response to that request is ready, the trace token header is also included in the + # response. + # - When a WS client request is issued and a `TraceContext` is available, the trace token header will be included + # in the request headers. + automatic-trace-token-propagation = true + + # Fully qualified name of the implementation of kamon.play.PlayNameGenerator that will be used for assigning names + # to traces and client http segments. + name-generator = kamon.play.DefaultPlayNameGenerator + dispatcher = ${kamon.default-dispatcher} } } \ No newline at end of file diff --git a/kamon-play/src/main/scala/kamon/play/Play.scala b/kamon-play/src/main/scala/kamon/play/Play.scala index 7b8777e0..6e2de3c1 100644 --- a/kamon-play/src/main/scala/kamon/play/Play.scala +++ b/kamon-play/src/main/scala/kamon/play/Play.scala @@ -21,6 +21,8 @@ import akka.event.Logging import kamon.Kamon import kamon.http.HttpServerMetrics import kamon.metric.Metrics +import play.api.libs.ws.WSRequest +import play.api.mvc.RequestHeader object Play extends ExtensionId[PlayExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Play @@ -35,7 +37,22 @@ class PlayExtension(private val system: ExtendedActorSystem) extends Kamon.Exten val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get val defaultDispatcher = system.dispatchers.lookup(config.getString("dispatcher")) - val includeTraceToken: Boolean = config.getBoolean("include-trace-token-header") + val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation") val traceTokenHeaderName: String = config.getString("trace-token-header-name") + + private val nameGeneratorFQN = config.getString("name-generator") + private val nameGenerator: PlayNameGenerator = system.dynamicAccess.createInstanceFor[PlayNameGenerator](nameGeneratorFQN, Nil).get + + def generateTraceName(requestHeader: RequestHeader): String = nameGenerator.generateTraceName(requestHeader) + def generateHttpClientSegmentName(request: WSRequest): String = nameGenerator.generateHttpClientSegmentName(request) } +trait PlayNameGenerator { + def generateTraceName(requestHeader: RequestHeader): String + def generateHttpClientSegmentName(request: WSRequest): String +} + +class DefaultPlayNameGenerator extends PlayNameGenerator { + def generateTraceName(requestHeader: RequestHeader): String = requestHeader.method + ": " + requestHeader.uri + def generateHttpClientSegmentName(request: WSRequest): String = request.url +} diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index 82a43926..ca95781e 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -42,7 +42,7 @@ class RequestInstrumentation { def beforeRouteRequest(requestHeader: RequestHeader): Unit = { val system = Akka.system() val playExtension = Kamon(Play)(system) - val defaultTraceName: String = s"${requestHeader.method}: ${requestHeader.uri}" + val defaultTraceName = playExtension.generateTraceName(requestHeader) val token = if (playExtension.includeTraceToken) { requestHeader.headers.toSimpleMap.find(_._1 == playExtension.traceTokenHeaderName).map(_._2) diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index c2eafa2b..125db85e 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -34,8 +34,10 @@ class WSInstrumentation { @Around("onExecuteRequest(request)") def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = { TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val executor = Kamon(Play)(system).defaultDispatcher - val segment = ctx.startSegment(request.url, SegmentMetricIdentityLabel.HttpClient) + val playExtension = Kamon(Play)(system) + val executor = playExtension.defaultDispatcher + val segmentName = playExtension.generateHttpClientSegmentName(request) + val segment = ctx.startSegment(segmentName, SegmentMetricIdentityLabel.HttpClient) val response = pjp.proceed().asInstanceOf[Future[WSResponse]] response.map(result ⇒ segment.finish())(executor) -- cgit v1.2.3 From 508336438227fc89eb732565bd19cb254572e533 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 31 Oct 2014 03:07:52 +0100 Subject: ! spray: rename the default implementation of SprayNameGenerator --- kamon-spray/src/main/resources/reference.conf | 2 +- kamon-spray/src/main/scala/kamon/spray/Spray.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf index 9fed5a2b..5c5e9317 100644 --- a/kamon-spray/src/main/resources/reference.conf +++ b/kamon-spray/src/main/resources/reference.conf @@ -18,7 +18,7 @@ kamon { # Fully qualified name of the implementation of kamon.spray.SprayNameGenerator that will be used for assigning names # to traces and client http segments. - name-generator = kamon.spray.SimpleSprayNameGenerator + name-generator = kamon.spray.DefaultSprayNameGenerator client { # Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala index 4a0fd74e..c1c81116 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala @@ -65,7 +65,7 @@ trait SprayNameGenerator { def generateHostLevelApiSegmentName(request: HttpRequest): String } -class SimpleSprayNameGenerator extends SprayNameGenerator { +class DefaultSprayNameGenerator extends SprayNameGenerator { def generateRequestLevelApiSegmentName(request: HttpRequest): String = request.method.value + ": " + request.uri.path def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path def generateHostLevelApiSegmentName(request: HttpRequest): String = request.uri.authority.host.address -- cgit v1.2.3