aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
blob: 0b1db1b757b91a233f0d8cc0abda2dba943d615c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package kamon.spray

import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
import org.scalatest.WordSpecLike
import spray.httpx.RequestBuilding
import kamon.Kamon
import kamon.metrics.{ TraceMetrics, Metrics }
import spray.http.{ HttpResponse, HttpRequest }
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.trace.TraceRecorder
import spray.can.client.ClientRequestInstrumentation
import com.typesafe.config.ConfigFactory
import spray.can.Http
import akka.pattern.pipe
import spray.client.pipelining
import scala.concurrent.duration._

class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer {
  implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString(
    """
      |kamon {
      |  metrics {
      |    tick-interval = 1 second
      |
      |    filters = [
      |      {
      |        trace {
      |          includes = [ "*" ]
      |          excludes = []
      |        }
      |      }
      |    ]
      |  }
      |}
    """.stripMargin))

  implicit def ec = system.dispatcher

  "the client instrumentation" should {
    "record the elapsed time for a http request when using the Http manager directly and tag it as SprayTime" in {

      val metricListener = TestProbe()
      Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)

      val (hostConnector, server) = buildSHostConnectorAndServer
      val client = TestProbe()

      // Initiate a request within the context of a trace
      val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") {
        client.send(hostConnector, Get("/direct-to-http-manager-request"))
        TraceRecorder.currentContext
      }

      // Accept the connection at the server side
      server.expectMsgType[Http.Connected]
      server.reply(Http.Register(server.ref))

      // Receive the request and reply back
      server.expectMsgType[HttpRequest]
      server.reply(HttpResponse(entity = "ok"))
      client.expectMsgType[HttpResponse]

      // Finish the trace
      testContext.map(_.finish(Map.empty))

      metricListener.fishForMessage() {
        case snapshot @ TickMetricSnapshot(_, _, metrics) 
          metrics.filterKeys(_.name == "direct-to-http-manager-request").exists {
            case (group, snapshot) 
              snapshot.metrics.filterKeys(id  id.name == "" && id.tag == ClientRequestInstrumentation.SprayTime).nonEmpty
          }
        case other  false
      }
    }

    "record the elapsed time for a http request when using the pipelining sendReceive and tag it as UserTime" in {

      val metricListener = TestProbe()
      Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)

      val (hostConnector, server) = buildSHostConnectorAndServer
      val client = TestProbe()
      val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 10 seconds)

      // Initiate a request within the context of a trace
      val testContext = TraceRecorder.withNewTraceContext("pipelining-helper-request") {
        pipeline(Get("/pipelining-helper-request")) to client.ref
        TraceRecorder.currentContext
      }

      // Accept the connection at the server side
      server.expectMsgType[Http.Connected]
      server.reply(Http.Register(server.ref))

      // Receive the request and reply back
      server.expectMsgType[HttpRequest]
      server.reply(HttpResponse(entity = "ok"))
      client.expectMsgType[HttpResponse]

      // Finish the trace
      testContext.map(_.finish(Map.empty))

      metricListener.fishForMessage() {
        case snapshot @ TickMetricSnapshot(_, _, metrics) 
          metrics.filterKeys(_.name == "pipelining-helper-request").exists {
            case (group, snapshot) 
              snapshot.metrics.filterKeys(id  id.name == "" && id.tag == ClientRequestInstrumentation.UserTime).nonEmpty
          }
        case other  false
      }
    }
  }

}