From 29068fc70a3e5a17a630c2c7fff951572bb5fa21 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 3 Jul 2014 14:36:42 -0300 Subject: ! all: refactor the core metric recording instruments and accomodate UserMetrics This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module. --- .../kamon/trace/TraceContextManipulationSpec.scala | 95 ++++++++++++ .../trace/instrumentation/ActorLoggingSpec.scala | 51 ------- .../ActorMessagePassingTracingSpec.scala | 85 ----------- ...orSystemMessagePassingInstrumentationSpec.scala | 169 --------------------- .../instrumentation/AskPatternTracingSpec.scala | 66 -------- .../trace/instrumentation/FutureTracingSpec.scala | 62 -------- 6 files changed, 95 insertions(+), 433 deletions(-) create mode 100644 kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala (limited to 'kamon-core/src/test/scala/kamon/trace') diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala new file mode 100644 index 00000000..4d0049f1 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala @@ -0,0 +1,95 @@ +package kamon.trace + +import akka.actor.ActorSystem +import akka.testkit.{ ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.trace.TraceContext.SegmentIdentity +import org.scalatest.{ Matchers, WordSpecLike } + +class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | + | default-min-max-counter-precision { + | refresh-interval = 1 second + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the TraceRecorder api" should { + "allow starting a trace within a specified block of code, and only within that block of code" in { + val createdContext = TraceRecorder.withNewTraceContext("start-context") { + TraceRecorder.currentContext should not be empty + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("start-context") + } + + "allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in { + val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) { + TraceRecorder.currentContext should not be empty + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("start-context-with-token") + createdContext.token should be("token-1") + } + + "allow providing a TraceContext and make it available within a block of code" in { + val createdContext = TraceRecorder.withNewTraceContext("manually-provided-trace-context") { TraceRecorder.currentContext } + + TraceRecorder.currentContext shouldBe empty + TraceRecorder.withTraceContext(createdContext) { + TraceRecorder.currentContext should be(createdContext) + } + + TraceRecorder.currentContext shouldBe empty + } + + "allow renaming a trace" in { + val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") { + TraceRecorder.rename("renamed-trace") + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("renamed-trace") + } + + "allow creating a segment within a trace" in { + val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") { + val segmentHandle = TraceRecorder.startSegment(TraceManipulationTestSegment("segment-1")) + + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("trace-with-segments") + + } + } + + case class TraceManipulationTestSegment(name: String) extends SegmentIdentity + +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala deleted file mode 100644 index 81fd9cbc..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import akka.testkit.TestKit -import org.scalatest.{ Inspectors, Matchers, WordSpecLike } -import akka.actor.{ Props, ActorLogging, Actor, ActorSystem } -import akka.event.Logging.LogEvent -import kamon.trace.{ TraceContextAware, TraceRecorder } - -class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors { - - "the ActorLogging instrumentation" should { - "attach the TraceContext (if available) to log events" in { - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - - val testTraceContext = TraceRecorder.withNewTraceContext("logging") { - loggerActor ! "info" - TraceRecorder.currentContext - } - - fishForMessage() { - case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ - val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext - ctxInEvent === testTraceContext - - case event: LogEvent ⇒ false - } - } - } -} - -class LoggerActor extends Actor with ActorLogging { - def receive = { - case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala deleted file mode 100644 index 4e62c9f7..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import org.scalatest.WordSpecLike -import akka.actor.{ Actor, Props, ActorSystem } - -import akka.testkit.{ ImplicitSender, TestKit } -import kamon.trace.TraceRecorder -import akka.pattern.{ pipe, ask } -import akka.util.Timeout -import scala.concurrent.duration._ -import akka.routing.{ RoundRobinPool } - -class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { - implicit val executionContext = system.dispatcher - - "the message passing instrumentation" should { - "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { - ctxEchoActor.tell("test", testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using ask" in new EchoActorFixture { - implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { - // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. - (ctxEchoActor ? "test") pipeTo (testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - } - - trait EchoActorFixture { - val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) - } - - trait RoutedEchoActorFixture extends EchoActorFixture { - override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinPool(nrOfInstances = 1))) - } -} - -class TraceContextEcho extends Actor { - def receive = { - case msg: String ⇒ sender ! TraceRecorder.currentContext - } -} - diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala deleted file mode 100644 index ed239b38..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala +++ /dev/null @@ -1,169 +0,0 @@ -package kamon.trace.instrumentation - -import akka.testkit.{ ImplicitSender, TestKit } -import akka.actor._ -import org.scalatest.WordSpecLike -import kamon.trace.TraceRecorder -import scala.util.control.NonFatal -import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume } -import scala.concurrent.duration._ - -class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { - implicit val executionContext = system.dispatcher - - "the system message passing instrumentation" should { - "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { - system.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { - system.actorOf(Props(new Actor { - def receive: Actor.Receive = { - case any ⇒ - context.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - } - })) ! "any" - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext in the supervision cycle" when { - "the actor is resumed" in { - val supervisor = supervisorWithDirective(Resume) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(None) - } - - "the actor is restarted" in { - val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the preRestart hook - expectMsg(testTraceContext) // From the postRestart hook - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(None) - } - - "the actor is stopped" in { - val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook - expectNoMsg(1 second) - } - - "the failure is escalated" in { - val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the grandparent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook in the child - expectMsg(testTraceContext) // From the postStop hook in the parent - expectNoMsg(1 second) - } - } - } - - def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, - sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { - class GrandParent extends Actor { - val child = context.actorOf(Props(new Parent)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop - } - - def receive = { - case any ⇒ child forward any - } - } - - class Parent extends Actor { - val child = context.actorOf(Props(new Child)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive - } - - def receive: Actor.Receive = { - case any ⇒ child forward any - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - } - - class Child extends Actor { - def receive = { - case "fail" ⇒ 1 / 0 - case "context" ⇒ sender ! TraceRecorder.currentContext - } - - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceRecorder.currentContext - super.preRestart(reason, message) - } - - override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceRecorder.currentContext - super.postRestart(reason) - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - - override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceRecorder.currentContext - super.preStart() - } - } - - system.actorOf(Props(new GrandParent)) - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala deleted file mode 100644 index fb886de6..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace.instrumentation - -import akka.testkit.TestKitBase -import akka.actor.{ Props, Actor, ActorSystem } -import org.scalatest.{ Matchers, WordSpecLike } -import akka.event.Logging.Warning -import scala.concurrent.duration._ -import akka.pattern.ask -import akka.util.Timeout -import kamon.trace.{ TraceContextAware, TraceRecorder } -import com.typesafe.config.ConfigFactory - -class AskPatternTracingSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString( - """ - |kamon { - | trace { - | ask-pattern-tracing = on - | } - |} - """.stripMargin)) - - "the AskPatternTracing" should { - "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { - implicit val ec = system.dispatcher - implicit val timeout = Timeout(10 milliseconds) - val noReply = system.actorOf(Props[NoReply]) - system.eventStream.subscribe(testActor, classOf[Warning]) - - val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { - noReply ? "hello" - TraceRecorder.currentContext - } - - val warn = expectMsgPF() { - case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn - } - val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext - - capturedCtx should be('defined) - capturedCtx should equal(testTraceContext) - } - } -} - -class NoReply extends Actor { - def receive = { - case any ⇒ - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala deleted file mode 100644 index b1765fd8..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import scala.concurrent.{ ExecutionContext, Future } -import org.scalatest.{ Matchers, OptionValues, WordSpecLike } -import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration } -import kamon.trace.TraceRecorder -import akka.testkit.TestKit -import akka.actor.ActorSystem - -class FutureTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with Matchers - with ScalaFutures with PatienceConfiguration with OptionValues { - - implicit val execContext = system.dispatcher - - "a Future created with FutureTracing" should { - "capture the TraceContext available when created" which { - "must be available when executing the future's body" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future(TraceRecorder.currentContext) - - (future, TraceRecorder.currentContext) - } - - whenReady(future)(ctxInFuture ⇒ - ctxInFuture should equal(testTraceContext)) - } - - "must be available when executing callbacks on the future" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future("Hello Kamon!") - // The TraceContext is expected to be available during all intermediate processing. - .map(_.length) - .flatMap(len ⇒ Future(len.toString)) - .map(s ⇒ TraceRecorder.currentContext) - - (future, TraceRecorder.currentContext) - } - - whenReady(future)(ctxInFuture ⇒ - ctxInFuture should equal(testTraceContext)) - } - } - } -} - -- cgit v1.2.3