aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-10-26 02:21:11 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2014-10-26 02:21:11 +0200
commitcd8dce169b4231bf533445656bfb5a35034a6304 (patch)
tree4561bf3b8b7890891990a0d1b500155975d54277 /kamon-spray
parentea1a0d5d76988992227eb30b0baaf8e97678c946 (diff)
downloadKamon-cd8dce169b4231bf533445656bfb5a35034a6304.tar.gz
Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.tar.bz2
Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.zip
= all: upgrade to be compatible with the latest code in core
Diffstat (limited to 'kamon-spray')
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala104
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala47
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala20
3 files changed, 82 insertions, 89 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index d9cdde08..cfd204df 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -19,9 +19,8 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
-import spray.http.HttpHeaders.{ RawHeader, Host }
-import kamon.trace.{SegmentAware, TraceRecorder, SegmentCompletionHandleAware}
-import kamon.metric.TraceMetrics.HttpClientRequest
+import spray.http.HttpHeaders.RawHeader
+import kamon.trace._
import kamon.Kamon
import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
@@ -34,26 +33,28 @@ class ClientRequestInstrumentation {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
def mixin: SegmentAware = SegmentAware.default
- @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
- def requestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {}
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
+ def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {}
- @After("requestContextCreation(ctx, request)")
- def afterRequestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {
+ @After("requestContextCreation(requestContext, request)")
+ def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {
// The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
- // completion handle the first time we create one.
+ // segment the first time we create one.
// The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
- if (ctx.segmentCompletionHandle.isEmpty) {
- TraceRecorder.currentContext.map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ if (requestContext.segment.isEmpty) {
+ TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒
+ val sprayExtension = Kamon(Spray)(ctx.system)
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
- ctx.segmentCompletionHandle = Some(completionHandle)
- }
+ requestContext.segment = segment
+ }
+
+ case EmptyTraceContext ⇒ // Nothing to do here.
}
}
}
@@ -73,17 +74,15 @@ class ClientRequestInstrumentation {
@Around("dispatchToCommander(requestContext, message)")
def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = {
- requestContext.traceContext match {
- case ctx @ Some(_) ⇒
- TraceRecorder.withInlineTraceContextReplacement(ctx) {
- if (message.isInstanceOf[HttpMessageEnd])
- requestContext.segment.finish()
+ if (requestContext.traceContext.nonEmpty) {
+ TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) {
+ if (message.isInstanceOf[HttpMessageEnd])
+ requestContext.segment.finish()
- pjp.proceed()
- }
+ pjp.proceed()
+ }
- case None ⇒ pjp.proceed()
- }
+ } else pjp.proceed()
}
@Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
@@ -95,18 +94,21 @@ class ClientRequestInstrumentation {
(request: HttpRequest) ⇒ {
val responseFuture = originalSendReceive.apply(request)
- TraceRecorder.currentContext.map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
-
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
-
- responseFuture.onComplete { result ⇒
- completionHandle.finish(Map.empty)
- }(ec)
- }
+
+ TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒
+ val sprayExtension = Kamon(Spray)(ctx.system)
+
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
+ }
+
+ case EmptyTraceContext ⇒ // Nothing to do here.
}
responseFuture
@@ -114,26 +116,22 @@ class ClientRequestInstrumentation {
}
- def basicRequestAttributes(request: HttpRequest): Map[String, String] = {
- Map[String, String](
- "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
- "path" -> request.uri.path.toString(),
- "method" -> request.method.toString())
- }
-
@Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
@Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
- val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
-
- if (sprayExtension.includeTraceToken)
- RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders
- else
- defaultHeaders
- } getOrElse defaultHeaders
+ val modifiedHeaders = TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒
+ val sprayExtension = Kamon(Spray)(ctx.system)
+
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ else
+ defaultHeaders
+
+ case EmptyTraceContext ⇒ defaultHeaders
+ }
pjp.proceed(Array(modifiedHeaders))
}
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
index 69b0160e..74d98564 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -16,11 +16,10 @@
package spray.can.server
import org.aspectj.lang.annotation._
-import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware }
+import kamon.trace._
import akka.actor.ActorSystem
import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
import akka.event.Logging.Warning
-import scala.Some
import kamon.Kamon
import kamon.spray.{ SprayExtension, Spray }
import org.aspectj.lang.ProceedingJoinPoint
@@ -67,40 +66,36 @@ class ServerRequestInstrumentation {
val incomingContext = TraceRecorder.currentContext
val storedContext = openRequest.traceContext
- verifyTraceContextConsistency(incomingContext, storedContext)
- incomingContext match {
- case None ⇒ pjp.proceed()
- case Some(traceContext) ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ // The stored context is always a DefaultTraceContext if the instrumentation is running
+ val system = storedContext.asInstanceOf[DefaultTraceContext].system
- val proceedResult = if (sprayExtension.includeTraceToken) {
- val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token)
- pjp.proceed(Array(openRequest, responseWithHeader))
+ verifyTraceContextConsistency(incomingContext, storedContext, system)
- } else pjp.proceed
+ if (incomingContext.isEmpty)
+ pjp.proceed()
+ else {
+ val sprayExtension = Kamon(Spray)(system)
- TraceRecorder.finish()
- recordHttpServerMetrics(response, traceContext.name, sprayExtension)
- proceedResult
- }
- }
+ val proceedResult = if (sprayExtension.includeTraceToken) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token)
+ pjp.proceed(Array(openRequest, responseWithHeader))
- def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = {
- for (original ← storedTraceContext) {
- incomingTraceContext match {
- case Some(incoming) if original.token != incoming.token ⇒
- publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system)
+ } else pjp.proceed
- case Some(_) ⇒ // nothing to do here.
-
- case None ⇒
- publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system)
- }
+ TraceRecorder.finish()
+ recordHttpServerMetrics(response, incomingContext.name, sprayExtension)
+ proceedResult
}
+ }
+ def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = {
def publishWarning(text: String, system: ActorSystem): Unit =
system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text))
+ if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token)
+ publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system)
+ else
+ publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system)
}
def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit =
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 54329645..fbf69c8a 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -21,7 +21,7 @@ import akka.actor.ActorSystem
import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentity, TraceRecorder }
import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
@@ -31,7 +31,7 @@ import spray.client.pipelining
import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
import akka.pattern.pipe
-import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
@@ -78,12 +78,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
}
}
@@ -106,12 +106,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
}
}
@@ -143,12 +143,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
client.expectMsgType[HttpResponse]
// Finish the trace
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2)
recordedSegment should not be empty
recordedSegment map { segmentMetrics ⇒
segmentMetrics.numberOfMeasurements should be(1L)
@@ -184,12 +184,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
client.expectMsgType[HttpResponse]
// Finish the trace
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2)
recordedSegment should not be empty
recordedSegment map { segmentMetrics ⇒
segmentMetrics.numberOfMeasurements should be(1L)