From f498749274bc9f25ede7221d6bd8b3f0c3822dda Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 6 Nov 2014 16:29:54 +0100 Subject: ! newrelic: major refactor of the newrelic reporter Most notable changes: - The agent connection setup is separated from the actual metrics reporting, this will be important in the near future when we start sending errors too. - The metrics subscriptions are delayed until the connection to the agent is established. - The Tick metrics buffer is only created if necessary. - Introduced the kamon.newrelic.max-initialize-retries and initialize-retry-delay settings. - External service calls via HTTP clients are reported as external services. --- kamon-playground/src/main/resources/application.conf | 2 +- kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'kamon-playground') diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 32f0269d..86a87439 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,5 +1,5 @@ akka { - loglevel = INFO + loglevel = DEBUG extensions = ["kamon.newrelic.NewRelic"] actor { diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 878c3c8c..506e0bff 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -106,7 +106,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("future") { - traceName("OK-Future") { + traceName("OKFuture") { dynamic { counter.increment() complete(Future { "OK" }) -- cgit v1.2.3 From 447605e612d5bbea9765ede288ba19b1a69af48c Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 9 Nov 2014 22:55:43 +0100 Subject: + core,play,spray: allow segments to identify the library they belong to. --- .../src/main/scala/kamon/trace/TraceContext.scala | 24 ++++++++++++---------- .../test/scala/kamon/metric/TraceMetricsSpec.scala | 8 ++++---- .../kamon/trace/TraceContextManipulationSpec.scala | 4 ++-- .../newrelic/WebTransactionMetricExtractor.scala | 4 ++-- kamon-play/src/main/scala/kamon/play/Play.scala | 2 ++ .../play/instrumentation/WSInstrumentation.scala | 4 ++-- .../scala/kamon/play/WSInstrumentationSpec.scala | 4 ++-- .../main/scala/test/SimpleRequestProcessor.scala | 4 ++-- kamon-spray/src/main/scala/kamon/spray/Spray.scala | 1 + .../can/client/ClientRequestInstrumentation.scala | 4 ++-- .../spray/ClientRequestInstrumentationSpec.scala | 11 ++++++---- 11 files changed, 39 insertions(+), 31 deletions(-) (limited to 'kamon-playground') diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 9555daba..5b74e6b2 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -37,14 +37,15 @@ sealed trait TraceContext { def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty - def startSegment(segmentName: String, label: String): Segment + def startSegment(segmentName: String, category: String, library: String): Segment def nanoTimestamp: Long } sealed trait Segment { def name: String def rename(newName: String): Unit - def label: String + def category: String + def library: String def finish(): Unit def isEmpty: Boolean } @@ -57,12 +58,13 @@ case object EmptyTraceContext extends TraceContext { def origin: TraceContextOrigin = TraceContextOrigin.Local def isOpen: Boolean = false def isEmpty: Boolean = true - def startSegment(segmentName: String, label: String): Segment = EmptySegment + def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment def nanoTimestamp: Long = 0L case object EmptySegment extends Segment { val name: String = "empty-segment" - val label: String = "empty-label" + val category: String = "empty-category" + val library: String = "empty-library" def isEmpty: Boolean = true def rename(newName: String): Unit = {} def finish: Unit = {} @@ -99,7 +101,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, } } - def startSegment(segmentName: String, segmentLabel: String): Segment = new DefaultSegment(segmentName, segmentLabel) + def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { val segment = finishedSegments.poll() @@ -109,8 +111,8 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, } } - private def finishSegment(segmentName: String, label: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, label), duration)) + private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) if (isClosed) { metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ @@ -119,7 +121,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, } } - class DefaultSegment(segmentName: String, val label: String) extends Segment { + class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { private val _segmentStartNanoTime = System.nanoTime() @volatile private var _segmentName = segmentName @volatile private var _isOpen = true @@ -130,15 +132,15 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, label, (segmentFinishNanoTime - _segmentStartNanoTime)) + finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) } } } -case class SegmentMetricIdentity(name: String, label: String) extends MetricIdentity +case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity case class SegmentData(identity: SegmentMetricIdentity, duration: Long) -object SegmentMetricIdentityLabel { +object SegmentCategory { val HttpClient = "http-client" } diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala index 6453dd77..cd10f2d3 100644 --- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala @@ -53,7 +53,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with "record the elapsed time for segments that occur inside a given trace" in { TraceRecorder.withNewTraceContext("trace-with-segments") { - val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-label") + val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library") segment.finish() TraceRecorder.finish() } @@ -61,12 +61,12 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val snapshot = takeSnapshotOf("trace-with-segments") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1) } "record the elapsed time for segments that finish after their correspondent trace has finished" in { val segment = TraceRecorder.withNewTraceContext("closing-segment-after-trace") { - val s = TraceRecorder.currentContext.startSegment("test-segment", "test-label") + val s = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library") TraceRecorder.finish() s } @@ -80,7 +80,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0) afterFinishSegmentSnapshot.segments.size should be(1) - afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1) + afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1) } } diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala index e2031a72..0875deff 100644 --- a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala @@ -78,7 +78,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow creating a segment within a trace" in { val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") { - val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-label") + val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-category", "segment-library") TraceRecorder.currentContext } @@ -88,7 +88,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow renaming a segment" in { TraceRecorder.withNewTraceContext("trace-with-renamed-segment") { - val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label") + val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label", "segment-library") segment.name should be("original-segment-name") segment.rename("new-segment-name") diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 3710595c..15eba982 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -19,7 +19,7 @@ package kamon.newrelic import kamon.metric._ import kamon.metric.TraceMetrics.ElapsedTime import kamon.metric.instrument.Histogram -import kamon.trace.SegmentMetricIdentityLabel.HttpClient +import kamon.trace.SegmentCategory.HttpClient import kamon.trace.SegmentMetricIdentity object WebTransactionMetricExtractor extends MetricExtractor { @@ -45,7 +45,7 @@ object WebTransactionMetricExtractor extends MetricExtractor { Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit) // Extract all external services. - case (SegmentMetricIdentity(segmentName, label), snapshot: Histogram.Snapshot) if label.equals(HttpClient) ⇒ + case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit) diff --git a/kamon-play/src/main/scala/kamon/play/Play.scala b/kamon-play/src/main/scala/kamon/play/Play.scala index 6e2de3c1..9b69f9a3 100644 --- a/kamon-play/src/main/scala/kamon/play/Play.scala +++ b/kamon-play/src/main/scala/kamon/play/Play.scala @@ -27,6 +27,8 @@ import play.api.mvc.RequestHeader object Play extends ExtensionId[PlayExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Play override def createExtension(system: ExtendedActorSystem): PlayExtension = new PlayExtension(system) + + val SegmentLibraryName = "WS-client" } class PlayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index 125db85e..f16c76c8 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -18,7 +18,7 @@ package kamon.play.instrumentation import kamon.Kamon import kamon.play.Play -import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import play.api.libs.ws.{ WSRequest, WSResponse } @@ -37,7 +37,7 @@ class WSInstrumentation { val playExtension = Kamon(Play)(system) val executor = playExtension.defaultDispatcher val segmentName = playExtension.generateHttpClientSegmentName(request) - val segment = ctx.startSegment(segmentName, SegmentMetricIdentityLabel.HttpClient) + val segment = ctx.startSegment(segmentName, SegmentCategory.HttpClient, Play.SegmentLibraryName) val response = pjp.proceed().asInstanceOf[Future[WSResponse]] response.map(result ⇒ segment.finish())(executor) diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala index bf1ead05..bda8281b 100644 --- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala @@ -19,7 +19,7 @@ package kamon.play import kamon.Kamon import kamon.metric.TraceMetrics.TraceMetricsSnapshot import kamon.metric.{ Metrics, TraceMetrics } -import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } import org.scalatest.{ Matchers, WordSpecLike } import org.scalatestplus.play.OneServerPerSuite import play.api.libs.ws.WS @@ -49,7 +49,7 @@ class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPer val snapshot = takeSnapshotOf("GET: /inside") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentCategory.HttpClient, Play.SegmentLibraryName)).numberOfMeasurements should be(1) } "propagate the TraceContext outside an Action and complete the WS request" in { diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 506e0bff..da6a3cbe 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -23,7 +23,7 @@ import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.metric._ import kamon.spray.KamonTraceDirectives -import kamon.trace.{ SegmentMetricIdentityLabel, TraceRecorder } +import kamon.trace.{ SegmentCategory, TraceRecorder } import spray.http.{ StatusCodes, Uri } import spray.httpx.RequestBuilding import spray.routing.SimpleRoutingApp @@ -127,7 +127,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } ~ path("segment") { complete { - val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentMetricIdentityLabel.HttpClient) + val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentCategory.HttpClient, "none") (replier ? "hello").mapTo[String].onComplete { t ⇒ segment.finish() } diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala index 72b997d7..ab8d6a7d 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala @@ -28,6 +28,7 @@ object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Spray def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system) + val SegmentLibraryName = "spray-client" } object ClientSegmentCollectionStrategy { diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index 94fc3572..813915c4 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -53,7 +53,7 @@ class ClientRequestInstrumentation { if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { if (requestContext.segment.isEmpty) { val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName) requestContext.segment = segment } @@ -116,7 +116,7 @@ class ClientRequestInstrumentation { val sprayExtension = Kamon(Spray)(system) val segment = if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) - ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient) + ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName) else EmptyTraceContext.EmptySegment diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 57f9ebe1..b90b0f3b 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -23,7 +23,7 @@ import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader @@ -144,7 +144,8 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api") traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", + SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) } "rename a request level api segment once it reaches the relevant host connector" in { @@ -174,7 +175,8 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api") traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", + SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) } } @@ -261,7 +263,8 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api") traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", + SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) } } } -- cgit v1.2.3 From 5b9bbb196734c47e67d69d48e378e196b205fd57 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 10 Nov 2014 04:22:05 +0100 Subject: + newrelic: report additional and scoped external service metrics, improves #63. --- .../main/scala/kamon/newrelic/JsonProtocol.scala | 53 +++++++++++++-- .../main/scala/kamon/newrelic/MetricReporter.scala | 4 +- .../scala/kamon/newrelic/NewRelicJsonPrinter.scala | 59 +++++++++++++++++ .../newrelic/WebTransactionMetricExtractor.scala | 67 ++++++++++++++----- .../scala/kamon/newrelic/MetricReporterSpec.scala | 77 ++++++++++------------ .../main/scala/test/SimpleRequestProcessor.scala | 9 ++- 6 files changed, 198 insertions(+), 71 deletions(-) create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala (limited to 'kamon-playground') diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index c573d04d..26e8839e 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -32,18 +32,46 @@ object JsonProtocol extends DefaultJsonProtocol { "pid" -> JsNumber(obj.pid))) } - implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] { + implicit def seqWriter[T: JsonFormat] = new JsonFormat[Seq[T]] { + def read(value: JsValue): Seq[T] = value match { + case JsArray(elements) ⇒ elements.map(_.convertTo[T])(collection.breakOut) + case x ⇒ deserializationError("Expected Seq as JsArray, but got " + x) + } + def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) } - implicit object MetricDetailWriter extends JsonWriter[Metric] { + implicit object MetricDetailWriter extends JsonFormat[Metric] { + def read(json: JsValue): (MetricID, MetricData) = json match { + case JsArray(elements) ⇒ + val metricID = elements(0) match { + case JsObject(fields) ⇒ MetricID(fields("name").convertTo[String], fields.get("scope").map(_.convertTo[String])) + case x ⇒ deserializationError("Expected MetricID as JsObject, but got " + x) + } + + val metricData = elements(1) match { + case JsArray(dataElements) ⇒ + MetricData( + dataElements(0).convertTo[Long], + dataElements(1).convertTo[Double], + dataElements(2).convertTo[Double], + dataElements(3).convertTo[Double], + dataElements(4).convertTo[Double], + dataElements(5).convertTo[Double]) + case x ⇒ deserializationError("Expected MetricData as JsArray, but got " + x) + } + + (metricID, metricData) + + case x ⇒ deserializationError("Expected Metric as JsArray, but got " + x) + } + def write(obj: Metric): JsValue = { val (metricID, metricData) = obj + val nameAndScope = metricID.scope.foldLeft(Map("name" -> JsString(metricID.name)))((m, scope) ⇒ m + ("scope" -> JsString(scope))) JsArray( - JsObject( - "name" -> JsString(metricID.name) // TODO Include scope - ), + JsObject(nameAndScope), JsArray( JsNumber(metricData.callCount), JsNumber(metricData.total), @@ -54,7 +82,20 @@ object JsonProtocol extends DefaultJsonProtocol { } } - implicit object MetricBatchWriter extends RootJsonWriter[MetricBatch] { + implicit object MetricBatchWriter extends RootJsonFormat[MetricBatch] { + + def read(json: JsValue): MetricBatch = json match { + case JsArray(elements) ⇒ + val runID = elements(0).convertTo[Long] + val timeSliceFrom = elements(1).convertTo[Long] + val timeSliceTo = elements(2).convertTo[Long] + val metrics = elements(3).convertTo[Seq[Metric]] + + MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap)) + + case x ⇒ deserializationError("Expected Array as JsArray, but got " + x) + } + def write(obj: MetricBatch): JsValue = JsArray( JsNumber(obj.runID), diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index b09973ef..0aa078f5 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -14,8 +14,6 @@ import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed import spray.can.Http import spray.http.Uri import spray.httpx.SprayJsonSupport -import spray.json.CompactPrinter - import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -76,7 +74,7 @@ class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extend log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) collectorClient { - Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter)) + Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)) } map { response ⇒ if (response.status.isSuccess) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala new file mode 100644 index 00000000..713a5586 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala @@ -0,0 +1,59 @@ +package kamon.newrelic + +import java.lang.{ StringBuilder ⇒ JStringBuilder } +import spray.json._ +import scala.annotation.tailrec + +// We need a special treatment of / that needs to be escaped as \/ for New Relic to work properly with all metrics. +// Without this custom json printer the scoped metrics are not displayed in the site. + +// format: OFF +trait NewRelicJsonPrinter extends CompactPrinter { + + override def printString(s: String, sb: JStringBuilder) { + import NewRelicJsonPrinter._ + @tailrec def firstToBeEncoded(ix: Int = 0): Int = + if (ix == s.length) -1 else if (requiresEncoding(s.charAt(ix))) ix else firstToBeEncoded(ix + 1) + + sb.append('"') + firstToBeEncoded() match { + case -1 ⇒ sb.append(s) + case first ⇒ + sb.append(s, 0, first) + @tailrec def append(ix: Int): Unit = + if (ix < s.length) { + s.charAt(ix) match { + case c if !requiresEncoding(c) => sb.append(c) + case '"' => sb.append("\\\"") + case '\\' => sb.append("\\\\") + case '/' => sb.append("\\/") + case '\b' => sb.append("\\b") + case '\f' => sb.append("\\f") + case '\n' => sb.append("\\n") + case '\r' => sb.append("\\r") + case '\t' => sb.append("\\t") + case x if x <= 0xF => sb.append("\\u000").append(Integer.toHexString(x)) + case x if x <= 0xFF => sb.append("\\u00").append(Integer.toHexString(x)) + case x if x <= 0xFFF => sb.append("\\u0").append(Integer.toHexString(x)) + case x => sb.append("\\u").append(Integer.toHexString(x)) + } + append(ix + 1) + } + append(first) + } + sb.append('"') + } +} + +object NewRelicJsonPrinter extends NewRelicJsonPrinter { + + def requiresEncoding(c: Char): Boolean = + // from RFC 4627 + // unescaped = %x20-21 / %x23-5B / %x5D-10FFFF + c match { + case '"' ⇒ true + case '\\' ⇒ true + case '/' ⇒ true + case c ⇒ c < 0x20 + } +} \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 15eba982..0a4a516b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,8 +16,9 @@ package kamon.newrelic +import scala.collection.mutable; import kamon.metric._ -import kamon.metric.TraceMetrics.ElapsedTime +import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime } import kamon.metric.instrument.Histogram import kamon.trace.SegmentCategory.HttpClient import kamon.trace.SegmentMetricIdentity @@ -31,33 +32,65 @@ object WebTransactionMetricExtractor extends MetricExtractor { var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - val transactionMetrics = metrics.collect { - case (TraceMetrics(name), groupSnapshot) ⇒ - - groupSnapshot.metrics collect { - // Extract WebTransaction metrics and accumulate HttpDispatcher - case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext) - snapshot.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count) - } + val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] + val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] + val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] - Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit) + val transactionMetrics = metrics.collect { + case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ - // Extract all external services. + tms.segments.foreach { case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) - Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit) + // Accumulate externals by host + externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + + // Accumulate externals by host and library + externalByHostAndLibrarySnapshots.update((segmentName, library), + snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + + // Accumulate externals by host and library, including the transaction as scope. + externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), + snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + + } + + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) + tms.elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) } + + Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) } val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) - val webTransaction = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "WebTransaction", None, Scale.Unit) - val external = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External", None, Scale.Unit) + val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) + val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) + val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAll = externalAllWeb.copy(MetricID("External/all", None)) + + val externalByHost = externalByHostSnapshots.map { + case (host, snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + } + + val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { + case ((host, library), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + } + + val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { + case ((host, library, traceName), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + } - Map(httpDispatcher, webTransaction, external, externalAllWeb, apdexBuilder.build) ++ transactionMetrics.flatten.toMap + Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ + transactionMetrics ++ externalByHost ++ externalByHostAndLibrary ++ externalScopedByHostAndLibrary } } diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala index 3cf4bbd0..0001072e 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -23,15 +23,16 @@ import com.typesafe.config.ConfigFactory import kamon.metric.{ TraceMetrics, Metrics } import kamon.{ Kamon, AkkaExtensionSwap } import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.WordSpecLike +import org.scalatest.{ Matchers, WordSpecLike } import spray.can.Http import spray.http.Uri.Query import spray.http._ import spray.httpx.encoding.Deflate import spray.httpx.{ RequestBuilding, SprayJsonSupport } import scala.concurrent.duration._ +import spray.json._ -class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuilding with SprayJsonSupport { +class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with SprayJsonSupport { import kamon.newrelic.JsonProtocol._ implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString( @@ -61,21 +62,21 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) metricReporter ! firstSnapshot - httpManager.expectMsg(Deflate.encode { - HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity( - s""" - |[9999,0,0, - |[ - | [{"name":"Apdex"},[3,0.0,0.0,1.0,1.0,0.0]], - | [{"name":"WebTransaction"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], - | [{"name":"External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], - | [{"name":"WebTransaction/Custom/example-trace"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], - | [{"name":"HttpDispatcher"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], - | [{"name":"External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]] - |] - |] - """.stripMargin)) - }) + val metricPost = httpManager.expectMsgType[HttpRequest] + + metricPost.method should be(HttpMethods.POST) + metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data")) + metricPost.encoding should be(HttpEncodings.deflate) + + val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] + postedBatch.runID should be(9999) + postedBatch.timeSliceMetrics.from should be(1415587618) + postedBatch.timeSliceMetrics.to should be(1415587678) + + val metrics = postedBatch.timeSliceMetrics.metrics + metrics(MetricID("Apdex", None)).callCount should be(3) + metrics(MetricID("WebTransaction", None)).callCount should be(3) + metrics(MetricID("HttpDispatcher", None)).callCount should be(3) } "accumulate metrics if posting fails" in new FakeTickSnapshotsFixture { @@ -87,31 +88,23 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild httpManager.reply(Timedout(request)) metricReporter ! secondSnapshot - httpManager.expectMsg(Deflate.encode { - HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity( - s""" - |[9999,0,0, - |[ - | [{"name":"Apdex"},[6,0.0,0.0,1.0,1.0,0.0]], - | [{"name":"WebTransaction"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], - | [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], - | [{"name":"WebTransaction/Custom/example-trace"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], - | [{"name":"HttpDispatcher"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], - | [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]] - |] - |] - """.stripMargin)) - }) + val metricPost = httpManager.expectMsgType[HttpRequest] + + metricPost.method should be(HttpMethods.POST) + metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data")) + metricPost.encoding should be(HttpEncodings.deflate) + + val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] + postedBatch.runID should be(9999) + postedBatch.timeSliceMetrics.from should be(1415587618) + postedBatch.timeSliceMetrics.to should be(1415587738) + + val metrics = postedBatch.timeSliceMetrics.metrics + metrics(MetricID("Apdex", None)).callCount should be(6) + metrics(MetricID("WebTransaction", None)).callCount should be(6) + metrics(MetricID("HttpDispatcher", None)).callCount should be(6) } } - /* - [9999, 0, 0, [ - [{"name": "Apdex"}, [6, 0.0, 0.0, 1.0, 1.0, 0.0]], - [{"name": "WebTransaction"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], - [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], - [{"name": "WebTransaction/Custom/example-trace"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], - [{"name": "HttpDispatcher"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], - [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]]]]*/ def setHttpManager(probe: TestProbe): TestProbe = { AkkaExtensionSwap.swap(system, Http, new IO.Extension { @@ -146,11 +139,11 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild recorder.elapsedTime.record(1000000) recorder.elapsedTime.record(2000000) recorder.elapsedTime.record(3000000) - val firstSnapshot = TickMetricSnapshot(1, 100, Map(testTraceID -> collectRecorder)) + val firstSnapshot = TickMetricSnapshot(1415587618000L, 1415587678000L, Map(testTraceID -> collectRecorder)) recorder.elapsedTime.record(6000000) recorder.elapsedTime.record(5000000) recorder.elapsedTime.record(4000000) - val secondSnapshot = TickMetricSnapshot(100, 200, Map(testTraceID -> collectRecorder)) + val secondSnapshot = TickMetricSnapshot(1415587678000L, 1415587738000L, Map(testTraceID -> collectRecorder)) } } \ No newline at end of file diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index da6a3cbe..7d7494eb 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -83,8 +83,11 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("site") { - complete { - pipeline(Get("http://localhost:9090/site-redirect")) + traceName("FinalGetSite-3") { + complete { + for (f1 <- pipeline(Get("http://127.0.0.1:9090/ok")); + f2 <- pipeline(Get("http://www.google.com/search?q=mkyong"))) yield "Ok Double Future" + } } } ~ path("site-redirect") { @@ -99,7 +102,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("ok") { - traceName("OK") { + traceName("RespondWithOK-3") { complete { "ok" } -- cgit v1.2.3