aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala')
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala110
1 files changed, 110 insertions, 0 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
new file mode 100644
index 00000000..244d66ed
--- /dev/null
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -0,0 +1,110 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package spray.can.server
+
+import org.aspectj.lang.annotation._
+import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware }
+import akka.actor.ActorSystem
+import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
+import akka.event.Logging.Warning
+import scala.Some
+import kamon.Kamon
+import kamon.spray.Spray
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http.HttpHeaders.RawHeader
+
+@Aspect
+class ServerRequestInstrumentation {
+
+ @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
+ def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default
+
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
+ def openRequestInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {}
+
+ @After("openRequestInit(openRequest, request)")
+ def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {
+ val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
+ val sprayExtension = Kamon(Spray)(system)
+
+ val defaultTraceName: String = request.method.value + ": " + request.uri.path
+ val token = if (sprayExtension.includeTraceToken) {
+ request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value)
+ } else None
+
+ TraceRecorder.start(defaultTraceName, token)(system)
+
+ // Necessary to force initialization of traceContext when initiating the request.
+ openRequest.traceContext
+ }
+
+ @Pointcut("execution(* spray.can.server.ServerFrontend$$anon$2$$anon$1.spray$can$server$ServerFrontend$$anon$$anon$$openNewRequest(..))")
+ def openNewRequest(): Unit = {}
+
+ @After("openNewRequest()")
+ def afterOpenNewRequest(): Unit = {
+ TraceRecorder.clearContext
+ }
+
+ @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)")
+ def openRequestCreation(openRequest: TraceContextAware, response: HttpMessagePartWrapper): Unit = {}
+
+ @Around("openRequestCreation(openRequest, response)")
+ def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = {
+ val incomingContext = TraceRecorder.currentContext
+ val storedContext = openRequest.traceContext
+
+ verifyTraceContextConsistency(incomingContext, storedContext)
+ val proceedResult = incomingContext match {
+ case None ⇒ pjp.proceed()
+ case Some(traceContext) ⇒
+ val sprayExtension = Kamon(Spray)(traceContext.system)
+
+ if (sprayExtension.includeTraceToken) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token)
+ pjp.proceed(Array(openRequest, responseWithHeader))
+
+ } else pjp.proceed
+ }
+
+ TraceRecorder.finish()
+ proceedResult
+ }
+
+ def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = {
+ for (original ← storedTraceContext) {
+ incomingTraceContext match {
+ case Some(incoming) if original.token != incoming.token ⇒
+ publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system)
+
+ case Some(_) ⇒ // nothing to do here.
+
+ case None ⇒
+ publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system)
+ }
+ }
+
+ def publishWarning(text: String, system: ActorSystem): Unit =
+ system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text))
+
+ }
+
+ def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper =
+ response match {
+ case response: HttpResponse ⇒ response.withHeaders(RawHeader(traceTokenHeaderName, token))
+ case other ⇒ other
+ }
+}