diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-23 18:27:58 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-23 18:27:58 -0300 |
commit | a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b (patch) | |
tree | 2388d67e5166687bf5b904c3ebcf7b54db679d1b | |
parent | c56018c9a3bef9e99cc38f1804eafdfe5c8be45c (diff) | |
download | Kamon-a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b.tar.gz Kamon-a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b.tar.bz2 Kamon-a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b.zip |
included aspectj support for running tests, and simple test stub
-rw-r--r-- | project/AspectJ.scala | 14 | ||||
-rw-r--r-- | project/Build.scala | 6 | ||||
-rw-r--r-- | project/Dependencies.scala | 2 | ||||
-rw-r--r-- | project/plugins.sbt | 2 | ||||
-rw-r--r-- | src/main/resources/META-INF/aop.xml | 2 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/AskSupport.scala | 16 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala | 63 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 26 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala (renamed from src/main/scala/akka/instrumentation/PromiseInstrumentation.scala) | 6 | ||||
-rw-r--r-- | src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala | 25 |
10 files changed, 71 insertions, 91 deletions
diff --git a/project/AspectJ.scala b/project/AspectJ.scala new file mode 100644 index 00000000..7ba359eb --- /dev/null +++ b/project/AspectJ.scala @@ -0,0 +1,14 @@ +import sbt._ +import sbt.Keys._ +import com.typesafe.sbt.SbtAspectj +import com.typesafe.sbt.SbtAspectj.Aspectj +import com.typesafe.sbt.SbtAspectj.AspectjKeys._ + + +object AspectJ { + + lazy val aspectJSettings = SbtAspectj.aspectjSettings ++ Seq( + fork in Test := true, + javaOptions in Test <++= weaverOptions in Aspectj + ) +}
\ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index 97f23aa6..601d5089 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -2,16 +2,18 @@ import sbt._ import Keys._ object Build extends Build { - import Dependencies._ + import AspectJ._ import Settings._ + import Dependencies._ lazy val root = Project("kamon", file(".")) .settings(basicSettings: _*) .settings(revolverSettings: _*) + .settings(aspectJSettings: _*) .settings( libraryDependencies ++= compile(akkaActor, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, metricsScala, sprayJson, guava) ++ - test(scalatest, sprayTestkit)) + test(specs2, sprayTestkit)) }
\ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 357b7d7d..5b8543c6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2" val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2" val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2" - val scalatest = "org.scalatest" %% "scalatest" % "1.9.1" + val specs2 = "org.specs2" %% "specs2" % "1.14" val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0" diff --git a/project/plugins.sbt b/project/plugins.sbt index 34921388..92902e2b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,3 +4,5 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0") addSbtPlugin("io.spray" % "sbt-revolver" % "0.6.2") +addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.0") + diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 28f1c578..99eee806 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -10,7 +10,7 @@ <aspect name="akka.PoolMonitorAspect"/>--> <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> - <aspect name="akka.instrumentation.PromiseInstrumentation"/> + <aspect name="kamon.instrumentation.PromiseInstrumentation"/> <include within="*"/> <exclude within="javax.*"/> diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala deleted file mode 100644 index 0a8d27be..00000000 --- a/src/main/scala/kamon/actor/AskSupport.scala +++ /dev/null @@ -1,16 +0,0 @@ -package kamon.actor - -import akka.actor.ActorRef -import akka.util.Timeout -import kamon.TraceContext - -trait TraceableAskSupport { - implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef) -} - -// FIXME: This name sucks -class TraceableAskableActorRef(val actorRef: ActorRef) { - - def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message)) - -} diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala deleted file mode 100644 index 62f90da8..00000000 --- a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala +++ /dev/null @@ -1,63 +0,0 @@ -package kamon.executor - -import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites} -import com.typesafe.config.Config -import scala.concurrent.forkjoin.ForkJoinPool -import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService} -import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool -import java.util - -class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) - extends ForkJoinExecutorConfigurator(config, prerequisites) { - - println("Created the instrumented executor") - - - class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) - extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) { - - - override def createExecutorService: ExecutorService = { - super.createExecutorService match { - case fjp: AkkaForkJoinPool => new WrappedPool(fjp) - case other => other - } - } - } - -} - -case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long) - -class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService { - - - def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount) - - def shutdown = fjp.shutdown() - - def shutdownNow(): util.List[Runnable] = fjp.shutdownNow() - - def isShutdown: Boolean = fjp.isShutdown - - def isTerminated: Boolean = fjp.isTerminated - - def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit) - - def submit[T](task: Callable[T]): Future[T] = fjp.submit(task) - - def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result) - - def submit(task: Runnable): Future[_] = fjp.submit(task) - - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks) - - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit) - - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks) - - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks) - - def execute(command: Runnable) = fjp.execute(command) -} - diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 9a54366f..ebaff7eb 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -78,7 +78,7 @@ object TryAkka extends App{ - + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -89,17 +89,31 @@ object TryAkka extends App{ implicit val timeout = Timeout(10, TimeUnit.SECONDS) implicit def execContext = system.dispatcher //for(i <- 1 to 8) { - val i = 1 +/* val i = 1 TraceContext.start val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") val f = ping ? Pong() - f.onComplete({ - case Success(p) => println(s"On my main success, with the context: ${TraceContext.current}") - case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}") - }) + f.map({ + a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}") + }) + .flatMap({ + (a: Any) => { + threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}") + Future { s"In the flatMap body, with the context: ${TraceContext.current}" } + } + }) + .onComplete({ + case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}") + case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}") + })*/ //} + TraceContext.start + threadPrintln("Before doing it") + val f = Future { threadPrintln("This is happening inside the future body") } + + /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala index 918b4aac..45e9c414 100644 --- a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala @@ -1,4 +1,4 @@ -package akka.instrumentation +package kamon.instrumentation import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} import kamon.TraceContext @@ -16,6 +16,7 @@ class PromiseInstrumentation { @Before("promiseCreation()") def catchTheTraceContext = { + println(s"During promise creation the context is: ${TraceContext.current}") TraceContext.current match { case Some(ctx) => traceContext = Some(ctx.fork) case None => traceContext = None @@ -27,6 +28,7 @@ class PromiseInstrumentation { @Around("registeringOnCompleteCallback(func, executor)") def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = { + import pjp._ val wrappedFunction = traceContext match { case Some(ctx) => (tryResult: Try[Any]) => { @@ -39,6 +41,6 @@ class PromiseInstrumentation { case None => func } - pjp.proceed(pjp.getArgs.updated(0, wrappedFunction)) + proceed(getArgs.updated(0, wrappedFunction)) } } diff --git a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala new file mode 100644 index 00000000..2eb8d07a --- /dev/null +++ b/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala @@ -0,0 +1,25 @@ +package kamon.instrumentation + +import scala.concurrent.{Await, Future} +import org.specs2.mutable.Specification +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{FiniteDuration, DurationLong} +import org.specs2.time.{ Duration => SpecsDuration } + + +class FutureInstrumentationSpec extends Specification { + import Await.result + implicit def specsDuration2Akka(duration: SpecsDuration): FiniteDuration = new DurationLong(duration.inMillis).millis + + "a instrumented Future" should { + "preserve the transaction context available during the future creation" in { + + } + + "use the same context available at creation when executing the onComplete callback" in { + val future = Future { "hello" } + + result(future, 100 millis) === "hello" + } + } +} |