diff options
-rw-r--r-- | project/Build.scala | 4 | ||||
-rw-r--r-- | project/Dependencies.scala | 33 | ||||
-rw-r--r-- | project/Settings.scala | 9 | ||||
-rw-r--r-- | project/plugins.sbt | 2 | ||||
-rw-r--r-- | src/main/resources/META-INF/aop.xml | 12 | ||||
-rw-r--r-- | src/main/scala/akka/ActorAspect.scala | 26 | ||||
-rw-r--r-- | src/main/scala/akka/Tracer.scala | 2 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorInstrumentation.scala | 23 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 42 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/AskSupport.scala | 16 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/TraceableActor.scala | 44 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 63 | ||||
-rw-r--r-- | src/main/scala/spraytest/ClientTest.scala | 56 | ||||
-rw-r--r-- | src/main/scala/spraytest/FutureTesting.scala | 81 |
14 files changed, 347 insertions, 66 deletions
diff --git a/project/Build.scala b/project/Build.scala index 49366bd5..13afe956 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -5,12 +5,12 @@ object Build extends Build { import Dependencies._ import Settings._ - lazy val root = Project("kamon", file(".")) .settings(basicSettings: _*) + .settings(revolverSettings: _*) .settings( libraryDependencies ++= - compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, metricsScala) ++ + compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, metricsScala, sprayJson) ++ test(scalatest, sprayTestkit)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d64626f2..5474228f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,22 +7,23 @@ object Dependencies { "spray nightlies repo" at "http://nightlies.spray.io" ) - val sprayCan = "io.spray" % "spray-can" % "1.1-M7" - val sprayRouting = "io.spray" % "spray-routing" % "1.1-M7" - val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-M7" - val sprayClient = "io.spray" % "spray-client" % "1.1-M7" - val sprayServlet = "io.spray" % "spray-servlet" % "1.1-M7" - val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1" - val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2" - val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2" - val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2" - val scalatest = "org.scalatest" %% "scalatest" % "1.9.1" - val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" - val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" - val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0" - val metricsScala = "com.yammer.metrics" % "metrics-scala_2.9.1" % "2.2.0" - val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2" - + val sprayCan = "io.spray" % "spray-can" % "1.1-20130509" + val sprayRouting = "io.spray" % "spray-routing" % "1.1-20130509" + val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-20130509" + val sprayClient = "io.spray" % "spray-client" % "1.1-20130509" + val sprayServlet = "io.spray" % "spray-servlet" % "1.1-20130509" + val sprayJson = "io.spray" %% "spray-json" % "1.2.3" + val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1" + val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2" + val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2" + val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2" + val scalatest = "org.scalatest" %% "scalatest" % "1.9.1" + val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" + val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" + val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0" + val metricsScala = "com.yammer.metrics" % "metrics-scala_2.9.1" % "2.2.0" + val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2" + val playJson = "play" % "play-json" % "2.2-SNAPSHOT" def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") diff --git a/project/Settings.scala b/project/Settings.scala index e878e881..7eddda5f 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -1,5 +1,6 @@ import sbt._ import Keys._ +import spray.revolver.RevolverPlugin.Revolver object Settings { val VERSION = "0.1-SNAPSHOT" @@ -21,5 +22,13 @@ object Settings { "-Xlog-reflective-calls" ) ) + + + import spray.revolver.RevolverPlugin.Revolver._ + lazy val revolverSettings = Revolver.settings ++ seq( + reJRebelJar := "~/.jrebel/jrebel.jar" + ) + + } diff --git a/project/plugins.sbt b/project/plugins.sbt index 91cadf24..34921388 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,3 +2,5 @@ resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositori addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0") +addSbtPlugin("io.spray" % "sbt-revolver" % "0.6.2") + diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index b5e78683..20df0b49 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -1,18 +1,18 @@ -<!DOCTYPE aspectj PUBLIC - "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> <aspectj> <weaver options="-verbose -showWeaveInfo"/> <aspects> - <aspect name="akka.ActorSystemAspect"/> - <!--<aspect name="akka.MailboxAspect"/>--> - <aspect name="akka.PoolMonitorAspect"/> - <!--<aspect name="akka.ActorAspect"/>--> + <!--<aspect name="akka.ActorSystemAspect"/> + <!–<aspect name="akka.MailboxAspect"/>–> + <aspect name="akka.PoolMonitorAspect"/>--> + <aspect name="akka.instrumentation.ActorInstrumentation"/> <include within="*"/> <exclude within="javax.*"/> + <exclude within="org.aspectj.*"/> <exclude within="scala.*"/> <exclude within="scalaz.*"/> diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala index 744b0aea..9d64f205 100644 --- a/src/main/scala/akka/ActorAspect.scala +++ b/src/main/scala/akka/ActorAspect.scala @@ -3,24 +3,24 @@ package akka import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint import kamon.metric.Metrics -import akka.actor.ActorCell @Aspect class ActorAspect extends Metrics { - println("Created ActorAspect") + println("Created ActorAspect") - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - protected def actorReceive:Unit = {} + @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") + protected def actorReceive:Unit = {} - @Around("actorReceive() && this(actor)") - def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { + @Around("actorReceive() && this(actor)") + def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { - //println("The path is: "+actor.self.path.) - val actorName:String = actor.self.path.toString + //println("The path is: "+actor.self.path.) + val actorName:String = actor.self.path.toString - markAndCountMeter(actorName){ - pjp.proceed - } - } -}
\ No newline at end of file + markAndCountMeter(actorName){ + pjp.proceed + } + + } + }
\ No newline at end of file diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala index bb290960..c58983e0 100644 --- a/src/main/scala/akka/Tracer.scala +++ b/src/main/scala/akka/Tracer.scala @@ -3,6 +3,7 @@ package akka import actor.{Props, ActorSystemImpl} import scala.concurrent.forkjoin.ForkJoinPool import scala.concurrent.duration._ +import com.newrelic.api.agent.NewRelic import akka.dispatch.Mailbox import scala._ import com.newrelic.api.agent.NewRelic @@ -28,6 +29,7 @@ object Tracer { val mbm = MailboxMetrics(mailboxes) mbm.mailboxes.map { case(actorName,mb) => { println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}") + NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages) NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput) diff --git a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala new file mode 100644 index 00000000..ea599891 --- /dev/null +++ b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala @@ -0,0 +1,23 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import kamon.metric.Metrics +import akka.actor.ActorCell + +@Aspect +class ActorInstrumentation { + println("Created ActorAspect") + + @Pointcut("execution(* kamon.executor.PingActor.receive(..))") + protected def actorReceive:Unit = {} + + @Before("actorReceive() && args(message)") + def around(message: Any) = { + println("Around the actor cell receive") + //pjp.proceed(Array(Wrapper(message))) + //pjp.proceed + } +} + +case class Wrapper(content: Any)
\ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala new file mode 100644 index 00000000..b137168c --- /dev/null +++ b/src/main/scala/kamon/TraceContext.scala @@ -0,0 +1,42 @@ +package kamon + +import java.util.UUID +import akka.actor.ActorPath + + +case class TraceContext(id: UUID, entries: List[TraceEntry]) { + def fork = this.copy(entries = Nil) + def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries) +} + +object TraceContext { + val current = new ThreadLocal[TraceContext] +} + +trait TraceEntry +case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long) + +case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry + + + + +trait TraceSupport { + import TraceContext.current + + + def trace[T](blockName: String)(f: => T): T = { + val before = System.currentTimeMillis + + val result = f + + val after = System.currentTimeMillis + swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after))) + + result + } + + def swapContext(newContext: TraceContext) { + current.set(newContext) + } +} diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala new file mode 100644 index 00000000..8a1ac2e8 --- /dev/null +++ b/src/main/scala/kamon/actor/AskSupport.scala @@ -0,0 +1,16 @@ +package kamon.actor + +import akka.actor.ActorRef +import akka.util.Timeout +import kamon.TraceContext + +trait TraceableAskSupport { + implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef) +} + +// FIXME: This name sucks +class TraceableAskableActorRef(val actorRef: ActorRef) { + + def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get().fork, message)) + +} diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala new file mode 100644 index 00000000..a38b10c9 --- /dev/null +++ b/src/main/scala/kamon/actor/TraceableActor.scala @@ -0,0 +1,44 @@ +package kamon.actor + +import akka.actor.{ActorRef, Actor} +import kamon.TraceContext + +trait TraceableActor extends Actor with TracingImplicitConversions { + + final def receive = { + case a: Any => { + a match { + case TraceableMessage(ctx, message) => { + TraceContext.current.set(ctx) + + tracedReceive(message) + + TraceContext.current.remove() + + /** Publish the partial context information to the EventStream */ + context.system.eventStream.publish(ctx) + } + case message: Any => tracedReceive(message) + } + } + } + + def tracedReceive: Receive + +} + +class TraceableActorRef(val target: ActorRef) { + def !! (message: Any)(implicit sender: ActorRef) = { + val traceableMessage = TraceableMessage(TraceContext.current.get().fork, message) + target.tell(traceableMessage, sender) + } +} + + + +trait TracingImplicitConversions { + implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef) +} + +case class TraceableMessage(traceContext: TraceContext, message: Any) + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 09f28b69..faa076d9 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -8,8 +8,13 @@ import kamon.metric.NewRelicReporter import com.yammer.metrics.core.{MetricName, MetricsRegistry} import com.yammer.metrics.reporting.ConsoleReporter -import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor} +import kamon.actor._ import scala.concurrent.Future +import kamon.{TraceSupport, TraceContext} +import akka.util.Timeout + +//import kamon.executor.MessageEvent +import java.util.UUID trait Message @@ -33,6 +38,33 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ subscriber ! event } } +case class Ping() +case class Pong() + +class PingActor(val target: ActorRef) extends Actor { + implicit def executionContext = context.dispatcher + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + + def receive = { + case Pong() => { + println("pong") + Thread.sleep(1000) + target ! Ping() + } + case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) + } +} + +class PongActor extends Actor { + def receive = { + case Ping() => { + println("ping") + sender ! Pong() + } + case a: Any => println(s"Got ${a} in PONG") + } +} + object TryAkka extends App{ val system = ActorSystem("MySystem") @@ -47,34 +79,7 @@ object TryAkka extends App{ - case class Ping() - case class Pong() - class PingActor(val target: ActorRef) extends EnhancedActor { - import akka.pattern.pipe - implicit def executionContext = context.dispatcher - - def wrappedReceive = { - case Pong() => { - transactionContext = transactionContext.append(DeveloperComment("In PONG")) - - - Future { - Thread.sleep(1000) // Doing something really expensive - ContextAwareMessage(transactionContext, Ping()) - } pipeTo target - - } - } - } - - class PongActor extends EnhancedActor { - def wrappedReceive = { - case Ping() => { - superTell(sender, Pong()) - } - } - } /* @@ -85,7 +90,7 @@ object TryAkka extends App{ /*for(i <- 1 to 8) {*/ val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong") - ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong()) + ping ! Pong() //} diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala new file mode 100644 index 00000000..c3a6ba39 --- /dev/null +++ b/src/main/scala/spraytest/ClientTest.scala @@ -0,0 +1,56 @@ +package spraytest + +import akka.actor.ActorSystem +import spray.client.pipelining._ +import spray.httpx.SprayJsonSupport +import spray.json._ +import scala.concurrent.Future + +/** + * BEGIN JSON Infrastructure + */ +case class Container(data: List[PointOfInterest]) +case class Geolocation(latitude: Float, longitude: Float) +case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) + +object GeoJsonProtocol extends DefaultJsonProtocol { + implicit val geolocationFormat = jsonFormat2(Geolocation) + implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) + implicit val containerFormat = jsonFormat1(Container) +} +/** END-OF JSON Infrastructure */ + + + + + + +class ClientTest extends App { + implicit val actorSystem = ActorSystem("spray-client-test") + import actorSystem.dispatcher + + + import GeoJsonProtocol._ + import SprayJsonSupport._ + + + + + val pipeline = sendReceive ~> unmarshal[Container] + + val response = pipeline { + Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") + + Post("http://www.") + + } onSuccess { + case a => { + println(a) + } + } +} + + + + + diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala new file mode 100644 index 00000000..f592f6d7 --- /dev/null +++ b/src/main/scala/spraytest/FutureTesting.scala @@ -0,0 +1,81 @@ +package spraytest +/* +import akka.actor.ActorSystem +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Try, Success} +import kamon.actor.TransactionContext + +object FutureTesting extends App { + + val actorSystem = ActorSystem("future-testing") + implicit val ec = actorSystem.dispatcher + implicit val tctx = TransactionContext(11, Nil) + + threadPrintln("In the initial Thread") + + + val f = TraceableFuture { + threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") + } + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") + }) + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") + }) + + + + + + + + + def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") + +} + + + + +trait TransactionContextWrapper { + def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { + TransactionContext.current.set(tranContext.fork) + println(s"SetContext to: ${tranContext}") + val result = f + + TransactionContext.current.remove() + result + } + +} + +class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { + def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { + future.onComplete(wrap(func, transactionContext)) + } +} + +object TraceableFuture { + + implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future + + def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { + val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil)) + + new TraceableFuture(Future { wrappedBody }) + } + + + + + def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { + TransactionContext.current.set(transactionContext) + val result = body + TransactionContext.current.remove() + result + } +}*/ + |