aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/Build.scala4
-rw-r--r--project/Dependencies.scala33
-rw-r--r--project/Settings.scala9
-rw-r--r--project/plugins.sbt2
-rw-r--r--src/main/resources/META-INF/aop.xml12
-rw-r--r--src/main/scala/akka/ActorAspect.scala26
-rw-r--r--src/main/scala/akka/Tracer.scala2
-rw-r--r--src/main/scala/akka/instrumentation/ActorInstrumentation.scala23
-rw-r--r--src/main/scala/kamon/TraceContext.scala42
-rw-r--r--src/main/scala/kamon/actor/AskSupport.scala16
-rw-r--r--src/main/scala/kamon/actor/TraceableActor.scala44
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala63
-rw-r--r--src/main/scala/spraytest/ClientTest.scala56
-rw-r--r--src/main/scala/spraytest/FutureTesting.scala81
14 files changed, 347 insertions, 66 deletions
diff --git a/project/Build.scala b/project/Build.scala
index 49366bd5..13afe956 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -5,12 +5,12 @@ object Build extends Build {
import Dependencies._
import Settings._
-
lazy val root = Project("kamon", file("."))
.settings(basicSettings: _*)
+ .settings(revolverSettings: _*)
.settings(
libraryDependencies ++=
- compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, metricsScala) ++
+ compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, metricsScala, sprayJson) ++
test(scalatest, sprayTestkit))
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index d64626f2..5474228f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -7,22 +7,23 @@ object Dependencies {
"spray nightlies repo" at "http://nightlies.spray.io"
)
- val sprayCan = "io.spray" % "spray-can" % "1.1-M7"
- val sprayRouting = "io.spray" % "spray-routing" % "1.1-M7"
- val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-M7"
- val sprayClient = "io.spray" % "spray-client" % "1.1-M7"
- val sprayServlet = "io.spray" % "spray-servlet" % "1.1-M7"
- val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1"
- val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2"
- val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2"
- val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2"
- val scalatest = "org.scalatest" %% "scalatest" % "1.9.1"
- val logback = "ch.qos.logback" % "logback-classic" % "1.0.10"
- val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2"
- val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0"
- val metricsScala = "com.yammer.metrics" % "metrics-scala_2.9.1" % "2.2.0"
- val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2"
-
+ val sprayCan = "io.spray" % "spray-can" % "1.1-20130509"
+ val sprayRouting = "io.spray" % "spray-routing" % "1.1-20130509"
+ val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-20130509"
+ val sprayClient = "io.spray" % "spray-client" % "1.1-20130509"
+ val sprayServlet = "io.spray" % "spray-servlet" % "1.1-20130509"
+ val sprayJson = "io.spray" %% "spray-json" % "1.2.3"
+ val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1"
+ val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2"
+ val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2"
+ val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2"
+ val scalatest = "org.scalatest" %% "scalatest" % "1.9.1"
+ val logback = "ch.qos.logback" % "logback-classic" % "1.0.10"
+ val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2"
+ val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0"
+ val metricsScala = "com.yammer.metrics" % "metrics-scala_2.9.1" % "2.2.0"
+ val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2"
+ val playJson = "play" % "play-json" % "2.2-SNAPSHOT"
def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile")
diff --git a/project/Settings.scala b/project/Settings.scala
index e878e881..7eddda5f 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -1,5 +1,6 @@
import sbt._
import Keys._
+import spray.revolver.RevolverPlugin.Revolver
object Settings {
val VERSION = "0.1-SNAPSHOT"
@@ -21,5 +22,13 @@ object Settings {
"-Xlog-reflective-calls"
)
)
+
+
+ import spray.revolver.RevolverPlugin.Revolver._
+ lazy val revolverSettings = Revolver.settings ++ seq(
+ reJRebelJar := "~/.jrebel/jrebel.jar"
+ )
+
+
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 91cadf24..34921388 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -2,3 +2,5 @@ resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositori
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0")
+addSbtPlugin("io.spray" % "sbt-revolver" % "0.6.2")
+
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index b5e78683..20df0b49 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -1,18 +1,18 @@
-<!DOCTYPE aspectj PUBLIC
- "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
<aspectj>
<weaver options="-verbose -showWeaveInfo"/>
<aspects>
- <aspect name="akka.ActorSystemAspect"/>
- <!--<aspect name="akka.MailboxAspect"/>-->
- <aspect name="akka.PoolMonitorAspect"/>
- <!--<aspect name="akka.ActorAspect"/>-->
+ <!--<aspect name="akka.ActorSystemAspect"/>
+ &lt;!&ndash;<aspect name="akka.MailboxAspect"/>&ndash;&gt;
+ <aspect name="akka.PoolMonitorAspect"/>-->
+ <aspect name="akka.instrumentation.ActorInstrumentation"/>
<include within="*"/>
<exclude within="javax.*"/>
+
<exclude within="org.aspectj.*"/>
<exclude within="scala.*"/>
<exclude within="scalaz.*"/>
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala
index 744b0aea..9d64f205 100644
--- a/src/main/scala/akka/ActorAspect.scala
+++ b/src/main/scala/akka/ActorAspect.scala
@@ -3,24 +3,24 @@ package akka
import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
import kamon.metric.Metrics
-import akka.actor.ActorCell
@Aspect
class ActorAspect extends Metrics {
- println("Created ActorAspect")
+ println("Created ActorAspect")
- @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
- protected def actorReceive:Unit = {}
+ @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
+ protected def actorReceive:Unit = {}
- @Around("actorReceive() && this(actor)")
- def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
+ @Around("actorReceive() && this(actor)")
+ def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
- //println("The path is: "+actor.self.path.)
- val actorName:String = actor.self.path.toString
+ //println("The path is: "+actor.self.path.)
+ val actorName:String = actor.self.path.toString
- markAndCountMeter(actorName){
- pjp.proceed
- }
- }
-} \ No newline at end of file
+ markAndCountMeter(actorName){
+ pjp.proceed
+ }
+
+ }
+ } \ No newline at end of file
diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala
index bb290960..c58983e0 100644
--- a/src/main/scala/akka/Tracer.scala
+++ b/src/main/scala/akka/Tracer.scala
@@ -3,6 +3,7 @@ package akka
import actor.{Props, ActorSystemImpl}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._
+import com.newrelic.api.agent.NewRelic
import akka.dispatch.Mailbox
import scala._
import com.newrelic.api.agent.NewRelic
@@ -28,6 +29,7 @@ object Tracer {
val mbm = MailboxMetrics(mailboxes)
mbm.mailboxes.map { case(actorName,mb) => {
println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}")
+
NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages)
NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput)
diff --git a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala
new file mode 100644
index 00000000..ea599891
--- /dev/null
+++ b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala
@@ -0,0 +1,23 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect}
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.metric.Metrics
+import akka.actor.ActorCell
+
+@Aspect
+class ActorInstrumentation {
+ println("Created ActorAspect")
+
+ @Pointcut("execution(* kamon.executor.PingActor.receive(..))")
+ protected def actorReceive:Unit = {}
+
+ @Before("actorReceive() && args(message)")
+ def around(message: Any) = {
+ println("Around the actor cell receive")
+ //pjp.proceed(Array(Wrapper(message)))
+ //pjp.proceed
+ }
+}
+
+case class Wrapper(content: Any) \ No newline at end of file
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
new file mode 100644
index 00000000..b137168c
--- /dev/null
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -0,0 +1,42 @@
+package kamon
+
+import java.util.UUID
+import akka.actor.ActorPath
+
+
+case class TraceContext(id: UUID, entries: List[TraceEntry]) {
+ def fork = this.copy(entries = Nil)
+ def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries)
+}
+
+object TraceContext {
+ val current = new ThreadLocal[TraceContext]
+}
+
+trait TraceEntry
+case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long)
+
+case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry
+
+
+
+
+trait TraceSupport {
+ import TraceContext.current
+
+
+ def trace[T](blockName: String)(f: => T): T = {
+ val before = System.currentTimeMillis
+
+ val result = f
+
+ val after = System.currentTimeMillis
+ swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after)))
+
+ result
+ }
+
+ def swapContext(newContext: TraceContext) {
+ current.set(newContext)
+ }
+}
diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala
new file mode 100644
index 00000000..8a1ac2e8
--- /dev/null
+++ b/src/main/scala/kamon/actor/AskSupport.scala
@@ -0,0 +1,16 @@
+package kamon.actor
+
+import akka.actor.ActorRef
+import akka.util.Timeout
+import kamon.TraceContext
+
+trait TraceableAskSupport {
+ implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef)
+}
+
+// FIXME: This name sucks
+class TraceableAskableActorRef(val actorRef: ActorRef) {
+
+ def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get().fork, message))
+
+}
diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala
new file mode 100644
index 00000000..a38b10c9
--- /dev/null
+++ b/src/main/scala/kamon/actor/TraceableActor.scala
@@ -0,0 +1,44 @@
+package kamon.actor
+
+import akka.actor.{ActorRef, Actor}
+import kamon.TraceContext
+
+trait TraceableActor extends Actor with TracingImplicitConversions {
+
+ final def receive = {
+ case a: Any => {
+ a match {
+ case TraceableMessage(ctx, message) => {
+ TraceContext.current.set(ctx)
+
+ tracedReceive(message)
+
+ TraceContext.current.remove()
+
+ /** Publish the partial context information to the EventStream */
+ context.system.eventStream.publish(ctx)
+ }
+ case message: Any => tracedReceive(message)
+ }
+ }
+ }
+
+ def tracedReceive: Receive
+
+}
+
+class TraceableActorRef(val target: ActorRef) {
+ def !! (message: Any)(implicit sender: ActorRef) = {
+ val traceableMessage = TraceableMessage(TraceContext.current.get().fork, message)
+ target.tell(traceableMessage, sender)
+ }
+}
+
+
+
+trait TracingImplicitConversions {
+ implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef)
+}
+
+case class TraceableMessage(traceContext: TraceContext, message: Any)
+
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index 09f28b69..faa076d9 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -8,8 +8,13 @@ import kamon.metric.NewRelicReporter
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import com.yammer.metrics.reporting.ConsoleReporter
-import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor}
+import kamon.actor._
import scala.concurrent.Future
+import kamon.{TraceSupport, TraceContext}
+import akka.util.Timeout
+
+//import kamon.executor.MessageEvent
+import java.util.UUID
trait Message
@@ -33,6 +38,33 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{
subscriber ! event
}
}
+case class Ping()
+case class Pong()
+
+class PingActor(val target: ActorRef) extends Actor {
+ implicit def executionContext = context.dispatcher
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+
+ def receive = {
+ case Pong() => {
+ println("pong")
+ Thread.sleep(1000)
+ target ! Ping()
+ }
+ case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
+ }
+}
+
+class PongActor extends Actor {
+ def receive = {
+ case Ping() => {
+ println("ping")
+ sender ! Pong()
+ }
+ case a: Any => println(s"Got ${a} in PONG")
+ }
+}
+
object TryAkka extends App{
val system = ActorSystem("MySystem")
@@ -47,34 +79,7 @@ object TryAkka extends App{
- case class Ping()
- case class Pong()
- class PingActor(val target: ActorRef) extends EnhancedActor {
- import akka.pattern.pipe
- implicit def executionContext = context.dispatcher
-
- def wrappedReceive = {
- case Pong() => {
- transactionContext = transactionContext.append(DeveloperComment("In PONG"))
-
-
- Future {
- Thread.sleep(1000) // Doing something really expensive
- ContextAwareMessage(transactionContext, Ping())
- } pipeTo target
-
- }
- }
- }
-
- class PongActor extends EnhancedActor {
- def wrappedReceive = {
- case Ping() => {
- superTell(sender, Pong())
- }
- }
- }
/*
@@ -85,7 +90,7 @@ object TryAkka extends App{
/*for(i <- 1 to 8) {*/
val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong")
- ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong())
+ ping ! Pong()
//}
diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala
new file mode 100644
index 00000000..c3a6ba39
--- /dev/null
+++ b/src/main/scala/spraytest/ClientTest.scala
@@ -0,0 +1,56 @@
+package spraytest
+
+import akka.actor.ActorSystem
+import spray.client.pipelining._
+import spray.httpx.SprayJsonSupport
+import spray.json._
+import scala.concurrent.Future
+
+/**
+ * BEGIN JSON Infrastructure
+ */
+case class Container(data: List[PointOfInterest])
+case class Geolocation(latitude: Float, longitude: Float)
+case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation)
+
+object GeoJsonProtocol extends DefaultJsonProtocol {
+ implicit val geolocationFormat = jsonFormat2(Geolocation)
+ implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest)
+ implicit val containerFormat = jsonFormat1(Container)
+}
+/** END-OF JSON Infrastructure */
+
+
+
+
+
+
+class ClientTest extends App {
+ implicit val actorSystem = ActorSystem("spray-client-test")
+ import actorSystem.dispatcher
+
+
+ import GeoJsonProtocol._
+ import SprayJsonSupport._
+
+
+
+
+ val pipeline = sendReceive ~> unmarshal[Container]
+
+ val response = pipeline {
+ Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco")
+
+ Post("http://www.")
+
+ } onSuccess {
+ case a => {
+ println(a)
+ }
+ }
+}
+
+
+
+
+
diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala
new file mode 100644
index 00000000..f592f6d7
--- /dev/null
+++ b/src/main/scala/spraytest/FutureTesting.scala
@@ -0,0 +1,81 @@
+package spraytest
+/*
+import akka.actor.ActorSystem
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Try, Success}
+import kamon.actor.TransactionContext
+
+object FutureTesting extends App {
+
+ val actorSystem = ActorSystem("future-testing")
+ implicit val ec = actorSystem.dispatcher
+ implicit val tctx = TransactionContext(11, Nil)
+
+ threadPrintln("In the initial Thread")
+
+
+ val f = TraceableFuture {
+ threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}")
+ }
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+
+
+
+
+
+
+
+ def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]")
+
+}
+
+
+
+
+trait TransactionContextWrapper {
+ def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = {
+ TransactionContext.current.set(tranContext.fork)
+ println(s"SetContext to: ${tranContext}")
+ val result = f
+
+ TransactionContext.current.remove()
+ result
+ }
+
+}
+
+class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper {
+ def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = {
+ future.onComplete(wrap(func, transactionContext))
+ }
+}
+
+object TraceableFuture {
+
+ implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
+
+ def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil))
+
+ new TraceableFuture(Future { wrappedBody })
+ }
+
+
+
+
+ def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = {
+ TransactionContext.current.set(transactionContext)
+ val result = body
+ TransactionContext.current.remove()
+ result
+ }
+}*/
+