aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-11-10 04:22:05 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-11-10 04:53:56 +0100
commit5b9bbb196734c47e67d69d48e378e196b205fd57 (patch)
treed51796eb32c61c84e23da83780af1ef0854bbc1e
parent447605e612d5bbea9765ede288ba19b1a69af48c (diff)
downloadKamon-5b9bbb196734c47e67d69d48e378e196b205fd57.tar.gz
Kamon-5b9bbb196734c47e67d69d48e378e196b205fd57.tar.bz2
Kamon-5b9bbb196734c47e67d69d48e378e196b205fd57.zip
+ newrelic: report additional and scoped external service metrics, improves #63.
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala53
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala4
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala59
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala67
-rw-r--r--kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala77
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala9
6 files changed, 198 insertions, 71 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
index c573d04d..26e8839e 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
@@ -32,18 +32,46 @@ object JsonProtocol extends DefaultJsonProtocol {
"pid" -> JsNumber(obj.pid)))
}
- implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] {
+ implicit def seqWriter[T: JsonFormat] = new JsonFormat[Seq[T]] {
+ def read(value: JsValue): Seq[T] = value match {
+ case JsArray(elements) ⇒ elements.map(_.convertTo[T])(collection.breakOut)
+ case x ⇒ deserializationError("Expected Seq as JsArray, but got " + x)
+ }
+
def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector)
}
- implicit object MetricDetailWriter extends JsonWriter[Metric] {
+ implicit object MetricDetailWriter extends JsonFormat[Metric] {
+ def read(json: JsValue): (MetricID, MetricData) = json match {
+ case JsArray(elements) ⇒
+ val metricID = elements(0) match {
+ case JsObject(fields) ⇒ MetricID(fields("name").convertTo[String], fields.get("scope").map(_.convertTo[String]))
+ case x ⇒ deserializationError("Expected MetricID as JsObject, but got " + x)
+ }
+
+ val metricData = elements(1) match {
+ case JsArray(dataElements) ⇒
+ MetricData(
+ dataElements(0).convertTo[Long],
+ dataElements(1).convertTo[Double],
+ dataElements(2).convertTo[Double],
+ dataElements(3).convertTo[Double],
+ dataElements(4).convertTo[Double],
+ dataElements(5).convertTo[Double])
+ case x ⇒ deserializationError("Expected MetricData as JsArray, but got " + x)
+ }
+
+ (metricID, metricData)
+
+ case x ⇒ deserializationError("Expected Metric as JsArray, but got " + x)
+ }
+
def write(obj: Metric): JsValue = {
val (metricID, metricData) = obj
+ val nameAndScope = metricID.scope.foldLeft(Map("name" -> JsString(metricID.name)))((m, scope) ⇒ m + ("scope" -> JsString(scope)))
JsArray(
- JsObject(
- "name" -> JsString(metricID.name) // TODO Include scope
- ),
+ JsObject(nameAndScope),
JsArray(
JsNumber(metricData.callCount),
JsNumber(metricData.total),
@@ -54,7 +82,20 @@ object JsonProtocol extends DefaultJsonProtocol {
}
}
- implicit object MetricBatchWriter extends RootJsonWriter[MetricBatch] {
+ implicit object MetricBatchWriter extends RootJsonFormat[MetricBatch] {
+
+ def read(json: JsValue): MetricBatch = json match {
+ case JsArray(elements) ⇒
+ val runID = elements(0).convertTo[Long]
+ val timeSliceFrom = elements(1).convertTo[Long]
+ val timeSliceTo = elements(2).convertTo[Long]
+ val metrics = elements(3).convertTo[Seq[Metric]]
+
+ MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap))
+
+ case x ⇒ deserializationError("Expected Array as JsArray, but got " + x)
+ }
+
def write(obj: MetricBatch): JsValue =
JsArray(
JsNumber(obj.runID),
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
index b09973ef..0aa078f5 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
@@ -14,8 +14,6 @@ import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed
import spray.can.Http
import spray.http.Uri
import spray.httpx.SprayJsonSupport
-import spray.json.CompactPrinter
-
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
@@ -76,7 +74,7 @@ class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extend
log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to)
collectorClient {
- Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter))
+ Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter))
} map { response ⇒
if (response.status.isSuccess)
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala
new file mode 100644
index 00000000..713a5586
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala
@@ -0,0 +1,59 @@
+package kamon.newrelic
+
+import java.lang.{ StringBuilder ⇒ JStringBuilder }
+import spray.json._
+import scala.annotation.tailrec
+
+// We need a special treatment of / that needs to be escaped as \/ for New Relic to work properly with all metrics.
+// Without this custom json printer the scoped metrics are not displayed in the site.
+
+// format: OFF
+trait NewRelicJsonPrinter extends CompactPrinter {
+
+ override def printString(s: String, sb: JStringBuilder) {
+ import NewRelicJsonPrinter._
+ @tailrec def firstToBeEncoded(ix: Int = 0): Int =
+ if (ix == s.length) -1 else if (requiresEncoding(s.charAt(ix))) ix else firstToBeEncoded(ix + 1)
+
+ sb.append('"')
+ firstToBeEncoded() match {
+ case -1 ⇒ sb.append(s)
+ case first ⇒
+ sb.append(s, 0, first)
+ @tailrec def append(ix: Int): Unit =
+ if (ix < s.length) {
+ s.charAt(ix) match {
+ case c if !requiresEncoding(c) => sb.append(c)
+ case '"' => sb.append("\\\"")
+ case '\\' => sb.append("\\\\")
+ case '/' => sb.append("\\/")
+ case '\b' => sb.append("\\b")
+ case '\f' => sb.append("\\f")
+ case '\n' => sb.append("\\n")
+ case '\r' => sb.append("\\r")
+ case '\t' => sb.append("\\t")
+ case x if x <= 0xF => sb.append("\\u000").append(Integer.toHexString(x))
+ case x if x <= 0xFF => sb.append("\\u00").append(Integer.toHexString(x))
+ case x if x <= 0xFFF => sb.append("\\u0").append(Integer.toHexString(x))
+ case x => sb.append("\\u").append(Integer.toHexString(x))
+ }
+ append(ix + 1)
+ }
+ append(first)
+ }
+ sb.append('"')
+ }
+}
+
+object NewRelicJsonPrinter extends NewRelicJsonPrinter {
+
+ def requiresEncoding(c: Char): Boolean =
+ // from RFC 4627
+ // unescaped = %x20-21 / %x23-5B / %x5D-10FFFF
+ c match {
+ case '"' ⇒ true
+ case '\\' ⇒ true
+ case '/' ⇒ true
+ case c ⇒ c < 0x20
+ }
+} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
index 15eba982..0a4a516b 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
@@ -16,8 +16,9 @@
package kamon.newrelic
+import scala.collection.mutable;
import kamon.metric._
-import kamon.metric.TraceMetrics.ElapsedTime
+import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime }
import kamon.metric.instrument.Histogram
import kamon.trace.SegmentCategory.HttpClient
import kamon.trace.SegmentMetricIdentity
@@ -31,33 +32,65 @@ object WebTransactionMetricExtractor extends MetricExtractor {
var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano)
var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano)
- val transactionMetrics = metrics.collect {
- case (TraceMetrics(name), groupSnapshot) ⇒
-
- groupSnapshot.metrics collect {
- // Extract WebTransaction metrics and accumulate HttpDispatcher
- case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒
- accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext)
- snapshot.recordsIterator.foreach { record ⇒
- apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count)
- }
+ val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]]
+ val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]]
+ val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]]
- Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit)
+ val transactionMetrics = metrics.collect {
+ case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒
- // Extract all external services.
+ tms.segments.foreach {
case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒
accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext)
- Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit)
+ // Accumulate externals by host
+ externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil))
+
+ // Accumulate externals by host and library
+ externalByHostAndLibrarySnapshots.update((segmentName, library),
+ snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil))
+
+ // Accumulate externals by host and library, including the transaction as scope.
+ externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName),
+ snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil))
+
+ }
+
+ accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext)
+ tms.elapsedTime.recordsIterator.foreach { record ⇒
+ apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count)
}
+
+ Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit)
}
val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit)
- val webTransaction = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "WebTransaction", None, Scale.Unit)
- val external = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External", None, Scale.Unit)
+ val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None))
+ val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None))
+
val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit)
+ val externalAll = externalAllWeb.copy(MetricID("External/all", None))
+
+ val externalByHost = externalByHostSnapshots.map {
+ case (host, snapshots) ⇒
+ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext))
+ Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit)
+ }
+
+ val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map {
+ case ((host, library), snapshots) ⇒
+ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext))
+ Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit)
+ }
+
+ val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map {
+ case ((host, library, traceName), snapshots) ⇒
+ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext))
+ Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit)
+ }
- Map(httpDispatcher, webTransaction, external, externalAllWeb, apdexBuilder.build) ++ transactionMetrics.flatten.toMap
+ Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++
+ transactionMetrics ++ externalByHost ++ externalByHostAndLibrary ++ externalScopedByHostAndLibrary
}
}
diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala
index 3cf4bbd0..0001072e 100644
--- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala
+++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala
@@ -23,15 +23,16 @@ import com.typesafe.config.ConfigFactory
import kamon.metric.{ TraceMetrics, Metrics }
import kamon.{ Kamon, AkkaExtensionSwap }
import kamon.metric.Subscriptions.TickMetricSnapshot
-import org.scalatest.WordSpecLike
+import org.scalatest.{ Matchers, WordSpecLike }
import spray.can.Http
import spray.http.Uri.Query
import spray.http._
import spray.httpx.encoding.Deflate
import spray.httpx.{ RequestBuilding, SprayJsonSupport }
import scala.concurrent.duration._
+import spray.json._
-class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuilding with SprayJsonSupport {
+class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with SprayJsonSupport {
import kamon.newrelic.JsonProtocol._
implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString(
@@ -61,21 +62,21 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild
val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri))
metricReporter ! firstSnapshot
- httpManager.expectMsg(Deflate.encode {
- HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity(
- s"""
- |[9999,0,0,
- |[
- | [{"name":"Apdex"},[3,0.0,0.0,1.0,1.0,0.0]],
- | [{"name":"WebTransaction"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]],
- | [{"name":"External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]],
- | [{"name":"WebTransaction/Custom/example-trace"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]],
- | [{"name":"HttpDispatcher"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]],
- | [{"name":"External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]]
- |]
- |]
- """.stripMargin))
- })
+ val metricPost = httpManager.expectMsgType[HttpRequest]
+
+ metricPost.method should be(HttpMethods.POST)
+ metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data"))
+ metricPost.encoding should be(HttpEncodings.deflate)
+
+ val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch]
+ postedBatch.runID should be(9999)
+ postedBatch.timeSliceMetrics.from should be(1415587618)
+ postedBatch.timeSliceMetrics.to should be(1415587678)
+
+ val metrics = postedBatch.timeSliceMetrics.metrics
+ metrics(MetricID("Apdex", None)).callCount should be(3)
+ metrics(MetricID("WebTransaction", None)).callCount should be(3)
+ metrics(MetricID("HttpDispatcher", None)).callCount should be(3)
}
"accumulate metrics if posting fails" in new FakeTickSnapshotsFixture {
@@ -87,31 +88,23 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild
httpManager.reply(Timedout(request))
metricReporter ! secondSnapshot
- httpManager.expectMsg(Deflate.encode {
- HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity(
- s"""
- |[9999,0,0,
- |[
- | [{"name":"Apdex"},[6,0.0,0.0,1.0,1.0,0.0]],
- | [{"name":"WebTransaction"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]],
- | [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]],
- | [{"name":"WebTransaction/Custom/example-trace"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]],
- | [{"name":"HttpDispatcher"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]],
- | [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]]
- |]
- |]
- """.stripMargin))
- })
+ val metricPost = httpManager.expectMsgType[HttpRequest]
+
+ metricPost.method should be(HttpMethods.POST)
+ metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data"))
+ metricPost.encoding should be(HttpEncodings.deflate)
+
+ val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch]
+ postedBatch.runID should be(9999)
+ postedBatch.timeSliceMetrics.from should be(1415587618)
+ postedBatch.timeSliceMetrics.to should be(1415587738)
+
+ val metrics = postedBatch.timeSliceMetrics.metrics
+ metrics(MetricID("Apdex", None)).callCount should be(6)
+ metrics(MetricID("WebTransaction", None)).callCount should be(6)
+ metrics(MetricID("HttpDispatcher", None)).callCount should be(6)
}
}
- /*
- [9999, 0, 0, [
- [{"name": "Apdex"}, [6, 0.0, 0.0, 1.0, 1.0, 0.0]],
- [{"name": "WebTransaction"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]],
- [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]],
- [{"name": "WebTransaction/Custom/example-trace"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]],
- [{"name": "HttpDispatcher"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]],
- [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]]]]*/
def setHttpManager(probe: TestProbe): TestProbe = {
AkkaExtensionSwap.swap(system, Http, new IO.Extension {
@@ -146,11 +139,11 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild
recorder.elapsedTime.record(1000000)
recorder.elapsedTime.record(2000000)
recorder.elapsedTime.record(3000000)
- val firstSnapshot = TickMetricSnapshot(1, 100, Map(testTraceID -> collectRecorder))
+ val firstSnapshot = TickMetricSnapshot(1415587618000L, 1415587678000L, Map(testTraceID -> collectRecorder))
recorder.elapsedTime.record(6000000)
recorder.elapsedTime.record(5000000)
recorder.elapsedTime.record(4000000)
- val secondSnapshot = TickMetricSnapshot(100, 200, Map(testTraceID -> collectRecorder))
+ val secondSnapshot = TickMetricSnapshot(1415587678000L, 1415587738000L, Map(testTraceID -> collectRecorder))
}
} \ No newline at end of file
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index da6a3cbe..7d7494eb 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -83,8 +83,11 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
}
} ~
path("site") {
- complete {
- pipeline(Get("http://localhost:9090/site-redirect"))
+ traceName("FinalGetSite-3") {
+ complete {
+ for (f1 <- pipeline(Get("http://127.0.0.1:9090/ok"));
+ f2 <- pipeline(Get("http://www.google.com/search?q=mkyong"))) yield "Ok Double Future"
+ }
}
} ~
path("site-redirect") {
@@ -99,7 +102,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
}
} ~
path("ok") {
- traceName("OK") {
+ traceName("RespondWithOK-3") {
complete {
"ok"
}