diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-01-27 09:11:32 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-01-27 09:11:32 -0300 |
commit | 003af1dd8ef1e3b4a8f1a9eff56a3c15c01cb2a9 (patch) | |
tree | e0afcb7d2358787f16c5b19e8caceaab5d6ea1ca | |
parent | fc13a7e4abf87c92d63cb7b13561119b6414651e (diff) | |
download | Kamon-003af1dd8ef1e3b4a8f1a9eff56a3c15c01cb2a9.tar.gz Kamon-003af1dd8ef1e3b4a8f1a9eff56a3c15c01cb2a9.tar.bz2 Kamon-003af1dd8ef1e3b4a8f1a9eff56a3c15c01cb2a9.zip |
rename UOW to TraceToken and generate it by default when a Trace starts
17 files changed, 84 insertions, 48 deletions
diff --git a/kamon-core/src/main/java/kamon/util/GlobPathFilter.java b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java index 5b019bec..a000e2a0 100644 --- a/kamon-core/src/main/java/kamon/util/GlobPathFilter.java +++ b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright 2013 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -14,7 +14,6 @@ * ========================================================================================= */ - // This file was copied from: https://github.com/jboss-modules/jboss-modules/blob/master/src/main/java/org/jboss/modules/filter/GlobPathFilter.java package kamon.util; diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala index dc4abde0..4a6c98f4 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala @@ -45,14 +45,6 @@ trait ActorMetricsOps { () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig) } - import scala.concurrent.duration._ - system.scheduler.schedule(0.seconds, 10.seconds)( - actorMetrics.collect { - case (name, recorder: HdrActorMetricsRecorder) ⇒ - println(s"Actor: $name") - recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D) - })(system.dispatcher) - def shouldTrackActor(path: String): Boolean = trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path)) diff --git a/kamon-core/src/main/scala/kamon/metrics/AtomicSnapshotableHistogram.scala b/kamon-core/src/main/scala/kamon/metrics/AtomicSnapshotableHistogram.scala index 39d4ac0b..c9e47792 100644 --- a/kamon-core/src/main/scala/kamon/metrics/AtomicSnapshotableHistogram.scala +++ b/kamon-core/src/main/scala/kamon/metrics/AtomicSnapshotableHistogram.scala @@ -18,8 +18,7 @@ package org.HdrHistogram import java.util.concurrent.atomic.AtomicLongFieldUpdater import scala.annotation.tailrec -import org.HdrHistogram.AtomicSnapshotableHistogram.{Value, Snapshot} - +import org.HdrHistogram.AtomicSnapshotableHistogram.{ Value, Snapshot } /** * This implementation aims to be used for real time data collection where data snapshots are taken often over time. @@ -27,25 +26,23 @@ import org.HdrHistogram.AtomicSnapshotableHistogram.{Value, Snapshot} * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ class AtomicSnapshotableHistogram(highestTrackableValue: Long, numberOfSignificantValueDigits: Int) - extends AtomicHistogram(1L, highestTrackableValue, numberOfSignificantValueDigits) { + extends AtomicHistogram(1L, highestTrackableValue, numberOfSignificantValueDigits) { import AtomicSnapshotableHistogram.totalCountUpdater - def snapshotAndReset(): Snapshot = { val entries = Vector.newBuilder[Value] val countsLength = counts.length() @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if(index < countsLength) { + if (index < countsLength) { val currentValue = previousValue + increment val countAtValue = counts.getAndSet(index, 0) - if(countAtValue > 0) + if (countAtValue > 0) entries += Value(currentValue, countAtValue) - - if(currentValue == bucketLimit) + if (currentValue == bucketLimit) iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) else iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala index 31e8185a..7dd3a6f8 100644 --- a/kamon-core/src/main/scala/kamon/trace/Trace.scala +++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala @@ -21,6 +21,8 @@ import scala.Some import kamon.trace.Trace.Register import scala.concurrent.duration._ import java.util.concurrent.atomic.AtomicLong +import scala.util.Try +import java.net.InetAddress object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: Extension] = Trace @@ -40,8 +42,8 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { private def set(ctx: Option[TraceContext]) = traceContext.set(ctx) def clear: Unit = traceContext.remove() - def start(name: String)(implicit system: ActorSystem): TraceContext = { - val ctx = newTraceContext(name) + def start(name: String, token: Option[String])(implicit system: ActorSystem): TraceContext = { + val ctx = newTraceContext(name, token.getOrElse(TraceToken.generate())) ctx.start(name) set(Some(ctx)) @@ -68,7 +70,7 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } // TODO: FIX - def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name) + def newTraceContext(name: String, token: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name, token) def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = { val start = Segments.Start(category, description, attributes) @@ -80,14 +82,21 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { case class SegmentCompletionHandle(start: Segments.Start) { def complete(): Unit = { val end = Segments.End() - println(s"Completing the Segment: $start - $end") + //println(s"Completing the Segment: $start - $end") } def complete(end: Segments.End): Unit = { - println(s"Completing the Segment: $start - $end") + //println(s"Completing the Segment: $start - $end") } } } +object TraceToken { + val tokenCounter = new AtomicLong + val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + + def generate(): String = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) +} + class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 3e68a816..5780b749 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -23,7 +23,7 @@ import kamon.Kamon import kamon.trace.UowTracing.{ Finish, Start } // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { +case class TraceContext(private val collector: ActorRef, id: Long, name: String, token: String, userContext: Option[Any] = None) { def start(name: String) = { collector ! Start(id, name) diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index add47fdf..403e4ee7 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -19,6 +19,6 @@ import ch.qos.logback.classic.pattern.ClassicConverter import ch.qos.logback.classic.spi.ILoggingEvent import kamon.trace.Trace -class LogbackUowConverter extends ClassicConverter { - def convert(event: ILoggingEvent): String = Trace.context().map(_.uow).getOrElse("undefined") +class LogbackTraceTokenConverter extends ClassicConverter { + def convert(event: ILoggingEvent): String = Trace.context().map(_.token).getOrElse("undefined") } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala index 89742651..d6648cef 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala @@ -25,7 +25,7 @@ class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with W "the ActorLogging instrumentation" should { "attach the TraceContext (if available) to log events" in { - val testTraceContext = Some(TraceContext(Actor.noSender, 1)) + val testTraceContext = Some(TraceContext(Actor.noSender, 1, "test", "test-1")) val loggerActor = system.actorOf(Props[LoggerActor]) system.eventStream.subscribe(testActor, classOf[LogEvent]) diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala index 89251bf4..f32623b9 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala @@ -67,7 +67,7 @@ class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message- } trait TraceContextEchoFixture { - val testTraceContext = Some(Trace.newTraceContext("")) + val testTraceContext = Some(Trace.newTraceContext("test", "test-1")) val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) } diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala index 62f7ec84..2df95d09 100644 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala @@ -6,5 +6,5 @@ import akka.actor.Actor trait TraceContextFixture { val random = new Random(System.nanoTime) - val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt)) + val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt, "test", "test-1")) }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 248aa03e..f3a08755 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -33,9 +33,9 @@ class NewRelicErrorLogger extends Actor { val ctx = error.asInstanceOf[ContextAware].traceContext for (c ← ctx) { - params.put("UOW", c.uow) + params.put("TraceToken", c.token) } - + if (error.cause == Error.NoCause) { NewRelic.noticeError(error.message.toString, params) } else { diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 88a77566..c0793fea 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,6 +1,9 @@ akka { loggers = [ "akka.event.slf4j.Slf4jLogger" ] loglevel = INFO + + extensions = ["kamon.newrelic.NewRelic"] + actor { debug { unhandled = on @@ -8,6 +11,14 @@ akka { } } +kamon { + newrelic { + app-name = "SimpleRequestProcessor" + license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 + } +} + + kamon { metrics { diff --git a/kamon-playground/src/main/resources/logback.xml b/kamon-playground/src/main/resources/logback.xml index e948eaf5..ae342ca2 100644 --- a/kamon-playground/src/main/resources/logback.xml +++ b/kamon-playground/src/main/resources/logback.xml @@ -1,8 +1,8 @@ <configuration scan="true"> - <conversionRule conversionWord="uow" converterClass="kamon.trace.logging.LogbackUowConverter" /> + <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter" /> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>%date{HH:mm:ss.SSS} %-5level [%uow][%X{akkaSource}] [%thread] %logger{55} - %msg%n</pattern> + <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%X{akkaSource}] [%thread] %logger{55} - %msg%n</pattern> </encoder> </appender> diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf new file mode 100644 index 00000000..88bd8fb8 --- /dev/null +++ b/kamon-spray/src/main/resources/reference.conf @@ -0,0 +1,6 @@ +kamon { + spray { + include-trace-token-header = true + trace-token-header-name = "X-Trace-Token" + } +}
\ No newline at end of file diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala index 44db55e7..78a5b336 100644 --- a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala +++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala @@ -28,10 +28,10 @@ trait UowDirectives extends BasicDirectives { val uowHeader = request.headers.find(_.name == "X-UOW") val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) - Trace.transformContext(_.copy(uow = generatedUow)) + Trace.transformContext(_.copy(token = generatedUow)) request } - def respondWithUow = mapHttpResponseHeaders(headers ⇒ Trace.context().map(ctx ⇒ RawHeader("X-UOW", ctx.uow) :: headers).getOrElse(headers)) + def respondWithUow = mapHttpResponseHeaders(headers ⇒ Trace.context().map(ctx ⇒ RawHeader("X-UOW", ctx.token) :: headers).getOrElse(headers)) } object UowDirectives { diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala index 91dba87b..a7d48ac8 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala @@ -37,9 +37,16 @@ class ServerRequestTracing { @After("openRequestInit(openRequest, request)") def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + val config = system.settings.config.getConfig("kamon.spray") + + val token = if(config.getBoolean("include-trace-token-header")) { + val traceTokenHeader = config.getString("trace-token-header-name") + request.headers.find(_.name == traceTokenHeader).map(_.value) + } else None + val defaultTraceName: String = request.method.value + ": " + request.uri.path - Trace.start(defaultTraceName)(system) + Trace.start(defaultTraceName, token)(system) // Necessary to force initialization of traceContext when initiating the request. openRequest.traceContext diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala index c7ec2a5f..eae0c992 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala @@ -14,7 +14,7 @@ class ClientRequestTracingSpec extends TestKit(ActorSystem("server-request-traci "the client instrumentation" should { "record segments for a client http request" in { - Trace.start("record-segments")(system) + Trace.start("record-segments", None)(system) send { Get(s"http://127.0.0.1:$port/ok") diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala index 44ed3baf..2a2865e1 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala @@ -19,25 +19,33 @@ import _root_.spray.httpx.RequestBuilding import _root_.spray.routing.SimpleRoutingApp import akka.testkit.TestKit import akka.actor.ActorSystem -import org.scalatest.WordSpecLike +import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.Await import scala.concurrent.duration._ import _root_.spray.client.pipelining._ import akka.util.Timeout import kamon.trace.{ UowTrace, Trace } import kamon.Kamon +import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } +import spray.http.HttpHeaders.RawHeader +import spray.http.HttpRequest +import spray.http.HttpHeaders.Host -class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding with TestServer { +class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer { "the spray server request tracing instrumentation" should { "trace a request start/finish sequence when proper TraceContext is received" in { - send { - Get(s"http://127.0.0.1:$port/ok") + val response = send { + Get("/ok") } within(5 seconds) { fishForNamedTrace("ok") } + + whenReady(response) { rsp ⇒ + rsp.headers should contain(RawHeader("X-Trace-Token", "")) + } } "finish a request even if no TraceContext is received in the response" in { @@ -60,6 +68,10 @@ class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-traci } } } + /* + - si no llega uow, crear una + - si llega con uow hay que propagarla + */ def fishForNamedTrace(traceName: String) = fishForMessage() { case trace: UowTrace if trace.name.contains(traceName) ⇒ true @@ -79,15 +91,18 @@ trait TestServer extends SimpleRoutingApp { path("ok") { complete("ok") } ~ - path("clearcontext") { - complete { - println("The Context in the route is: " + Trace.context) - Trace.clear - "ok" - } + path("clearcontext") { + complete { + Trace.clear + "ok" } + } }), timeout.duration).localAddress.getPort - val send = sendReceive(system, system.dispatcher, timeout) + val send = includeHost("127.0.0.1", port) ~> sendReceive(system, system.dispatcher, timeout) + + def includeHost(host: String, port: Int) = { request: HttpRequest ⇒ + request.withEffectiveUri(port == 443, Host(host, port)) + } } |