aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-06-03 12:46:56 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-06-03 12:46:56 -0300
commit695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31 (patch)
treeaff9e74f6b5838f186ba8ef9e6053d9ad4c84ea2
parentcad83e95166d91225e126aa6a0fab493b3baca59 (diff)
parentda47788738055e4fef1485f2721c6ee040c16fd8 (diff)
downloadKamon-695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31.tar.gz
Kamon-695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31.tar.bz2
Kamon-695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31.zip
Merged the aspects-refactor changes
-rw-r--r--.gitignore1
-rw-r--r--project/Build.scala6
-rw-r--r--project/Dependencies.scala1
-rw-r--r--project/NewRelic.scala13
-rw-r--r--project/Settings.scala3
-rw-r--r--project/plugins.sbt2
-rw-r--r--src/main/resources/META-INF/aop.xml2
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala14
-rw-r--r--src/main/scala/kamon/Aggregator.scala18
-rw-r--r--src/main/scala/kamon/Kamon.scala31
-rw-r--r--src/main/scala/kamon/TraceContext.scala37
-rw-r--r--src/main/scala/kamon/TraceContextSwap.scala26
-rw-r--r--src/main/scala/kamon/TransactionPublisher.scala15
-rw-r--r--src/main/scala/kamon/actor/TraceableActor.scala44
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala40
-rw-r--r--src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala (renamed from src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala)16
-rw-r--r--src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala (renamed from src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala)40
17 files changed, 164 insertions, 145 deletions
diff --git a/.gitignore b/.gitignore
index a008d6b1..ffb18ce4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
*.class
*.log
+.history
# sbt specific
dist/*
diff --git a/project/Build.scala b/project/Build.scala
index fe775462..37765ccf 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -3,6 +3,7 @@ import Keys._
object Build extends Build {
import AspectJ._
+ import NewRelic._
import Settings._
import Dependencies._
@@ -10,10 +11,11 @@ object Build extends Build {
.settings(basicSettings: _*)
.settings(revolverSettings: _*)
.settings(aspectJSettings: _*)
+ .settings(newrelicSettings: _*)
.settings(
libraryDependencies ++=
- compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++
+ compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++
test(scalatest, sprayTestkit))
-}
+} \ No newline at end of file
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 782379da..a0d51a39 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -15,6 +15,7 @@ object Dependencies {
val sprayJson = "io.spray" %% "spray-json" % "1.2.3"
val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1"
val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2"
+ val akkaAgent = "com.typesafe.akka" %% "akka-agent" % "2.1.2"
val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2"
val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2"
val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M5b"
diff --git a/project/NewRelic.scala b/project/NewRelic.scala
new file mode 100644
index 00000000..766eb28d
--- /dev/null
+++ b/project/NewRelic.scala
@@ -0,0 +1,13 @@
+import sbt.Keys._
+import com.ivantopo.sbt.newrelic.SbtNewrelic
+import com.ivantopo.sbt.newrelic.SbtNewrelic.newrelic
+import com.ivantopo.sbt.newrelic.SbtNewrelic.SbtNewrelicKeys._
+
+
+object NewRelic {
+
+ lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq(
+ javaOptions in run <++= jvmOptions in newrelic,
+ newrelicVersion in newrelic := "2.18.0"
+ )
+}
diff --git a/project/Settings.scala b/project/Settings.scala
index de8a3024..640a8013 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -8,8 +8,9 @@ object Settings {
lazy val basicSettings = seq(
version := VERSION,
organization := "com.despegar",
- scalaVersion := "2.10.1",
+ scalaVersion := "2.10.0",
resolvers ++= Dependencies.resolutionRepos,
+ fork in run := true,
scalacOptions := Seq(
"-encoding",
"utf8",
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 92902e2b..f8ce9e3c 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -6,3 +6,5 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.6.2")
addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.0")
+addSbtPlugin("com.ivantopo.sbt" % "sbt-newrelic" % "0.0.1")
+
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index 61bc837e..1413f424 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -13,7 +13,7 @@
<aspect name="akka.ActorInstrumentation" />
<aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
<aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
- <aspect name="kamon.instrumentation.PromiseCompletingRunnableInstrumentation" />
+ <aspect name="kamon.instrumentation.RunnableInstrumentation" />
<include within="*"/>
<exclude within="javax..*"/>
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
index 9e816d11..f631b79a 100644
--- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
+++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
@@ -3,10 +3,12 @@ package akka.instrumentation
import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{ActorRef}
-import kamon.TraceContext
-import kamon.actor.TraceableMessage
+import kamon.{Kamon, TraceContext}
import akka.dispatch.Envelope
+case class TraceableMessage(traceContext: TraceContext, message: Any)
+
+
@Aspect
class ActorRefTellInstrumentation {
println("Created ActorAspect")
@@ -18,9 +20,9 @@ class ActorRefTellInstrumentation {
def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
import pjp._
- TraceContext.current match {
+ Kamon.context() match {
case Some(ctx) => {
- val traceableMessage = TraceableMessage(ctx.fork, message)
+ val traceableMessage = TraceableMessage(ctx, message)
proceed(getArgs.updated(0, traceableMessage))
}
case None => proceed
@@ -42,12 +44,12 @@ class ActorCellInvokeInstrumentation {
envelope match {
case Envelope(TraceableMessage(ctx, msg), sender) => {
- TraceContext.set(ctx)
+ Kamon.set(ctx)
val originalEnvelope = envelope.copy(message = msg)
proceed(getArgs.updated(0, originalEnvelope))
- TraceContext.clear
+ Kamon.clear
}
case _ => proceed
}
diff --git a/src/main/scala/kamon/Aggregator.scala b/src/main/scala/kamon/Aggregator.scala
deleted file mode 100644
index 441178df..00000000
--- a/src/main/scala/kamon/Aggregator.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package kamon
-
-import akka.actor.Actor
-import scala.collection.mutable
-
-class Aggregator extends Actor {
-
- val parts = mutable.LinkedList[TraceEntry]()
-
- def receive = {
- case ContextPart(ctx) => println("registering context information")
- case FinishAggregation() => println("report to newrelic")
- }
-
-}
-
-case class ContextPart(context: TraceContext)
-case class FinishAggregation()
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala
new file mode 100644
index 00000000..ef5f8044
--- /dev/null
+++ b/src/main/scala/kamon/Kamon.scala
@@ -0,0 +1,31 @@
+package kamon
+
+import akka.actor.{Props, ActorSystem}
+
+object Kamon {
+
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ implicit lazy val actorSystem = ActorSystem("kamon")
+
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ def newTraceContext(): TraceContext = TraceContext()
+
+
+ val publisher = actorSystem.actorOf(Props[TransactionPublisher])
+
+ def publish(tx: FullTransaction) = publisher ! tx
+
+}
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
index e3582c60..19ebc578 100644
--- a/src/main/scala/kamon/TraceContext.scala
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -1,29 +1,30 @@
package kamon
import java.util.UUID
-import akka.actor.ActorPath
-
-
-case class TraceContext(id: UUID, entries: List[TraceEntry]) {
- def fork = this.copy(entries = Nil)
- def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries)
+import akka.actor.{ActorSystem, ActorPath}
+import akka.agent.Agent
+import java.util.concurrent.TimeUnit
+import scala.util.{Failure, Success}
+import akka.util.Timeout
+
+
+case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+ implicit val as = Kamon.actorSystem.dispatcher
+
+ def append(entry: TraceEntry) = entries send (entry :: _)
+ def close = entries.future.onComplete({
+ case Success(list) => Kamon.publish(FullTransaction(id, list))
+ case Failure(t) => println("WTF!")
+ })
}
object TraceContext {
- private val context = new ThreadLocal[Option[TraceContext]] {
- override def initialValue(): Option[TraceContext] = None
- }
-
- def current = context.get()
-
- def clear = context.remove()
+ def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+}
- def set(ctx: TraceContext) = context.set(Some(ctx))
- def start = set(TraceContext(UUID.randomUUID(), Nil))
-}
trait TraceEntry
-case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long)
-case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry
+case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/src/main/scala/kamon/TraceContextSwap.scala
new file mode 100644
index 00000000..68ee808b
--- /dev/null
+++ b/src/main/scala/kamon/TraceContextSwap.scala
@@ -0,0 +1,26 @@
+package kamon
+
+/**
+ * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
+ */
+trait TraceContextSwap {
+
+ def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body)
+
+ def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
+ ctx match {
+ case Some(context) => {
+ Kamon.set(context)
+ val bodyResult = primary
+ Kamon.clear
+
+ bodyResult
+ }
+ case None => fallback
+ }
+
+ }
+
+}
+
+object TraceContextSwap extends TraceContextSwap
diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/src/main/scala/kamon/TransactionPublisher.scala
new file mode 100644
index 00000000..0626b91d
--- /dev/null
+++ b/src/main/scala/kamon/TransactionPublisher.scala
@@ -0,0 +1,15 @@
+package kamon
+
+import akka.actor.Actor
+import java.util.UUID
+
+class TransactionPublisher extends Actor {
+
+ def receive = {
+ case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries")
+ }
+
+}
+
+
+case class FullTransaction(id: UUID, entries: List[TraceEntry])
diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala
deleted file mode 100644
index 3acbd293..00000000
--- a/src/main/scala/kamon/actor/TraceableActor.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-package kamon.actor
-
-import akka.actor.{ActorRef, Actor}
-import kamon.TraceContext
-
-trait TraceableActor extends Actor with TracingImplicitConversions {
-
- final def receive = {
- case a: Any => {
- a match {
- case TraceableMessage(ctx, message) => {
- //TraceContext.current.set(ctx)
-
- tracedReceive(message)
-
- //TraceContext.current.remove()
-
- /** Publish the partial context information to the EventStream */
- context.system.eventStream.publish(ctx)
- }
- case message: Any => tracedReceive(message)
- }
- }
- }
-
- def tracedReceive: Receive
-
-}
-
-class TraceableActorRef(val target: ActorRef) {
- def !! (message: Any)(implicit sender: ActorRef) = {
- val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message)
- target.tell(traceableMessage, sender)
- }
-}
-
-
-
-trait TracingImplicitConversions {
- implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef)
-}
-
-case class TraceableMessage(traceContext: TraceContext, message: Any)
-
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index ebaff7eb..ed76334f 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -5,7 +5,7 @@ import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit
-import kamon.{TraceContext}
+import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
import scala.util.Success
import scala.util.Failure
@@ -41,7 +41,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging {
def receive = {
case Pong() => {
- log.info(s"pong with context ${TraceContext.current}")
+ log.info(s"pong with context ${Kamon.context}")
Thread.sleep(1000)
sender ! Ping()
}
@@ -57,7 +57,7 @@ class PongActor extends Actor with ActorLogging {
case Ping() => {
Thread.sleep(3000)
sender ! Pong()
- log.info(s"ping with context ${TraceContext.current}")
+ log.info(s"ping with context ${Kamon.context}")
}
case a: Any => println(s"Got ${a} in PONG")
}
@@ -78,7 +78,7 @@ object TryAkka extends App{
- def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body")
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
/*
val newRelicReporter = new NewRelicReporter(registry)
@@ -88,32 +88,20 @@ object TryAkka extends App{
import akka.pattern.ask
implicit val timeout = Timeout(10, TimeUnit.SECONDS)
implicit def execContext = system.dispatcher
- //for(i <- 1 to 8) {
-/* val i = 1
- TraceContext.start
- val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}")
- val f = ping ? Pong()
-
- f.map({
- a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}")
- })
- .flatMap({
- (a: Any) => {
- threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}")
- Future { s"In the flatMap body, with the context: ${TraceContext.current}" }
- }
- })
- .onComplete({
- case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}")
- case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}")
- })*/
- //}
-
- TraceContext.start
+
+
+
+ Kamon.start
+
+ Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
threadPrintln("Before doing it")
val f = Future { threadPrintln("This is happening inside the future body") }
+ Kamon.stop
+
+ Thread.sleep(3000)
+ system.shutdown()
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
diff --git a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index ce19a7e6..ef908625 100644
--- a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala
+++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -1,7 +1,7 @@
package kamon.instrumentation
import org.aspectj.lang.annotation._
-import kamon.TraceContext
+import kamon.{Kamon, TraceContext}
import org.aspectj.lang.ProceedingJoinPoint
import scala.Some
@@ -12,7 +12,7 @@ trait TraceContextAwareRunnable extends Runnable {}
@Aspect("perthis(instrumentedRunnableCreation())")
-class PromiseCompletingRunnableInstrumentation {
+class RunnableInstrumentation {
/**
* These are the Runnables that need to be instrumented and make the TraceContext available
@@ -37,25 +37,19 @@ class PromiseCompletingRunnableInstrumentation {
* Aspect members
*/
- private val traceContext = TraceContext.current
+ private val traceContext = Kamon.context
/**
* Advices
*/
+ import kamon.TraceContextSwap.withContext
@Around("runnableExecution()")
def around(pjp: ProceedingJoinPoint) = {
import pjp._
- traceContext match {
- case Some(ctx) => {
- TraceContext.set(ctx)
- proceed()
- TraceContext.clear
- }
- case None => proceed()
- }
+ withContext(traceContext, proceed())
}
}
diff --git a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
index 44f92148..4fe9e617 100644
--- a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala
+++ b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
@@ -1,34 +1,34 @@
package kamon.instrumentation
import scala.concurrent.{Await, Promise, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
import org.scalatest.{OptionValues, WordSpec}
import org.scalatest.matchers.MustMatchers
import org.scalatest.concurrent.PatienceConfiguration
-import kamon.TraceContext
+import kamon.{Kamon, TraceContext}
import java.util.UUID
import scala.util.Success
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
+import akka.actor.ActorSystem
-class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues {
+class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues {
- "a instrumented Future" when {
+ "a instrumented runnable" when {
"created in a thread that does have a TraceContext" must {
"preserve the TraceContext" which {
- "should be available during the body's execution" in { new FutureWithContext {
+ "should be available during the run method execution" in { new FutureWithContextFixture {
whenReady(futureWithContext) { result =>
result.value must be === testContext
}
}}
- "should be available during the execution of onComplete callbacks" in { new FutureWithContext {
+ "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture {
val onCompleteContext = Promise[TraceContext]()
futureWithContext.onComplete({
- case _ => onCompleteContext.complete(Success(TraceContext.current.get))
+ case _ => onCompleteContext.complete(Success(Kamon.context.get))
})
whenReady(onCompleteContext.future) { result =>
@@ -39,18 +39,18 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut
}
"created in a thread that doest have a TraceContext" must {
- "not capture any TraceContext for the body execution" in { new FutureWithoutContext{
+ "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{
whenReady(futureWithoutContext) { result =>
result must be === None
}
}}
- "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContext {
+ "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture {
val onCompleteContext = Promise[Option[TraceContext]]()
futureWithoutContext.onComplete({
- case _ => onCompleteContext.complete(Success(TraceContext.current))
+ case _ => onCompleteContext.complete(Success(Kamon.context))
})
whenReady(onCompleteContext.future) { result =>
@@ -61,18 +61,22 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut
}
+ /**
+ * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have.
+ */
+ implicit val testActorSystem = ActorSystem("test-actorsystem")
+ implicit val execContext = testActorSystem.dispatcher
+ class FutureWithContextFixture {
+ val testContext = TraceContext()
+ Kamon.set(testContext)
- trait FutureWithContext {
- val testContext = TraceContext(UUID.randomUUID(), Nil)
- TraceContext.set(testContext)
-
- val futureWithContext = Future { TraceContext.current }
+ val futureWithContext = Future { Kamon.context}
}
- trait FutureWithoutContext {
- TraceContext.clear // Make sure no TraceContext is available
- val futureWithoutContext = Future { TraceContext.current }
+ trait FutureWithoutContextFixture {
+ Kamon.clear // Make sure no TraceContext is available
+ val futureWithoutContext = Future { Kamon.context }
}
}