diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-07-03 14:36:42 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-07-03 14:36:18 -0300 |
commit | 29068fc70a3e5a17a630c2c7fff951572bb5fa21 (patch) | |
tree | 7ec2632f36e9493cb559f510fa3cc3ead7443511 /kamon-spray/src | |
parent | 4d5803e579e223c4f4f5cb37ab79ca069a007949 (diff) | |
download | Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.tar.gz Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.tar.bz2 Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.zip |
! all: refactor the core metric recording instruments and accomodate UserMetrics
This PR is including several changes to the kamon-core, most notably:
- Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure
that the interfaces are as clean as possible.
- Move away from the all Vector[Measurement] based Histogram snapshot to a new approach
in which we use a single long to store both the index in the counts array and the
frequency on that bucket. The leftmost 2 bytes of each long are used for storing the
counts array index and the remaining 6 bytes are used for the actual count, and
everything is put into a simple long array. This way only the buckets that actually
have values will be included in the snapshot with the smallest possible memory
footprint.
- Introduce Gauges.
- Reorganize the instrumentation for Akka and Scala and rewrite most of the tests
of this components to avoid going through the subscription protocol to test.
- Introduce trace tests and fixes on various tests.
- Necessary changes on new relic, datadog and statsd modules to compile with the new
codebase.
Pending:
- Finish the upgrade of the new relic to the current model.
- Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes
limits.
- More testing, more testing, more testing.
- Create the KamonStandalone module.
Diffstat (limited to 'kamon-spray/src')
4 files changed, 18 insertions, 29 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index d7d9cf09..d787bda4 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -21,7 +21,7 @@ import org.aspectj.lang.ProceedingJoinPoint import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } import spray.http.HttpHeaders.{ RawHeader, Host } import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware } -import kamon.metrics.TraceMetrics.HttpClientRequest +import kamon.metric.TraceMetrics.HttpClientRequest import kamon.Kamon import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } import akka.actor.ActorRef @@ -30,7 +30,6 @@ import akka.util.Timeout @Aspect class ClientRequestInstrumentation { - import ClientRequestInstrumentation._ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixin: SegmentCompletionHandleAware = SegmentCompletionHandleAware.default @@ -51,7 +50,7 @@ class ClientRequestInstrumentation { if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { val requestAttributes = basicRequestAttributes(request) val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes) + val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) ctx.segmentCompletionHandle = Some(completionHandle) } @@ -102,7 +101,7 @@ class ClientRequestInstrumentation { if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { val requestAttributes = basicRequestAttributes(request) val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes) + val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) responseFuture.onComplete { result ⇒ completionHandle.finish(Map.empty) @@ -139,8 +138,3 @@ class ClientRequestInstrumentation { pjp.proceed(Array(modifiedHeaders)) } } - -object ClientRequestInstrumentation { - val SprayTime = "SprayTime" - val UserTime = "UserTime" -} diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 9469924a..54329645 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -26,13 +26,12 @@ import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon -import kamon.metrics.{ TraceMetrics, Metrics } +import kamon.metric.{ TraceMetrics, Metrics } import spray.client.pipelining -import kamon.metrics.Subscriptions.TickMetricSnapshot -import spray.can.client.ClientRequestInstrumentation +import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ import akka.pattern.pipe -import kamon.metrics.TraceMetrics.TraceMetricSnapshot +import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( @@ -149,7 +148,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.UserTime } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -190,7 +189,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.SprayTime } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -199,14 +198,14 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit } } - def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricSnapshot = { + def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricsSnapshot = { val tickSnapshot = within(timeout) { listener.expectMsgType[TickMetricSnapshot] } val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName)) metricsOption should not be empty - metricsOption.get.asInstanceOf[TraceMetricSnapshot] + metricsOption.get.asInstanceOf[TraceMetricsSnapshot] } def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala index 7edbbe11..ab9116fd 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala @@ -24,10 +24,11 @@ import kamon.Kamon import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } import spray.http.HttpHeaders.RawHeader import spray.http.{ HttpResponse, HttpRequest } -import kamon.metrics.{ TraceMetrics, Metrics } -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.{ TraceMetrics, Metrics } +import kamon.metric.Subscriptions.TickMetricSnapshot import com.typesafe.config.ConfigFactory -import kamon.metrics.TraceMetrics.ElapsedTime +import kamon.metric.TraceMetrics.ElapsedTime +import kamon.metric.instrument.Histogram class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer { @@ -122,7 +123,7 @@ class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit traceMetrics should not be empty traceMetrics map { metrics ⇒ - metrics(ElapsedTime).numberOfMeasurements should be(1L) + metrics(ElapsedTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) } } diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala index 81242133..65506770 100644 --- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala +++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala @@ -29,11 +29,9 @@ trait TestServer { def buildClientConnectionAndServer: (ActorRef, TestProbe) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) - val bound = within(10 seconds) { - serverHandler.expectMsgType[Bound] - } - + val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = clientConnection(bound) + serverHandler.expectMsgType[Http.Connected] serverHandler.reply(Http.Register(serverHandler.ref)) @@ -50,10 +48,7 @@ trait TestServer { def buildSHostConnectorAndServer: (ActorRef, TestProbe) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) - val bound = within(10 seconds) { - serverHandler.expectMsgType[Bound] - } - + val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = httpHostConnector(bound) (client, serverHandler) |