aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-02-03 09:22:58 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-02-03 09:22:58 -0300
commit819406d3ba117b886fedb133959bd7c362a8f726 (patch)
tree9b5cd0cd3e89837a578192942eb521faf40dc54f /kamon-spray
parent6f036514edb6dd406803acdffcff437dd92efe9b (diff)
downloadKamon-819406d3ba117b886fedb133959bd7c362a8f726.tar.gz
Kamon-819406d3ba117b886fedb133959bd7c362a8f726.tar.bz2
Kamon-819406d3ba117b886fedb133959bd7c362a8f726.zip
initial tests for spray-client instrumentation
Diffstat (limited to 'kamon-spray')
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala44
-rw-r--r--kamon-spray/src/test/resources/application.conf26
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala67
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala29
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala24
5 files changed, 147 insertions, 43 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index 868cbaca..4e2a352c 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -18,12 +18,15 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpMessageEnd, HttpRequest }
+import spray.http.{ HttpResponse, HttpMessageEnd, HttpRequest }
import spray.http.HttpHeaders.Host
import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware, TraceContextAware }
import kamon.metrics.TraceMetrics.HttpClientRequest
import kamon.Kamon
import kamon.spray.Spray
+import akka.actor.ActorRef
+import scala.concurrent.{ Future, ExecutionContext }
+import akka.util.Timeout
@Aspect
class ClientRequestInstrumentation {
@@ -43,13 +46,10 @@ class ClientRequestInstrumentation {
// The read to ctx.completionHandle should take care of initializing the aspect timely.
if (ctx.segmentCompletionHandle.isEmpty) {
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = Map[String, String](
- "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
- "path" -> request.uri.path.toString(),
- "method" -> request.method.toString())
-
+ val requestAttributes = basicRequestAttributes(request)
val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+
ctx.segmentCompletionHandle = Some(completionHandle)
}
}
@@ -82,6 +82,38 @@ class ClientRequestInstrumentation {
case None ⇒ pjp.proceed()
}
}
+
+ @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
+ def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {}
+
+ @Around("requestLevelApiSendReceive(transport, ec, timeout)")
+ def aroundRequestLevelApiSendReceive(pjp: ProceedingJoinPoint, transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Any = {
+ val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
+
+ (request: HttpRequest) ⇒ {
+ val responseFuture = originalSendReceive.apply(request)
+
+ TraceRecorder.currentContext.map { traceContext ⇒
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+
+ responseFuture.onComplete { result ⇒
+ completionHandle.finish(Map.empty)
+ }(ec)
+ }
+
+ responseFuture
+ }
+
+ }
+
+ def basicRequestAttributes(request: HttpRequest): Map[String, String] = {
+ Map[String, String](
+ "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
+ "path" -> request.uri.path.toString(),
+ "method" -> request.method.toString())
+ }
}
object ClientRequestInstrumentation {
diff --git a/kamon-spray/src/test/resources/application.conf b/kamon-spray/src/test/resources/application.conf
new file mode 100644
index 00000000..4a9b2c67
--- /dev/null
+++ b/kamon-spray/src/test/resources/application.conf
@@ -0,0 +1,26 @@
+kamon {
+ metrics {
+ tick-interval = 1 second
+
+ filters = [
+ {
+ actor {
+ includes = []
+ excludes = [ "system/*", "user/IO-*" ]
+ }
+ },
+ {
+ trace {
+ includes = [ "*" ]
+ excludes = []
+ }
+ },
+ {
+ dispatcher {
+ includes = [ "default-dispatcher" ]
+ excludes = []
+ }
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
new file mode 100644
index 00000000..339628d2
--- /dev/null
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -0,0 +1,67 @@
+package kamon.spray
+
+import akka.testkit.{TestKitBase, TestProbe, TestKit}
+import akka.actor.ActorSystem
+import org.scalatest.WordSpecLike
+import spray.httpx.RequestBuilding
+import kamon.Kamon
+import kamon.metrics.{ TraceMetrics, Metrics }
+import spray.http.{ HttpResponse, HttpRequest }
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.trace.TraceRecorder
+import spray.can.client.ClientRequestInstrumentation
+import com.typesafe.config.ConfigFactory
+
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer {
+ implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString(
+ """
+ |kamon {
+ | metrics {
+ | tick-interval = 1 second
+ |
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = []
+ | }
+ | }
+ | ]
+ | }
+ |}
+ """.stripMargin
+ ))
+
+ implicit def ec = system.dispatcher
+
+ "the client instrumentation" should {
+ "record record the elapsed time for a http request when using the Http manager directly" in {
+
+ val (hostConnector, server) = buildServer(httpHostConnector)
+ val client = TestProbe()
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+
+ val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") {
+ client.send(hostConnector, Get("/direct-to-http-manager-request"))
+ TraceRecorder.currentContext
+ }
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+
+ testContext.map(_.finish(Map.empty))
+
+ metricListener.fishForMessage() {
+ case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
+ metrics.filterKeys(_.name == "direct-to-http-manager-request").exists {
+ case (group, snapshot) ⇒
+ snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.SprayTime).nonEmpty
+ }
+ case other ⇒ false
+ }
+ }
+ }
+
+}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala
deleted file mode 100644
index 1ed3a787..00000000
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package kamon.spray
-
-import akka.testkit.TestKit
-import akka.actor.ActorSystem
-import org.scalatest.WordSpecLike
-import spray.httpx.RequestBuilding
-import spray.client.pipelining._
-import scala.concurrent.Await
-
-class ClientRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding with TestServer {
- implicit val ec = system.dispatcher
-
- "the client instrumentation" should {
- "record segments for a client http request" in {
-
- /* Trace.start("record-segments", None)(system)
-
- send {
- Get(s"http://127.0.0.1:$port/ok")
-
- // We don't care about the response, just make sure we finish the Trace after the response has been received.
- } map (rsp ⇒ Trace.finish())
-
- val trace = expectMsgType[UowTrace]
- println(trace.segments)*/
- }
- }
-
-}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index 0edf75e0..08b1a917 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -17,7 +17,7 @@ package kamon.spray
import _root_.spray.httpx.RequestBuilding
import _root_.spray.routing.SimpleRoutingApp
-import akka.testkit.{ TestProbe, TestKit }
+import akka.testkit.{TestKitBase, TestProbe, TestKit}
import akka.actor.{ ActorRef, ActorSystem }
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.Await
@@ -41,7 +41,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
"the spray server request tracing instrumentation" should {
"reply back with the same trace token header provided in the request" in {
- val (connection, server) = buildServer()
+ val (connection, server) = buildServer(clientConnection)
val client = TestProbe()
client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token")))
@@ -54,7 +54,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
}
"reply back with a automatically assigned trace token if none was provided with the request" in {
- val (connection, server) = buildServer()
+ val (connection, server) = buildServer(clientConnection)
val client = TestProbe()
client.send(connection, Get("/"))
@@ -67,7 +67,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
}
"open and finish a trace during the lifetime of a request" in {
- val (connection, server) = buildServer()
+ val (connection, server) = buildServer(clientConnection)
val client = TestProbe()
val metricListener = TestProbe()
@@ -89,25 +89,33 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
}
trait TestServer {
- self: TestKit ⇒
+ self: TestKitBase ⇒
- def buildServer(): (ActorRef, TestProbe) = {
+ def buildServer(clientBuilder: Http.Bound => ActorRef): (ActorRef, TestProbe) = {
val serverHandler = TestProbe()
IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
val bound = serverHandler.expectMsgType[Bound]
- val client = buildClient(bound)
+ val client = clientBuilder(bound)
serverHandler.expectMsgType[Http.Connected]
serverHandler.reply(Http.Register(serverHandler.ref))
(client, serverHandler)
}
- def buildClient(connectionInfo: Http.Bound): ActorRef = {
+ def clientConnection(connectionInfo: Http.Bound): ActorRef = {
val probe = TestProbe()
probe.send(IO(Http), Http.Connect(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort))
probe.expectMsgType[Http.Connected]
probe.sender
}
+ def httpHostConnector(connectionInfo: Http.Bound): ActorRef = {
+ val probe = TestProbe()
+ probe.send(IO(Http), Http.HostConnectorSetup(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort))
+ probe.expectMsgType[Http.HostConnectorInfo].hostConnector
+ }
+
+
+
}