From 83e51763db4da386fb22b670aab9b0c2beda20d2 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 15 May 2013 18:47:22 -0300 Subject: wip --- src/main/resources/META-INF/aop.xml | 12 ++-- src/main/scala/akka/ActorAspect.scala | 26 +++---- src/main/scala/akka/Tracer.scala | 2 + .../instrumentation/ActorInstrumentation.scala | 23 ++++++ src/main/scala/kamon/TraceContext.scala | 42 +++++++++++ src/main/scala/kamon/actor/AskSupport.scala | 16 +++++ src/main/scala/kamon/actor/TraceableActor.scala | 44 ++++++++++++ src/main/scala/kamon/executor/eventbus.scala | 63 +++++++++-------- src/main/scala/spraytest/ClientTest.scala | 56 +++++++++++++++ src/main/scala/spraytest/FutureTesting.scala | 81 ++++++++++++++++++++++ 10 files changed, 317 insertions(+), 48 deletions(-) create mode 100644 src/main/scala/akka/instrumentation/ActorInstrumentation.scala create mode 100644 src/main/scala/kamon/TraceContext.scala create mode 100644 src/main/scala/kamon/actor/AskSupport.scala create mode 100644 src/main/scala/kamon/actor/TraceableActor.scala create mode 100644 src/main/scala/spraytest/ClientTest.scala create mode 100644 src/main/scala/spraytest/FutureTesting.scala (limited to 'src') diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index b5e78683..20df0b49 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -1,18 +1,18 @@ - + - - - - + + + diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala index 744b0aea..9d64f205 100644 --- a/src/main/scala/akka/ActorAspect.scala +++ b/src/main/scala/akka/ActorAspect.scala @@ -3,24 +3,24 @@ package akka import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint import kamon.metric.Metrics -import akka.actor.ActorCell @Aspect class ActorAspect extends Metrics { - println("Created ActorAspect") + println("Created ActorAspect") - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - protected def actorReceive:Unit = {} + @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") + protected def actorReceive:Unit = {} - @Around("actorReceive() && this(actor)") - def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { + @Around("actorReceive() && this(actor)") + def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { - //println("The path is: "+actor.self.path.) - val actorName:String = actor.self.path.toString + //println("The path is: "+actor.self.path.) + val actorName:String = actor.self.path.toString - markAndCountMeter(actorName){ - pjp.proceed - } - } -} \ No newline at end of file + markAndCountMeter(actorName){ + pjp.proceed + } + + } + } \ No newline at end of file diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala index bb290960..c58983e0 100644 --- a/src/main/scala/akka/Tracer.scala +++ b/src/main/scala/akka/Tracer.scala @@ -3,6 +3,7 @@ package akka import actor.{Props, ActorSystemImpl} import scala.concurrent.forkjoin.ForkJoinPool import scala.concurrent.duration._ +import com.newrelic.api.agent.NewRelic import akka.dispatch.Mailbox import scala._ import com.newrelic.api.agent.NewRelic @@ -28,6 +29,7 @@ object Tracer { val mbm = MailboxMetrics(mailboxes) mbm.mailboxes.map { case(actorName,mb) => { println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}") + NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages) NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput) diff --git a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala new file mode 100644 index 00000000..ea599891 --- /dev/null +++ b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala @@ -0,0 +1,23 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import kamon.metric.Metrics +import akka.actor.ActorCell + +@Aspect +class ActorInstrumentation { + println("Created ActorAspect") + + @Pointcut("execution(* kamon.executor.PingActor.receive(..))") + protected def actorReceive:Unit = {} + + @Before("actorReceive() && args(message)") + def around(message: Any) = { + println("Around the actor cell receive") + //pjp.proceed(Array(Wrapper(message))) + //pjp.proceed + } +} + +case class Wrapper(content: Any) \ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala new file mode 100644 index 00000000..b137168c --- /dev/null +++ b/src/main/scala/kamon/TraceContext.scala @@ -0,0 +1,42 @@ +package kamon + +import java.util.UUID +import akka.actor.ActorPath + + +case class TraceContext(id: UUID, entries: List[TraceEntry]) { + def fork = this.copy(entries = Nil) + def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries) +} + +object TraceContext { + val current = new ThreadLocal[TraceContext] +} + +trait TraceEntry +case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long) + +case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry + + + + +trait TraceSupport { + import TraceContext.current + + + def trace[T](blockName: String)(f: => T): T = { + val before = System.currentTimeMillis + + val result = f + + val after = System.currentTimeMillis + swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after))) + + result + } + + def swapContext(newContext: TraceContext) { + current.set(newContext) + } +} diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala new file mode 100644 index 00000000..8a1ac2e8 --- /dev/null +++ b/src/main/scala/kamon/actor/AskSupport.scala @@ -0,0 +1,16 @@ +package kamon.actor + +import akka.actor.ActorRef +import akka.util.Timeout +import kamon.TraceContext + +trait TraceableAskSupport { + implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef) +} + +// FIXME: This name sucks +class TraceableAskableActorRef(val actorRef: ActorRef) { + + def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get().fork, message)) + +} diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala new file mode 100644 index 00000000..a38b10c9 --- /dev/null +++ b/src/main/scala/kamon/actor/TraceableActor.scala @@ -0,0 +1,44 @@ +package kamon.actor + +import akka.actor.{ActorRef, Actor} +import kamon.TraceContext + +trait TraceableActor extends Actor with TracingImplicitConversions { + + final def receive = { + case a: Any => { + a match { + case TraceableMessage(ctx, message) => { + TraceContext.current.set(ctx) + + tracedReceive(message) + + TraceContext.current.remove() + + /** Publish the partial context information to the EventStream */ + context.system.eventStream.publish(ctx) + } + case message: Any => tracedReceive(message) + } + } + } + + def tracedReceive: Receive + +} + +class TraceableActorRef(val target: ActorRef) { + def !! (message: Any)(implicit sender: ActorRef) = { + val traceableMessage = TraceableMessage(TraceContext.current.get().fork, message) + target.tell(traceableMessage, sender) + } +} + + + +trait TracingImplicitConversions { + implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef) +} + +case class TraceableMessage(traceContext: TraceContext, message: Any) + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 09f28b69..faa076d9 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -8,8 +8,13 @@ import kamon.metric.NewRelicReporter import com.yammer.metrics.core.{MetricName, MetricsRegistry} import com.yammer.metrics.reporting.ConsoleReporter -import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor} +import kamon.actor._ import scala.concurrent.Future +import kamon.{TraceSupport, TraceContext} +import akka.util.Timeout + +//import kamon.executor.MessageEvent +import java.util.UUID trait Message @@ -33,6 +38,33 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ subscriber ! event } } +case class Ping() +case class Pong() + +class PingActor(val target: ActorRef) extends Actor { + implicit def executionContext = context.dispatcher + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + + def receive = { + case Pong() => { + println("pong") + Thread.sleep(1000) + target ! Ping() + } + case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) + } +} + +class PongActor extends Actor { + def receive = { + case Ping() => { + println("ping") + sender ! Pong() + } + case a: Any => println(s"Got ${a} in PONG") + } +} + object TryAkka extends App{ val system = ActorSystem("MySystem") @@ -47,34 +79,7 @@ object TryAkka extends App{ - case class Ping() - case class Pong() - class PingActor(val target: ActorRef) extends EnhancedActor { - import akka.pattern.pipe - implicit def executionContext = context.dispatcher - - def wrappedReceive = { - case Pong() => { - transactionContext = transactionContext.append(DeveloperComment("In PONG")) - - - Future { - Thread.sleep(1000) // Doing something really expensive - ContextAwareMessage(transactionContext, Ping()) - } pipeTo target - - } - } - } - - class PongActor extends EnhancedActor { - def wrappedReceive = { - case Ping() => { - superTell(sender, Pong()) - } - } - } /* @@ -85,7 +90,7 @@ object TryAkka extends App{ /*for(i <- 1 to 8) {*/ val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong") - ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong()) + ping ! Pong() //} diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala new file mode 100644 index 00000000..c3a6ba39 --- /dev/null +++ b/src/main/scala/spraytest/ClientTest.scala @@ -0,0 +1,56 @@ +package spraytest + +import akka.actor.ActorSystem +import spray.client.pipelining._ +import spray.httpx.SprayJsonSupport +import spray.json._ +import scala.concurrent.Future + +/** + * BEGIN JSON Infrastructure + */ +case class Container(data: List[PointOfInterest]) +case class Geolocation(latitude: Float, longitude: Float) +case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) + +object GeoJsonProtocol extends DefaultJsonProtocol { + implicit val geolocationFormat = jsonFormat2(Geolocation) + implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) + implicit val containerFormat = jsonFormat1(Container) +} +/** END-OF JSON Infrastructure */ + + + + + + +class ClientTest extends App { + implicit val actorSystem = ActorSystem("spray-client-test") + import actorSystem.dispatcher + + + import GeoJsonProtocol._ + import SprayJsonSupport._ + + + + + val pipeline = sendReceive ~> unmarshal[Container] + + val response = pipeline { + Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") + + Post("http://www.") + + } onSuccess { + case a => { + println(a) + } + } +} + + + + + diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala new file mode 100644 index 00000000..f592f6d7 --- /dev/null +++ b/src/main/scala/spraytest/FutureTesting.scala @@ -0,0 +1,81 @@ +package spraytest +/* +import akka.actor.ActorSystem +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Try, Success} +import kamon.actor.TransactionContext + +object FutureTesting extends App { + + val actorSystem = ActorSystem("future-testing") + implicit val ec = actorSystem.dispatcher + implicit val tctx = TransactionContext(11, Nil) + + threadPrintln("In the initial Thread") + + + val f = TraceableFuture { + threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") + } + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") + }) + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") + }) + + + + + + + + + def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") + +} + + + + +trait TransactionContextWrapper { + def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { + TransactionContext.current.set(tranContext.fork) + println(s"SetContext to: ${tranContext}") + val result = f + + TransactionContext.current.remove() + result + } + +} + +class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { + def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { + future.onComplete(wrap(func, transactionContext)) + } +} + +object TraceableFuture { + + implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future + + def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { + val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil)) + + new TraceableFuture(Future { wrappedBody }) + } + + + + + def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { + TransactionContext.current.set(transactionContext) + val result = body + TransactionContext.current.remove() + result + } +}*/ + -- cgit v1.2.3