From 2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 5 Nov 2013 18:38:39 -0300 Subject: basic separation of concerns between sub-projects --- .../scala/kamon/ActorInstrumentationSpec.scala | 94 ++++++++++++++++++++++ .../scala/kamon/RunnableInstrumentationSpec.scala | 85 +++++++++++++++++++ .../src/test/scala/kamon/TraceAggregatorSpec.scala | 37 +++++++++ 3 files changed, 216 insertions(+) create mode 100644 kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala create mode 100644 kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala create mode 100644 kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala (limited to 'kamon-trace/src/test/scala') diff --git a/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala new file mode 100644 index 00000000..f5d88f06 --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala @@ -0,0 +1,94 @@ +package kamon + +import org.scalatest.{WordSpecLike, Matchers} +import akka.actor.{ActorRef, Actor, Props, ActorSystem} + +import akka.testkit.{ImplicitSender, TestKit} +import kamon.trace.Trace +import akka.pattern.{pipe, ask} +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import akka.routing.RoundRobinRouter +import kamon.trace.TraceContext + + +class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { + implicit val executionContext = system.dispatcher + + "an instrumented actor ref" when { + "used inside the context of a transaction" should { + "propagate the trace context using bang" in new TraceContextEchoFixture { + echo ! "test" + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using tell" in new TraceContextEchoFixture { + echo.tell("test", testActor) + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + (echo ? "test") pipeTo(testActor) + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture { + val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) + + expectMsgAllOf(contexts: _*) + } + + /*"propagate with many asks" in { + val echo = system.actorOf(Props[TraceContextEcho]) + val iterations = 50000 + implicit val timeout = Timeout(10 seconds) + + val futures = for(_ <- 1 to iterations) yield { + Tracer.start + val result = (echo ? "test") + Tracer.clear + + result + } + + val allResults = Await.result(Future.sequence(futures), 10 seconds) + assert(iterations == allResults.collect { + case Some(_) => 1 + }.sum) + }*/ + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Trace.newTraceContext() + val echo = system.actorOf(Props[TraceContextEcho]) + + Trace.set(testTraceContext) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) + + def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { + val context = Trace.newTraceContext() + Trace.set(context) + + target ! message + context + } + } + +} + +class TraceContextEcho extends Actor { + def receive = { + case msg: String ⇒ sender ! Trace.context() + } +} + + diff --git a/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala new file mode 100644 index 00000000..f968fa83 --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala @@ -0,0 +1,85 @@ +package kamon + +import scala.concurrent.{Await, Promise, Future} +import org.scalatest.{Matchers, OptionValues, WordSpec} +import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} +import java.util.UUID +import scala.util.Success +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import akka.actor.{Actor, ActorSystem} +import kamon.trace.{Trace, TraceContext} + + +class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { + + "a instrumented runnable" when { + "created in a thread that does have a TraceContext" must { + "preserve the TraceContext" which { + "should be available during the run method execution" in new FutureWithContextFixture { + +/* whenReady(futureWithContext) { result => + result.value should equal(testContext) + }*/ + } + + "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture { + + val onCompleteContext = Promise[Option[TraceContext]]() + +/* Tracer.traceContext.withValue(Some(testContext)) { + futureWithContext.onComplete({ + case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value)) + }) + }*/ + + whenReady(onCompleteContext.future) { result => + result should equal(Some(testContext)) + } + } + } + } + + "created in a thread that doest have a TraceContext" must { + "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{ + whenReady(futureWithoutContext) { result => + result should equal(None) + } + } + + "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture { + val onCompleteContext = Promise[Option[TraceContext]]() + + futureWithoutContext.onComplete { + case _ => onCompleteContext.complete(Success(Trace.context())) + } + + whenReady(onCompleteContext.future) { result => + result should equal(None) + } + } + } + } + + + /** + * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have. + */ + implicit val testActorSystem = ActorSystem("test-actorsystem") + implicit val execContext = testActorSystem.dispatcher + + class FutureWithContextFixture { + val testContext = TraceContext(Actor.noSender, 1) + +/* var futureWithContext: Future[Option[TraceContext]] = _ + Tracer.context.withValue(Some(testContext)) { + futureWithContext = Future { Tracer.traceContext.value } + }*/ + } + + trait FutureWithoutContextFixture { + val futureWithoutContext = Future { Trace.context.value } + } +} + + diff --git a/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala new file mode 100644 index 00000000..a8e736ae --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala @@ -0,0 +1,37 @@ +package kamon + +import org.scalatest.{WordSpecLike, WordSpec} +import akka.testkit.{TestKitBase, TestKit} +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.trace.UowTracing.{Finish, Rename, Start} +import kamon.trace.{UowTrace, UowTraceAggregator} + +class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { + + "a TraceAggregator" should { + "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1) + aggregator ! Finish(1) + + expectMsg(UowTrace("UNKNOWN", Seq(Start(1), Finish(1)))) + } + } + + "change the uow name after receiving a Rename message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1) + aggregator ! Rename(1, "test-uow") + aggregator ! Finish(1) + + expectMsg(UowTrace("test-uow", Seq(Start(1), Finish(1)))) + } + } + } + + + trait AggregatorFixture { + val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) + } +} -- cgit v1.2.3