diff options
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/resources/META-INF/aop.xml | 6 | ||||
-rw-r--r-- | src/main/scala/akka/ActorAspect.scala | 26 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala | 54 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 56 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/AskSupport.scala | 16 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/EnhancedActor.scala | 45 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/TraceableActor.scala | 44 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 78 | ||||
-rw-r--r-- | src/main/scala/spraytest/ClientTest.scala | 56 | ||||
-rw-r--r-- | src/main/scala/spraytest/FutureTesting.scala | 81 |
10 files changed, 368 insertions, 94 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index b5e78683..aa4f7f4a 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -1,5 +1,4 @@ -<!DOCTYPE aspectj PUBLIC - "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> <aspectj> @@ -9,7 +8,8 @@ <aspect name="akka.ActorSystemAspect"/> <!--<aspect name="akka.MailboxAspect"/>--> <aspect name="akka.PoolMonitorAspect"/> - <!--<aspect name="akka.ActorAspect"/>--> + <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> + <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> <include within="*"/> <exclude within="javax.*"/> diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala index 744b0aea..05a7bc0a 100644 --- a/src/main/scala/akka/ActorAspect.scala +++ b/src/main/scala/akka/ActorAspect.scala @@ -3,24 +3,24 @@ package akka import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint import kamon.metric.Metrics -import akka.actor.ActorCell @Aspect class ActorAspect extends Metrics { - println("Created ActorAspect") + println("Created ActorAspect") - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - protected def actorReceive:Unit = {} + @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") + protected def actorReceive:Unit = {} - @Around("actorReceive() && this(actor)") - def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { + @Around("sendingMessageToActorRef() && this(actor)") + def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { - //println("The path is: "+actor.self.path.) - val actorName:String = actor.self.path.toString + //println("The path is: "+actor.self.path.) + val actorName:String = actor.self.path.toString - markAndCountMeter(actorName){ - pjp.proceed - } - } -}
\ No newline at end of file + markAndCountMeter(actorName){ + pjp.proceed + } + + } + }
\ No newline at end of file diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..783a6c45 --- /dev/null +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,54 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{ActorRef, ActorCell} +import kamon.TraceContext +import kamon.actor.TraceableMessage +import akka.dispatch.Envelope + +@Aspect +class ActorRefTellInstrumentation { + println("Created ActorAspect") + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)") + def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(message, sender)") + def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { + import pjp._ + + TraceContext.current match { + case Some(ctx) => { + val traceableMessage = TraceableMessage(ctx.fork, message) + proceed(getArgs.updated(0, traceableMessage)) + } + case None => proceed + } + } +} + +@Aspect +class ActorCellInvokeInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope) = { + import pjp._ + + envelope match { + case Envelope(TraceableMessage(ctx, msg), sender) => { + TraceContext.set(ctx) + + val originalEnvelope = envelope.copy(message = msg) + proceed(getArgs.updated(0, originalEnvelope)) + + TraceContext.clear + } + case _ => proceed + } + } +}
\ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala new file mode 100644 index 00000000..1fbedf86 --- /dev/null +++ b/src/main/scala/kamon/TraceContext.scala @@ -0,0 +1,56 @@ +package kamon + +import java.util.UUID +import akka.actor.ActorPath + + +case class TraceContext(id: UUID, entries: List[TraceEntry]) { + def fork = this.copy(entries = Nil) + def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries) +} + +object TraceContext { + private val context = new ThreadLocal[TraceContext] + + def current = { + val ctx = context.get() + if(ctx ne null) + Some(ctx) + else + None + } + + def clear = context.remove() + + def set(ctx: TraceContext) = context.set(ctx) + + def start = set(TraceContext(UUID.randomUUID(), Nil)) +} + +trait TraceEntry +case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long) + +case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry + + + + +trait TraceSupport { + import TraceContext.current + + + def trace[T](blockName: String)(f: => T): T = { + val before = System.currentTimeMillis + + val result = f + + val after = System.currentTimeMillis + //swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after))) + + result + } + + def swapContext(newContext: TraceContext) { + //current.set(newContext) + } +} diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala new file mode 100644 index 00000000..0a8d27be --- /dev/null +++ b/src/main/scala/kamon/actor/AskSupport.scala @@ -0,0 +1,16 @@ +package kamon.actor + +import akka.actor.ActorRef +import akka.util.Timeout +import kamon.TraceContext + +trait TraceableAskSupport { + implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef) +} + +// FIXME: This name sucks +class TraceableAskableActorRef(val actorRef: ActorRef) { + + def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message)) + +} diff --git a/src/main/scala/kamon/actor/EnhancedActor.scala b/src/main/scala/kamon/actor/EnhancedActor.scala deleted file mode 100644 index ad879505..00000000 --- a/src/main/scala/kamon/actor/EnhancedActor.scala +++ /dev/null @@ -1,45 +0,0 @@ -package kamon.actor - -import akka.actor.{ActorRef, Actor} - -trait EnhancedActor extends Actor { - protected[this] var transactionContext: TransactionContext = _ - - final def receive = { - case a: Any => { - a match { - case ContextAwareMessage(ctx, message) => { - transactionContext = ctx - println(s"Actor ${self.path.toString}. Current context: ${transactionContext}") - wrappedReceive(message) - } - case message: Any => wrappedReceive(message) - } - } - } - - - - - def wrappedReceive: Receive - - - def superTell(target: ActorRef, message: Any) = { - target.tell(ContextAwareMessage(transactionContext, message), self) - } - -} - - -case class ContextAwareMessage(context: TransactionContext, message: Any) - - -case class TransactionContext(id: Long, entries: List[ContextEntry]) { - def append(entry: ContextEntry) = this.copy(entries = entry :: this.entries) -} -sealed trait ContextEntry - -case class DeveloperComment(comment: String) extends ContextEntry - -case class MessageExecutionTime(actorPath: String, begin: Long, end: Long) extends ContextEntry - diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala new file mode 100644 index 00000000..3acbd293 --- /dev/null +++ b/src/main/scala/kamon/actor/TraceableActor.scala @@ -0,0 +1,44 @@ +package kamon.actor + +import akka.actor.{ActorRef, Actor} +import kamon.TraceContext + +trait TraceableActor extends Actor with TracingImplicitConversions { + + final def receive = { + case a: Any => { + a match { + case TraceableMessage(ctx, message) => { + //TraceContext.current.set(ctx) + + tracedReceive(message) + + //TraceContext.current.remove() + + /** Publish the partial context information to the EventStream */ + context.system.eventStream.publish(ctx) + } + case message: Any => tracedReceive(message) + } + } + } + + def tracedReceive: Receive + +} + +class TraceableActorRef(val target: ActorRef) { + def !! (message: Any)(implicit sender: ActorRef) = { + val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message) + target.tell(traceableMessage, sender) + } +} + + + +trait TracingImplicitConversions { + implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef) +} + +case class TraceableMessage(traceContext: TraceContext, message: Any) + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 09f28b69..84420373 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -2,14 +2,22 @@ package kamon.executor import akka.event.ActorEventBus import akka.event.LookupClassification -import akka.actor.{ActorRef, ActorSystem, Props, Actor} +import akka.actor._ import java.util.concurrent.TimeUnit import kamon.metric.NewRelicReporter import com.yammer.metrics.core.{MetricName, MetricsRegistry} import com.yammer.metrics.reporting.ConsoleReporter -import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor} +import kamon.actor._ import scala.concurrent.Future +import kamon.{TraceSupport, TraceContext} +import akka.util.Timeout +import kamon.executor.Ping +import kamon.executor.MessageEvent +import kamon.executor.Pong + +//import kamon.executor.MessageEvent +import java.util.UUID trait Message @@ -33,6 +41,36 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ subscriber ! event } } +case class Ping() +case class Pong() + +class PingActor(val target: ActorRef) extends Actor with ActorLogging { + implicit def executionContext = context.dispatcher + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + + def receive = { + case Pong() => { + log.info(s"pong with context ${TraceContext.current}") + Thread.sleep(1000) + target ! Ping() + } + case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) + } + + def withAny(): Any = {1} + def withAnyRef(): AnyRef = {new Object} +} + +class PongActor extends Actor with ActorLogging { + def receive = { + case Ping() => { + log.info(s"ping with context ${TraceContext.current}") + sender ! Pong() + } + case a: Any => println(s"Got ${a} in PONG") + } +} + object TryAkka extends App{ val system = ActorSystem("MySystem") @@ -47,34 +85,7 @@ object TryAkka extends App{ - case class Ping() - case class Pong() - class PingActor(val target: ActorRef) extends EnhancedActor { - import akka.pattern.pipe - implicit def executionContext = context.dispatcher - - def wrappedReceive = { - case Pong() => { - transactionContext = transactionContext.append(DeveloperComment("In PONG")) - - - Future { - Thread.sleep(1000) // Doing something really expensive - ContextAwareMessage(transactionContext, Ping()) - } pipeTo target - - } - } - } - - class PongActor extends EnhancedActor { - def wrappedReceive = { - case Ping() => { - superTell(sender, Pong()) - } - } - } /* @@ -83,10 +94,11 @@ object TryAkka extends App{ */ - /*for(i <- 1 to 8) {*/ - val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong") - ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong()) - //} + for(i <- 1 to 8) { + TraceContext.start + val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") + ping ! Pong() + } /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala new file mode 100644 index 00000000..c3a6ba39 --- /dev/null +++ b/src/main/scala/spraytest/ClientTest.scala @@ -0,0 +1,56 @@ +package spraytest + +import akka.actor.ActorSystem +import spray.client.pipelining._ +import spray.httpx.SprayJsonSupport +import spray.json._ +import scala.concurrent.Future + +/** + * BEGIN JSON Infrastructure + */ +case class Container(data: List[PointOfInterest]) +case class Geolocation(latitude: Float, longitude: Float) +case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) + +object GeoJsonProtocol extends DefaultJsonProtocol { + implicit val geolocationFormat = jsonFormat2(Geolocation) + implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) + implicit val containerFormat = jsonFormat1(Container) +} +/** END-OF JSON Infrastructure */ + + + + + + +class ClientTest extends App { + implicit val actorSystem = ActorSystem("spray-client-test") + import actorSystem.dispatcher + + + import GeoJsonProtocol._ + import SprayJsonSupport._ + + + + + val pipeline = sendReceive ~> unmarshal[Container] + + val response = pipeline { + Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") + + Post("http://www.") + + } onSuccess { + case a => { + println(a) + } + } +} + + + + + diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala new file mode 100644 index 00000000..f592f6d7 --- /dev/null +++ b/src/main/scala/spraytest/FutureTesting.scala @@ -0,0 +1,81 @@ +package spraytest +/* +import akka.actor.ActorSystem +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Try, Success} +import kamon.actor.TransactionContext + +object FutureTesting extends App { + + val actorSystem = ActorSystem("future-testing") + implicit val ec = actorSystem.dispatcher + implicit val tctx = TransactionContext(11, Nil) + + threadPrintln("In the initial Thread") + + + val f = TraceableFuture { + threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") + } + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") + }) + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") + }) + + + + + + + + + def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") + +} + + + + +trait TransactionContextWrapper { + def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { + TransactionContext.current.set(tranContext.fork) + println(s"SetContext to: ${tranContext}") + val result = f + + TransactionContext.current.remove() + result + } + +} + +class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { + def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { + future.onComplete(wrap(func, transactionContext)) + } +} + +object TraceableFuture { + + implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future + + def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { + val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil)) + + new TraceableFuture(Future { wrappedBody }) + } + + + + + def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { + TransactionContext.current.set(transactionContext) + val result = body + TransactionContext.current.remove() + result + } +}*/ + |