aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorMichael Donaghy <michael.donaghy@visualdna.com>2014-08-20 12:36:44 +0100
committerMichael Donaghy <michael.donaghy@visualdna.com>2014-08-21 09:32:42 +0100
commit318939af8d86af56878c4cab60eb46c21c2dc66c (patch)
tree2b46bd06eebb411626ec3ad8cb5321af09c92a88 /kamon-core
parentbd038442903e9070626ab21b64a4016d2ad5ae6a (diff)
downloadKamon-318939af8d86af56878c4cab60eb46c21c2dc66c.tar.gz
Kamon-318939af8d86af56878c4cab60eb46c21c2dc66c.tar.bz2
Kamon-318939af8d86af56878c4cab60eb46c21c2dc66c.zip
+ core: add support for trace context propagation on Scalaz futures
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml2
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala47
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala63
3 files changed, 112 insertions, 0 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 07f5cfd1..99360bb4 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -20,6 +20,7 @@
<!-- Futures -->
<aspect name="kamon.instrumentation.scala.FutureInstrumentation"/>
+ <aspect name="kamon.instrumentation.scalaz.FutureInstrumentation"/>
<!-- Patterns -->
<aspect name="akka.instrumentation.AskPatternInstrumentation"/>
@@ -27,6 +28,7 @@
<weaver options="-XmessageHandlerClass:kamon.weaver.logging.KamonWeaverMessageHandler">
<include within="scala.concurrent..*"/>
+ <include within="scalaz.concurrent..*"/>
<include within="akka..*"/>
<include within="spray..*"/>
<include within="kamon..*"/>
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala
new file mode 100644
index 00000000..65caaa8f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala
@@ -0,0 +1,47 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.instrumentation.scalaz
+
+import kamon.trace.{ TraceContextAware, TraceRecorder }
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
+
+@Aspect
+class FutureInstrumentation {
+
+ @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+")
+ def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware =
+ TraceContextAware.default
+
+ @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)")
+ def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {}
+
+ @After("futureRelatedCallableCreation(callable)")
+ def afterCreation(callable: TraceContextAware): Unit =
+ // Force traceContext initialization.
+ callable.traceContext
+
+ @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)")
+ def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {}
+
+ @Around("futureRelatedCallableExecution(callable)")
+ def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any =
+ TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) {
+ pjp.proceed()
+ }
+
+}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala
new file mode 100644
index 00000000..29bf96f8
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala
@@ -0,0 +1,63 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.instrumentation.scalaz
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import kamon.trace.TraceRecorder
+import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
+import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
+import scalaz.concurrent.Future
+import java.util.concurrent.Executors
+
+class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers
+ with ScalaFutures with PatienceConfiguration with OptionValues {
+
+ implicit val execContext = Executors.newCachedThreadPool()
+
+ "a Future created with FutureTracing" should {
+ "capture the TraceContext available when created" which {
+ "must be available when executing the future's body" in {
+
+ val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
+ val future = Future(TraceRecorder.currentContext).start
+
+ (future, TraceRecorder.currentContext)
+ }
+
+ val ctxInFuture = future.run
+ ctxInFuture should equal(testTraceContext)
+ }
+
+ "must be available when executing callbacks on the future" in {
+
+ val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
+ val future = Future("Hello Kamon!")
+ // The TraceContext is expected to be available during all intermediate processing.
+ .map(_.length)
+ .flatMap(len ⇒ Future(len.toString))
+ .map(s ⇒ TraceRecorder.currentContext)
+
+ (future.start, TraceRecorder.currentContext)
+ }
+
+ val ctxInFuture = future.run
+ ctxInFuture should equal(testTraceContext)
+ }
+ }
+ }
+}
+