aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-15 18:47:22 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-15 18:47:22 -0300
commit83e51763db4da386fb22b670aab9b0c2beda20d2 (patch)
treeb6bd0f1a63fd02e75208e6a6cec21dc1724b9b55 /src
parentee1b12e22a502308a26208f87132f08d356d1e1e (diff)
downloadKamon-83e51763db4da386fb22b670aab9b0c2beda20d2.tar.gz
Kamon-83e51763db4da386fb22b670aab9b0c2beda20d2.tar.bz2
Kamon-83e51763db4da386fb22b670aab9b0c2beda20d2.zip
wip
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/META-INF/aop.xml12
-rw-r--r--src/main/scala/akka/ActorAspect.scala26
-rw-r--r--src/main/scala/akka/Tracer.scala2
-rw-r--r--src/main/scala/akka/instrumentation/ActorInstrumentation.scala23
-rw-r--r--src/main/scala/kamon/TraceContext.scala42
-rw-r--r--src/main/scala/kamon/actor/AskSupport.scala16
-rw-r--r--src/main/scala/kamon/actor/TraceableActor.scala44
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala63
-rw-r--r--src/main/scala/spraytest/ClientTest.scala56
-rw-r--r--src/main/scala/spraytest/FutureTesting.scala81
10 files changed, 317 insertions, 48 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index b5e78683..20df0b49 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -1,18 +1,18 @@
-<!DOCTYPE aspectj PUBLIC
- "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
<aspectj>
<weaver options="-verbose -showWeaveInfo"/>
<aspects>
- <aspect name="akka.ActorSystemAspect"/>
- <!--<aspect name="akka.MailboxAspect"/>-->
- <aspect name="akka.PoolMonitorAspect"/>
- <!--<aspect name="akka.ActorAspect"/>-->
+ <!--<aspect name="akka.ActorSystemAspect"/>
+ &lt;!&ndash;<aspect name="akka.MailboxAspect"/>&ndash;&gt;
+ <aspect name="akka.PoolMonitorAspect"/>-->
+ <aspect name="akka.instrumentation.ActorInstrumentation"/>
<include within="*"/>
<exclude within="javax.*"/>
+
<exclude within="org.aspectj.*"/>
<exclude within="scala.*"/>
<exclude within="scalaz.*"/>
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala
index 744b0aea..9d64f205 100644
--- a/src/main/scala/akka/ActorAspect.scala
+++ b/src/main/scala/akka/ActorAspect.scala
@@ -3,24 +3,24 @@ package akka
import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
import kamon.metric.Metrics
-import akka.actor.ActorCell
@Aspect
class ActorAspect extends Metrics {
- println("Created ActorAspect")
+ println("Created ActorAspect")
- @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
- protected def actorReceive:Unit = {}
+ @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
+ protected def actorReceive:Unit = {}
- @Around("actorReceive() && this(actor)")
- def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
+ @Around("actorReceive() && this(actor)")
+ def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
- //println("The path is: "+actor.self.path.)
- val actorName:String = actor.self.path.toString
+ //println("The path is: "+actor.self.path.)
+ val actorName:String = actor.self.path.toString
- markAndCountMeter(actorName){
- pjp.proceed
- }
- }
-} \ No newline at end of file
+ markAndCountMeter(actorName){
+ pjp.proceed
+ }
+
+ }
+ } \ No newline at end of file
diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala
index bb290960..c58983e0 100644
--- a/src/main/scala/akka/Tracer.scala
+++ b/src/main/scala/akka/Tracer.scala
@@ -3,6 +3,7 @@ package akka
import actor.{Props, ActorSystemImpl}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._
+import com.newrelic.api.agent.NewRelic
import akka.dispatch.Mailbox
import scala._
import com.newrelic.api.agent.NewRelic
@@ -28,6 +29,7 @@ object Tracer {
val mbm = MailboxMetrics(mailboxes)
mbm.mailboxes.map { case(actorName,mb) => {
println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}")
+
NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages)
NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput)
diff --git a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala
new file mode 100644
index 00000000..ea599891
--- /dev/null
+++ b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala
@@ -0,0 +1,23 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect}
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.metric.Metrics
+import akka.actor.ActorCell
+
+@Aspect
+class ActorInstrumentation {
+ println("Created ActorAspect")
+
+ @Pointcut("execution(* kamon.executor.PingActor.receive(..))")
+ protected def actorReceive:Unit = {}
+
+ @Before("actorReceive() && args(message)")
+ def around(message: Any) = {
+ println("Around the actor cell receive")
+ //pjp.proceed(Array(Wrapper(message)))
+ //pjp.proceed
+ }
+}
+
+case class Wrapper(content: Any) \ No newline at end of file
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
new file mode 100644
index 00000000..b137168c
--- /dev/null
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -0,0 +1,42 @@
+package kamon
+
+import java.util.UUID
+import akka.actor.ActorPath
+
+
+case class TraceContext(id: UUID, entries: List[TraceEntry]) {
+ def fork = this.copy(entries = Nil)
+ def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries)
+}
+
+object TraceContext {
+ val current = new ThreadLocal[TraceContext]
+}
+
+trait TraceEntry
+case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long)
+
+case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry
+
+
+
+
+trait TraceSupport {
+ import TraceContext.current
+
+
+ def trace[T](blockName: String)(f: => T): T = {
+ val before = System.currentTimeMillis
+
+ val result = f
+
+ val after = System.currentTimeMillis
+ swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after)))
+
+ result
+ }
+
+ def swapContext(newContext: TraceContext) {
+ current.set(newContext)
+ }
+}
diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala
new file mode 100644
index 00000000..8a1ac2e8
--- /dev/null
+++ b/src/main/scala/kamon/actor/AskSupport.scala
@@ -0,0 +1,16 @@
+package kamon.actor
+
+import akka.actor.ActorRef
+import akka.util.Timeout
+import kamon.TraceContext
+
+trait TraceableAskSupport {
+ implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef)
+}
+
+// FIXME: This name sucks
+class TraceableAskableActorRef(val actorRef: ActorRef) {
+
+ def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get().fork, message))
+
+}
diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala
new file mode 100644
index 00000000..a38b10c9
--- /dev/null
+++ b/src/main/scala/kamon/actor/TraceableActor.scala
@@ -0,0 +1,44 @@
+package kamon.actor
+
+import akka.actor.{ActorRef, Actor}
+import kamon.TraceContext
+
+trait TraceableActor extends Actor with TracingImplicitConversions {
+
+ final def receive = {
+ case a: Any => {
+ a match {
+ case TraceableMessage(ctx, message) => {
+ TraceContext.current.set(ctx)
+
+ tracedReceive(message)
+
+ TraceContext.current.remove()
+
+ /** Publish the partial context information to the EventStream */
+ context.system.eventStream.publish(ctx)
+ }
+ case message: Any => tracedReceive(message)
+ }
+ }
+ }
+
+ def tracedReceive: Receive
+
+}
+
+class TraceableActorRef(val target: ActorRef) {
+ def !! (message: Any)(implicit sender: ActorRef) = {
+ val traceableMessage = TraceableMessage(TraceContext.current.get().fork, message)
+ target.tell(traceableMessage, sender)
+ }
+}
+
+
+
+trait TracingImplicitConversions {
+ implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef)
+}
+
+case class TraceableMessage(traceContext: TraceContext, message: Any)
+
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index 09f28b69..faa076d9 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -8,8 +8,13 @@ import kamon.metric.NewRelicReporter
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import com.yammer.metrics.reporting.ConsoleReporter
-import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor}
+import kamon.actor._
import scala.concurrent.Future
+import kamon.{TraceSupport, TraceContext}
+import akka.util.Timeout
+
+//import kamon.executor.MessageEvent
+import java.util.UUID
trait Message
@@ -33,6 +38,33 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{
subscriber ! event
}
}
+case class Ping()
+case class Pong()
+
+class PingActor(val target: ActorRef) extends Actor {
+ implicit def executionContext = context.dispatcher
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+
+ def receive = {
+ case Pong() => {
+ println("pong")
+ Thread.sleep(1000)
+ target ! Ping()
+ }
+ case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
+ }
+}
+
+class PongActor extends Actor {
+ def receive = {
+ case Ping() => {
+ println("ping")
+ sender ! Pong()
+ }
+ case a: Any => println(s"Got ${a} in PONG")
+ }
+}
+
object TryAkka extends App{
val system = ActorSystem("MySystem")
@@ -47,34 +79,7 @@ object TryAkka extends App{
- case class Ping()
- case class Pong()
- class PingActor(val target: ActorRef) extends EnhancedActor {
- import akka.pattern.pipe
- implicit def executionContext = context.dispatcher
-
- def wrappedReceive = {
- case Pong() => {
- transactionContext = transactionContext.append(DeveloperComment("In PONG"))
-
-
- Future {
- Thread.sleep(1000) // Doing something really expensive
- ContextAwareMessage(transactionContext, Ping())
- } pipeTo target
-
- }
- }
- }
-
- class PongActor extends EnhancedActor {
- def wrappedReceive = {
- case Ping() => {
- superTell(sender, Pong())
- }
- }
- }
/*
@@ -85,7 +90,7 @@ object TryAkka extends App{
/*for(i <- 1 to 8) {*/
val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong")
- ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong())
+ ping ! Pong()
//}
diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala
new file mode 100644
index 00000000..c3a6ba39
--- /dev/null
+++ b/src/main/scala/spraytest/ClientTest.scala
@@ -0,0 +1,56 @@
+package spraytest
+
+import akka.actor.ActorSystem
+import spray.client.pipelining._
+import spray.httpx.SprayJsonSupport
+import spray.json._
+import scala.concurrent.Future
+
+/**
+ * BEGIN JSON Infrastructure
+ */
+case class Container(data: List[PointOfInterest])
+case class Geolocation(latitude: Float, longitude: Float)
+case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation)
+
+object GeoJsonProtocol extends DefaultJsonProtocol {
+ implicit val geolocationFormat = jsonFormat2(Geolocation)
+ implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest)
+ implicit val containerFormat = jsonFormat1(Container)
+}
+/** END-OF JSON Infrastructure */
+
+
+
+
+
+
+class ClientTest extends App {
+ implicit val actorSystem = ActorSystem("spray-client-test")
+ import actorSystem.dispatcher
+
+
+ import GeoJsonProtocol._
+ import SprayJsonSupport._
+
+
+
+
+ val pipeline = sendReceive ~> unmarshal[Container]
+
+ val response = pipeline {
+ Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco")
+
+ Post("http://www.")
+
+ } onSuccess {
+ case a => {
+ println(a)
+ }
+ }
+}
+
+
+
+
+
diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala
new file mode 100644
index 00000000..f592f6d7
--- /dev/null
+++ b/src/main/scala/spraytest/FutureTesting.scala
@@ -0,0 +1,81 @@
+package spraytest
+/*
+import akka.actor.ActorSystem
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Try, Success}
+import kamon.actor.TransactionContext
+
+object FutureTesting extends App {
+
+ val actorSystem = ActorSystem("future-testing")
+ implicit val ec = actorSystem.dispatcher
+ implicit val tctx = TransactionContext(11, Nil)
+
+ threadPrintln("In the initial Thread")
+
+
+ val f = TraceableFuture {
+ threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}")
+ }
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+
+
+
+
+
+
+
+ def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]")
+
+}
+
+
+
+
+trait TransactionContextWrapper {
+ def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = {
+ TransactionContext.current.set(tranContext.fork)
+ println(s"SetContext to: ${tranContext}")
+ val result = f
+
+ TransactionContext.current.remove()
+ result
+ }
+
+}
+
+class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper {
+ def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = {
+ future.onComplete(wrap(func, transactionContext))
+ }
+}
+
+object TraceableFuture {
+
+ implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
+
+ def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil))
+
+ new TraceableFuture(Future { wrappedBody })
+ }
+
+
+
+
+ def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = {
+ TransactionContext.current.set(transactionContext)
+ val result = body
+ TransactionContext.current.remove()
+ result
+ }
+}*/
+