diff options
17 files changed, 164 insertions, 145 deletions
@@ -1,5 +1,6 @@ *.class *.log +.history # sbt specific dist/* diff --git a/project/Build.scala b/project/Build.scala index fe775462..37765ccf 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -3,6 +3,7 @@ import Keys._ object Build extends Build { import AspectJ._ + import NewRelic._ import Settings._ import Dependencies._ @@ -10,10 +11,11 @@ object Build extends Build { .settings(basicSettings: _*) .settings(revolverSettings: _*) .settings(aspectJSettings: _*) + .settings(newrelicSettings: _*) .settings( libraryDependencies ++= - compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++ + compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++ test(scalatest, sprayTestkit)) -} +}
\ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 782379da..a0d51a39 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,6 +15,7 @@ object Dependencies { val sprayJson = "io.spray" %% "spray-json" % "1.2.3" val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1" val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2" + val akkaAgent = "com.typesafe.akka" %% "akka-agent" % "2.1.2" val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2" val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2" val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M5b" diff --git a/project/NewRelic.scala b/project/NewRelic.scala new file mode 100644 index 00000000..766eb28d --- /dev/null +++ b/project/NewRelic.scala @@ -0,0 +1,13 @@ +import sbt.Keys._ +import com.ivantopo.sbt.newrelic.SbtNewrelic +import com.ivantopo.sbt.newrelic.SbtNewrelic.newrelic +import com.ivantopo.sbt.newrelic.SbtNewrelic.SbtNewrelicKeys._ + + +object NewRelic { + + lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq( + javaOptions in run <++= jvmOptions in newrelic, + newrelicVersion in newrelic := "2.18.0" + ) +} diff --git a/project/Settings.scala b/project/Settings.scala index de8a3024..640a8013 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -8,8 +8,9 @@ object Settings { lazy val basicSettings = seq( version := VERSION, organization := "com.despegar", - scalaVersion := "2.10.1", + scalaVersion := "2.10.0", resolvers ++= Dependencies.resolutionRepos, + fork in run := true, scalacOptions := Seq( "-encoding", "utf8", diff --git a/project/plugins.sbt b/project/plugins.sbt index 92902e2b..f8ce9e3c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,3 +6,5 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.6.2") addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.0") +addSbtPlugin("com.ivantopo.sbt" % "sbt-newrelic" % "0.0.1") + diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 61bc837e..1413f424 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -13,7 +13,7 @@ <aspect name="akka.ActorInstrumentation" /> <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> - <aspect name="kamon.instrumentation.PromiseCompletingRunnableInstrumentation" /> + <aspect name="kamon.instrumentation.RunnableInstrumentation" /> <include within="*"/> <exclude within="javax..*"/> diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index 9e816d11..f631b79a 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -3,10 +3,12 @@ package akka.instrumentation import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{ActorRef} -import kamon.TraceContext -import kamon.actor.TraceableMessage +import kamon.{Kamon, TraceContext} import akka.dispatch.Envelope +case class TraceableMessage(traceContext: TraceContext, message: Any) + + @Aspect class ActorRefTellInstrumentation { println("Created ActorAspect") @@ -18,9 +20,9 @@ class ActorRefTellInstrumentation { def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { import pjp._ - TraceContext.current match { + Kamon.context() match { case Some(ctx) => { - val traceableMessage = TraceableMessage(ctx.fork, message) + val traceableMessage = TraceableMessage(ctx, message) proceed(getArgs.updated(0, traceableMessage)) } case None => proceed @@ -42,12 +44,12 @@ class ActorCellInvokeInstrumentation { envelope match { case Envelope(TraceableMessage(ctx, msg), sender) => { - TraceContext.set(ctx) + Kamon.set(ctx) val originalEnvelope = envelope.copy(message = msg) proceed(getArgs.updated(0, originalEnvelope)) - TraceContext.clear + Kamon.clear } case _ => proceed } diff --git a/src/main/scala/kamon/Aggregator.scala b/src/main/scala/kamon/Aggregator.scala deleted file mode 100644 index 441178df..00000000 --- a/src/main/scala/kamon/Aggregator.scala +++ /dev/null @@ -1,18 +0,0 @@ -package kamon - -import akka.actor.Actor -import scala.collection.mutable - -class Aggregator extends Actor { - - val parts = mutable.LinkedList[TraceEntry]() - - def receive = { - case ContextPart(ctx) => println("registering context information") - case FinishAggregation() => println("report to newrelic") - } - -} - -case class ContextPart(context: TraceContext) -case class FinishAggregation() diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala new file mode 100644 index 00000000..ef5f8044 --- /dev/null +++ b/src/main/scala/kamon/Kamon.scala @@ -0,0 +1,31 @@ +package kamon + +import akka.actor.{Props, ActorSystem} + +object Kamon { + + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + implicit lazy val actorSystem = ActorSystem("kamon") + + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + def newTraceContext(): TraceContext = TraceContext() + + + val publisher = actorSystem.actorOf(Props[TransactionPublisher]) + + def publish(tx: FullTransaction) = publisher ! tx + +} diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index e3582c60..19ebc578 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -1,29 +1,30 @@ package kamon import java.util.UUID -import akka.actor.ActorPath - - -case class TraceContext(id: UUID, entries: List[TraceEntry]) { - def fork = this.copy(entries = Nil) - def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries) +import akka.actor.{ActorSystem, ActorPath} +import akka.agent.Agent +import java.util.concurrent.TimeUnit +import scala.util.{Failure, Success} +import akka.util.Timeout + + +case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + implicit val as = Kamon.actorSystem.dispatcher + + def append(entry: TraceEntry) = entries send (entry :: _) + def close = entries.future.onComplete({ + case Success(list) => Kamon.publish(FullTransaction(id, list)) + case Failure(t) => println("WTF!") + }) } object TraceContext { - private val context = new ThreadLocal[Option[TraceContext]] { - override def initialValue(): Option[TraceContext] = None - } - - def current = context.get() - - def clear = context.remove() + def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) +} - def set(ctx: TraceContext) = context.set(Some(ctx)) - def start = set(TraceContext(UUID.randomUUID(), Nil)) -} trait TraceEntry -case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long) -case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry +case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/src/main/scala/kamon/TraceContextSwap.scala new file mode 100644 index 00000000..68ee808b --- /dev/null +++ b/src/main/scala/kamon/TraceContextSwap.scala @@ -0,0 +1,26 @@ +package kamon + +/** + * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. + */ +trait TraceContextSwap { + + def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) + + def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { + ctx match { + case Some(context) => { + Kamon.set(context) + val bodyResult = primary + Kamon.clear + + bodyResult + } + case None => fallback + } + + } + +} + +object TraceContextSwap extends TraceContextSwap diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/src/main/scala/kamon/TransactionPublisher.scala new file mode 100644 index 00000000..0626b91d --- /dev/null +++ b/src/main/scala/kamon/TransactionPublisher.scala @@ -0,0 +1,15 @@ +package kamon + +import akka.actor.Actor +import java.util.UUID + +class TransactionPublisher extends Actor { + + def receive = { + case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") + } + +} + + +case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala deleted file mode 100644 index 3acbd293..00000000 --- a/src/main/scala/kamon/actor/TraceableActor.scala +++ /dev/null @@ -1,44 +0,0 @@ -package kamon.actor - -import akka.actor.{ActorRef, Actor} -import kamon.TraceContext - -trait TraceableActor extends Actor with TracingImplicitConversions { - - final def receive = { - case a: Any => { - a match { - case TraceableMessage(ctx, message) => { - //TraceContext.current.set(ctx) - - tracedReceive(message) - - //TraceContext.current.remove() - - /** Publish the partial context information to the EventStream */ - context.system.eventStream.publish(ctx) - } - case message: Any => tracedReceive(message) - } - } - } - - def tracedReceive: Receive - -} - -class TraceableActorRef(val target: ActorRef) { - def !! (message: Any)(implicit sender: ActorRef) = { - val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message) - target.tell(traceableMessage, sender) - } -} - - - -trait TracingImplicitConversions { - implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef) -} - -case class TraceableMessage(traceContext: TraceContext, message: Any) - diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index ebaff7eb..ed76334f 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -5,7 +5,7 @@ import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.{TraceContext} +import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout import scala.util.Success import scala.util.Failure @@ -41,7 +41,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging { def receive = { case Pong() => { - log.info(s"pong with context ${TraceContext.current}") + log.info(s"pong with context ${Kamon.context}") Thread.sleep(1000) sender ! Ping() } @@ -57,7 +57,7 @@ class PongActor extends Actor with ActorLogging { case Ping() => { Thread.sleep(3000) sender ! Pong() - log.info(s"ping with context ${TraceContext.current}") + log.info(s"ping with context ${Kamon.context}") } case a: Any => println(s"Got ${a} in PONG") } @@ -78,7 +78,7 @@ object TryAkka extends App{ - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body") + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -88,32 +88,20 @@ object TryAkka extends App{ import akka.pattern.ask implicit val timeout = Timeout(10, TimeUnit.SECONDS) implicit def execContext = system.dispatcher - //for(i <- 1 to 8) { -/* val i = 1 - TraceContext.start - val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") - val f = ping ? Pong() - - f.map({ - a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}") - }) - .flatMap({ - (a: Any) => { - threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}") - Future { s"In the flatMap body, with the context: ${TraceContext.current}" } - } - }) - .onComplete({ - case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}") - case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}") - })*/ - //} - - TraceContext.start + + + + Kamon.start + + Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } + Kamon.stop + + Thread.sleep(3000) + system.shutdown() /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index ce19a7e6..ef908625 100644 --- a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -1,7 +1,7 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ -import kamon.TraceContext +import kamon.{Kamon, TraceContext} import org.aspectj.lang.ProceedingJoinPoint import scala.Some @@ -12,7 +12,7 @@ trait TraceContextAwareRunnable extends Runnable {} @Aspect("perthis(instrumentedRunnableCreation())") -class PromiseCompletingRunnableInstrumentation { +class RunnableInstrumentation { /** * These are the Runnables that need to be instrumented and make the TraceContext available @@ -37,25 +37,19 @@ class PromiseCompletingRunnableInstrumentation { * Aspect members */ - private val traceContext = TraceContext.current + private val traceContext = Kamon.context /** * Advices */ + import kamon.TraceContextSwap.withContext @Around("runnableExecution()") def around(pjp: ProceedingJoinPoint) = { import pjp._ - traceContext match { - case Some(ctx) => { - TraceContext.set(ctx) - proceed() - TraceContext.clear - } - case None => proceed() - } + withContext(traceContext, proceed()) } } diff --git a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index 44f92148..4fe9e617 100644 --- a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala +++ b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -1,34 +1,34 @@ package kamon.instrumentation import scala.concurrent.{Await, Promise, Future} -import scala.concurrent.ExecutionContext.Implicits.global import org.scalatest.{OptionValues, WordSpec} import org.scalatest.matchers.MustMatchers import org.scalatest.concurrent.PatienceConfiguration -import kamon.TraceContext +import kamon.{Kamon, TraceContext} import java.util.UUID import scala.util.Success import scala.concurrent.duration._ import java.util.concurrent.TimeUnit +import akka.actor.ActorSystem -class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues { +class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues { - "a instrumented Future" when { + "a instrumented runnable" when { "created in a thread that does have a TraceContext" must { "preserve the TraceContext" which { - "should be available during the body's execution" in { new FutureWithContext { + "should be available during the run method execution" in { new FutureWithContextFixture { whenReady(futureWithContext) { result => result.value must be === testContext } }} - "should be available during the execution of onComplete callbacks" in { new FutureWithContext { + "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture { val onCompleteContext = Promise[TraceContext]() futureWithContext.onComplete({ - case _ => onCompleteContext.complete(Success(TraceContext.current.get)) + case _ => onCompleteContext.complete(Success(Kamon.context.get)) }) whenReady(onCompleteContext.future) { result => @@ -39,18 +39,18 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut } "created in a thread that doest have a TraceContext" must { - "not capture any TraceContext for the body execution" in { new FutureWithoutContext{ + "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{ whenReady(futureWithoutContext) { result => result must be === None } }} - "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContext { + "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture { val onCompleteContext = Promise[Option[TraceContext]]() futureWithoutContext.onComplete({ - case _ => onCompleteContext.complete(Success(TraceContext.current)) + case _ => onCompleteContext.complete(Success(Kamon.context)) }) whenReady(onCompleteContext.future) { result => @@ -61,18 +61,22 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut } + /** + * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have. + */ + implicit val testActorSystem = ActorSystem("test-actorsystem") + implicit val execContext = testActorSystem.dispatcher + class FutureWithContextFixture { + val testContext = TraceContext() + Kamon.set(testContext) - trait FutureWithContext { - val testContext = TraceContext(UUID.randomUUID(), Nil) - TraceContext.set(testContext) - - val futureWithContext = Future { TraceContext.current } + val futureWithContext = Future { Kamon.context} } - trait FutureWithoutContext { - TraceContext.clear // Make sure no TraceContext is available - val futureWithoutContext = Future { TraceContext.current } + trait FutureWithoutContextFixture { + Kamon.clear // Make sure no TraceContext is available + val futureWithoutContext = Future { Kamon.context } } } |