aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-02-03 14:05:07 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-02-03 14:10:38 -0300
commit481a50b253fee53ec92901731f152ce9ba63e6a4 (patch)
tree8a04bd57c7c07aff502f3e40c03a204177d240f5
parent819406d3ba117b886fedb133959bd7c362a8f726 (diff)
downloadKamon-481a50b253fee53ec92901731f152ce9ba63e6a4.tar.gz
Kamon-481a50b253fee53ec92901731f152ce9ba63e6a4.tar.bz2
Kamon-481a50b253fee53ec92901731f152ce9ba63e6a4.zip
fix spray client instrumentation tests
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala64
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala40
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/TestServer.scala63
3 files changed, 124 insertions, 43 deletions
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 339628d2..8163e25c 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -1,6 +1,6 @@
package kamon.spray
-import akka.testkit.{TestKitBase, TestProbe, TestKit}
+import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
import org.scalatest.WordSpecLike
import spray.httpx.RequestBuilding
@@ -11,6 +11,10 @@ import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.trace.TraceRecorder
import spray.can.client.ClientRequestInstrumentation
import com.typesafe.config.ConfigFactory
+import spray.can.Http
+import akka.pattern.pipe
+import spray.client.pipelining
+import scala.concurrent.duration._
class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString(
@@ -29,28 +33,35 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
| ]
| }
|}
- """.stripMargin
- ))
+ """.stripMargin))
implicit def ec = system.dispatcher
"the client instrumentation" should {
- "record record the elapsed time for a http request when using the Http manager directly" in {
-
- val (hostConnector, server) = buildServer(httpHostConnector)
- val client = TestProbe()
+ "record the elapsed time for a http request when using the Http manager directly and tag it as SprayTime" in {
val metricListener = TestProbe()
Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+
+ // Initiate a request within the context of a trace
val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") {
client.send(hostConnector, Get("/direct-to-http-manager-request"))
TraceRecorder.currentContext
}
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
+ // Finish the trace
testContext.map(_.finish(Map.empty))
metricListener.fishForMessage() {
@@ -62,6 +73,45 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
case other ⇒ false
}
}
+
+
+ "record the elapsed time for a http request when using the pipelining sendReceive and tag it as UserTime" in {
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+
+
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 10 seconds)
+
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("pipelining-helper-request") {
+ pipeline(Get("/pipelining-helper-request")) to client.ref
+ TraceRecorder.currentContext
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+
+ metricListener.fishForMessage() {
+ case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
+ metrics.filterKeys(_.name == "pipelining-helper-request").exists {
+ case (group, snapshot) ⇒
+ snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.UserTime).nonEmpty
+ }
+ case other ⇒ false
+ }
+ }
}
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index 08b1a917..8fd84bfb 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -17,7 +17,7 @@ package kamon.spray
import _root_.spray.httpx.RequestBuilding
import _root_.spray.routing.SimpleRoutingApp
-import akka.testkit.{TestKitBase, TestProbe, TestKit}
+import akka.testkit.{ TestKitBase, TestProbe, TestKit }
import akka.actor.{ ActorRef, ActorSystem }
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.Await
@@ -41,7 +41,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
"the spray server request tracing instrumentation" should {
"reply back with the same trace token header provided in the request" in {
- val (connection, server) = buildServer(clientConnection)
+ val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token")))
@@ -54,7 +54,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
}
"reply back with a automatically assigned trace token if none was provided with the request" in {
- val (connection, server) = buildServer(clientConnection)
+ val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
client.send(connection, Get("/"))
@@ -67,7 +67,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
}
"open and finish a trace during the lifetime of a request" in {
- val (connection, server) = buildServer(clientConnection)
+ val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
val metricListener = TestProbe()
@@ -87,35 +87,3 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
}
}
-
-trait TestServer {
- self: TestKitBase ⇒
-
- def buildServer(clientBuilder: Http.Bound => ActorRef): (ActorRef, TestProbe) = {
- val serverHandler = TestProbe()
- IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
- val bound = serverHandler.expectMsgType[Bound]
-
- val client = clientBuilder(bound)
- serverHandler.expectMsgType[Http.Connected]
- serverHandler.reply(Http.Register(serverHandler.ref))
-
- (client, serverHandler)
- }
-
- def clientConnection(connectionInfo: Http.Bound): ActorRef = {
- val probe = TestProbe()
- probe.send(IO(Http), Http.Connect(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort))
- probe.expectMsgType[Http.Connected]
- probe.sender
- }
-
- def httpHostConnector(connectionInfo: Http.Bound): ActorRef = {
- val probe = TestProbe()
- probe.send(IO(Http), Http.HostConnectorSetup(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort))
- probe.expectMsgType[Http.HostConnectorInfo].hostConnector
- }
-
-
-
-}
diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
new file mode 100644
index 00000000..e2d012ab
--- /dev/null
+++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
@@ -0,0 +1,63 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.spray
+
+import akka.testkit.{ TestProbe, TestKitBase }
+import spray.can.Http
+import akka.actor.ActorRef
+import akka.io.IO
+import akka.io.Tcp.Bound
+
+trait TestServer {
+ self: TestKitBase ⇒
+
+ def buildClientConnectionAndServer: (ActorRef, TestProbe) = {
+ val serverHandler = TestProbe()
+ IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
+ val bound = serverHandler.expectMsgType[Bound]
+
+ val client = clientConnection(bound)
+ serverHandler.expectMsgType[Http.Connected]
+ serverHandler.reply(Http.Register(serverHandler.ref))
+
+ (client, serverHandler)
+ }
+
+ private def clientConnection(connectionInfo: Http.Bound): ActorRef = {
+ val probe = TestProbe()
+ probe.send(IO(Http), Http.Connect(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort))
+ probe.expectMsgType[Http.Connected]
+ probe.sender
+ }
+
+ def buildSHostConnectorAndServer: (ActorRef, TestProbe) = {
+ val serverHandler = TestProbe()
+ IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
+ val bound = serverHandler.expectMsgType[Bound]
+
+ val client = httpHostConnector(bound)
+
+ (client, serverHandler)
+ }
+
+ private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = {
+ val probe = TestProbe()
+ probe.send(IO(Http), Http.HostConnectorSetup(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort))
+ probe.expectMsgType[Http.HostConnectorInfo].hostConnector
+ }
+
+}