From 481a50b253fee53ec92901731f152ce9ba63e6a4 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 3 Feb 2014 14:05:07 -0300 Subject: fix spray client instrumentation tests --- .../spray/ClientRequestInstrumentationSpec.scala | 64 +++++++++++++++++++--- .../spray/ServerRequestInstrumentationSpec.scala | 40 ++------------ .../src/test/scala/kamon/spray/TestServer.scala | 63 +++++++++++++++++++++ 3 files changed, 124 insertions(+), 43 deletions(-) create mode 100644 kamon-spray/src/test/scala/kamon/spray/TestServer.scala (limited to 'kamon-spray/src/test/scala/kamon') diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 339628d2..8163e25c 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -1,6 +1,6 @@ package kamon.spray -import akka.testkit.{TestKitBase, TestProbe, TestKit} +import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.ActorSystem import org.scalatest.WordSpecLike import spray.httpx.RequestBuilding @@ -11,6 +11,10 @@ import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.trace.TraceRecorder import spray.can.client.ClientRequestInstrumentation import com.typesafe.config.ConfigFactory +import spray.can.Http +import akka.pattern.pipe +import spray.client.pipelining +import scala.concurrent.duration._ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString( @@ -29,28 +33,35 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit | ] | } |} - """.stripMargin - )) + """.stripMargin)) implicit def ec = system.dispatcher "the client instrumentation" should { - "record record the elapsed time for a http request when using the Http manager directly" in { - - val (hostConnector, server) = buildServer(httpHostConnector) - val client = TestProbe() + "record the elapsed time for a http request when using the Http manager directly and tag it as SprayTime" in { val metricListener = TestProbe() Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) + val (hostConnector, server) = buildSHostConnectorAndServer + val client = TestProbe() + + // Initiate a request within the context of a trace val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") { client.send(hostConnector, Get("/direct-to-http-manager-request")) TraceRecorder.currentContext } + + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back server.expectMsgType[HttpRequest] server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] + // Finish the trace testContext.map(_.finish(Map.empty)) metricListener.fishForMessage() { @@ -62,6 +73,45 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit case other ⇒ false } } + + + "record the elapsed time for a http request when using the pipelining sendReceive and tag it as UserTime" in { + + val metricListener = TestProbe() + Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) + + + val (hostConnector, server) = buildSHostConnectorAndServer + val client = TestProbe() + val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 10 seconds) + + // Initiate a request within the context of a trace + val testContext = TraceRecorder.withNewTraceContext("pipelining-helper-request") { + pipeline(Get("/pipelining-helper-request")) to client.ref + TraceRecorder.currentContext + } + + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + client.expectMsgType[HttpResponse] + + // Finish the trace + testContext.map(_.finish(Map.empty)) + + metricListener.fishForMessage() { + case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒ + metrics.filterKeys(_.name == "pipelining-helper-request").exists { + case (group, snapshot) ⇒ + snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.UserTime).nonEmpty + } + case other ⇒ false + } + } } } diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala index 08b1a917..8fd84bfb 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala @@ -17,7 +17,7 @@ package kamon.spray import _root_.spray.httpx.RequestBuilding import _root_.spray.routing.SimpleRoutingApp -import akka.testkit.{TestKitBase, TestProbe, TestKit} +import akka.testkit.{ TestKitBase, TestProbe, TestKit } import akka.actor.{ ActorRef, ActorSystem } import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.Await @@ -41,7 +41,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with "the spray server request tracing instrumentation" should { "reply back with the same trace token header provided in the request" in { - val (connection, server) = buildServer(clientConnection) + val (connection, server) = buildClientConnectionAndServer val client = TestProbe() client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token"))) @@ -54,7 +54,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with } "reply back with a automatically assigned trace token if none was provided with the request" in { - val (connection, server) = buildServer(clientConnection) + val (connection, server) = buildClientConnectionAndServer val client = TestProbe() client.send(connection, Get("/")) @@ -67,7 +67,7 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with } "open and finish a trace during the lifetime of a request" in { - val (connection, server) = buildServer(clientConnection) + val (connection, server) = buildClientConnectionAndServer val client = TestProbe() val metricListener = TestProbe() @@ -87,35 +87,3 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with } } - -trait TestServer { - self: TestKitBase ⇒ - - def buildServer(clientBuilder: Http.Bound => ActorRef): (ActorRef, TestProbe) = { - val serverHandler = TestProbe() - IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) - val bound = serverHandler.expectMsgType[Bound] - - val client = clientBuilder(bound) - serverHandler.expectMsgType[Http.Connected] - serverHandler.reply(Http.Register(serverHandler.ref)) - - (client, serverHandler) - } - - def clientConnection(connectionInfo: Http.Bound): ActorRef = { - val probe = TestProbe() - probe.send(IO(Http), Http.Connect(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort)) - probe.expectMsgType[Http.Connected] - probe.sender - } - - def httpHostConnector(connectionInfo: Http.Bound): ActorRef = { - val probe = TestProbe() - probe.send(IO(Http), Http.HostConnectorSetup(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort)) - probe.expectMsgType[Http.HostConnectorInfo].hostConnector - } - - - -} diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala new file mode 100644 index 00000000..e2d012ab --- /dev/null +++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala @@ -0,0 +1,63 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spray + +import akka.testkit.{ TestProbe, TestKitBase } +import spray.can.Http +import akka.actor.ActorRef +import akka.io.IO +import akka.io.Tcp.Bound + +trait TestServer { + self: TestKitBase ⇒ + + def buildClientConnectionAndServer: (ActorRef, TestProbe) = { + val serverHandler = TestProbe() + IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) + val bound = serverHandler.expectMsgType[Bound] + + val client = clientConnection(bound) + serverHandler.expectMsgType[Http.Connected] + serverHandler.reply(Http.Register(serverHandler.ref)) + + (client, serverHandler) + } + + private def clientConnection(connectionInfo: Http.Bound): ActorRef = { + val probe = TestProbe() + probe.send(IO(Http), Http.Connect(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort)) + probe.expectMsgType[Http.Connected] + probe.sender + } + + def buildSHostConnectorAndServer: (ActorRef, TestProbe) = { + val serverHandler = TestProbe() + IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) + val bound = serverHandler.expectMsgType[Bound] + + val client = httpHostConnector(bound) + + (client, serverHandler) + } + + private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = { + val probe = TestProbe() + probe.send(IO(Http), Http.HostConnectorSetup(connectionInfo.localAddress.getHostName, connectionInfo.localAddress.getPort)) + probe.expectMsgType[Http.HostConnectorInfo].hostConnector + } + +} -- cgit v1.2.3