aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-10-31 03:14:32 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-10-31 03:14:32 +0100
commit992dd3007a6ddd24bf2aaf952aeacab7d8d9fb1a (patch)
tree87d874030e28a19638b9b4e4a80a658976f4043d
parent7b3928c881c85152bee88019da9d76a0cfa5359f (diff)
parent508336438227fc89eb732565bd19cb254572e533 (diff)
downloadKamon-992dd3007a6ddd24bf2aaf952aeacab7d8d9fb1a.tar.gz
Kamon-992dd3007a6ddd24bf2aaf952aeacab7d8d9fb1a.tar.bz2
Kamon-992dd3007a6ddd24bf2aaf952aeacab7d8d9fb1a.zip
Merge branch 'wip/improve-trace-context-and-segments-api', closes #65
-rw-r--r--kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala11
-rw-r--r--kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala3
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala207
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala8
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala6
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala1
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala21
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala24
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala5
-rw-r--r--kamon-play/src/main/resources/reference.conf20
-rw-r--r--kamon-play/src/main/scala/kamon/play/Play.scala19
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala21
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala25
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala31
-rw-r--r--kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala2
-rw-r--r--kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala6
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala7
-rw-r--r--kamon-spray/src/main/resources/reference.conf3
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/Spray.scala20
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala143
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala49
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala224
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/TestServer.scala4
-rw-r--r--kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala4
28 files changed, 565 insertions, 400 deletions
diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
index 3278ec67..560008cf 100644
--- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
+++ b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
@@ -31,12 +31,15 @@ class RemotingInstrumentation {
envelopeBuilder.setMessage(serializedMessage)
// Attach the TraceContext info, if available.
- TraceRecorder.currentContext.foreach { context ⇒
+ if (!TraceRecorder.currentContext.isEmpty) {
+ val context = TraceRecorder.currentContext
+ val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.nanoTimestamp) / 1000000)
+
envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder()
.setTraceName(context.name)
.setTraceToken(context.token)
.setIsOpen(context.isOpen)
- .setStartMilliTime(context.startMilliTime)
+ .setStartMilliTime(relativeStartMilliTime)
.build())
}
@@ -81,14 +84,14 @@ class RemotingInstrumentation {
if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) {
val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext
val system = provider.guardian.underlying.system
- val tc = TraceRecorder.joinRemoteTraceContext(
+ val ctx = TraceRecorder.joinRemoteTraceContext(
remoteTraceContext.getTraceName(),
remoteTraceContext.getTraceToken(),
remoteTraceContext.getStartMilliTime(),
remoteTraceContext.getIsOpen(),
system)
- TraceRecorder.setContext(Some(tc))
+ TraceRecorder.setContext(ctx)
}
pjp.proceed()
diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
index 29276dd0..8a3973ca 100644
--- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
+++ b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
@@ -127,14 +127,13 @@ class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends
case "fail" ⇒
throw new ArithmeticException("Division by zero.")
case "reply-trace-token" ⇒
- log.info("Sending back the TT: " + TraceRecorder.currentContext.map(_.token).getOrElse("unavailable"))
+ log.info("Sending back the TT: " + TraceRecorder.currentContext.token)
sender ! currentTraceContextInfo
}
def currentTraceContextInfo: String = {
- TraceRecorder.currentContext.map { context ⇒
- s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}"
- }.getOrElse("unavailable")
+ val ctx = TraceRecorder.currentContext
+ s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}"
}
}
@@ -162,8 +161,7 @@ class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address)
}
def currentTraceContextInfo: String = {
- TraceRecorder.currentContext.map { context ⇒
- s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}"
- }.getOrElse("unavailable")
+ val ctx = TraceRecorder.currentContext
+ s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}"
}
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
index 9b541a32..90928ba0 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
@@ -59,7 +59,7 @@ class ActorCellInstrumentation {
def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
val timestampBeforeProcessing = System.nanoTime()
- val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
+ val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware]
try {
TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
@@ -154,13 +154,13 @@ class ActorCellMetricsIntoActorCellMixin {
class TraceContextIntoEnvelopeMixin {
@DeclareMixin("akka.dispatch.Envelope")
- def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
+ def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default
@Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
- def envelopeCreation(ctx: TraceContextAware): Unit = {}
+ def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {}
@After("envelopeCreation(ctx)")
- def afterEnvelopeCreation(ctx: TraceContextAware): Unit = {
+ def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = {
// Necessary to force the initialization of ContextAware at the moment of creation.
ctx.traceContext
}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
index 10dbcc01..7246ccb5 100644
--- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -30,12 +30,11 @@ object TraceMetrics extends MetricGroupCategory {
val name = "trace"
case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
- case class HttpClientRequest(name: String) extends MetricIdentity
case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram)
extends MetricGroupRecorder {
- private val segments = TrieMap[MetricIdentity, Histogram]()
+ val segments = TrieMap[MetricIdentity, Histogram]()
def segmentRecorder(segmentIdentity: MetricIdentity): Histogram =
segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 64ee70be..c4c28a68 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -23,113 +23,73 @@ import kamon.Kamon
import kamon.metric._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
-import kamon.trace.TraceContext.SegmentIdentity
import kamon.metric.TraceMetrics.TraceMetricRecorder
import scala.annotation.tailrec
-trait TraceContext {
+sealed trait TraceContext {
def name: String
def token: String
- def system: ActorSystem
def rename(name: String): Unit
- def levelOfDetail: TracingLevelOfDetail
- def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
- def finish(metadata: Map[String, String])
+ def finish(): Unit
def origin: TraceContextOrigin
- def startMilliTime: Long
def isOpen: Boolean
-
- private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
-}
-
-object TraceContext {
- type SegmentIdentity = MetricIdentity
-}
-
-trait SegmentCompletionHandle {
- def finish(metadata: Map[String, String] = Map.empty)
-}
-
-case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])
-
-sealed trait TracingLevelOfDetail
-case object OnlyMetrics extends TracingLevelOfDetail
-case object SimpleTrace extends TracingLevelOfDetail
-case object FullTrace extends TracingLevelOfDetail
-
-sealed trait TraceContextOrigin
-object TraceContextOrigin {
- case object Local extends TraceContextOrigin
- case object Remote extends TraceContextOrigin
+ def isEmpty: Boolean
+ def nonEmpty: Boolean = !isEmpty
+ def startSegment(segmentName: String, label: String): Segment
+ def nanoTimestamp: Long
}
-trait TraceContextAware extends Serializable {
- def captureNanoTime: Long
- def traceContext: Option[TraceContext]
+sealed trait Segment {
+ def name: String
+ def rename(newName: String): Unit
+ def label: String
+ def finish(): Unit
+ def isEmpty: Boolean
}
-object TraceContextAware {
- def default: TraceContextAware = new DefaultTraceContextAware
-
- class DefaultTraceContextAware extends TraceContextAware {
- @transient val captureNanoTime = System.nanoTime()
- @transient val traceContext = TraceRecorder.currentContext
-
- //
- // Beware of this hack, it might bite us in the future!
- //
- // When using remoting/cluster all messages carry the TraceContext in the envelope in which they
- // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is
- // available (if any) when the system messages are read and this will make sure that it is correctly
- // captured and propagated.
- @throws[ObjectStreamException]
- private def readResolve: AnyRef = {
- new DefaultTraceContextAware
- }
+case object EmptyTraceContext extends TraceContext {
+ def name: String = "empty-trace"
+ def token: String = ""
+ def rename(name: String): Unit = {}
+ def finish(): Unit = {}
+ def origin: TraceContextOrigin = TraceContextOrigin.Local
+ def isOpen: Boolean = false
+ def isEmpty: Boolean = true
+ def startSegment(segmentName: String, label: String): Segment = EmptySegment
+ def nanoTimestamp: Long = 0L
+
+ case object EmptySegment extends Segment {
+ val name: String = "empty-segment"
+ val label: String = "empty-label"
+ def isEmpty: Boolean = true
+ def rename(newName: String): Unit = {}
+ def finish: Unit = {}
}
}
-trait SegmentCompletionHandleAware extends TraceContextAware {
- @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
-}
-
-object SegmentCompletionHandleAware {
- def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware
-
- class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
-}
-
-class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String],
- val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis,
- izOpen: Boolean = true) extends TraceContext {
+class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
+ val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext {
+ val isEmpty: Boolean = false
@volatile private var _name = traceName
@volatile private var _isOpen = izOpen
- val levelOfDetail = OnlyMetrics
- val startNanoTime = System.nanoTime()
- val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
- val metricsExtension = Kamon(Metrics)(system)
+ private val _nanoTimestamp = nanoTimeztamp
+ private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
+ private val metricsExtension = Kamon(Metrics)(system)
+ private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
def name: String = _name
+ def rename(newName: String): Unit =
+ if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace.
- def rename(newName: String): Unit = _name = newName
-
- def isOpen(): Boolean = _isOpen
+ def isOpen: Boolean = _isOpen
+ def nanoTimestamp: Long = _nanoTimestamp
- def finish(metadata: Map[String, String]): Unit = {
+ def finish(): Unit = {
_isOpen = false
-
- val elapsedNanoTime =
- if (origin == TraceContextOrigin.Local)
- // Everything is local, nanoTime is still the best resolution we can use.
- System.nanoTime() - startNanoTime
- else
- // For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds
- // to be consistent with unit used for all latency measurements.
- (System.currentTimeMillis() - startMilliTime) * 1000000L
-
+ val elapsedNanoTime = System.nanoTime() - _nanoTimestamp
val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
@@ -138,6 +98,8 @@ class SimpleMetricCollectionContext(traceName: String, val token: String, metada
}
}
+ def startSegment(segmentName: String, segmentLabel: String): Segment = new DefaultSegment(segmentName, segmentLabel)
+
@tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
val segment = finishedSegments.poll()
if (segment != null) {
@@ -146,8 +108,8 @@ class SimpleMetricCollectionContext(traceName: String, val token: String, metada
}
}
- private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
- finishedSegments.add(SegmentData(identity, duration, metadata))
+ private def finishSegment(segmentName: String, label: String, duration: Long): Unit = {
+ finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, label), duration))
if (!_isOpen) {
metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
@@ -156,16 +118,81 @@ class SimpleMetricCollectionContext(traceName: String, val token: String, metada
}
}
- def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
- new SimpleMetricCollectionCompletionHandle(identity, metadata)
+ class DefaultSegment(segmentName: String, val label: String) extends Segment {
+ private val _segmentStartNanoTime = System.nanoTime()
+ @volatile private var _segmentName = segmentName
+ @volatile private var _isOpen = true
- class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
- val segmentStartNanoTime = System.nanoTime()
+ def name: String = _segmentName
+ def rename(newName: String): Unit = _segmentName = newName
+ def isEmpty: Boolean = false
- def finish(metadata: Map[String, String] = Map.empty): Unit = {
+ def finish: Unit = {
val segmentFinishNanoTime = System.nanoTime()
- finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata)
+ finishSegment(name, label, (segmentFinishNanoTime - _segmentStartNanoTime))
+ }
+ }
+}
+
+case class SegmentMetricIdentity(name: String, label: String) extends MetricIdentity
+case class SegmentData(identity: SegmentMetricIdentity, duration: Long)
+
+object SegmentMetricIdentityLabel {
+ val HttpClient = "http-client"
+}
+
+sealed trait LevelOfDetail
+object LevelOfDetail {
+ case object OnlyMetrics extends LevelOfDetail
+ case object SimpleTrace extends LevelOfDetail
+ case object FullTrace extends LevelOfDetail
+}
+
+sealed trait TraceContextOrigin
+object TraceContextOrigin {
+ case object Local extends TraceContextOrigin
+ case object Remote extends TraceContextOrigin
+}
+
+trait TraceContextAware extends Serializable {
+ def traceContext: TraceContext
+}
+
+object TraceContextAware {
+ def default: TraceContextAware = new DefaultTraceContextAware
+
+ class DefaultTraceContextAware extends TraceContextAware {
+ @transient val traceContext = TraceRecorder.currentContext
+
+ //
+ // Beware of this hack, it might bite us in the future!
+ //
+ // When using remoting/cluster all messages carry the TraceContext in the envelope in which they
+ // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is
+ // available (if any) when the system messages are read and this will make sure that it is correctly
+ // captured and propagated.
+ @throws[ObjectStreamException]
+ private def readResolve: AnyRef = {
+ new DefaultTraceContextAware
}
}
}
+trait TimestampedTraceContextAware extends TraceContextAware {
+ def captureNanoTime: Long
+}
+
+object TimestampedTraceContextAware {
+ def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware {
+ @transient val captureNanoTime = System.nanoTime()
+ }
+}
+
+trait SegmentAware {
+ @volatile var segment: Segment = EmptyTraceContext.EmptySegment
+}
+
+object SegmentAware {
+ def default: SegmentAware = new DefaultSegmentAware
+ class DefaultSegmentAware extends DefaultTraceContextAware with SegmentAware {}
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
index 3ff074b6..0766af74 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -24,18 +24,20 @@ object TraceLocal {
type ValueType
}
- def store(key: TraceLocalKey)(value: key.ValueType): Unit =
- TraceRecorder.currentContext.map(_.traceLocalStorage.store(key)(value))
-
- def retrieve(key: TraceLocalKey): Option[key.ValueType] =
- TraceRecorder.currentContext.flatMap(_.traceLocalStorage.retrieve(key))
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value)
+ case EmptyTraceContext ⇒ // Can't store in the empty context.
+ }
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key)
+ case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
+ }
}
class TraceLocalStorage {
val underlyingStorage = TrieMap[TraceLocal.TraceLocalKey, Any]()
def store(key: TraceLocalKey)(value: key.ValueType): Unit = underlyingStorage.put(key, value)
-
def retrieve(key: TraceLocalKey): Option[key.ValueType] = underlyingStorage.get(key).map(_.asInstanceOf[key.ValueType])
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index 778edc42..8da187cb 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -23,65 +23,70 @@ import kamon.macros.InlineTraceContextMacro
import scala.util.Try
import java.net.InetAddress
import akka.actor.ActorSystem
-import kamon.trace.TraceContext.SegmentIdentity
object TraceRecorder {
- private val traceContextStorage = new ThreadLocal[Option[TraceContext]] {
- override def initialValue(): Option[TraceContext] = None
+ private val traceContextStorage = new ThreadLocal[TraceContext] {
+ override def initialValue(): TraceContext = EmptyTraceContext
}
private val tokenCounter = new AtomicLong
private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
- def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet())
-
- private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String],
- system: ActorSystem): TraceContext = {
-
- // In the future this should select between implementations.
- val finalToken = token.getOrElse(newToken)
- new SimpleMetricCollectionContext(name, finalToken, metadata, TraceContextOrigin.Local, system)
+ def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet())
+
+ private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = {
+ new DefaultTraceContext(
+ name,
+ token.getOrElse(newToken),
+ izOpen = true,
+ LevelOfDetail.OnlyMetrics,
+ TraceContextOrigin.Local,
+ nanoTimeztamp = System.nanoTime,
+ system)
}
def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = {
- new SimpleMetricCollectionContext(
+ val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000)
+ new DefaultTraceContext(
traceName,
traceToken,
- Map.empty,
+ isOpen,
+ LevelOfDetail.OnlyMetrics,
TraceContextOrigin.Remote,
- system,
- startMilliTime,
- isOpen)
+ equivalentNanotime,
+ system)
}
- def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context)
+ def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
- def clearContext: Unit = traceContextStorage.set(None)
+ def clearContext: Unit = traceContextStorage.set(EmptyTraceContext)
- def currentContext: Option[TraceContext] = traceContextStorage.get()
+ def currentContext: TraceContext = traceContextStorage.get()
- def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = {
- val ctx = newTraceContext(name, token, metadata, system)
- traceContextStorage.set(Some(ctx))
+ def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = {
+ val ctx = newTraceContext(name, token, system)
+ traceContextStorage.set(ctx)
}
- def startSegment(identity: SegmentIdentity, metadata: Map[String, String] = Map.empty): Option[SegmentCompletionHandle] =
- currentContext.map(_.startSegment(identity, metadata))
-
- def rename(name: String): Unit = currentContext.map(_.rename(name))
+ def rename(name: String): Unit = currentContext.rename(name)
- def withNewTraceContext[T](name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(thunk: ⇒ T)(implicit system: ActorSystem): T =
- withTraceContext(Some(newTraceContext(name, token, metadata, system)))(thunk)
+ def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T =
+ withTraceContext(newTraceContext(name, token, system))(thunk)
- def withTraceContext[T](context: Option[TraceContext])(thunk: ⇒ T): T = {
+ def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = {
val oldContext = currentContext
setContext(context)
try thunk finally setContext(oldContext)
}
- def withInlineTraceContextReplacement[T](traceCtx: Option[TraceContext])(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, Option[TraceContext]]
+ def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match {
+ case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system))
+ case EmptyTraceContext ⇒ None
+ }
+
+ def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext]
- def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata))
+ def finish(): Unit = currentContext.finish()
}
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
index 4b7dbb28..f052f009 100644
--- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
@@ -20,5 +20,11 @@ import ch.qos.logback.classic.spi.ILoggingEvent
import kamon.trace.TraceRecorder
class LogbackTraceTokenConverter extends ClassicConverter {
- def convert(event: ILoggingEvent): String = TraceRecorder.currentContext.map(_.token).getOrElse("undefined")
+ def convert(event: ILoggingEvent): String = {
+ val ctx = TraceRecorder.currentContext
+ if (ctx.isEmpty)
+ "undefined"
+ else
+ ctx.token
+ }
}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala
index cb39f5e6..d79ccbe0 100644
--- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala
@@ -3,7 +3,7 @@ package kamon.instrumentation.akka
import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop }
import akka.actor._
import akka.testkit.{ ImplicitSender, TestKit }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ EmptyTraceContext, TraceRecorder }
import org.scalatest.WordSpecLike
import scala.concurrent.duration._
@@ -59,7 +59,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-s
// Ensure we didn't tie the actor with the context
supervisor ! "context"
- expectMsg(None)
+ expectMsg(EmptyTraceContext)
}
"the actor is restarted" in {
@@ -76,7 +76,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-s
// Ensure we didn't tie the actor with the context
supervisor ! "context"
- expectMsg(None)
+ expectMsg(EmptyTraceContext)
}
"the actor is stopped" in {
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
index d914ffe8..17312ba3 100644
--- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
@@ -54,7 +54,6 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M
}
val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext
- capturedCtx should be('defined)
capturedCtx should equal(testTraceContext)
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
index 8a87408d..6453dd77 100644
--- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
@@ -5,8 +5,7 @@ import akka.testkit.{ ImplicitSender, TestKitBase }
import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-import kamon.trace.TraceContext.SegmentIdentity
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentity, TraceRecorder }
import org.scalatest.{ Matchers, WordSpecLike }
class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
@@ -54,39 +53,37 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with
"record the elapsed time for segments that occur inside a given trace" in {
TraceRecorder.withNewTraceContext("trace-with-segments") {
- val segmentHandle = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment"))
- segmentHandle.get.finish()
+ val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-label")
+ segment.finish()
TraceRecorder.finish()
}
val snapshot = takeSnapshotOf("trace-with-segments")
snapshot.elapsedTime.numberOfMeasurements should be(1)
snapshot.segments.size should be(1)
- snapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1)
+ snapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1)
}
"record the elapsed time for segments that finish after their correspondent trace has finished" in {
- val segmentHandle = TraceRecorder.withNewTraceContext("closing-segment-after-trace") {
- val sh = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment"))
+ val segment = TraceRecorder.withNewTraceContext("closing-segment-after-trace") {
+ val s = TraceRecorder.currentContext.startSegment("test-segment", "test-label")
TraceRecorder.finish()
- sh
+ s
}
val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace")
beforeFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(1)
beforeFinishSegmentSnapshot.segments.size should be(0)
- segmentHandle.get.finish()
+ segment.finish()
val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace")
afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0)
afterFinishSegmentSnapshot.segments.size should be(1)
- afterFinishSegmentSnapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1)
+ afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1)
}
}
- case class TraceMetricsTestSegment(name: String) extends SegmentIdentity
-
def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
index 4d0049f1..e2031a72 100644
--- a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
@@ -3,7 +3,6 @@ package kamon.trace
import akka.actor.ActorSystem
import akka.testkit.{ ImplicitSender, TestKitBase }
import com.typesafe.config.ConfigFactory
-import kamon.trace.TraceContext.SegmentIdentity
import org.scalatest.{ Matchers, WordSpecLike }
class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
@@ -38,7 +37,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma
"allow starting a trace within a specified block of code, and only within that block of code" in {
val createdContext = TraceRecorder.withNewTraceContext("start-context") {
TraceRecorder.currentContext should not be empty
- TraceRecorder.currentContext.get
+ TraceRecorder.currentContext
}
TraceRecorder.currentContext shouldBe empty
@@ -48,7 +47,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma
"allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in {
val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) {
TraceRecorder.currentContext should not be empty
- TraceRecorder.currentContext.get
+ TraceRecorder.currentContext
}
TraceRecorder.currentContext shouldBe empty
@@ -70,7 +69,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma
"allow renaming a trace" in {
val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") {
TraceRecorder.rename("renamed-trace")
- TraceRecorder.currentContext.get
+ TraceRecorder.currentContext
}
TraceRecorder.currentContext shouldBe empty
@@ -79,17 +78,22 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma
"allow creating a segment within a trace" in {
val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") {
- val segmentHandle = TraceRecorder.startSegment(TraceManipulationTestSegment("segment-1"))
-
- TraceRecorder.currentContext.get
+ val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-label")
+ TraceRecorder.currentContext
}
TraceRecorder.currentContext shouldBe empty
createdContext.name shouldBe ("trace-with-segments")
-
}
- }
- case class TraceManipulationTestSegment(name: String) extends SegmentIdentity
+ "allow renaming a segment" in {
+ TraceRecorder.withNewTraceContext("trace-with-renamed-segment") {
+ val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label")
+ segment.name should be("original-segment-name")
+ segment.rename("new-segment-name")
+ segment.name should be("new-segment-name")
+ }
+ }
+ }
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
index 4bb0ad3a..08b5df99 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
@@ -34,10 +34,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging {
val params = new java.util.HashMap[String, String]()
val ctx = error.asInstanceOf[TraceContextAware].traceContext
-
- for (c ← ctx) {
- params.put("TraceToken", c.token)
- }
+ params.put("TraceToken", ctx.token)
if (error.cause == Error.NoCause) {
NR.noticeError(error.message.toString, params)
diff --git a/kamon-play/src/main/resources/reference.conf b/kamon-play/src/main/resources/reference.conf
index 72266a0c..5ad070ce 100644
--- a/kamon-play/src/main/resources/reference.conf
+++ b/kamon-play/src/main/resources/reference.conf
@@ -3,14 +3,24 @@
# ================================== #
kamon {
- metrics {
- tick-interval = 1 hour
- }
-
play {
- include-trace-token-header = true
+
+ # Header name used when propagating the `TraceContext.token` value across applications.
trace-token-header-name = "X-Trace-Token"
+ # When set to true, Kamon will automatically set and propogate the `TraceContext.token` value under the following
+ # conditions:
+ # - When a server side request is received containing the trace token header, the new `TraceContext` will have that
+ # some token, and once the response to that request is ready, the trace token header is also included in the
+ # response.
+ # - When a WS client request is issued and a `TraceContext` is available, the trace token header will be included
+ # in the request headers.
+ automatic-trace-token-propagation = true
+
+ # Fully qualified name of the implementation of kamon.play.PlayNameGenerator that will be used for assigning names
+ # to traces and client http segments.
+ name-generator = kamon.play.DefaultPlayNameGenerator
+
dispatcher = ${kamon.default-dispatcher}
}
} \ No newline at end of file
diff --git a/kamon-play/src/main/scala/kamon/play/Play.scala b/kamon-play/src/main/scala/kamon/play/Play.scala
index 7b8777e0..6e2de3c1 100644
--- a/kamon-play/src/main/scala/kamon/play/Play.scala
+++ b/kamon-play/src/main/scala/kamon/play/Play.scala
@@ -21,6 +21,8 @@ import akka.event.Logging
import kamon.Kamon
import kamon.http.HttpServerMetrics
import kamon.metric.Metrics
+import play.api.libs.ws.WSRequest
+import play.api.mvc.RequestHeader
object Play extends ExtensionId[PlayExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = Play
@@ -35,7 +37,22 @@ class PlayExtension(private val system: ExtendedActorSystem) extends Kamon.Exten
val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get
val defaultDispatcher = system.dispatchers.lookup(config.getString("dispatcher"))
- val includeTraceToken: Boolean = config.getBoolean("include-trace-token-header")
+ val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation")
val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+
+ private val nameGeneratorFQN = config.getString("name-generator")
+ private val nameGenerator: PlayNameGenerator = system.dynamicAccess.createInstanceFor[PlayNameGenerator](nameGeneratorFQN, Nil).get
+
+ def generateTraceName(requestHeader: RequestHeader): String = nameGenerator.generateTraceName(requestHeader)
+ def generateHttpClientSegmentName(request: WSRequest): String = nameGenerator.generateHttpClientSegmentName(request)
}
+trait PlayNameGenerator {
+ def generateTraceName(requestHeader: RequestHeader): String
+ def generateHttpClientSegmentName(request: WSRequest): String
+}
+
+class DefaultPlayNameGenerator extends PlayNameGenerator {
+ def generateTraceName(requestHeader: RequestHeader): String = requestHeader.method + ": " + requestHeader.uri
+ def generateHttpClientSegmentName(request: WSRequest): String = request.url
+}
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala
index 92686ff0..e2ffd3f9 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala
@@ -15,7 +15,7 @@
package kamon.play.instrumentation
-import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder }
+import kamon.trace._
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
import org.slf4j.MDC
@@ -52,21 +52,24 @@ class LoggerLikeInstrumentation {
object LoggerLikeInstrumentation {
@inline final def withMDC[A](block: ⇒ A): A = {
- val keys = TraceRecorder.currentContext.map(extractProperties).map(putAndExtractKeys)
+ val keys = putAndExtractKeys(extractProperties(TraceRecorder.currentContext))
- try block finally keys.map(k ⇒ k.foreach(MDC.remove(_)))
+ try block finally keys.foreach(k ⇒ MDC.remove(k))
}
def putAndExtractKeys(values: Iterable[Map[String, Any]]): Iterable[String] = values.map {
value ⇒ value.map { case (key, value) ⇒ MDC.put(key, value.toString); key }
}.flatten
- def extractProperties(ctx: TraceContext): Iterable[Map[String, Any]] = ctx.traceLocalStorage.underlyingStorage.values.map {
- case traceLocalValue @ (p: Product) ⇒ {
- val properties = p.productIterator
- traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap
- }
- case anything ⇒ Map.empty[String, Any]
+ def extractProperties(traceContext: TraceContext): Iterable[Map[String, Any]] = traceContext match {
+ case ctx: DefaultTraceContext ⇒
+ ctx.traceLocalStorage.underlyingStorage.values.collect {
+ case traceLocalValue @ (p: Product) ⇒ {
+ val properties = p.productIterator
+ traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap
+ }
+ }
+ case EmptyTraceContext ⇒ Iterable.empty[Map[String, Any]]
}
}
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
index c761e72f..ca95781e 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
@@ -42,7 +42,7 @@ class RequestInstrumentation {
def beforeRouteRequest(requestHeader: RequestHeader): Unit = {
val system = Akka.system()
val playExtension = Kamon(Play)(system)
- val defaultTraceName: String = s"${requestHeader.method}: ${requestHeader.uri}"
+ val defaultTraceName = playExtension.generateTraceName(requestHeader)
val token = if (playExtension.includeTraceToken) {
requestHeader.headers.toSimpleMap.find(_._1 == playExtension.traceTokenHeaderName).map(_._2)
@@ -54,16 +54,23 @@ class RequestInstrumentation {
@Around("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)")
def aroundDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = {
val essentialAction = (requestHeader: RequestHeader) ⇒ {
+ // TODO: Move to a Kamon-specific dispatcher.
val executor = Kamon(Play)(Akka.system()).defaultDispatcher
def onResult(result: Result): Result = {
- TraceRecorder.finish()
- TraceRecorder.currentContext.map { ctx ⇒
- val playExtension = Kamon(Play)(ctx.system)
+
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ ctx.finish()
+
+ val playExtension = Kamon(Play)(system)
recordHttpServerMetrics(result.header, ctx.name, playExtension)
- if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token)
- else result
- }.getOrElse(result)
+
+ if (playExtension.includeTraceToken)
+ result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token)
+ else
+ result
+
+ } getOrElse (result)
}
//override the current trace name
@@ -76,8 +83,8 @@ class RequestInstrumentation {
}
@Before("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)")
- def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = request.traceContext.map {
- ctx ⇒ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(ctx.system))
+ def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(system))
}
private def recordHttpServerMetrics(header: ResponseHeader, traceName: String, playExtension: PlayExtension): Unit =
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
index 87467050..125db85e 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
@@ -17,12 +17,10 @@
package kamon.play.instrumentation
import kamon.Kamon
-import kamon.metric.TraceMetrics.HttpClientRequest
import kamon.play.Play
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
-import play.api.libs.ws.ning.NingWSRequest
import play.api.libs.ws.{ WSRequest, WSResponse }
import scala.concurrent.Future
@@ -35,28 +33,15 @@ class WSInstrumentation {
@Around("onExecuteRequest(request)")
def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = {
-
- import kamon.play.instrumentation.WSInstrumentation._
-
- TraceRecorder.currentContext.map { ctx ⇒
- val executor = Kamon(Play)(ctx.system).defaultDispatcher
- val segmentHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request))
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val playExtension = Kamon(Play)(system)
+ val executor = playExtension.defaultDispatcher
+ val segmentName = playExtension.generateHttpClientSegmentName(request)
+ val segment = ctx.startSegment(segmentName, SegmentMetricIdentityLabel.HttpClient)
val response = pjp.proceed().asInstanceOf[Future[WSResponse]]
- response.map(result ⇒ segmentHandle.map(_.finish()))(executor)
+ response.map(result ⇒ segment.finish())(executor)
response
- }.getOrElse(pjp.proceed())
- }
-}
-
-object WSInstrumentation {
-
- def uri(request: WSRequest): java.net.URI = request.asInstanceOf[NingWSRequest].builder.build().getURI
-
- def basicRequestAttributes(request: WSRequest): Map[String, String] = {
- Map[String, String](
- "host" -> uri(request).getHost,
- "path" -> uri(request).getPath,
- "method" -> request.method)
+ } getOrElse (pjp.proceed())
}
} \ No newline at end of file
diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
index 2afd31fd..3feb6246 100644
--- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
@@ -117,7 +117,7 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
"respond to the Async Action with X-Trace-Token and the renamed trace" in {
val result = Await.result(route(FakeRequest(GET, "/async-renamed").withHeaders(traceTokenHeader)).get, 10 seconds)
- TraceRecorder.currentContext.map(_.name) must be(Some("renamed-trace"))
+ TraceRecorder.currentContext.name must be("renamed-trace")
Some(result.header.headers(traceTokenHeaderName)) must be(expectedToken)
}
diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
index b72659d2..bf1ead05 100644
--- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
@@ -17,9 +17,9 @@
package kamon.play
import kamon.Kamon
-import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
import kamon.metric.{ Metrics, TraceMetrics }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import org.scalatest.{ Matchers, WordSpecLike }
import org.scalatestplus.play.OneServerPerSuite
import play.api.libs.ws.WS
@@ -49,7 +49,7 @@ class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPer
val snapshot = takeSnapshotOf("GET: /inside")
snapshot.elapsedTime.numberOfMeasurements should be(1)
snapshot.segments.size should be(1)
- snapshot.segments(HttpClientRequest("http://localhost:19001/async")).numberOfMeasurements should be(1)
+ snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
"propagate the TraceContext outside an Action and complete the WS request" in {
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index 3e6e982e..878c3c8c 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -21,10 +21,9 @@ import akka.routing.RoundRobinPool
import akka.util.Timeout
import kamon.Kamon
import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.TraceMetrics.HttpClientRequest
import kamon.metric._
import kamon.spray.KamonTraceDirectives
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, TraceRecorder }
import spray.http.{ StatusCodes, Uri }
import spray.httpx.RequestBuilding
import spray.routing.SimpleRoutingApp
@@ -128,9 +127,9 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
} ~
path("segment") {
complete {
- val segment = TraceRecorder.startSegment(HttpClientRequest("hello-world"))
+ val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentMetricIdentityLabel.HttpClient)
(replier ? "hello").mapTo[String].onComplete { t ⇒
- segment.get.finish()
+ segment.finish()
}
"segment"
diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf
index d497e681..5c5e9317 100644
--- a/kamon-spray/src/main/resources/reference.conf
+++ b/kamon-spray/src/main/resources/reference.conf
@@ -16,6 +16,9 @@ kamon {
# in the `HttpRequest` headers.
automatic-trace-token-propagation = true
+ # Fully qualified name of the implementation of kamon.spray.SprayNameGenerator that will be used for assigning names
+ # to traces and client http segments.
+ name-generator = kamon.spray.DefaultSprayNameGenerator
client {
# Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
index 76adb214..c1c81116 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
@@ -43,6 +43,9 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte
val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get
// It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it.
+ private val nameGeneratorFQN = config.getString("name-generator")
+ private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems.
+
val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
config.getString("client.segment-collection-strategy") match {
case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining
@@ -51,6 +54,19 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte
s"only pipelining and internal are valid options.")
}
- // Later we should expose a way for the user to customize this.
- def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address
+ def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request)
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request)
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request)
+}
+
+trait SprayNameGenerator {
+ def generateTraceName(request: HttpRequest): String
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String
+ def generateHostLevelApiSegmentName(request: HttpRequest): String
+}
+
+class DefaultSprayNameGenerator extends SprayNameGenerator {
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = request.method.value + ": " + request.uri.path
+ def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = request.uri.authority.host.address
}
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index df1d2b59..94fc3572 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -18,10 +18,9 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
-import spray.http.HttpHeaders.{ RawHeader, Host }
-import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware }
-import kamon.metric.TraceMetrics.HttpClientRequest
+import spray.http._
+import spray.http.HttpHeaders.RawHeader
+import kamon.trace._
import kamon.Kamon
import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
@@ -32,58 +31,77 @@ import akka.util.Timeout
class ClientRequestInstrumentation {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixin: SegmentCompletionHandleAware = SegmentCompletionHandleAware.default
+ def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default
- @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
- def requestContextCreation(ctx: SegmentCompletionHandleAware, request: HttpRequest): Unit = {}
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default
+
+ @DeclareMixin("spray.http.HttpRequest")
+ def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default
- @After("requestContextCreation(ctx, request)")
- def afterRequestContextCreation(ctx: SegmentCompletionHandleAware, request: HttpRequest): Unit = {
- // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
- // completion handle the first time we create one.
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
+ def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {}
- // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
- if (ctx.segmentCompletionHandle.isEmpty) {
- TraceRecorder.currentContext.map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ @After("requestContextCreation(requestContext, request)")
+ def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {
+ // This read to requestContext.traceContext takes care of initializing the aspect timely.
+ requestContext.traceContext
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
- ctx.segmentCompletionHandle = Some(completionHandle)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ if (requestContext.segment.isEmpty) {
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ requestContext.segment = segment
}
+
+ } else {
+
+ // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a
+ // name again here is that when the request was initially sent it might not have the Host information available
+ // and it might be important to decide a proper segment name.
+
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ request.asInstanceOf[SegmentAware].segment.rename(clientRequestName)
}
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
- def copyingRequestContext(old: SegmentCompletionHandleAware): Unit = {}
+ def copyingRequestContext(old: TraceContextAware): Unit = {}
@Around("copyingRequestContext(old)")
- def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentCompletionHandleAware): Any = {
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = {
TraceRecorder.withInlineTraceContextReplacement(old.traceContext) {
pjp.proceed()
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
- def dispatchToCommander(requestContext: SegmentCompletionHandleAware, message: Any): Unit = {}
+ def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {}
@Around("dispatchToCommander(requestContext, message)")
- def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentCompletionHandleAware, message: Any) = {
- requestContext.traceContext match {
- case ctx @ Some(_) ⇒
- TraceRecorder.withInlineTraceContextReplacement(ctx) {
- if (message.isInstanceOf[HttpMessageEnd])
- requestContext.segmentCompletionHandle.map(_.finish(Map.empty))
-
- pjp.proceed()
- }
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
+ if (requestContext.traceContext.nonEmpty) {
+ TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) {
+ if (message.isInstanceOf[HttpMessageEnd])
+ requestContext.asInstanceOf[SegmentAware].segment.finish()
- case None ⇒ pjp.proceed()
- }
+ pjp.proceed()
+ }
+ } else pjp.proceed()
+ }
+
+ @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)")
+ def copyingHttpRequest(old: SegmentAware): Unit = {}
+
+ @Around("copyingHttpRequest(old)")
+ def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware]
+ copiedHttpRequest.segment = old.segment
+ copiedHttpRequest
}
@Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
@@ -94,47 +112,42 @@ class ClientRequestInstrumentation {
val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
(request: HttpRequest) ⇒ {
- val responseFuture = originalSendReceive.apply(request)
- TraceRecorder.currentContext.map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
-
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
-
- responseFuture.onComplete { result ⇒
- completionHandle.finish(Map.empty)
- }(ec)
- }
- }
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
+ val segment =
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining)
+ ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient)
+ else
+ EmptyTraceContext.EmptySegment
- responseFuture
- }
+ request.asInstanceOf[SegmentAware].segment = segment
- }
+ val responseFuture = originalSendReceive.apply(request)
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
+
+ responseFuture
- def basicRequestAttributes(request: HttpRequest): Map[String, String] = {
- Map[String, String](
- "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
- "path" -> request.uri.path.toString(),
- "method" -> request.method.toString())
+ } getOrElse (originalSendReceive.apply(request))
+ }
}
- @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
- def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
+ @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {}
- @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
- def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
- val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = {
+ val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
if (sprayExtension.includeTraceToken)
- RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders
+ RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
else
defaultHeaders
- } getOrElse defaultHeaders
- pjp.proceed(Array(modifiedHeaders))
+ } getOrElse (defaultHeaders)
+
+ pjp.proceed(Array[AnyRef](request, modifiedHeaders))
}
}
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
index 69b0160e..eb25412b 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -16,11 +16,10 @@
package spray.can.server
import org.aspectj.lang.annotation._
-import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware }
+import kamon.trace._
import akka.actor.ActorSystem
import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
import akka.event.Logging.Warning
-import scala.Some
import kamon.Kamon
import kamon.spray.{ SprayExtension, Spray }
import org.aspectj.lang.ProceedingJoinPoint
@@ -40,7 +39,7 @@ class ServerRequestInstrumentation {
val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
val sprayExtension = Kamon(Spray)(system)
- val defaultTraceName: String = request.method.value + ": " + request.uri.path
+ val defaultTraceName = sprayExtension.generateTraceName(request)
val token = if (sprayExtension.includeTraceToken) {
request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value)
} else None
@@ -67,40 +66,36 @@ class ServerRequestInstrumentation {
val incomingContext = TraceRecorder.currentContext
val storedContext = openRequest.traceContext
- verifyTraceContextConsistency(incomingContext, storedContext)
- incomingContext match {
- case None ⇒ pjp.proceed()
- case Some(traceContext) ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ // The stored context is always a DefaultTraceContext if the instrumentation is running
+ val system = storedContext.asInstanceOf[DefaultTraceContext].system
- val proceedResult = if (sprayExtension.includeTraceToken) {
- val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token)
- pjp.proceed(Array(openRequest, responseWithHeader))
+ verifyTraceContextConsistency(incomingContext, storedContext, system)
- } else pjp.proceed
+ if (incomingContext.isEmpty)
+ pjp.proceed()
+ else {
+ val sprayExtension = Kamon(Spray)(system)
- TraceRecorder.finish()
- recordHttpServerMetrics(response, traceContext.name, sprayExtension)
- proceedResult
- }
- }
+ val proceedResult = if (sprayExtension.includeTraceToken) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token)
+ pjp.proceed(Array(openRequest, responseWithHeader))
- def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = {
- for (original ← storedTraceContext) {
- incomingTraceContext match {
- case Some(incoming) if original.token != incoming.token ⇒
- publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system)
+ } else pjp.proceed
- case Some(_) ⇒ // nothing to do here.
-
- case None ⇒
- publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system)
- }
+ TraceRecorder.finish()
+ recordHttpServerMetrics(response, incomingContext.name, sprayExtension)
+ proceedResult
}
+ }
+ def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = {
def publishWarning(text: String, system: ActorSystem): Unit =
system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text))
+ if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token)
+ publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system)
+ else
+ publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system)
}
def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit =
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 54329645..57f9ebe1 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -18,22 +18,23 @@ package kamon.spray
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
import kamon.Kamon
import kamon.metric.{ TraceMetrics, Metrics }
-import spray.client.pipelining
+import spray.client.pipelining.sendReceive
import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
-import akka.pattern.pipe
-import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
"""
|akka {
@@ -41,8 +42,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
|}
|
|kamon {
+ | spray {
+ | name-generator = kamon.spray.TestSprayNameGenerator
+ | }
+ |
| metrics {
- | tick-interval = 2 seconds
+ | tick-interval = 1 hour
|
| filters = [
| {
@@ -57,19 +62,48 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
""".stripMargin))
implicit def ec = system.dispatcher
+ implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis))
- "the client instrumentation" when {
- "configured to do automatic-trace-token-propagation" should {
- "include the trace token header on spray-client requests" in {
+ "the spray client instrumentation" when {
+ "using the request-level api" should {
+ "include the trace token header if automatic-trace-token-propagation is enabled" in {
enableAutomaticTraceTokenPropagation()
+ val (_, server, bound) = buildSHostConnectorAndServer
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+ }
+
+ "not include the trace token header if automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+ val (_, server, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
- client.send(hostConnector, Get("/dummy-path"))
- TraceRecorder.currentContext
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
+ }
+
+ (TraceRecorder.currentContext, rF)
}
// Accept the connection at the server side
@@ -78,24 +112,82 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+ }
+
+ "start and finish a segment that must be named using the request level api name assignation" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val transport = TestProbe()
+ val (_, _, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") {
+ val rF = sendReceive(transport.ref)(ec, 10.seconds) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Receive the request and reply back
+ transport.expectMsgType[HttpRequest]
+ transport.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
+ }
+
+ "rename a request level api segment once it reaches the relevant host connector" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val (_, server, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
}
- "not configured to do automatic-trace-token-propagation" should {
- "not include the trace token header on spray-client requests" in {
- disableAutomaticTraceTokenPropagation()
+ "using the host-level api" should {
+ "include the trace token header on spray-client requests if automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+ enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
client.send(hostConnector, Get("/dummy-path"))
TraceRecorder.currentContext
}
@@ -106,30 +198,24 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
}
- }
- "configured to use pipelining segment collection strategy" should {
- "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in {
- enablePipeliningSegmentCollectionStrategy()
+ "not include the trace token header on spray-client requests if automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+ enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") {
- pipeline(Get("/dummy-path")) to client.ref
+ val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
TraceRecorder.currentContext
}
@@ -138,39 +224,25 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
server.reply(Http.Register(server.ref))
// Receive the request and reply back
- val req = server.expectMsgType[HttpRequest]
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
-
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
- traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
- traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
- recordedSegment should not be empty
- recordedSegment map { segmentMetrics ⇒
- segmentMetrics.numberOfMeasurements should be(1L)
- }
+ testContext.finish()
}
- }
- "configured to use internal segment collection strategy" should {
- "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in {
+ "start and finish a segment that must be named using the host level api name assignation" in {
+ disableAutomaticTraceTokenPropagation()
enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") {
- pipeline(Get("/dummy-path")) to client.ref
+ val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") {
+ client.send(hostConnector, Get("/host-level-api-segment"))
TraceRecorder.currentContext
}
@@ -179,21 +251,17 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
server.reply(Http.Register(server.ref))
// Receive the request and reply back
- server.expectMsgType[HttpRequest]
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
+ testContext.finish()
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
- traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
- traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
- recordedSegment should not be empty
- recordedSegment map { segmentMetrics ⇒
- segmentMetrics.numberOfMeasurements should be(1L)
- }
+ val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
}
}
@@ -208,6 +276,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
metricsOption.get.asInstanceOf[TraceMetricsSnapshot]
}
+ def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
+ val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
+ recorder.get.collect(collectionContext)
+ }
+
def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
@@ -227,3 +301,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
field.set(target, include)
}
}
+
+class TestSprayNameGenerator extends SprayNameGenerator {
+ def generateTraceName(request: HttpRequest): String = request.uri.path.toString()
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = "request-level " + request.uri.path.toString()
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = "host-level " + request.uri.path.toString()
+}
diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
index 65506770..379b8fc8 100644
--- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
@@ -45,13 +45,13 @@ trait TestServer {
probe.sender
}
- def buildSHostConnectorAndServer: (ActorRef, TestProbe) = {
+ def buildSHostConnectorAndServer: (ActorRef, TestProbe, Http.Bound) = {
val serverHandler = TestProbe()
IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
val bound = serverHandler.expectMsgType[Bound](10 seconds)
val client = httpHostConnector(bound)
- (client, serverHandler)
+ (client, serverHandler, bound)
}
private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = {
diff --git a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
index de867035..825cc718 100644
--- a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
+++ b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
@@ -17,7 +17,7 @@
package akka.testkit
import org.aspectj.lang.annotation._
-import kamon.trace.{ TraceContextAware, TraceRecorder }
+import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import akka.testkit.TestActor.RealMessage
@@ -43,7 +43,7 @@ class TestProbeInstrumentation {
def aroundTestProbeReply(pjp: ProceedingJoinPoint, testProbe: TestProbe): Any = {
val traceContext = testProbe.lastMessage match {
case msg: RealMessage ⇒ msg.asInstanceOf[TraceContextAware].traceContext
- case _ ⇒ None
+ case _ ⇒ EmptyTraceContext
}
TraceRecorder.withTraceContext(traceContext) {