From 32746f4fca3ebf2edf4a15ada327ae2733676f0b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 24 May 2013 02:19:19 -0300 Subject: switched back to scalatest and created simple instrumentation for futures body --- src/main/resources/META-INF/aop.xml | 1 + .../PromiseCompletingRunnableInstrumentation.scala | 41 ++++++++++++++++++++++ .../instrumentation/PromiseInstrumentation.scala | 1 - .../FutureInstrumentationSpec.scala | 29 ++++++++++----- .../scala/kamon/instrumentation/ScalaFutures.scala | 32 +++++++++++++++++ 5 files changed, 94 insertions(+), 10 deletions(-) create mode 100644 src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala create mode 100644 src/test/scala/kamon/instrumentation/ScalaFutures.scala (limited to 'src') diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 99eee806..ae3914cd 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -11,6 +11,7 @@ + diff --git a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala new file mode 100644 index 00000000..dde0d857 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala @@ -0,0 +1,41 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} +import kamon.TraceContext +import org.aspectj.lang.ProceedingJoinPoint + +@Aspect("perthis(promiseCompletingRunnableCreation())") +class PromiseCompletingRunnableInstrumentation { + + private var traceContext: Option[TraceContext] = None + + @Pointcut("execution(scala.concurrent.impl.Future.PromiseCompletingRunnable.new(..))") + def promiseCompletingRunnableCreation(): Unit = {} + + @Before("promiseCompletingRunnableCreation()") + def catchTheTraceContext = { + TraceContext.current match { + case Some(ctx) => traceContext = Some(ctx.fork) + case None => traceContext = None + } + } + + + @Pointcut("execution(* scala.concurrent.impl.Future.PromiseCompletingRunnable.run())") + def runnableExecution() = {} + + @Around("runnableExecution()") + def around(pjp: ProceedingJoinPoint) = { + import pjp._ + + traceContext match { + case Some(ctx) => { + TraceContext.set(ctx) + proceed() + TraceContext.clear + } + case None => proceed() + } + } + +} diff --git a/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala index 45e9c414..b87abe44 100644 --- a/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/PromiseInstrumentation.scala @@ -16,7 +16,6 @@ class PromiseInstrumentation { @Before("promiseCreation()") def catchTheTraceContext = { - println(s"During promise creation the context is: ${TraceContext.current}") TraceContext.current match { case Some(ctx) => traceContext = Some(ctx.fork) case None => traceContext = None diff --git a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala index 2eb8d07a..97e81b44 100644 --- a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala +++ b/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala @@ -1,25 +1,36 @@ package kamon.instrumentation -import scala.concurrent.{Await, Future} -import org.specs2.mutable.Specification +import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.{FiniteDuration, DurationLong} -import org.specs2.time.{ Duration => SpecsDuration } +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.concurrent.PatienceConfiguration +import kamon.TraceContext +import java.util.UUID -class FutureInstrumentationSpec extends Specification { - import Await.result - implicit def specsDuration2Akka(duration: SpecsDuration): FiniteDuration = new DurationLong(duration.inMillis).millis +class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration { "a instrumented Future" should { "preserve the transaction context available during the future creation" in { + new ContextAwareTest { + val future = Future { TraceContext.current.get } + whenReady(future) { result => + result must be === context + } + } } "use the same context available at creation when executing the onComplete callback" in { - val future = Future { "hello" } - result(future, 100 millis) === "hello" } } + + trait ContextAwareTest { + val context = TraceContext(UUID.randomUUID(), Nil) + TraceContext.set(context) + } } + + diff --git a/src/test/scala/kamon/instrumentation/ScalaFutures.scala b/src/test/scala/kamon/instrumentation/ScalaFutures.scala new file mode 100644 index 00000000..169b709c --- /dev/null +++ b/src/test/scala/kamon/instrumentation/ScalaFutures.scala @@ -0,0 +1,32 @@ +package kamon.instrumentation + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} +import org.scalatest.concurrent.Futures +import java.util.concurrent.TimeUnit + +trait ScalaFutures extends Futures { + implicit def scalaFutureToFutureConcept[T](future: Future[T]): FutureConcept[T] = new FutureConcept[T] { + def eitherValue: Option[Either[Throwable, T]] = { + if(!future.isCompleted) + None + else + future.value match { + case None => None + case Some(t) => t match { + case Success(v) => Some(Right(v)) + case Failure(e) => Some(Left(e)) + } + } + } + + def isExpired: Boolean = false // Scala futures cant expire + + def isCanceled: Boolean = false // Scala futures cannot be cancelled + + override def futureValue(implicit config: PatienceConfig): T = { + Await.result(future, Duration(config.timeout.totalNanos, TimeUnit.NANOSECONDS)) + } + } +} -- cgit v1.2.3