From 7a10c0ef2a6566229e8571f6d385ca2ff794cc20 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 2 Jan 2014 18:09:53 -0300 Subject: integrate trace and metrics into the base project --- .../test/scala/kamon/MailboxSizeMetricsSpec.scala | 31 ++++ .../scala/kamon/metrics/ActorMetricsSpec.scala | 51 +++++++ .../trace/instrumentation/ActorLoggingSpec.scala | 51 +++++++ .../ActorMessagePassingTracingSpec.scala | 84 +++++++++++ ...orSystemMessagePassingInstrumentationSpec.scala | 165 +++++++++++++++++++++ .../instrumentation/AskPatternTracingSpec.scala | 59 ++++++++ .../trace/instrumentation/FutureTracingSpec.scala | 62 ++++++++ .../instrumentation/TraceAggregatorSpec.scala | 51 +++++++ .../instrumentation/TraceContextFixture.scala | 10 ++ 9 files changed, 564 insertions(+) create mode 100644 kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala (limited to 'kamon-core/src/test/scala') diff --git a/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala new file mode 100644 index 00000000..5108af25 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala @@ -0,0 +1,31 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon + +import org.scalatest.{ WordSpecLike, WordSpec } +import akka.testkit.TestKit +import akka.actor.{ Props, ActorSystem } + +class MailboxSizeMetricsSpec extends TestKit(ActorSystem("mailbox-size-metrics-spec")) with WordSpecLike { + + "the mailbox size metrics instrumentation" should { + "register a counter for mailbox size upon actor creation" in { + val target = system.actorOf(Props.empty, "sample") + + //Metrics.registry.getHistograms.get("akka://mailbox-size-metrics-spec/sample:MAILBOX") + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala new file mode 100644 index 00000000..ff08ca0f --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala @@ -0,0 +1,51 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.scalatest.{ WordSpecLike, Matchers, WordSpec } +import akka.testkit.TestKitBase +import akka.actor.{ Actor, Props, ActorSystem } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, Subscribe } +import scala.concurrent.duration._ + +class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics.actors.tracked = ["user/test*"] + """.stripMargin)) + + implicit def self = testActor + + lazy val metricsExtension = Kamon(ActorMetrics).metricsDispatcher + + "the Kamon actor metrics" should { + "track configured actors" in { + system.actorOf(Props[Other], "test-tracked-actor") ! "nothing" + metricsExtension ! Subscribe("user/test-tracked-actor") + + within(5 seconds) { + expectMsgType[ActorMetricsDispatcher.ActorMetricsSnapshot] + } + } + } +} + +class Other extends Actor { + def receive = { case a ⇒ } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala new file mode 100644 index 00000000..89742651 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala @@ -0,0 +1,51 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import akka.testkit.TestKit +import org.scalatest.{ Inspectors, Matchers, WordSpecLike } +import akka.actor.{ Props, ActorLogging, Actor, ActorSystem } +import akka.event.Logging.{ LogEvent } +import kamon.trace.{ ContextAware, TraceContext, Trace } + +class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors { + + "the ActorLogging instrumentation" should { + "attach the TraceContext (if available) to log events" in { + val testTraceContext = Some(TraceContext(Actor.noSender, 1)) + val loggerActor = system.actorOf(Props[LoggerActor]) + system.eventStream.subscribe(testActor, classOf[LogEvent]) + + Trace.withContext(testTraceContext) { + loggerActor ! "info" + } + + fishForMessage() { + case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ + val ctxInEvent = event.asInstanceOf[ContextAware].traceContext + ctxInEvent === testTraceContext + + case event: LogEvent ⇒ false + } + } + } +} + +class LoggerActor extends Actor with ActorLogging { + def receive = { + case "info" ⇒ log.info("TraceContext => {}", Trace.context()) + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala new file mode 100644 index 00000000..89251bf4 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala @@ -0,0 +1,84 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import org.scalatest.{ WordSpecLike, Matchers } +import akka.actor.{ ActorRef, Actor, Props, ActorSystem } + +import akka.testkit.{ ImplicitSender, TestKit } +import kamon.trace.Trace +import akka.pattern.{ pipe, ask } +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } +import akka.routing.RoundRobinRouter +import kamon.trace.TraceContext + +class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { + implicit val executionContext = system.dispatcher + + "the message passing instrumentation" should { + "propagate the TraceContext using bang" in new TraceContextEchoFixture { + Trace.withContext(testTraceContext) { + ctxEchoActor ! "test" + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using tell" in new TraceContextEchoFixture { + Trace.withContext(testTraceContext) { + ctxEchoActor.tell("test", testActor) + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + Trace.withContext(testTraceContext) { + // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. + (ctxEchoActor ? "test") pipeTo (testActor) + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture { + Trace.withContext(testTraceContext) { + ctxEchoActor ! "test" + } + + expectMsg(testTraceContext) + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Some(Trace.newTraceContext("")) + val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1))) + } +} + +class TraceContextEcho extends Actor { + def receive = { + case msg: String ⇒ sender ! Trace.context() + } +} + diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala new file mode 100644 index 00000000..7d539370 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala @@ -0,0 +1,165 @@ +package kamon.trace.instrumentation + +import akka.testkit.{ ImplicitSender, TestKit } +import akka.actor._ +import org.scalatest.WordSpecLike +import kamon.trace.Trace +import scala.util.control.NonFatal +import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume } +import scala.concurrent.duration._ + +class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { + implicit val executionContext = system.dispatcher + + "the system message passing instrumentation" should { + "keep the TraceContext while processing the Create message in top level actors" in new TraceContextFixture { + Trace.withContext(testTraceContext) { + system.actorOf(Props(new Actor { + + testActor ! Trace.context() + + def receive: Actor.Receive = { case any ⇒ } + })) + } + + expectMsg(testTraceContext) + } + + "keep the TraceContext while processing the Create message in non top level actors" in new TraceContextFixture { + Trace.withContext(testTraceContext) { + system.actorOf(Props(new Actor { + def receive: Actor.Receive = { + case any ⇒ + context.actorOf(Props(new Actor { + + testActor ! Trace.context() + + def receive: Actor.Receive = { case any ⇒ } + })) + } + })) ! "any" + } + + expectMsg(testTraceContext) + } + + "keep the TraceContext in the supervision cycle" when { + "the actor is resumed" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Resume) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + + // Ensure we didn't tie the actor with the context + supervisor ! "context" + expectMsg(None) + } + + "the actor is restarted" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the preRestart hook + expectMsg(testTraceContext) // From the postRestart hook + + // Ensure we didn't tie the actor with the context + supervisor ! "context" + expectMsg(None) + } + + "the actor is stopped" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Stop, sendPostStop = true) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the postStop hook + expectNoMsg(1 second) + } + + "the failure is escalated" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the grandparent executing the supervision strategy + expectMsg(testTraceContext) // From the postStop hook in the child + expectMsg(testTraceContext) // From the postStop hook in the parent + expectNoMsg(1 second) + } + } + } + + def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, + sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { + class GrandParent extends Actor { + val child = context.actorOf(Props(new Parent)) + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ testActor ! Trace.context(); Stop + } + + def receive = { + case any ⇒ child forward any + } + } + + class Parent extends Actor { + val child = context.actorOf(Props(new Child)) + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ testActor ! Trace.context(); directive + } + + def receive: Actor.Receive = { + case any ⇒ child forward any + } + + override def postStop(): Unit = { + if (sendPostStop) testActor ! Trace.context() + super.postStop() + } + } + + class Child extends Actor { + def receive = { + case "fail" ⇒ 1 / 0 + case "context" ⇒ sender ! Trace.context() + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + if (sendPreRestart) testActor ! Trace.context() + super.preRestart(reason, message) + } + + override def postRestart(reason: Throwable): Unit = { + if (sendPostRestart) testActor ! Trace.context() + super.postRestart(reason) + } + + override def postStop(): Unit = { + if (sendPostStop) testActor ! Trace.context() + super.postStop() + } + + override def preStart(): Unit = { + if (sendPreStart) testActor ! Trace.context() + super.preStart() + } + } + + system.actorOf(Props(new GrandParent)) + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala new file mode 100644 index 00000000..9df67391 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala @@ -0,0 +1,59 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import akka.testkit.TestKit +import akka.actor.{ Props, Actor, ActorSystem } +import org.scalatest.{ Matchers, WordSpecLike } +import akka.event.Logging.Warning +import scala.concurrent.duration._ +import akka.pattern.ask +import akka.util.Timeout +import kamon.trace.{ Trace, ContextAware } +import org.scalatest.OptionValues._ + +class AskPatternTracingSpec extends TestKit(ActorSystem("ask-pattern-tracing-spec")) with WordSpecLike with Matchers { + + "the AskPatternTracing" should { + "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in new TraceContextFixture { + implicit val ec = system.dispatcher + implicit val timeout = Timeout(10 milliseconds) + val noReply = system.actorOf(Props[NoReply]) + system.eventStream.subscribe(testActor, classOf[Warning]) + + within(500 milliseconds) { + val initialCtx = Trace.withContext(testTraceContext) { + noReply ? "hello" + Trace.context() + } + + val warn = expectMsgPF() { + case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn + } + val capturedCtx = warn.asInstanceOf[ContextAware].traceContext + + capturedCtx should be('defined) + capturedCtx should equal(initialCtx) + } + } + } +} + +class NoReply extends Actor { + def receive = { + case any ⇒ + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala new file mode 100644 index 00000000..a5554836 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala @@ -0,0 +1,62 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import scala.concurrent.{ ExecutionContext, Await, Promise, Future } +import org.scalatest.{ Matchers, OptionValues, WordSpec } +import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration } +import java.util.UUID +import scala.util.{ Random, Success } +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import akka.actor.{ Actor, ActorSystem } +import kamon.trace.{ Trace, TraceContext } + +class FutureTracingSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { + + implicit val execContext = ExecutionContext.Implicits.global + + "a Future created with FutureTracing" should { + "capture the TraceContext available when created" which { + "must be available when executing the future's body" in new TraceContextFixture { + var future: Future[Option[TraceContext]] = _ + + Trace.withContext(testTraceContext) { + future = Future(Trace.context) + } + + whenReady(future)(ctxInFuture ⇒ + ctxInFuture should equal(testTraceContext)) + } + + "must be available when executing callbacks on the future" in new TraceContextFixture { + var future: Future[Option[TraceContext]] = _ + + Trace.withContext(testTraceContext) { + future = Future("Hello Kamon!") + // The TraceContext is expected to be available during all intermediate processing. + .map(_.length) + .flatMap(len ⇒ Future(len.toString)) + .map(s ⇒ Trace.context()) + } + + whenReady(future)(ctxInFuture ⇒ + ctxInFuture should equal(testTraceContext)) + } + } + } +} + diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala new file mode 100644 index 00000000..3b32f3ac --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala @@ -0,0 +1,51 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import org.scalatest.{ WordSpecLike, WordSpec } +import akka.testkit.{ TestKitBase, TestKit } +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.trace.UowTracing.{ Finish, Rename, Start } +import kamon.trace.{ UowTrace, UowTraceAggregator } + +class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { + + "a TraceAggregator" should { + "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1, "/accounts") + aggregator ! Finish(1) + + //expectMsg(UowTrace("UNKNOWN", Seq(Start(1, "/accounts"), Finish(1)))) + } + } + + "change the uow name after receiving a Rename message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1, "/accounts") + aggregator ! Rename(1, "test-uow") + aggregator ! Finish(1) + + //expectMsg(UowTrace("test-uow", Seq(Start(1, "/accounts"), Finish(1)))) + } + } + } + + trait AggregatorFixture { + val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala new file mode 100644 index 00000000..62f7ec84 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala @@ -0,0 +1,10 @@ +package kamon.trace.instrumentation + +import scala.util.Random +import kamon.trace.TraceContext +import akka.actor.Actor + +trait TraceContextFixture { + val random = new Random(System.nanoTime) + val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt)) +} \ No newline at end of file -- cgit v1.2.3