aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-01-12 01:45:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-01-24 23:19:01 +0100
commit01a34f67ff75419c440f2e69c0a0db888a670a34 (patch)
tree9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-spray
parent4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff)
downloadKamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-spray')
-rw-r--r--kamon-spray/src/main/resources/META-INF/aop.xml6
-rw-r--r--kamon-spray/src/main/resources/reference.conf7
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala4
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala (renamed from kamon-spray/src/main/scala/kamon/spray/Spray.scala)61
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala35
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala (renamed from kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala)29
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala (renamed from kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala)38
-rw-r--r--kamon-spray/src/test/resources/application.conf25
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala147
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala67
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala51
11 files changed, 212 insertions, 258 deletions
diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml
index 0e5726c6..00e8763a 100644
--- a/kamon-spray/src/main/resources/META-INF/aop.xml
+++ b/kamon-spray/src/main/resources/META-INF/aop.xml
@@ -2,14 +2,16 @@
<aspectj>
<aspects>
+
<!-- Spray Server -->
- <aspect name="spray.can.server.ServerRequestInstrumentation"/>
+ <aspect name="spray.can.server.instrumentation.ServerRequestInstrumentation"/>
<!-- Spray Client -->
<aspect name="spray.can.client.ClientRequestInstrumentation"/>
+
</aspects>
<weaver>
- <include within="spray.can..*"/>
+ <include within="spray..*"/>
</weaver>
</aspectj>
diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf
index 5c5e9317..bdba21cb 100644
--- a/kamon-spray/src/main/resources/reference.conf
+++ b/kamon-spray/src/main/resources/reference.conf
@@ -4,6 +4,7 @@
kamon {
spray {
+
# Header name used when propagating the `TraceContext.token` value across applications.
trace-token-header-name = "X-Trace-Token"
@@ -23,16 +24,16 @@ kamon {
client {
# Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values
# are:
- # - pipelining: measures the time during which the user application code is waiting for a spray-client request to
+ # - request-level: measures the time during which the user application code is waiting for a spray-client request to
# complete, by attaching a callback to the Future[HttpResponse] returned by `spray.client.pipelining.sendReceive`.
# If `spray.client.pipelining.sendReceive` is not used, the segment measurement wont be performed.
- # - internal: measures the internal time taken by spray-client to finish a request. Sometimes the user application
+ # - host-level: measures the internal time taken by spray-client to finish a request. Sometimes the user application
# code has a finite future timeout (like when using `spray.client.pipelining.sendReceive`) that doesn't match
# the actual amount of time spray might take internally to resolve a request, counting retries, redirects,
# connection timeouts and so on. If using the internal strategy, the measured time will include the entire time
# since the request has been received by the corresponding `HttpHostConnector` until a response is sent back
# to the requester.
- segment-collection-strategy = pipelining
+ instrumentation-level = request-level
}
}
} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala
index e98b63d9..4eefee95 100644
--- a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala
@@ -17,11 +17,11 @@ package kamon.spray
import spray.routing.directives.BasicDirectives
import spray.routing._
-import kamon.trace.TraceRecorder
+import kamon.trace.TraceContext
trait KamonTraceDirectives extends BasicDirectives {
def traceName(name: String): Directive0 = mapRequest { req ⇒
- TraceRecorder.rename(name)
+ TraceContext.currentContext.rename(name)
req
}
}
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala
index ab8d6a7d..3df8d972 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala
@@ -18,47 +18,49 @@ package kamon.spray
import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
import akka.actor
+import akka.event.{ Logging, LoggingAdapter }
import kamon.Kamon
import kamon.http.HttpServerMetrics
-import kamon.metric.Metrics
+import kamon.metric.{ Entity, Metrics }
import spray.http.HttpHeaders.Host
import spray.http.HttpRequest
object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = Spray
- def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system)
+ def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtensionImpl(system)
val SegmentLibraryName = "spray-client"
}
-object ClientSegmentCollectionStrategy {
- sealed trait Strategy
- case object Pipelining extends Strategy
- case object Internal extends Strategy
+trait SprayExtension extends Kamon.Extension {
+ def settings: SprayExtensionSettings
+ def log: LoggingAdapter
+ def httpServerMetrics: HttpServerMetrics
+ def generateTraceName(request: HttpRequest): String
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String
+ def generateHostLevelApiSegmentName(request: HttpRequest): String
}
-class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
- private val config = system.settings.config.getConfig("kamon.spray")
+class SprayExtensionImpl(system: ExtendedActorSystem) extends SprayExtension {
+ val settings = SprayExtensionSettings(system)
+ val log = Logging(system, "SprayExtension")
- val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation")
- val traceTokenHeaderName: String = config.getString("trace-token-header-name")
- val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get
- // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it.
+ val httpServerMetrics = {
+ val metricsExtension = Metrics.get(system)
+ val factory = metricsExtension.instrumentFactory(HttpServerMetrics.category)
+ val entity = Entity("spray-server", HttpServerMetrics.category)
- private val nameGeneratorFQN = config.getString("name-generator")
- private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems.
+ Metrics.get(system).register(entity, new HttpServerMetrics(factory)).recorder
+ }
- val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
- config.getString("client.segment-collection-strategy") match {
- case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining
- case "internal" ⇒ ClientSegmentCollectionStrategy.Internal
- case other ⇒ throw new IllegalArgumentException(s"Configured segment-collection-strategy [$other] is invalid, " +
- s"only pipelining and internal are valid options.")
- }
+ def generateTraceName(request: HttpRequest): String =
+ settings.nameGenerator.generateTraceName(request)
- def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request)
- def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request)
- def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request)
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String =
+ settings.nameGenerator.generateRequestLevelApiSegmentName(request)
+
+ def generateHostLevelApiSegmentName(request: HttpRequest): String =
+ settings.nameGenerator.generateHostLevelApiSegmentName(request)
}
trait SprayNameGenerator {
@@ -68,14 +70,19 @@ trait SprayNameGenerator {
}
class DefaultSprayNameGenerator extends SprayNameGenerator {
- def hostFromHeaders(request: HttpRequest): Option[String] = request.header[Host].map(_.host)
def generateRequestLevelApiSegmentName(request: HttpRequest): String = {
val uriAddress = request.uri.authority.host.address
if (uriAddress.equals("")) hostFromHeaders(request).getOrElse("unknown-host") else uriAddress
}
- def generateHostLevelApiSegmentName(request: HttpRequest): String = hostFromHeaders(request).getOrElse("unknown-host")
+ def generateHostLevelApiSegmentName(request: HttpRequest): String =
+ hostFromHeaders(request).getOrElse("unknown-host")
+
+ def generateTraceName(request: HttpRequest): String =
+ request.method.value + ": " + request.uri.path
+
+ private def hostFromHeaders(request: HttpRequest): Option[String] =
+ request.header[Host].map(_.host)
- def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path
}
diff --git a/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala
new file mode 100644
index 00000000..44c71eaf
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala
@@ -0,0 +1,35 @@
+package kamon.spray
+
+import akka.actor.ExtendedActorSystem
+
+case class SprayExtensionSettings(
+ includeTraceTokenHeader: Boolean,
+ traceTokenHeaderName: String,
+ nameGenerator: SprayNameGenerator,
+ clientInstrumentationLevel: ClientInstrumentationLevel.Level)
+
+object SprayExtensionSettings {
+ def apply(system: ExtendedActorSystem): SprayExtensionSettings = {
+ val config = system.settings.config.getConfig("kamon.spray")
+
+ val includeTraceTokenHeader: Boolean = config.getBoolean("automatic-trace-token-propagation")
+ val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+
+ val nameGeneratorFQN = config.getString("name-generator")
+ val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems.
+
+ val clientInstrumentationLevel: ClientInstrumentationLevel.Level = config.getString("client.instrumentation-level") match {
+ case "request-level" ⇒ ClientInstrumentationLevel.RequestLevelAPI
+ case "host-level" ⇒ ClientInstrumentationLevel.HostLevelAPI
+ case other ⇒ sys.error(s"Invalid client instrumentation level [$other] found in configuration.")
+ }
+
+ SprayExtensionSettings(includeTraceTokenHeader, traceTokenHeaderName, nameGenerator, clientInstrumentationLevel)
+ }
+}
+
+object ClientInstrumentationLevel {
+ sealed trait Level
+ case object RequestLevelAPI extends Level
+ case object HostLevelAPI extends Level
+}
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
index 813915c4..fa9063ad 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
@@ -21,8 +21,7 @@ import org.aspectj.lang.ProceedingJoinPoint
import spray.http._
import spray.http.HttpHeaders.RawHeader
import kamon.trace._
-import kamon.Kamon
-import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
+import kamon.spray.{ ClientInstrumentationLevel, Spray }
import akka.actor.ActorRef
import scala.concurrent.{ Future, ExecutionContext }
import akka.util.Timeout
@@ -47,10 +46,10 @@ class ClientRequestInstrumentation {
// This read to requestContext.traceContext takes care of initializing the aspect timely.
requestContext.traceContext
- TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
- val sprayExtension = Kamon(Spray)(system)
+ TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.HostLevelAPI) {
if (requestContext.segment.isEmpty) {
val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName)
@@ -74,7 +73,7 @@ class ClientRequestInstrumentation {
@Around("copyingRequestContext(old)")
def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = {
- TraceRecorder.withInlineTraceContextReplacement(old.traceContext) {
+ TraceContext.withContext(old.traceContext) {
pjp.proceed()
}
}
@@ -85,7 +84,7 @@ class ClientRequestInstrumentation {
@Around("dispatchToCommander(requestContext, message)")
def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
if (requestContext.traceContext.nonEmpty) {
- TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) {
+ TraceContext.withContext(requestContext.traceContext) {
if (message.isInstanceOf[HttpMessageEnd])
requestContext.asInstanceOf[SegmentAware].segment.finish()
@@ -112,10 +111,10 @@ class ClientRequestInstrumentation {
val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
(request: HttpRequest) ⇒ {
- TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
- val sprayExtension = Kamon(Spray)(system)
+ TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
val segment =
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining)
+ if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.RequestLevelAPI)
ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName)
else
EmptyTraceContext.EmptySegment
@@ -139,10 +138,10 @@ class ClientRequestInstrumentation {
@Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)")
def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = {
- val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
- val sprayExtension = Kamon(Spray)(system)
- if (sprayExtension.includeTraceToken)
- RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ val modifiedHeaders = TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+ if (sprayExtension.settings.includeTraceTokenHeader)
+ RawHeader(sprayExtension.settings.traceTokenHeaderName, ctx.token) :: defaultHeaders
else
defaultHeaders
@@ -150,4 +149,4 @@ class ClientRequestInstrumentation {
pjp.proceed(Array[AnyRef](request, modifiedHeaders))
}
-}
+} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala
index 1ae4ad80..73287132 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala
@@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package spray.can.server
+package spray.can.server.instrumentation
import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey }
import org.aspectj.lang.annotation._
import kamon.trace._
import akka.actor.ActorSystem
+import spray.can.server.OpenRequest
import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
-import akka.event.Logging.Warning
import kamon.Kamon
import kamon.spray.{ SprayExtension, Spray }
import org.aspectj.lang.ProceedingJoinPoint
@@ -40,14 +40,16 @@ class ServerRequestInstrumentation {
@After("openRequestInit(openRequest, request)")
def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {
val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
+ val tracer = Tracer.get(system)
val sprayExtension = Kamon(Spray)(system)
val defaultTraceName = sprayExtension.generateTraceName(request)
- val token = if (sprayExtension.includeTraceToken) {
- request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value)
+ val token = if (sprayExtension.settings.includeTraceTokenHeader) {
+ request.headers.find(_.name == sprayExtension.settings.traceTokenHeaderName).map(_.value)
} else None
- TraceRecorder.start(defaultTraceName, token)(system)
+ val newContext = token.map(customToken ⇒ tracer.newContext(defaultTraceName, customToken)) getOrElse (tracer.newContext(defaultTraceName))
+ TraceContext.setCurrentContext(newContext)
// Necessary to force initialization of traceContext when initiating the request.
openRequest.traceContext
@@ -58,7 +60,7 @@ class ServerRequestInstrumentation {
@After("openNewRequest()")
def afterOpenNewRequest(): Unit = {
- TraceRecorder.clearContext
+ TraceContext.clearCurrentContext
}
@Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)")
@@ -66,26 +68,24 @@ class ServerRequestInstrumentation {
@Around("openRequestCreation(openRequest, response)")
def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = {
- val incomingContext = TraceRecorder.currentContext
+ val incomingContext = TraceContext.currentContext
val storedContext = openRequest.traceContext
// The stored context is always a DefaultTraceContext if the instrumentation is running
- val system = storedContext.system
-
- verifyTraceContextConsistency(incomingContext, storedContext, system)
+ verifyTraceContextConsistency(incomingContext, storedContext)
if (incomingContext.isEmpty)
pjp.proceed()
else {
- val sprayExtension = Kamon(Spray)(system)
+ val sprayExtension = incomingContext.lookupExtension(Spray)
- val proceedResult = if (sprayExtension.includeTraceToken) {
- val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token)
+ val proceedResult = if (sprayExtension.settings.includeTraceTokenHeader) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.settings.traceTokenHeaderName, incomingContext.token)
pjp.proceed(Array(openRequest, responseWithHeader))
} else pjp.proceed
- TraceRecorder.finish()
+ TraceContext.currentContext.finish()
recordHttpServerMetrics(response, incomingContext.name, sprayExtension)
@@ -96,15 +96,15 @@ class ServerRequestInstrumentation {
}
}
- def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = {
- def publishWarning(text: String, system: ActorSystem): Unit =
- system.eventStream.publish(Warning("ServerRequestInstrumentation", classOf[ServerRequestInstrumentation], text))
+ def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext): Unit = {
+ def publishWarning(text: String): Unit =
+ storedTraceContext.lookupExtension(Spray).log.warning(text)
if (incomingTraceContext.nonEmpty) {
if (incomingTraceContext.token != storedTraceContext.token)
- publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system)
+ publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]")
} else
- publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system)
+ publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]")
}
def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit =
diff --git a/kamon-spray/src/test/resources/application.conf b/kamon-spray/src/test/resources/application.conf
index 4a9b2c67..8b137891 100644
--- a/kamon-spray/src/test/resources/application.conf
+++ b/kamon-spray/src/test/resources/application.conf
@@ -1,26 +1 @@
-kamon {
- metrics {
- tick-interval = 1 second
- filters = [
- {
- actor {
- includes = []
- excludes = [ "system/*", "user/IO-*" ]
- }
- },
- {
- trace {
- includes = [ "*" ]
- excludes = []
- }
- },
- {
- dispatcher {
- includes = [ "default-dispatcher" ]
- excludes = []
- }
- }
- ]
- }
-} \ No newline at end of file
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index b90b0f3b..c5d7d992 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -16,50 +16,36 @@
package kamon.spray
-import akka.testkit.{ TestKitBase, TestProbe }
-import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import kamon.testkit.BaseKamonSpec
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }
-import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder }
+import kamon.trace.{ TraceContext, SegmentCategory }
import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
import kamon.Kamon
-import kamon.metric.{ TraceMetrics, Metrics }
+import kamon.metric.TraceMetricsSpec
import spray.client.pipelining.sendReceive
-import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
-import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer {
- implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
- """
- |akka {
- | loglevel = ERROR
- |}
- |
- |kamon {
- | spray {
- | name-generator = kamon.spray.TestSprayNameGenerator
- | }
- |
- | metrics {
- | tick-interval = 1 hour
- |
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = []
- | }
- | }
- | ]
- | }
- |}
- """.stripMargin))
+
+class ClientRequestInstrumentationSpec extends BaseKamonSpec("client-request-instrumentation-spec") with ScalaFutures
+ with RequestBuilding with TestServer {
+
+ import TraceMetricsSpec.SegmentSyntax
+
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | metric.tick-interval = 1 hour
+ | spray.name-generator = kamon.spray.TestSprayNameGenerator
+ |}
+ |
+ |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
+ """.stripMargin)
implicit def ec = system.dispatcher
implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis))
@@ -71,12 +57,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val (_, server, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") {
+ val (testContext, responseFuture) = TraceContext.withContext(newContext("include-trace-token-header-at-request-level-api")) {
val rF = sendReceive(system, ec) {
Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
}
- (TraceRecorder.currentContext, rF)
+ (TraceContext.currentContext, rF)
}
// Accept the connection at the server side
@@ -85,7 +71,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+ request.headers should contain(traceTokenHeader(testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
@@ -98,12 +84,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val (_, server, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") {
+ val (testContext, responseFuture) = TraceContext.withContext(newContext("do-not-include-trace-token-header-at-request-level-api")) {
val rF = sendReceive(system, ec) {
Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
}
- (TraceRecorder.currentContext, rF)
+ (TraceContext.currentContext, rF)
}
// Accept the connection at the server side
@@ -112,7 +98,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+ request.headers should not contain (traceTokenHeader(testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
@@ -128,12 +114,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val (_, _, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") {
+ val (testContext, responseFuture) = TraceContext.withContext(newContext("assign-name-to-segment-with-request-level-api")) {
val rF = sendReceive(transport.ref)(ec, 10.seconds) {
Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
}
- (TraceRecorder.currentContext, rF)
+ (TraceContext.currentContext, rF)
}
// Receive the request and reply back
@@ -142,10 +128,10 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
responseFuture.futureValue.entity.asString should be("ok")
testContext.finish()
- val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api")
- traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
- traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment",
- SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1)
+ val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api", "trace")
+ traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segment("request-level /request-level-api-segment", SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ .numberOfMeasurements should be(1)
}
"rename a request level api segment once it reaches the relevant host connector" in {
@@ -155,12 +141,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val (_, server, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") {
+ val (testContext, responseFuture) = TraceContext.withContext(newContext("rename-segment-with-request-level-api")) {
val rF = sendReceive(system, ec) {
Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
}
- (TraceRecorder.currentContext, rF)
+ (TraceContext.currentContext, rF)
}
// Accept the connection at the server side
@@ -173,10 +159,10 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
responseFuture.futureValue.entity.asString should be("ok")
testContext.finish()
- val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api")
- traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
- traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment",
- SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1)
+ val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api", "trace")
+ traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segment("host-level /request-level-api-segment", SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ .numberOfMeasurements should be(1)
}
}
@@ -189,9 +175,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val client = TestProbe()
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
+ val testContext = TraceContext.withContext(newContext("include-trace-token-header-on-http-client-request")) {
client.send(hostConnector, Get("/dummy-path"))
- TraceRecorder.currentContext
+ TraceContext.currentContext
}
// Accept the connection at the server side
@@ -200,7 +186,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+ request.headers should contain(traceTokenHeader(testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
@@ -216,9 +202,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val client = TestProbe()
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ val testContext = TraceContext.withContext(newContext("not-include-trace-token-header-on-http-client-request")) {
client.send(hostConnector, Get("/dummy-path"))
- TraceRecorder.currentContext
+ TraceContext.currentContext
}
// Accept the connection at the server side
@@ -227,7 +213,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+ request.headers should not contain (traceTokenHeader(testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
@@ -243,9 +229,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val client = TestProbe()
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") {
+ val testContext = TraceContext.withContext(newContext("create-segment-with-host-level-api")) {
client.send(hostConnector, Get("/host-level-api-segment"))
- TraceRecorder.currentContext
+ TraceContext.currentContext
}
// Accept the connection at the server side
@@ -254,52 +240,39 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+ request.headers should not contain (traceTokenHeader(testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
testContext.finish()
- val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api")
- traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
- traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment",
- SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1)
+ val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api", "trace")
+ traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segment("host-level /host-level-api-segment", SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ .numberOfMeasurements should be(1)
}
}
}
- def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricsSnapshot = {
- val tickSnapshot = within(timeout) {
- listener.expectMsgType[TickMetricSnapshot]
- }
-
- val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName))
- metricsOption should not be empty
- metricsOption.get.asInstanceOf[TraceMetricsSnapshot]
- }
-
- def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
- val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
- val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
- recorder.get.collect(collectionContext)
- }
+ def traceTokenHeader(token: String): RawHeader =
+ RawHeader(Kamon(Spray).settings.traceTokenHeaderName, token)
- def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
- def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
+ def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientInstrumentationLevel.HostLevelAPI)
+ def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientInstrumentationLevel.RequestLevelAPI)
def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
- def setSegmentCollectionStrategy(strategy: ClientSegmentCollectionStrategy.Strategy): Unit = {
- val target = Kamon(Spray)(system)
- val field = target.getClass.getDeclaredField("clientSegmentCollectionStrategy")
+ def setSegmentCollectionStrategy(strategy: ClientInstrumentationLevel.Level): Unit = {
+ val target = Kamon(Spray)(system).settings
+ val field = target.getClass.getDeclaredField("clientInstrumentationLevel")
field.setAccessible(true)
field.set(target, strategy)
}
def setIncludeTraceToken(include: Boolean): Unit = {
- val target = Kamon(Spray)(system)
- val field = target.getClass.getDeclaredField("includeTraceToken")
+ val target = Kamon(Spray)(system).settings
+ val field = target.getClass.getDeclaredField("includeTraceTokenHeader")
field.setAccessible(true)
field.set(target, include)
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala
index c4b370d7..58bb2885 100644
--- a/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala
@@ -1,46 +1,27 @@
package kamon.spray
-import akka.actor.ActorSystem
-import akka.testkit.{ TestProbe, TestKitBase }
+import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.http.HttpServerMetrics
-import kamon.metric._
+import kamon.testkit.BaseKamonSpec
import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
-import org.scalatest.{ Matchers, WordSpecLike }
import spray.http.{ StatusCodes, HttpResponse, HttpRequest }
import spray.httpx.RequestBuilding
-class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
- with ScalaFutures with PatienceConfiguration with TestServer {
+class SprayServerMetricsSpec extends BaseKamonSpec("spray-server-metrics-spec") with RequestBuilding with ScalaFutures
+ with PatienceConfiguration with TestServer {
- val collectionContext = CollectionContext(100)
-
- implicit lazy val system: ActorSystem = ActorSystem("spray-server-metrics-spec", ConfigFactory.parseString(
- """
- |akka {
- | loglevel = ERROR
- |}
- |
- |kamon {
- | metrics {
- | tick-interval = 1 hour
- |
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = []
- | }
- | }
- | ]
- | }
- |}
- """.stripMargin))
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ |}
+ |
+ |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
+ """.stripMargin)
"the Spray Server metrics instrumentation" should {
- "record trace metrics for requests received" in {
- Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext)
+ "record trace metrics for processed requests" in {
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
@@ -58,15 +39,17 @@ class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers
client.expectMsgType[HttpResponse]
}
- val snapshot = Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext)
- snapshot.elapsedTime.numberOfMeasurements should be(15)
+ val snapshot = takeSnapshotOf("GET: /record-trace-metrics", "trace")
+ snapshot.histogram("elapsed-time").get.numberOfMeasurements should be(15)
}
- "record http serve metrics for all the requests" in {
- Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext)
+ "record http server metrics for all the requests" in {
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
+ // Erase metrics recorder from previous tests.
+ takeSnapshotOf("spray-server", "http-server")
+
for (repetition ← 1 to 10) {
client.send(connection, Get("/record-http-metrics"))
server.expectMsgType[HttpRequest]
@@ -81,11 +64,11 @@ class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers
client.expectMsgType[HttpResponse]
}
- val snapshot = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext)
- snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("200").count should be(10)
- snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("400").count should be(5)
- snapshot.countsPerStatusCode("200").count should be(10)
- snapshot.countsPerStatusCode("400").count should be(5)
+ val snapshot = takeSnapshotOf("spray-server", "http-server")
+ snapshot.counter("GET: /record-http-metrics_200").get.count should be(10)
+ snapshot.counter("GET: /record-http-metrics_400").get.count should be(5)
+ snapshot.counter("200").get.count should be(10)
+ snapshot.counter("400").get.count should be(5)
}
}
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala
index 30d42eea..1ae0cb98 100644
--- a/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala
@@ -17,39 +17,15 @@
package kamon.spray
import _root_.spray.httpx.RequestBuilding
-import akka.testkit.{ TestKitBase, TestProbe }
-import akka.actor.ActorSystem
-import org.scalatest.{ Matchers, WordSpecLike }
+import akka.testkit.TestProbe
+import kamon.testkit.BaseKamonSpec
import kamon.Kamon
import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
import spray.http.HttpHeaders.RawHeader
import spray.http.{ HttpResponse, HttpRequest }
-import com.typesafe.config.ConfigFactory
-
-class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
- with ScalaFutures with PatienceConfiguration with TestServer {
-
- implicit lazy val system: ActorSystem = ActorSystem("spray-server-tracing-spec", ConfigFactory.parseString(
- """
- |akka {
- | loglevel = ERROR
- |}
- |
- |kamon {
- | metrics {
- | tick-interval = 2 seconds
- |
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = []
- | }
- | }
- | ]
- | }
- |}
- """.stripMargin))
+
+class SprayServerTracingSpec extends BaseKamonSpec("spray-server-tracing-spec") with RequestBuilding with ScalaFutures
+ with PatienceConfiguration with TestServer {
"the spray server request tracing instrumentation" should {
"include the trace-token header in responses when the automatic-trace-token-propagation is enabled" in {
@@ -58,12 +34,12 @@ class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
- client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled")))
+ client.send(connection, Get("/").withHeaders(traceTokenHeader("propagation-enabled")))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled"))
+ response.headers should contain(traceTokenHeader("propagation-enabled"))
}
"reply back with an automatically assigned trace token if none was provided with the request and automatic-trace-token-propagation is enabled" in {
@@ -77,7 +53,7 @@ class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers.count(_.name == Kamon(Spray).traceTokenHeaderName) should be(1)
+ response.headers.count(_.name == Kamon(Spray).settings.traceTokenHeaderName) should be(1)
}
@@ -87,21 +63,24 @@ class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
- client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled")))
+ client.send(connection, Get("/").withHeaders(traceTokenHeader("propagation-disabled")))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers should not contain RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled")
+ response.headers should not contain traceTokenHeader("propagation-disabled")
}
}
+ def traceTokenHeader(token: String): RawHeader =
+ RawHeader(Kamon(Spray).settings.traceTokenHeaderName, token)
+
def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
def setIncludeTraceToken(include: Boolean): Unit = {
- val target = Kamon(Spray)(system)
- val field = target.getClass.getDeclaredField("includeTraceToken")
+ val target = Kamon(Spray)(system).settings
+ val field = target.getClass.getDeclaredField("includeTraceTokenHeader")
field.setAccessible(true)
field.set(target, include)
}