From b8f61d74ff71eb4e210368a9a8eee33bd504adbb Mon Sep 17 00:00:00 2001 From: Michael Donaghy Date: Wed, 20 Aug 2014 12:36:44 +0100 Subject: + core: add support for trace context propagation on Scalaz futures --- kamon-core/src/main/resources/META-INF/aop.xml | 2 + .../scalaz/FutureInstrumentation.scala | 47 ++++++++++++++++ .../scalaz/FutureInstrumentationSpec.scala | 63 ++++++++++++++++++++++ 3 files changed, 112 insertions(+) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 07f5cfd1..99360bb4 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -20,6 +20,7 @@ + @@ -27,6 +28,7 @@ + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala new file mode 100644 index 00000000..65caaa8f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala @@ -0,0 +1,47 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.scalaz + +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class FutureInstrumentation { + + @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+") + def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware = + TraceContextAware.default + + @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)") + def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {} + + @After("futureRelatedCallableCreation(callable)") + def afterCreation(callable: TraceContextAware): Unit = + // Force traceContext initialization. + callable.traceContext + + @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)") + def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {} + + @Around("futureRelatedCallableExecution(callable)") + def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any = + TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) { + pjp.proceed() + } + +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala new file mode 100644 index 00000000..29bf96f8 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala @@ -0,0 +1,63 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation.scalaz + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import kamon.trace.TraceRecorder +import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } +import org.scalatest.{ Matchers, OptionValues, WordSpecLike } +import scalaz.concurrent.Future +import java.util.concurrent.Executors + +class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers + with ScalaFutures with PatienceConfiguration with OptionValues { + + implicit val execContext = Executors.newCachedThreadPool() + + "a Future created with FutureTracing" should { + "capture the TraceContext available when created" which { + "must be available when executing the future's body" in { + + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future(TraceRecorder.currentContext).start + + (future, TraceRecorder.currentContext) + } + + val ctxInFuture = future.run + ctxInFuture should equal(testTraceContext) + } + + "must be available when executing callbacks on the future" in { + + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future("Hello Kamon!") + // The TraceContext is expected to be available during all intermediate processing. + .map(_.length) + .flatMap(len ⇒ Future(len.toString)) + .map(s ⇒ TraceRecorder.currentContext) + + (future.start, TraceRecorder.currentContext) + } + + val ctxInFuture = future.run + ctxInFuture should equal(testTraceContext) + } + } + } +} + -- cgit v1.2.3