diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-02-05 01:12:34 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-02-05 01:12:34 -0300 |
commit | 522a12c90788c402a364407e146d302e6519a3da (patch) | |
tree | 3603c2172a8048b5f13c923d85e14b4f8490ca53 | |
parent | 57e433c07a271b4e5e4159500cdc828cd7bb6a83 (diff) | |
download | Kamon-522a12c90788c402a364407e146d302e6519a3da.tar.gz Kamon-522a12c90788c402a364407e146d302e6519a3da.tar.bz2 Kamon-522a12c90788c402a364407e146d302e6519a3da.zip |
kamon-newrelic now uses the subscription protocol to report metrics to NewRelic
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/Metrics.scala | 4 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala | 2 | ||||
-rw-r--r-- | kamon-newrelic/src/main/resources/reference.conf | 6 | ||||
-rw-r--r-- | kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala | 4 | ||||
-rw-r--r-- | kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala | 3 | ||||
-rw-r--r-- | kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala | 43 | ||||
-rw-r--r-- | kamon-playground/src/main/resources/application.conf | 2 | ||||
-rw-r--r-- | kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala | 21 | ||||
-rw-r--r-- | kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala (renamed from kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala) | 21 |
9 files changed, 70 insertions, 36 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala index ee32dbe8..81475d52 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -102,8 +102,8 @@ object MetricSnapshot { case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot -object MetricGroupIdentity { - +object DefaultMetricSnapshot { + val empty = DefaultMetricSnapshot(0, Vector.empty) } trait MetricGroupFactory { diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 3e3bb19f..0e264cd2 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -54,6 +54,8 @@ object TraceRecorder { def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] = currentContext.map(_.startSegment(identity, metadata)) + def rename(name: String): Unit = currentContext.map(_.rename(name)) + def withNewTraceContext[T](name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(thunk: ⇒ T)(implicit system: ActorSystem): T = withTraceContext(Some(newTraceContext(name, token, metadata, system)))(thunk) diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index dacabe41..ee0acdc3 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -1,9 +1,7 @@ -akka { - extensions = ["kamon.newrelic.NewRelic"] -} - kamon { newrelic { + apdexT = 1 second + app-name = "Kamon[Development]" license-key = 2e24765acb032cb9e7207013b5ba3e2ab7d2d75c } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala index 39177e30..c3438df5 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala @@ -24,9 +24,11 @@ class MetricTranslator(receiver: ActorRef) extends Actor with WebTransactionMetr def receive = { case TickMetricSnapshot(from, to, metrics) => + val scaledFrom = (from / 1E3).toInt + val scaledTo = (to / 1E3).toInt val allMetrics = collectWebTransactionMetrics(metrics) - receiver ! TimeSliceMetrics(from, to, allMetrics) + receiver ! TimeSliceMetrics(scaledFrom, scaledTo, allMetrics) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index ef2de343..92191842 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -24,7 +24,10 @@ import akka.actor class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val config = system.settings.config.getConfig("kamon.newrelic") + val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") + val apdexT: Double = config.getMilliseconds("apdexT") / 1E3 // scale to seconds. Kamon(Metrics)(system).subscribe(TraceMetrics, "*", manager, permanently = true) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala index 7fe5793c..31a3669d 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala @@ -16,18 +16,53 @@ package kamon.newrelic -import kamon.metrics.{ TraceMetrics, MetricGroupSnapshot, MetricGroupIdentity } +import kamon.metrics._ import kamon.metrics.TraceMetrics.ElapsedTime +import akka.actor.Actor +import kamon.Kamon trait WebTransactionMetrics { - def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): List[NewRelic.Metric] = { + self: Actor => + + def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { + val apdexBuilder = new ApdexBuilder("Apdex", None, (NewRelic)(context.system).apdexT) + var accumulatedHttpDispatcher: MetricSnapshot = DefaultMetricSnapshot.empty + val webTransactionMetrics = metrics.collect { case (TraceMetrics(name), groupSnapshot) ⇒ + groupSnapshot.metrics collect { - case (ElapsedTime, snapshot) => toNewRelicMetric("HttpDispatcher", None, snapshot) + case (ElapsedTime, snapshot) => + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot) + snapshot.measurementLevels.foreach(level => apdexBuilder.record(level.value / 1E9D, level.count)) + + toNewRelicMetric(s"WebTransaction/Custom/$name", None, snapshot) } } - webTransactionMetrics.flatten.toList + val httpDispatcher = toNewRelicMetric("HttpDispatcher", None, accumulatedHttpDispatcher) + val webTransaction = toNewRelicMetric("WebTransaction", None, accumulatedHttpDispatcher) + + Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq } } + +class ApdexBuilder(name: String, scope: Option[String], apdexT: Double) { + val frustratingThreshold = 4 * apdexT + + var satisfying = 0L + var tolerating = 0L + var frustrating = 0L + + def record(duration: Double, count: Long): Unit = + if(duration <= apdexT) + satisfying += count + else + if(duration <= frustratingThreshold) + tolerating += count + else + frustrating += count + + // NewRelic reuses the same metric structure for recording the Apdex.. weird, but that's how it works. + def build: NewRelic.Metric = NewRelic.Metric(name, scope, satisfying, tolerating, frustrating, apdexT, apdexT, 0) +} diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index c490c6c2..f0698592 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,6 +1,6 @@ akka { loggers = [ "akka.event.slf4j.Slf4jLogger" ] - loglevel = DEBUG + loglevel = INFO extensions = ["kamon.newrelic.NewRelic"] diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 06d8795a..c518e32a 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -20,7 +20,7 @@ import spray.routing.SimpleRoutingApp import akka.util.Timeout import spray.httpx.RequestBuilding import scala.concurrent.{ Await, Future } -import kamon.spray.UowDirectives +import kamon.spray.KamonTraceDirectives import scala.util.Random import akka.routing.RoundRobinRouter import kamon.trace.TraceRecorder @@ -30,7 +30,7 @@ import spray.http.{ StatusCodes, Uri } import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.newrelic.WebTransactionMetrics -object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { +object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with KamonTraceDirectives { import scala.concurrent.duration._ import spray.client.pipelining._ import akka.pattern.ask @@ -57,7 +57,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil startServer(interface = "localhost", port = 9090) { get { path("test") { - uow { + traceName("test") { complete { val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r ⇒ "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r ⇒ "Ok") :: Nil @@ -75,21 +75,24 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } ~ path("reply" / Segment) { reqID ⇒ - uow { + traceName("reply") { complete { (replier ? reqID).mapTo[String] } } } ~ path("ok") { - complete { - //Thread.sleep(random.nextInt(1) + random.nextInt(5) + random.nextInt(2)) - "ok" + traceName("OK") { + complete { + "ok" + } } } ~ path("future") { - dynamic { - complete(Future { "OK" }) + traceName("OK-Future") { + dynamic { + complete(Future { "OK" }) + } } } ~ path("kill") { diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala index 56cc6d5e..e831af7b 100644 --- a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala +++ b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala @@ -21,21 +21,12 @@ import java.util.concurrent.atomic.AtomicLong import scala.util.Try import java.net.InetAddress import spray.http.HttpHeaders.RawHeader +import shapeless.HNil +import kamon.trace.TraceRecorder -trait UowDirectives extends BasicDirectives { - def uow: Directive0 = mapRequest { request ⇒ - val uowHeader = request.headers.find(_.name == "X-UOW") - - val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) - //Trace.transformContext(_.copy(token = generatedUow)) - request +trait KamonTraceDirectives extends BasicDirectives { + def traceName(name: String): Directive0 = mapRequest { req => + TraceRecorder.rename(name) + req } - //def respondWithUow = mapHttpResponseHeaders(headers ⇒ Trace.context().map(ctx ⇒ RawHeader("X-UOW", ctx.token) :: headers).getOrElse(headers)) -} - -object UowDirectives { - val uowCounter = new AtomicLong - val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) - } |