diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-08-26 13:00:51 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-08-26 13:00:51 -0300 |
commit | f8692b49b722c81a590ebc331aa55eb093985add (patch) | |
tree | 20200d40eab7ebde99002e09ff316ac4a1323526 | |
parent | cd1e2519140211253120d49c8457e34833c6a3e9 (diff) | |
parent | b8f61d74ff71eb4e210368a9a8eee33bd504adbb (diff) | |
download | Kamon-f8692b49b722c81a590ebc331aa55eb093985add.tar.gz Kamon-f8692b49b722c81a590ebc331aa55eb093985add.tar.bz2 Kamon-f8692b49b722c81a590ebc331aa55eb093985add.zip |
Merge pull request #76 from m50d/master
Add support for scalaz-concurrent Futures
3 files changed, 112 insertions, 0 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 07f5cfd1..99360bb4 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -20,6 +20,7 @@ <!-- Futures --> <aspect name="kamon.instrumentation.scala.FutureInstrumentation"/> + <aspect name="kamon.instrumentation.scalaz.FutureInstrumentation"/> <!-- Patterns --> <aspect name="akka.instrumentation.AskPatternInstrumentation"/> @@ -27,6 +28,7 @@ <weaver options="-XmessageHandlerClass:kamon.weaver.logging.KamonWeaverMessageHandler"> <include within="scala.concurrent..*"/> + <include within="scalaz.concurrent..*"/> <include within="akka..*"/> <include within="spray..*"/> <include within="kamon..*"/> diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala new file mode 100644 index 00000000..65caaa8f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala @@ -0,0 +1,47 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.scalaz + +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class FutureInstrumentation { + + @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+") + def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware = + TraceContextAware.default + + @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)") + def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {} + + @After("futureRelatedCallableCreation(callable)") + def afterCreation(callable: TraceContextAware): Unit = + // Force traceContext initialization. + callable.traceContext + + @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)") + def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {} + + @Around("futureRelatedCallableExecution(callable)") + def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any = + TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) { + pjp.proceed() + } + +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala new file mode 100644 index 00000000..29bf96f8 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala @@ -0,0 +1,63 @@ +/* =================================================== + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation.scalaz + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import kamon.trace.TraceRecorder +import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } +import org.scalatest.{ Matchers, OptionValues, WordSpecLike } +import scalaz.concurrent.Future +import java.util.concurrent.Executors + +class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers + with ScalaFutures with PatienceConfiguration with OptionValues { + + implicit val execContext = Executors.newCachedThreadPool() + + "a Future created with FutureTracing" should { + "capture the TraceContext available when created" which { + "must be available when executing the future's body" in { + + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future(TraceRecorder.currentContext).start + + (future, TraceRecorder.currentContext) + } + + val ctxInFuture = future.run + ctxInFuture should equal(testTraceContext) + } + + "must be available when executing callbacks on the future" in { + + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future("Hello Kamon!") + // The TraceContext is expected to be available during all intermediate processing. + .map(_.length) + .flatMap(len ⇒ Future(len.toString)) + .map(s ⇒ TraceRecorder.currentContext) + + (future.start, TraceRecorder.currentContext) + } + + val ctxInFuture = future.run + ctxInFuture should equal(testTraceContext) + } + } + } +} + |