aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/resources/META-INF/aop.xml6
-rw-r--r--src/main/scala/akka/ActorAspect.scala26
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala54
-rw-r--r--src/main/scala/kamon/TraceContext.scala56
-rw-r--r--src/main/scala/kamon/actor/AskSupport.scala16
-rw-r--r--src/main/scala/kamon/actor/EnhancedActor.scala45
-rw-r--r--src/main/scala/kamon/actor/TraceableActor.scala44
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala78
-rw-r--r--src/main/scala/spraytest/ClientTest.scala56
-rw-r--r--src/main/scala/spraytest/FutureTesting.scala81
10 files changed, 368 insertions, 94 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index b5e78683..aa4f7f4a 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -1,5 +1,4 @@
-<!DOCTYPE aspectj PUBLIC
- "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
<aspectj>
@@ -9,7 +8,8 @@
<aspect name="akka.ActorSystemAspect"/>
<!--<aspect name="akka.MailboxAspect"/>-->
<aspect name="akka.PoolMonitorAspect"/>
- <!--<aspect name="akka.ActorAspect"/>-->
+ <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
+ <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
<include within="*"/>
<exclude within="javax.*"/>
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala
index 744b0aea..05a7bc0a 100644
--- a/src/main/scala/akka/ActorAspect.scala
+++ b/src/main/scala/akka/ActorAspect.scala
@@ -3,24 +3,24 @@ package akka
import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
import kamon.metric.Metrics
-import akka.actor.ActorCell
@Aspect
class ActorAspect extends Metrics {
- println("Created ActorAspect")
+ println("Created ActorAspect")
- @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
- protected def actorReceive:Unit = {}
+ @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
+ protected def actorReceive:Unit = {}
- @Around("actorReceive() && this(actor)")
- def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
+ @Around("sendingMessageToActorRef() && this(actor)")
+ def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
- //println("The path is: "+actor.self.path.)
- val actorName:String = actor.self.path.toString
+ //println("The path is: "+actor.self.path.)
+ val actorName:String = actor.self.path.toString
- markAndCountMeter(actorName){
- pjp.proceed
- }
- }
-} \ No newline at end of file
+ markAndCountMeter(actorName){
+ pjp.proceed
+ }
+
+ }
+ } \ No newline at end of file
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..783a6c45
--- /dev/null
+++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,54 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{ActorRef, ActorCell}
+import kamon.TraceContext
+import kamon.actor.TraceableMessage
+import akka.dispatch.Envelope
+
+@Aspect
+class ActorRefTellInstrumentation {
+ println("Created ActorAspect")
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)")
+ def sendingMessageToActorRef(message: Any, sender: ActorRef) = {}
+
+ @Around("sendingMessageToActorRef(message, sender)")
+ def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
+ import pjp._
+
+ TraceContext.current match {
+ case Some(ctx) => {
+ val traceableMessage = TraceableMessage(ctx.fork, message)
+ proceed(getArgs.updated(0, traceableMessage))
+ }
+ case None => proceed
+ }
+ }
+}
+
+@Aspect
+class ActorCellInvokeInstrumentation {
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope) = {
+ import pjp._
+
+ envelope match {
+ case Envelope(TraceableMessage(ctx, msg), sender) => {
+ TraceContext.set(ctx)
+
+ val originalEnvelope = envelope.copy(message = msg)
+ proceed(getArgs.updated(0, originalEnvelope))
+
+ TraceContext.clear
+ }
+ case _ => proceed
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
new file mode 100644
index 00000000..1fbedf86
--- /dev/null
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -0,0 +1,56 @@
+package kamon
+
+import java.util.UUID
+import akka.actor.ActorPath
+
+
+case class TraceContext(id: UUID, entries: List[TraceEntry]) {
+ def fork = this.copy(entries = Nil)
+ def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries)
+}
+
+object TraceContext {
+ private val context = new ThreadLocal[TraceContext]
+
+ def current = {
+ val ctx = context.get()
+ if(ctx ne null)
+ Some(ctx)
+ else
+ None
+ }
+
+ def clear = context.remove()
+
+ def set(ctx: TraceContext) = context.set(ctx)
+
+ def start = set(TraceContext(UUID.randomUUID(), Nil))
+}
+
+trait TraceEntry
+case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long)
+
+case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry
+
+
+
+
+trait TraceSupport {
+ import TraceContext.current
+
+
+ def trace[T](blockName: String)(f: => T): T = {
+ val before = System.currentTimeMillis
+
+ val result = f
+
+ val after = System.currentTimeMillis
+ //swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after)))
+
+ result
+ }
+
+ def swapContext(newContext: TraceContext) {
+ //current.set(newContext)
+ }
+}
diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala
new file mode 100644
index 00000000..0a8d27be
--- /dev/null
+++ b/src/main/scala/kamon/actor/AskSupport.scala
@@ -0,0 +1,16 @@
+package kamon.actor
+
+import akka.actor.ActorRef
+import akka.util.Timeout
+import kamon.TraceContext
+
+trait TraceableAskSupport {
+ implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef)
+}
+
+// FIXME: This name sucks
+class TraceableAskableActorRef(val actorRef: ActorRef) {
+
+ def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message))
+
+}
diff --git a/src/main/scala/kamon/actor/EnhancedActor.scala b/src/main/scala/kamon/actor/EnhancedActor.scala
deleted file mode 100644
index ad879505..00000000
--- a/src/main/scala/kamon/actor/EnhancedActor.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-package kamon.actor
-
-import akka.actor.{ActorRef, Actor}
-
-trait EnhancedActor extends Actor {
- protected[this] var transactionContext: TransactionContext = _
-
- final def receive = {
- case a: Any => {
- a match {
- case ContextAwareMessage(ctx, message) => {
- transactionContext = ctx
- println(s"Actor ${self.path.toString}. Current context: ${transactionContext}")
- wrappedReceive(message)
- }
- case message: Any => wrappedReceive(message)
- }
- }
- }
-
-
-
-
- def wrappedReceive: Receive
-
-
- def superTell(target: ActorRef, message: Any) = {
- target.tell(ContextAwareMessage(transactionContext, message), self)
- }
-
-}
-
-
-case class ContextAwareMessage(context: TransactionContext, message: Any)
-
-
-case class TransactionContext(id: Long, entries: List[ContextEntry]) {
- def append(entry: ContextEntry) = this.copy(entries = entry :: this.entries)
-}
-sealed trait ContextEntry
-
-case class DeveloperComment(comment: String) extends ContextEntry
-
-case class MessageExecutionTime(actorPath: String, begin: Long, end: Long) extends ContextEntry
-
diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala
new file mode 100644
index 00000000..3acbd293
--- /dev/null
+++ b/src/main/scala/kamon/actor/TraceableActor.scala
@@ -0,0 +1,44 @@
+package kamon.actor
+
+import akka.actor.{ActorRef, Actor}
+import kamon.TraceContext
+
+trait TraceableActor extends Actor with TracingImplicitConversions {
+
+ final def receive = {
+ case a: Any => {
+ a match {
+ case TraceableMessage(ctx, message) => {
+ //TraceContext.current.set(ctx)
+
+ tracedReceive(message)
+
+ //TraceContext.current.remove()
+
+ /** Publish the partial context information to the EventStream */
+ context.system.eventStream.publish(ctx)
+ }
+ case message: Any => tracedReceive(message)
+ }
+ }
+ }
+
+ def tracedReceive: Receive
+
+}
+
+class TraceableActorRef(val target: ActorRef) {
+ def !! (message: Any)(implicit sender: ActorRef) = {
+ val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message)
+ target.tell(traceableMessage, sender)
+ }
+}
+
+
+
+trait TracingImplicitConversions {
+ implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef)
+}
+
+case class TraceableMessage(traceContext: TraceContext, message: Any)
+
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index 09f28b69..84420373 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -2,14 +2,22 @@ package kamon.executor
import akka.event.ActorEventBus
import akka.event.LookupClassification
-import akka.actor.{ActorRef, ActorSystem, Props, Actor}
+import akka.actor._
import java.util.concurrent.TimeUnit
import kamon.metric.NewRelicReporter
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import com.yammer.metrics.reporting.ConsoleReporter
-import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor}
+import kamon.actor._
import scala.concurrent.Future
+import kamon.{TraceSupport, TraceContext}
+import akka.util.Timeout
+import kamon.executor.Ping
+import kamon.executor.MessageEvent
+import kamon.executor.Pong
+
+//import kamon.executor.MessageEvent
+import java.util.UUID
trait Message
@@ -33,6 +41,36 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{
subscriber ! event
}
}
+case class Ping()
+case class Pong()
+
+class PingActor(val target: ActorRef) extends Actor with ActorLogging {
+ implicit def executionContext = context.dispatcher
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+
+ def receive = {
+ case Pong() => {
+ log.info(s"pong with context ${TraceContext.current}")
+ Thread.sleep(1000)
+ target ! Ping()
+ }
+ case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
+ }
+
+ def withAny(): Any = {1}
+ def withAnyRef(): AnyRef = {new Object}
+}
+
+class PongActor extends Actor with ActorLogging {
+ def receive = {
+ case Ping() => {
+ log.info(s"ping with context ${TraceContext.current}")
+ sender ! Pong()
+ }
+ case a: Any => println(s"Got ${a} in PONG")
+ }
+}
+
object TryAkka extends App{
val system = ActorSystem("MySystem")
@@ -47,34 +85,7 @@ object TryAkka extends App{
- case class Ping()
- case class Pong()
- class PingActor(val target: ActorRef) extends EnhancedActor {
- import akka.pattern.pipe
- implicit def executionContext = context.dispatcher
-
- def wrappedReceive = {
- case Pong() => {
- transactionContext = transactionContext.append(DeveloperComment("In PONG"))
-
-
- Future {
- Thread.sleep(1000) // Doing something really expensive
- ContextAwareMessage(transactionContext, Ping())
- } pipeTo target
-
- }
- }
- }
-
- class PongActor extends EnhancedActor {
- def wrappedReceive = {
- case Ping() => {
- superTell(sender, Pong())
- }
- }
- }
/*
@@ -83,10 +94,11 @@ object TryAkka extends App{
*/
- /*for(i <- 1 to 8) {*/
- val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong")
- ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong())
- //}
+ for(i <- 1 to 8) {
+ TraceContext.start
+ val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}")
+ ping ! Pong()
+ }
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala
new file mode 100644
index 00000000..c3a6ba39
--- /dev/null
+++ b/src/main/scala/spraytest/ClientTest.scala
@@ -0,0 +1,56 @@
+package spraytest
+
+import akka.actor.ActorSystem
+import spray.client.pipelining._
+import spray.httpx.SprayJsonSupport
+import spray.json._
+import scala.concurrent.Future
+
+/**
+ * BEGIN JSON Infrastructure
+ */
+case class Container(data: List[PointOfInterest])
+case class Geolocation(latitude: Float, longitude: Float)
+case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation)
+
+object GeoJsonProtocol extends DefaultJsonProtocol {
+ implicit val geolocationFormat = jsonFormat2(Geolocation)
+ implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest)
+ implicit val containerFormat = jsonFormat1(Container)
+}
+/** END-OF JSON Infrastructure */
+
+
+
+
+
+
+class ClientTest extends App {
+ implicit val actorSystem = ActorSystem("spray-client-test")
+ import actorSystem.dispatcher
+
+
+ import GeoJsonProtocol._
+ import SprayJsonSupport._
+
+
+
+
+ val pipeline = sendReceive ~> unmarshal[Container]
+
+ val response = pipeline {
+ Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco")
+
+ Post("http://www.")
+
+ } onSuccess {
+ case a => {
+ println(a)
+ }
+ }
+}
+
+
+
+
+
diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala
new file mode 100644
index 00000000..f592f6d7
--- /dev/null
+++ b/src/main/scala/spraytest/FutureTesting.scala
@@ -0,0 +1,81 @@
+package spraytest
+/*
+import akka.actor.ActorSystem
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Try, Success}
+import kamon.actor.TransactionContext
+
+object FutureTesting extends App {
+
+ val actorSystem = ActorSystem("future-testing")
+ implicit val ec = actorSystem.dispatcher
+ implicit val tctx = TransactionContext(11, Nil)
+
+ threadPrintln("In the initial Thread")
+
+
+ val f = TraceableFuture {
+ threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}")
+ }
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+
+
+
+
+
+
+
+ def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]")
+
+}
+
+
+
+
+trait TransactionContextWrapper {
+ def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = {
+ TransactionContext.current.set(tranContext.fork)
+ println(s"SetContext to: ${tranContext}")
+ val result = f
+
+ TransactionContext.current.remove()
+ result
+ }
+
+}
+
+class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper {
+ def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = {
+ future.onComplete(wrap(func, transactionContext))
+ }
+}
+
+object TraceableFuture {
+
+ implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
+
+ def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil))
+
+ new TraceableFuture(Future { wrappedBody })
+ }
+
+
+
+
+ def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = {
+ TransactionContext.current.set(transactionContext)
+ val result = body
+ TransactionContext.current.remove()
+ result
+ }
+}*/
+