aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-23 18:27:58 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-23 18:27:58 -0300
commita12e8579e09c5fd8fdf98ba4553f0a232ddfea6b (patch)
tree2388d67e5166687bf5b904c3ebcf7b54db679d1b
parentc56018c9a3bef9e99cc38f1804eafdfe5c8be45c (diff)
downloadKamon-a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b.tar.gz
Kamon-a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b.tar.bz2
Kamon-a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b.zip
included aspectj support for running tests, and simple test stub
-rw-r--r--project/AspectJ.scala14
-rw-r--r--project/Build.scala6
-rw-r--r--project/Dependencies.scala2
-rw-r--r--project/plugins.sbt2
-rw-r--r--src/main/resources/META-INF/aop.xml2
-rw-r--r--src/main/scala/kamon/actor/AskSupport.scala16
-rw-r--r--src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala63
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala26
-rw-r--r--src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala (renamed from src/main/scala/akka/instrumentation/PromiseInstrumentation.scala)6
-rw-r--r--src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala25
10 files changed, 71 insertions, 91 deletions
diff --git a/project/AspectJ.scala b/project/AspectJ.scala
new file mode 100644
index 00000000..7ba359eb
--- /dev/null
+++ b/project/AspectJ.scala
@@ -0,0 +1,14 @@
+import sbt._
+import sbt.Keys._
+import com.typesafe.sbt.SbtAspectj
+import com.typesafe.sbt.SbtAspectj.Aspectj
+import com.typesafe.sbt.SbtAspectj.AspectjKeys._
+
+
+object AspectJ {
+
+ lazy val aspectJSettings = SbtAspectj.aspectjSettings ++ Seq(
+ fork in Test := true,
+ javaOptions in Test <++= weaverOptions in Aspectj
+ )
+} \ No newline at end of file
diff --git a/project/Build.scala b/project/Build.scala
index 97f23aa6..601d5089 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -2,16 +2,18 @@ import sbt._
import Keys._
object Build extends Build {
- import Dependencies._
+ import AspectJ._
import Settings._
+ import Dependencies._
lazy val root = Project("kamon", file("."))
.settings(basicSettings: _*)
.settings(revolverSettings: _*)
+ .settings(aspectJSettings: _*)
.settings(
libraryDependencies ++=
compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, metricsScala, sprayJson, guava) ++
- test(scalatest, sprayTestkit))
+ test(specs2, sprayTestkit))
} \ No newline at end of file
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 357b7d7d..5b8543c6 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -17,7 +17,7 @@ object Dependencies {
val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2"
val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2"
val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2"
- val scalatest = "org.scalatest" %% "scalatest" % "1.9.1"
+ val specs2 = "org.specs2" %% "specs2" % "1.14"
val logback = "ch.qos.logback" % "logback-classic" % "1.0.10"
val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2"
val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0"
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 34921388..92902e2b 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -4,3 +4,5 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0")
addSbtPlugin("io.spray" % "sbt-revolver" % "0.6.2")
+addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.0")
+
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index 28f1c578..99eee806 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -10,7 +10,7 @@
<aspect name="akka.PoolMonitorAspect"/>-->
<aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
<aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
- <aspect name="akka.instrumentation.PromiseInstrumentation"/>
+ <aspect name="kamon.instrumentation.PromiseInstrumentation"/>
<include within="*"/>
<exclude within="javax.*"/>
diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala
deleted file mode 100644
index 0a8d27be..00000000
--- a/src/main/scala/kamon/actor/AskSupport.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package kamon.actor
-
-import akka.actor.ActorRef
-import akka.util.Timeout
-import kamon.TraceContext
-
-trait TraceableAskSupport {
- implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef)
-}
-
-// FIXME: This name sucks
-class TraceableAskableActorRef(val actorRef: ActorRef) {
-
- def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message))
-
-}
diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
deleted file mode 100644
index 62f90da8..00000000
--- a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-package kamon.executor
-
-import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites}
-import com.typesafe.config.Config
-import scala.concurrent.forkjoin.ForkJoinPool
-import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService}
-import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
-import java.util
-
-class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
- extends ForkJoinExecutorConfigurator(config, prerequisites) {
-
- println("Created the instrumented executor")
-
-
- class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int)
- extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) {
-
-
- override def createExecutorService: ExecutorService = {
- super.createExecutorService match {
- case fjp: AkkaForkJoinPool => new WrappedPool(fjp)
- case other => other
- }
- }
- }
-
-}
-
-case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long)
-
-class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService {
-
-
- def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount)
-
- def shutdown = fjp.shutdown()
-
- def shutdownNow(): util.List[Runnable] = fjp.shutdownNow()
-
- def isShutdown: Boolean = fjp.isShutdown
-
- def isTerminated: Boolean = fjp.isTerminated
-
- def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit)
-
- def submit[T](task: Callable[T]): Future[T] = fjp.submit(task)
-
- def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result)
-
- def submit(task: Runnable): Future[_] = fjp.submit(task)
-
- def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks)
-
- def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit)
-
- def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks)
-
- def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks)
-
- def execute(command: Runnable) = fjp.execute(command)
-}
-
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index 9a54366f..ebaff7eb 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -78,7 +78,7 @@ object TryAkka extends App{
-
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body")
/*
val newRelicReporter = new NewRelicReporter(registry)
@@ -89,17 +89,31 @@ object TryAkka extends App{
implicit val timeout = Timeout(10, TimeUnit.SECONDS)
implicit def execContext = system.dispatcher
//for(i <- 1 to 8) {
- val i = 1
+/* val i = 1
TraceContext.start
val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}")
val f = ping ? Pong()
- f.onComplete({
- case Success(p) => println(s"On my main success, with the context: ${TraceContext.current}")
- case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}")
- })
+ f.map({
+ a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}")
+ })
+ .flatMap({
+ (a: Any) => {
+ threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}")
+ Future { s"In the flatMap body, with the context: ${TraceContext.current}" }
+ }
+ })
+ .onComplete({
+ case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}")
+ case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}")
+ })*/
//}
+ TraceContext.start
+ threadPrintln("Before doing it")
+ val f = Future { threadPrintln("This is happening inside the future body") }
+
+
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala
index 918b4aac..45e9c414 100644
--- a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala
+++ b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala
@@ -1,4 +1,4 @@
-package akka.instrumentation
+package kamon.instrumentation
import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect}
import kamon.TraceContext
@@ -16,6 +16,7 @@ class PromiseInstrumentation {
@Before("promiseCreation()")
def catchTheTraceContext = {
+ println(s"During promise creation the context is: ${TraceContext.current}")
TraceContext.current match {
case Some(ctx) => traceContext = Some(ctx.fork)
case None => traceContext = None
@@ -27,6 +28,7 @@ class PromiseInstrumentation {
@Around("registeringOnCompleteCallback(func, executor)")
def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = {
+ import pjp._
val wrappedFunction = traceContext match {
case Some(ctx) => (tryResult: Try[Any]) => {
@@ -39,6 +41,6 @@ class PromiseInstrumentation {
case None => func
}
- pjp.proceed(pjp.getArgs.updated(0, wrappedFunction))
+ proceed(getArgs.updated(0, wrappedFunction))
}
}
diff --git a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala
new file mode 100644
index 00000000..2eb8d07a
--- /dev/null
+++ b/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala
@@ -0,0 +1,25 @@
+package kamon.instrumentation
+
+import scala.concurrent.{Await, Future}
+import org.specs2.mutable.Specification
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.{FiniteDuration, DurationLong}
+import org.specs2.time.{ Duration => SpecsDuration }
+
+
+class FutureInstrumentationSpec extends Specification {
+ import Await.result
+ implicit def specsDuration2Akka(duration: SpecsDuration): FiniteDuration = new DurationLong(duration.inMillis).millis
+
+ "a instrumented Future" should {
+ "preserve the transaction context available during the future creation" in {
+
+ }
+
+ "use the same context available at creation when executing the onComplete callback" in {
+ val future = Future { "hello" }
+
+ result(future, 100 millis) === "hello"
+ }
+ }
+}