aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/main/scala/kamon/spray
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-spray/src/main/scala/kamon/spray')
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala4
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala (renamed from kamon-spray/src/main/scala/kamon/spray/Spray.scala)61
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala35
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala152
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala136
5 files changed, 359 insertions, 29 deletions
diff --git a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala
index e98b63d9..4eefee95 100644
--- a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala
@@ -17,11 +17,11 @@ package kamon.spray
import spray.routing.directives.BasicDirectives
import spray.routing._
-import kamon.trace.TraceRecorder
+import kamon.trace.TraceContext
trait KamonTraceDirectives extends BasicDirectives {
def traceName(name: String): Directive0 = mapRequest { req ⇒
- TraceRecorder.rename(name)
+ TraceContext.currentContext.rename(name)
req
}
}
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala
index ab8d6a7d..3df8d972 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala
@@ -18,47 +18,49 @@ package kamon.spray
import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
import akka.actor
+import akka.event.{ Logging, LoggingAdapter }
import kamon.Kamon
import kamon.http.HttpServerMetrics
-import kamon.metric.Metrics
+import kamon.metric.{ Entity, Metrics }
import spray.http.HttpHeaders.Host
import spray.http.HttpRequest
object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = Spray
- def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system)
+ def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtensionImpl(system)
val SegmentLibraryName = "spray-client"
}
-object ClientSegmentCollectionStrategy {
- sealed trait Strategy
- case object Pipelining extends Strategy
- case object Internal extends Strategy
+trait SprayExtension extends Kamon.Extension {
+ def settings: SprayExtensionSettings
+ def log: LoggingAdapter
+ def httpServerMetrics: HttpServerMetrics
+ def generateTraceName(request: HttpRequest): String
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String
+ def generateHostLevelApiSegmentName(request: HttpRequest): String
}
-class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
- private val config = system.settings.config.getConfig("kamon.spray")
+class SprayExtensionImpl(system: ExtendedActorSystem) extends SprayExtension {
+ val settings = SprayExtensionSettings(system)
+ val log = Logging(system, "SprayExtension")
- val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation")
- val traceTokenHeaderName: String = config.getString("trace-token-header-name")
- val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get
- // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it.
+ val httpServerMetrics = {
+ val metricsExtension = Metrics.get(system)
+ val factory = metricsExtension.instrumentFactory(HttpServerMetrics.category)
+ val entity = Entity("spray-server", HttpServerMetrics.category)
- private val nameGeneratorFQN = config.getString("name-generator")
- private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems.
+ Metrics.get(system).register(entity, new HttpServerMetrics(factory)).recorder
+ }
- val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
- config.getString("client.segment-collection-strategy") match {
- case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining
- case "internal" ⇒ ClientSegmentCollectionStrategy.Internal
- case other ⇒ throw new IllegalArgumentException(s"Configured segment-collection-strategy [$other] is invalid, " +
- s"only pipelining and internal are valid options.")
- }
+ def generateTraceName(request: HttpRequest): String =
+ settings.nameGenerator.generateTraceName(request)
- def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request)
- def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request)
- def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request)
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String =
+ settings.nameGenerator.generateRequestLevelApiSegmentName(request)
+
+ def generateHostLevelApiSegmentName(request: HttpRequest): String =
+ settings.nameGenerator.generateHostLevelApiSegmentName(request)
}
trait SprayNameGenerator {
@@ -68,14 +70,19 @@ trait SprayNameGenerator {
}
class DefaultSprayNameGenerator extends SprayNameGenerator {
- def hostFromHeaders(request: HttpRequest): Option[String] = request.header[Host].map(_.host)
def generateRequestLevelApiSegmentName(request: HttpRequest): String = {
val uriAddress = request.uri.authority.host.address
if (uriAddress.equals("")) hostFromHeaders(request).getOrElse("unknown-host") else uriAddress
}
- def generateHostLevelApiSegmentName(request: HttpRequest): String = hostFromHeaders(request).getOrElse("unknown-host")
+ def generateHostLevelApiSegmentName(request: HttpRequest): String =
+ hostFromHeaders(request).getOrElse("unknown-host")
+
+ def generateTraceName(request: HttpRequest): String =
+ request.method.value + ": " + request.uri.path
+
+ private def hostFromHeaders(request: HttpRequest): Option[String] =
+ request.header[Host].map(_.host)
- def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path
}
diff --git a/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala
new file mode 100644
index 00000000..44c71eaf
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala
@@ -0,0 +1,35 @@
+package kamon.spray
+
+import akka.actor.ExtendedActorSystem
+
+case class SprayExtensionSettings(
+ includeTraceTokenHeader: Boolean,
+ traceTokenHeaderName: String,
+ nameGenerator: SprayNameGenerator,
+ clientInstrumentationLevel: ClientInstrumentationLevel.Level)
+
+object SprayExtensionSettings {
+ def apply(system: ExtendedActorSystem): SprayExtensionSettings = {
+ val config = system.settings.config.getConfig("kamon.spray")
+
+ val includeTraceTokenHeader: Boolean = config.getBoolean("automatic-trace-token-propagation")
+ val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+
+ val nameGeneratorFQN = config.getString("name-generator")
+ val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems.
+
+ val clientInstrumentationLevel: ClientInstrumentationLevel.Level = config.getString("client.instrumentation-level") match {
+ case "request-level" ⇒ ClientInstrumentationLevel.RequestLevelAPI
+ case "host-level" ⇒ ClientInstrumentationLevel.HostLevelAPI
+ case other ⇒ sys.error(s"Invalid client instrumentation level [$other] found in configuration.")
+ }
+
+ SprayExtensionSettings(includeTraceTokenHeader, traceTokenHeaderName, nameGenerator, clientInstrumentationLevel)
+ }
+}
+
+object ClientInstrumentationLevel {
+ sealed trait Level
+ case object RequestLevelAPI extends Level
+ case object HostLevelAPI extends Level
+}
diff --git a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
new file mode 100644
index 00000000..fa9063ad
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
@@ -0,0 +1,152 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package spray.can.client
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http._
+import spray.http.HttpHeaders.RawHeader
+import kamon.trace._
+import kamon.spray.{ ClientInstrumentationLevel, Spray }
+import akka.actor.ActorRef
+import scala.concurrent.{ Future, ExecutionContext }
+import akka.util.Timeout
+
+@Aspect
+class ClientRequestInstrumentation {
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default
+
+ @DeclareMixin("spray.http.HttpRequest")
+ def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default
+
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
+ def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {}
+
+ @After("requestContextCreation(requestContext, request)")
+ def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {
+ // This read to requestContext.traceContext takes care of initializing the aspect timely.
+ requestContext.traceContext
+
+ TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+
+ if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.HostLevelAPI) {
+ if (requestContext.segment.isEmpty) {
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ requestContext.segment = segment
+ }
+
+ } else {
+
+ // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a
+ // name again here is that when the request was initially sent it might not have the Host information available
+ // and it might be important to decide a proper segment name.
+
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ request.asInstanceOf[SegmentAware].segment.rename(clientRequestName)
+ }
+ }
+ }
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
+ def copyingRequestContext(old: TraceContextAware): Unit = {}
+
+ @Around("copyingRequestContext(old)")
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = {
+ TraceContext.withContext(old.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
+ def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {}
+
+ @Around("dispatchToCommander(requestContext, message)")
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
+ if (requestContext.traceContext.nonEmpty) {
+ TraceContext.withContext(requestContext.traceContext) {
+ if (message.isInstanceOf[HttpMessageEnd])
+ requestContext.asInstanceOf[SegmentAware].segment.finish()
+
+ pjp.proceed()
+ }
+ } else pjp.proceed()
+ }
+
+ @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)")
+ def copyingHttpRequest(old: SegmentAware): Unit = {}
+
+ @Around("copyingHttpRequest(old)")
+ def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware]
+ copiedHttpRequest.segment = old.segment
+ copiedHttpRequest
+ }
+
+ @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
+ def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {}
+
+ @Around("requestLevelApiSendReceive(transport, ec, timeout)")
+ def aroundRequestLevelApiSendReceive(pjp: ProceedingJoinPoint, transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Any = {
+ val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
+
+ (request: HttpRequest) ⇒ {
+ TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+ val segment =
+ if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.RequestLevelAPI)
+ ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ else
+ EmptyTraceContext.EmptySegment
+
+ request.asInstanceOf[SegmentAware].segment = segment
+
+ val responseFuture = originalSendReceive.apply(request)
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
+
+ responseFuture
+
+ } getOrElse (originalSendReceive.apply(request))
+ }
+ }
+
+ @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {}
+
+ @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = {
+
+ val modifiedHeaders = TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+ if (sprayExtension.settings.includeTraceTokenHeader)
+ RawHeader(sprayExtension.settings.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ else
+ defaultHeaders
+
+ } getOrElse (defaultHeaders)
+
+ pjp.proceed(Array[AnyRef](request, modifiedHeaders))
+ }
+} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala
new file mode 100644
index 00000000..73287132
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala
@@ -0,0 +1,136 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package spray.can.server.instrumentation
+
+import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey }
+import org.aspectj.lang.annotation._
+import kamon.trace._
+import akka.actor.ActorSystem
+import spray.can.server.OpenRequest
+import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
+import kamon.Kamon
+import kamon.spray.{ SprayExtension, Spray }
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http.HttpHeaders.RawHeader
+
+@Aspect
+class ServerRequestInstrumentation {
+
+ import ServerRequestInstrumentation._
+
+ @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
+ def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default
+
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
+ def openRequestInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {}
+
+ @After("openRequestInit(openRequest, request)")
+ def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {
+ val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
+ val tracer = Tracer.get(system)
+ val sprayExtension = Kamon(Spray)(system)
+
+ val defaultTraceName = sprayExtension.generateTraceName(request)
+ val token = if (sprayExtension.settings.includeTraceTokenHeader) {
+ request.headers.find(_.name == sprayExtension.settings.traceTokenHeaderName).map(_.value)
+ } else None
+
+ val newContext = token.map(customToken ⇒ tracer.newContext(defaultTraceName, customToken)) getOrElse (tracer.newContext(defaultTraceName))
+ TraceContext.setCurrentContext(newContext)
+
+ // Necessary to force initialization of traceContext when initiating the request.
+ openRequest.traceContext
+ }
+
+ @Pointcut("execution(* spray.can.server.ServerFrontend$$anon$2$$anon$1.spray$can$server$ServerFrontend$$anon$$anon$$openNewRequest(..))")
+ def openNewRequest(): Unit = {}
+
+ @After("openNewRequest()")
+ def afterOpenNewRequest(): Unit = {
+ TraceContext.clearCurrentContext
+ }
+
+ @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)")
+ def openRequestCreation(openRequest: TraceContextAware, response: HttpMessagePartWrapper): Unit = {}
+
+ @Around("openRequestCreation(openRequest, response)")
+ def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = {
+ val incomingContext = TraceContext.currentContext
+ val storedContext = openRequest.traceContext
+
+ // The stored context is always a DefaultTraceContext if the instrumentation is running
+ verifyTraceContextConsistency(incomingContext, storedContext)
+
+ if (incomingContext.isEmpty)
+ pjp.proceed()
+ else {
+ val sprayExtension = incomingContext.lookupExtension(Spray)
+
+ val proceedResult = if (sprayExtension.settings.includeTraceTokenHeader) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.settings.traceTokenHeaderName, incomingContext.token)
+ pjp.proceed(Array(openRequest, responseWithHeader))
+
+ } else pjp.proceed
+
+ TraceContext.currentContext.finish()
+
+ recordHttpServerMetrics(response, incomingContext.name, sprayExtension)
+
+ //store in TraceLocal useful data to diagnose errors
+ storeDiagnosticData(openRequest)
+
+ proceedResult
+ }
+ }
+
+ def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext): Unit = {
+ def publishWarning(text: String): Unit =
+ storedTraceContext.lookupExtension(Spray).log.warning(text)
+
+ if (incomingTraceContext.nonEmpty) {
+ if (incomingTraceContext.token != storedTraceContext.token)
+ publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]")
+ } else
+ publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]")
+ }
+
+ def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit =
+ response match {
+ case httpResponse: HttpResponse ⇒ sprayExtension.httpServerMetrics.recordResponse(traceName, httpResponse.status.intValue.toString)
+ case other ⇒ // Nothing to do then.
+ }
+
+ def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper =
+ response match {
+ case response: HttpResponse ⇒ response.withHeaders(response.headers ::: RawHeader(traceTokenHeaderName, token) :: Nil)
+ case other ⇒ other
+ }
+
+ def storeDiagnosticData(currentContext: TraceContextAware): Unit = {
+ val request = currentContext.asInstanceOf[OpenRequest].request
+ val headers = request.headers.map(header ⇒ header.name -> header.value).toMap
+ val agent = headers.getOrElse(UserAgent, Unknown)
+ val forwarded = headers.getOrElse(XForwardedFor, Unknown)
+
+ TraceLocal.store(HttpContextKey)(HttpContext(agent, request.uri.toRelative.toString(), forwarded))
+ }
+}
+
+object ServerRequestInstrumentation {
+ val UserAgent = "User-Agent"
+ val XForwardedFor = "X-Forwarded-For"
+ val Unknown = "unknown"
+}