diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-03 12:46:56 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-03 12:46:56 -0300 |
commit | 695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31 (patch) | |
tree | aff9e74f6b5838f186ba8ef9e6053d9ad4c84ea2 /src/main/scala | |
parent | cad83e95166d91225e126aa6a0fab493b3baca59 (diff) | |
parent | da47788738055e4fef1485f2721c6ee040c16fd8 (diff) | |
download | Kamon-695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31.tar.gz Kamon-695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31.tar.bz2 Kamon-695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31.zip |
Merged the aspects-refactor changes
Diffstat (limited to 'src/main/scala')
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala | 14 | ||||
-rw-r--r-- | src/main/scala/kamon/Aggregator.scala | 18 | ||||
-rw-r--r-- | src/main/scala/kamon/Kamon.scala | 31 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 37 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContextSwap.scala | 26 | ||||
-rw-r--r-- | src/main/scala/kamon/TransactionPublisher.scala | 15 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/TraceableActor.scala | 44 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 40 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala (renamed from src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala) | 16 |
9 files changed, 118 insertions, 123 deletions
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index 9e816d11..f631b79a 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -3,10 +3,12 @@ package akka.instrumentation import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{ActorRef} -import kamon.TraceContext -import kamon.actor.TraceableMessage +import kamon.{Kamon, TraceContext} import akka.dispatch.Envelope +case class TraceableMessage(traceContext: TraceContext, message: Any) + + @Aspect class ActorRefTellInstrumentation { println("Created ActorAspect") @@ -18,9 +20,9 @@ class ActorRefTellInstrumentation { def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { import pjp._ - TraceContext.current match { + Kamon.context() match { case Some(ctx) => { - val traceableMessage = TraceableMessage(ctx.fork, message) + val traceableMessage = TraceableMessage(ctx, message) proceed(getArgs.updated(0, traceableMessage)) } case None => proceed @@ -42,12 +44,12 @@ class ActorCellInvokeInstrumentation { envelope match { case Envelope(TraceableMessage(ctx, msg), sender) => { - TraceContext.set(ctx) + Kamon.set(ctx) val originalEnvelope = envelope.copy(message = msg) proceed(getArgs.updated(0, originalEnvelope)) - TraceContext.clear + Kamon.clear } case _ => proceed } diff --git a/src/main/scala/kamon/Aggregator.scala b/src/main/scala/kamon/Aggregator.scala deleted file mode 100644 index 441178df..00000000 --- a/src/main/scala/kamon/Aggregator.scala +++ /dev/null @@ -1,18 +0,0 @@ -package kamon - -import akka.actor.Actor -import scala.collection.mutable - -class Aggregator extends Actor { - - val parts = mutable.LinkedList[TraceEntry]() - - def receive = { - case ContextPart(ctx) => println("registering context information") - case FinishAggregation() => println("report to newrelic") - } - -} - -case class ContextPart(context: TraceContext) -case class FinishAggregation() diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala new file mode 100644 index 00000000..ef5f8044 --- /dev/null +++ b/src/main/scala/kamon/Kamon.scala @@ -0,0 +1,31 @@ +package kamon + +import akka.actor.{Props, ActorSystem} + +object Kamon { + + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + implicit lazy val actorSystem = ActorSystem("kamon") + + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + def newTraceContext(): TraceContext = TraceContext() + + + val publisher = actorSystem.actorOf(Props[TransactionPublisher]) + + def publish(tx: FullTransaction) = publisher ! tx + +} diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index e3582c60..19ebc578 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -1,29 +1,30 @@ package kamon import java.util.UUID -import akka.actor.ActorPath - - -case class TraceContext(id: UUID, entries: List[TraceEntry]) { - def fork = this.copy(entries = Nil) - def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries) +import akka.actor.{ActorSystem, ActorPath} +import akka.agent.Agent +import java.util.concurrent.TimeUnit +import scala.util.{Failure, Success} +import akka.util.Timeout + + +case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + implicit val as = Kamon.actorSystem.dispatcher + + def append(entry: TraceEntry) = entries send (entry :: _) + def close = entries.future.onComplete({ + case Success(list) => Kamon.publish(FullTransaction(id, list)) + case Failure(t) => println("WTF!") + }) } object TraceContext { - private val context = new ThreadLocal[Option[TraceContext]] { - override def initialValue(): Option[TraceContext] = None - } - - def current = context.get() - - def clear = context.remove() + def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) +} - def set(ctx: TraceContext) = context.set(Some(ctx)) - def start = set(TraceContext(UUID.randomUUID(), Nil)) -} trait TraceEntry -case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long) -case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry +case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/src/main/scala/kamon/TraceContextSwap.scala new file mode 100644 index 00000000..68ee808b --- /dev/null +++ b/src/main/scala/kamon/TraceContextSwap.scala @@ -0,0 +1,26 @@ +package kamon + +/** + * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. + */ +trait TraceContextSwap { + + def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) + + def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { + ctx match { + case Some(context) => { + Kamon.set(context) + val bodyResult = primary + Kamon.clear + + bodyResult + } + case None => fallback + } + + } + +} + +object TraceContextSwap extends TraceContextSwap diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/src/main/scala/kamon/TransactionPublisher.scala new file mode 100644 index 00000000..0626b91d --- /dev/null +++ b/src/main/scala/kamon/TransactionPublisher.scala @@ -0,0 +1,15 @@ +package kamon + +import akka.actor.Actor +import java.util.UUID + +class TransactionPublisher extends Actor { + + def receive = { + case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") + } + +} + + +case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala deleted file mode 100644 index 3acbd293..00000000 --- a/src/main/scala/kamon/actor/TraceableActor.scala +++ /dev/null @@ -1,44 +0,0 @@ -package kamon.actor - -import akka.actor.{ActorRef, Actor} -import kamon.TraceContext - -trait TraceableActor extends Actor with TracingImplicitConversions { - - final def receive = { - case a: Any => { - a match { - case TraceableMessage(ctx, message) => { - //TraceContext.current.set(ctx) - - tracedReceive(message) - - //TraceContext.current.remove() - - /** Publish the partial context information to the EventStream */ - context.system.eventStream.publish(ctx) - } - case message: Any => tracedReceive(message) - } - } - } - - def tracedReceive: Receive - -} - -class TraceableActorRef(val target: ActorRef) { - def !! (message: Any)(implicit sender: ActorRef) = { - val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message) - target.tell(traceableMessage, sender) - } -} - - - -trait TracingImplicitConversions { - implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef) -} - -case class TraceableMessage(traceContext: TraceContext, message: Any) - diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index ebaff7eb..ed76334f 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -5,7 +5,7 @@ import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.{TraceContext} +import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout import scala.util.Success import scala.util.Failure @@ -41,7 +41,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging { def receive = { case Pong() => { - log.info(s"pong with context ${TraceContext.current}") + log.info(s"pong with context ${Kamon.context}") Thread.sleep(1000) sender ! Ping() } @@ -57,7 +57,7 @@ class PongActor extends Actor with ActorLogging { case Ping() => { Thread.sleep(3000) sender ! Pong() - log.info(s"ping with context ${TraceContext.current}") + log.info(s"ping with context ${Kamon.context}") } case a: Any => println(s"Got ${a} in PONG") } @@ -78,7 +78,7 @@ object TryAkka extends App{ - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body") + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -88,32 +88,20 @@ object TryAkka extends App{ import akka.pattern.ask implicit val timeout = Timeout(10, TimeUnit.SECONDS) implicit def execContext = system.dispatcher - //for(i <- 1 to 8) { -/* val i = 1 - TraceContext.start - val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") - val f = ping ? Pong() - - f.map({ - a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}") - }) - .flatMap({ - (a: Any) => { - threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}") - Future { s"In the flatMap body, with the context: ${TraceContext.current}" } - } - }) - .onComplete({ - case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}") - case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}") - })*/ - //} - - TraceContext.start + + + + Kamon.start + + Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } + Kamon.stop + + Thread.sleep(3000) + system.shutdown() /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index ce19a7e6..ef908625 100644 --- a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -1,7 +1,7 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ -import kamon.TraceContext +import kamon.{Kamon, TraceContext} import org.aspectj.lang.ProceedingJoinPoint import scala.Some @@ -12,7 +12,7 @@ trait TraceContextAwareRunnable extends Runnable {} @Aspect("perthis(instrumentedRunnableCreation())") -class PromiseCompletingRunnableInstrumentation { +class RunnableInstrumentation { /** * These are the Runnables that need to be instrumented and make the TraceContext available @@ -37,25 +37,19 @@ class PromiseCompletingRunnableInstrumentation { * Aspect members */ - private val traceContext = TraceContext.current + private val traceContext = Kamon.context /** * Advices */ + import kamon.TraceContextSwap.withContext @Around("runnableExecution()") def around(pjp: ProceedingJoinPoint) = { import pjp._ - traceContext match { - case Some(ctx) => { - TraceContext.set(ctx) - proceed() - TraceContext.clear - } - case None => proceed() - } + withContext(traceContext, proceed()) } } |