From a12e8579e09c5fd8fdf98ba4553f0a232ddfea6b Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 23 May 2013 18:27:58 -0300 Subject: included aspectj support for running tests, and simple test stub --- src/main/resources/META-INF/aop.xml | 2 +- .../instrumentation/PromiseInstrumentation.scala | 44 --------------- src/main/scala/kamon/actor/AskSupport.scala | 16 ------ .../InstrumentedExecutorServiceConfigurator.scala | 63 ---------------------- src/main/scala/kamon/executor/eventbus.scala | 26 ++++++--- .../instrumentation/PromiseInstrumentation.scala | 46 ++++++++++++++++ 6 files changed, 67 insertions(+), 130 deletions(-) delete mode 100644 src/main/scala/akka/instrumentation/PromiseInstrumentation.scala delete mode 100644 src/main/scala/kamon/actor/AskSupport.scala delete mode 100644 src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala create mode 100644 src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala (limited to 'src/main') diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 28f1c578..99eee806 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -10,7 +10,7 @@ --> - + diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala deleted file mode 100644 index 918b4aac..00000000 --- a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala +++ /dev/null @@ -1,44 +0,0 @@ -package akka.instrumentation - -import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} -import kamon.TraceContext -import scala.util.Try -import scala.concurrent.ExecutionContext -import org.aspectj.lang.ProceedingJoinPoint - -@Aspect("perthis(promiseCreation())") -class PromiseInstrumentation { - - private var traceContext: Option[TraceContext] = None - - @Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))") - def promiseCreation(): Unit = {} - - @Before("promiseCreation()") - def catchTheTraceContext = { - TraceContext.current match { - case Some(ctx) => traceContext = Some(ctx.fork) - case None => traceContext = None - } - } - - @Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)") - def registeringOnCompleteCallback(func: Try[Any] => Any, executor: ExecutionContext) = {} - - @Around("registeringOnCompleteCallback(func, executor)") - def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = { - - val wrappedFunction = traceContext match { - case Some(ctx) => (tryResult: Try[Any]) => { - TraceContext.set(ctx) - val result = func(tryResult) - TraceContext.clear - - result - } - case None => func - } - - pjp.proceed(pjp.getArgs.updated(0, wrappedFunction)) - } -} diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala deleted file mode 100644 index 0a8d27be..00000000 --- a/src/main/scala/kamon/actor/AskSupport.scala +++ /dev/null @@ -1,16 +0,0 @@ -package kamon.actor - -import akka.actor.ActorRef -import akka.util.Timeout -import kamon.TraceContext - -trait TraceableAskSupport { - implicit def pimpWithTraceableAsk(actorRef: ActorRef) = new TraceableAskableActorRef(actorRef) -} - -// FIXME: This name sucks -class TraceableAskableActorRef(val actorRef: ActorRef) { - - def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message)) - -} diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala deleted file mode 100644 index 62f90da8..00000000 --- a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala +++ /dev/null @@ -1,63 +0,0 @@ -package kamon.executor - -import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites} -import com.typesafe.config.Config -import scala.concurrent.forkjoin.ForkJoinPool -import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService} -import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool -import java.util - -class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) - extends ForkJoinExecutorConfigurator(config, prerequisites) { - - println("Created the instrumented executor") - - - class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) - extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) { - - - override def createExecutorService: ExecutorService = { - super.createExecutorService match { - case fjp: AkkaForkJoinPool => new WrappedPool(fjp) - case other => other - } - } - } - -} - -case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long) - -class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService { - - - def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount) - - def shutdown = fjp.shutdown() - - def shutdownNow(): util.List[Runnable] = fjp.shutdownNow() - - def isShutdown: Boolean = fjp.isShutdown - - def isTerminated: Boolean = fjp.isTerminated - - def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit) - - def submit[T](task: Callable[T]): Future[T] = fjp.submit(task) - - def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result) - - def submit(task: Runnable): Future[_] = fjp.submit(task) - - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks) - - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit) - - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks) - - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks) - - def execute(command: Runnable) = fjp.execute(command) -} - diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 9a54366f..ebaff7eb 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -78,7 +78,7 @@ object TryAkka extends App{ - + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -89,17 +89,31 @@ object TryAkka extends App{ implicit val timeout = Timeout(10, TimeUnit.SECONDS) implicit def execContext = system.dispatcher //for(i <- 1 to 8) { - val i = 1 +/* val i = 1 TraceContext.start val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") val f = ping ? Pong() - f.onComplete({ - case Success(p) => println(s"On my main success, with the context: ${TraceContext.current}") - case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}") - }) + f.map({ + a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}") + }) + .flatMap({ + (a: Any) => { + threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}") + Future { s"In the flatMap body, with the context: ${TraceContext.current}" } + } + }) + .onComplete({ + case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}") + case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}") + })*/ //} + TraceContext.start + threadPrintln("Before doing it") + val f = Future { threadPrintln("This is happening inside the future body") } + + /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala new file mode 100644 index 00000000..45e9c414 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala @@ -0,0 +1,46 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} +import kamon.TraceContext +import scala.util.Try +import scala.concurrent.ExecutionContext +import org.aspectj.lang.ProceedingJoinPoint + +@Aspect("perthis(promiseCreation())") +class PromiseInstrumentation { + + private var traceContext: Option[TraceContext] = None + + @Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))") + def promiseCreation(): Unit = {} + + @Before("promiseCreation()") + def catchTheTraceContext = { + println(s"During promise creation the context is: ${TraceContext.current}") + TraceContext.current match { + case Some(ctx) => traceContext = Some(ctx.fork) + case None => traceContext = None + } + } + + @Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)") + def registeringOnCompleteCallback(func: Try[Any] => Any, executor: ExecutionContext) = {} + + @Around("registeringOnCompleteCallback(func, executor)") + def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = { + import pjp._ + + val wrappedFunction = traceContext match { + case Some(ctx) => (tryResult: Try[Any]) => { + TraceContext.set(ctx) + val result = func(tryResult) + TraceContext.clear + + result + } + case None => func + } + + proceed(getArgs.updated(0, wrappedFunction)) + } +} -- cgit v1.2.3