aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-01-02 18:09:53 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-01-13 17:37:20 -0300
commit7a10c0ef2a6566229e8571f6d385ca2ff794cc20 (patch)
treececd7ce6eb7a71f967eaa1605615780fa94d346c /kamon-core
parent54143e4af6182b967736abc60a7fb20c88dd6587 (diff)
downloadKamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.gz
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.bz2
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.zip
integrate trace and metrics into the base project
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/java/kamon/util/GlobPathFilter.java137
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml21
-rw-r--r--kamon-core/src/main/resources/reference.conf30
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala93
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala65
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala37
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala74
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala132
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala148
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/package.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Segments.scala38
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Trace.scala114
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala82
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala (renamed from kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala)15
-rw-r--r--kamon-core/src/test/resources/application.conf3
-rw-r--r--kamon-core/src/test/resources/logback.xml12
-rw-r--r--kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala)27
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala51
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala51
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala84
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala165
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala59
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala62
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala51
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala10
32 files changed, 1547 insertions, 251 deletions
diff --git a/kamon-core/src/main/java/kamon/util/GlobPathFilter.java b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java
new file mode 100644
index 00000000..5b019bec
--- /dev/null
+++ b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java
@@ -0,0 +1,137 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+
+// This file was copied from: https://github.com/jboss-modules/jboss-modules/blob/master/src/main/java/org/jboss/modules/filter/GlobPathFilter.java
+package kamon.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+* Default implementation of PathFilter. Uses glob based includes and excludes to determine whether to export.
+*
+* @author John E. Bailey
+* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
+*/
+public final class GlobPathFilter {
+ private static final Pattern GLOB_PATTERN = Pattern.compile("(\\*\\*?)|(\\?)|(\\\\.)|(/+)|([^*?]+)");
+
+ private final String glob;
+ private final Pattern pattern;
+
+ /**
+* Construct a new instance.
+*
+* @param glob the path glob to match
+*/
+ public GlobPathFilter(final String glob) {
+ pattern = getGlobPattern(glob);
+ this.glob = glob;
+ }
+
+ /**
+* Determine whether a path should be accepted.
+*
+* @param path the path to check
+* @return true if the path should be accepted, false if not
+*/
+ public boolean accept(final String path) {
+ return pattern.matcher(path).matches();
+ }
+
+ /**
+ * Get a regular expression pattern which accept any path names which match the given glob. The glob patterns
+ * function similarly to {@code ant} file patterns. Valid metacharacters in the glob pattern include:
+ * <ul>
+ * <li><code>"\"</code> - escape the next character (treat it literally, even if it is itself a recognized metacharacter)</li>
+ * <li><code>"?"</code> - match any non-slash character</li>
+ * <li><code>"*"</code> - match zero or more non-slash characters</li>
+ * <li><code>"**"</code> - match zero or more characters, including slashes</li>
+ * <li><code>"/"</code> - match one or more slash characters. Consecutive {@code /} characters are collapsed down into one.</li>
+ * </ul>
+ * In addition, any glob pattern matches all subdirectories thereof. A glob pattern ending in {@code /} is equivalent
+ * to a glob pattern ending in <code>/**</code> in that the named directory is not itself included in the glob.
+ * <p/>
+ * <b>See also:</b> <a href="http://ant.apache.org/manual/dirtasks.html#patterns">"Patterns" in the Ant Manual</a>
+ *
+ * @param glob the glob to match
+ *
+ * @return the pattern
+ */
+ private static Pattern getGlobPattern(final String glob) {
+ StringBuilder patternBuilder = new StringBuilder();
+ final Matcher m = GLOB_PATTERN.matcher(glob);
+ boolean lastWasSlash = false;
+ while (m.find()) {
+ lastWasSlash = false;
+ String grp;
+ if ((grp = m.group(1)) != null) {
+ // match a * or **
+ if (grp.length() == 2) {
+ // it's a **
+ patternBuilder.append(".*");
+ } else {
+ // it's a *
+ patternBuilder.append("[^/]*");
+ }
+ } else if ((grp = m.group(2)) != null) {
+ // match a '?' glob pattern; any non-slash character
+ patternBuilder.append("[^/]");
+ } else if ((grp = m.group(3)) != null) {
+ // backslash-escaped value
+ patternBuilder.append(Pattern.quote(m.group().substring(1)));
+ } else if ((grp = m.group(4)) != null) {
+ // match any number of / chars
+ patternBuilder.append("/+");
+ lastWasSlash = true;
+ } else {
+ // some other string
+ patternBuilder.append(Pattern.quote(m.group()));
+ }
+ }
+ if (lastWasSlash) {
+ // ends in /, append **
+ patternBuilder.append(".*");
+ } else {
+ patternBuilder.append("(?:/.*)?");
+ }
+ return Pattern.compile(patternBuilder.toString());
+ }
+
+ public int hashCode() {
+ return glob.hashCode() + 13;
+ }
+
+ public boolean equals(final Object obj) {
+ return obj instanceof GlobPathFilter && equals((GlobPathFilter) obj);
+ }
+
+ public boolean equals(final GlobPathFilter obj) {
+ return obj != null && obj.pattern.equals(pattern);
+ }
+
+ public String toString() {
+ final StringBuilder b = new StringBuilder();
+ b.append("match ");
+ if (glob != null) {
+ b.append('"').append(glob).append('"');
+ } else {
+ b.append('/').append(pattern).append('/');
+ }
+ return b.toString();
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index f6951705..1448f22f 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -2,17 +2,30 @@
<aspectj>
<weaver options="-verbose -showWeaveInfo">
- <!--<dump within="*"/>-->
+ <!-- In case you want to see the weaved classes -->
+ <!--<dump within="*"/>-->
</weaver>
<aspects>
- <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
+ <!-- Actors -->
+ <aspect name="akka.instrumentation.RepointableActorRefTraceContextMixin"/>
+ <aspect name="akka.instrumentation.SystemMessageTraceContextMixin"/>
+ <aspect name="akka.instrumentation.ActorSystemMessagePassingTracing"/>
+ <aspect name="akka.instrumentation.EnvelopeTraceContextMixin"/>
+ <aspect name="akka.instrumentation.BehaviourInvokeTracing"/>
+ <aspect name="kamon.instrumentation.ActorLoggingTracing"/>
+
+ <!-- Futures -->
+ <aspect name="kamon.instrumentation.FutureTracing"/>
+
+ <!-- Patterns -->
+ <aspect name="akka.instrumentation.AskPatternTracing"/>
+
- <!--<exclude within="*"/>-->
<include within="scala.concurrent..*"/>
<include within="akka..*"/>
<include within="spray..*"/>
<include within="kamon..*"/>
</aspects>
-</aspectj>
+</aspectj> \ No newline at end of file
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
new file mode 100644
index 00000000..11e7cbb4
--- /dev/null
+++ b/kamon-core/src/main/resources/reference.conf
@@ -0,0 +1,30 @@
+kamon {
+ metrics {
+ tick-interval = 1 second
+
+ actors {
+ tracked = []
+
+ excluded = [ "system/*", "user/IO-*" ]
+
+ hdr-settings {
+ processing-time {
+ highest-trackable-value = 3600000000000
+ significant-value-digits = 2
+ }
+ time-in-mailbox {
+ highest-trackable-value = 3600000000000
+ significant-value-digits = 2
+ }
+ mailbox-size {
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+ }
+ }
+ }
+
+ trace {
+ ask-pattern-tracing = off
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
new file mode 100644
index 00000000..6cede344
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -0,0 +1,93 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{ Cell, Props, ActorSystem, ActorRef }
+import akka.dispatch.{ Envelope, MessageDispatcher }
+import kamon.trace.{ TraceContext, ContextAware, Trace }
+import kamon.metrics.{ HdrActorMetricsRecorder, ActorMetrics }
+import kamon.Kamon
+
+@Aspect("perthis(actorCellCreation(*, *, *, *, *))")
+class BehaviourInvokeTracing {
+ var path: Option[String] = None
+ var actorMetrics: Option[HdrActorMetricsRecorder] = None
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val metricsExtension = Kamon(ActorMetrics)(system)
+ val simplePathString = ref.path.elements.mkString("/")
+
+ if (metricsExtension.shouldTrackActor(simplePathString)) {
+ path = Some(ref.path.toString)
+ actorMetrics = Some(metricsExtension.registerActor(simplePathString))
+ }
+ }
+
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ val timestampBeforeProcessing = System.nanoTime()
+ val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware]
+
+ Trace.withContext(contextAndTimestamp.traceContext) {
+ pjp.proceed()
+ }
+
+ actorMetrics.map { am ⇒
+ am.recordProcessingTime(System.nanoTime() - timestampBeforeProcessing)
+ am.recordTimeInMailbox(timestampBeforeProcessing - contextAndTimestamp.timestamp)
+ }
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
+ def actorStop(cell: Cell): Unit = {}
+
+ @After("actorStop(cell)")
+ def afterStop(cell: Cell): Unit = {
+ path.map(p ⇒ Kamon(ActorMetrics)(cell.system).unregisterActor(p))
+ }
+}
+
+@Aspect
+class EnvelopeTraceContextMixin {
+
+ @DeclareMixin("akka.dispatch.Envelope")
+ def mixin: ContextAndTimestampAware = new ContextAndTimestampAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ val timestamp: Long = System.nanoTime()
+ }
+
+ @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+}
+
+trait ContextAndTimestampAware extends ContextAware {
+ def timestamp: Long
+}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
new file mode 100644
index 00000000..7d26016e
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
@@ -0,0 +1,65 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.trace.{ Trace, ContextAware }
+import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
+import org.aspectj.lang.ProceedingJoinPoint
+
+@Aspect
+class SystemMessageTraceContextMixin {
+
+ @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+}
+
+@Aspect
+class RepointableActorRefTraceContextMixin {
+
+ @DeclareMixin("akka.actor.RepointableActorRef")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+
+ @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)")
+ def repointableActorRefCreation(repointableActorRef: ContextAware): Unit = {}
+
+ @Around("repointableActorRefCreation(repointableActorRef)")
+ def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: ContextAware): Any = {
+ Trace.withContext(repointableActorRef.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+}
+
+@Aspect
+class ActorSystemMessagePassingTracing {
+
+ @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)")
+ def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {}
+
+ @Around("systemMessageProcessing(messages)")
+ def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
+ if (messages.nonEmpty) {
+ val ctx = messages.head.asInstanceOf[ContextAware].traceContext
+ Trace.withContext(ctx)(pjp.proceed())
+
+ } else pjp.proceed()
+ }
+}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
new file mode 100644
index 00000000..b5b23e61
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
@@ -0,0 +1,49 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect }
+import akka.event.Logging.Warning
+import scala.compat.Platform.EOL
+import akka.actor.ActorRefProvider
+import akka.pattern.{ AskTimeoutException, PromiseActorRef }
+
+@Aspect
+class AskPatternTracing {
+
+ class StackTraceCaptureException extends Throwable
+
+ @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider")
+ def promiseActorRefApply(provider: ActorRefProvider): Unit = {
+ provider.settings.config.getBoolean("kamon.trace.ask-pattern-tracing")
+ }
+
+ @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor")
+ def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = {
+ val future = promiseActor.result.future
+ val system = promiseActor.provider.guardian.underlying.system
+ implicit val ec = system.dispatcher
+ val stack = new StackTraceCaptureException
+
+ future onFailure {
+ case timeout: AskTimeoutException ⇒
+ val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL)
+
+ system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing],
+ "Timeout triggered for ask pattern registered at: " + stackString))
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index b5c3d552..b72e8fea 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -18,10 +18,8 @@ package kamon
import akka.actor._
object Kamon {
- trait Extension extends akka.actor.Extension {
- def manager: ActorRef
- }
+ trait Extension extends akka.actor.Extension
- def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager
+ def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system)
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
new file mode 100644
index 00000000..297017cf
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
@@ -0,0 +1,37 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect }
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.trace.{ ContextAware, Trace }
+
+@Aspect
+class ActorLoggingTracing {
+
+ @DeclareMixin("akka.event.Logging.LogEvent+")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
+ def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {}
+
+ @Around("withMdcInvocation(logSource, logEvent, logStatement)")
+ def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {
+ Trace.withContext(logEvent.traceContext) {
+ pjp.proceed()
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index a3da76f7..90d2b270 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -19,10 +19,8 @@ import org.aspectj.lang.annotation._
import java.util.concurrent._
import org.aspectj.lang.ProceedingJoinPoint
import java.util
-import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector }
import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory }
import com.typesafe.config.Config
-import kamon.Kamon
import scala.concurrent.forkjoin.ForkJoinPool
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
@@ -41,8 +39,8 @@ class ActorSystemInstrumentation {
@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
class ForkJoinPoolInstrumentation {
- var activeThreadsHistogram: Histogram = _
- var poolSizeHistogram: Histogram = _
+ /* var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _*/
@Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
@@ -71,8 +69,8 @@ class ForkJoinPoolInstrumentation {
@After("forkJoinScan(fjp)")
def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
- activeThreadsHistogram.update(fjp.getActiveThreadCount)
- poolSizeHistogram.update(fjp.getPoolSize)
+ /*activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ poolSizeHistogram.update(fjp.getPoolSize)*/
}
}
@@ -90,6 +88,7 @@ trait WatchedExecutorService {
def collector: ExecutorServiceCollector
}
+/*
trait ExecutorServiceMonitoring {
def dispatcherMetrics: DispatcherMetricCollector
}
@@ -97,6 +96,7 @@ trait ExecutorServiceMonitoring {
class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
@volatile var dispatcherMetrics: DispatcherMetricCollector = _
}
+*/
case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = delegate.createExecutorService
@@ -133,9 +133,9 @@ class NamedExecutorServiceFactoryDelegateInstrumentation {
@Around("factoryMethodCall(namedFactory)")
def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
val delegate = pjp.proceed().asInstanceOf[ExecutorService]
- val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+ val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
- ExecutorServiceMetricCollector.register(executorFullName, delegate)
+ //ExecutorServiceMetricCollector.register(executorFullName, delegate)
new NamedExecutorServiceDelegate(executorFullName, delegate)
}
@@ -143,7 +143,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation {
case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
def shutdown() = {
- ExecutorServiceMetricCollector.deregister(fullName)
+ //ExecutorServiceMetricCollector.deregister(fullName)
delegate.shutdown()
}
def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
new file mode 100644
index 00000000..5600d582
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
@@ -0,0 +1,47 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.trace.{ ContextAware, TraceContext, Trace }
+
+@Aspect
+class FutureTracing {
+
+ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)")
+ def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {}
+
+ @After("futureRelatedRunnableCreation(runnable)")
+ def afterCreation(runnable: ContextAware): Unit = {
+ // Force traceContext initialization.
+ runnable.traceContext
+ }
+
+ @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)")
+ def futureRelatedRunnableExecution(runnable: ContextAware) = {}
+
+ @Around("futureRelatedRunnableExecution(runnable)")
+ def aroundExecution(pjp: ProceedingJoinPoint, runnable: ContextAware): Any = {
+ Trace.withContext(runnable.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
index da797fa1..44eb8c43 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -15,17 +15,16 @@
* ========================================================== */
package kamon.instrumentation
-import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram }
import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue }
import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect }
import akka.actor.{ ActorSystem, ActorRef }
-import kamon.metric.{ Metrics, MetricDirectory }
import org.aspectj.lang.ProceedingJoinPoint
/**
* For Mailboxes we would like to track the queue size and message latency. Currently the latency
* will be gathered from the ActorCellMetrics.
*/
+/*
@Aspect
class MessageQueueInstrumentation {
@@ -74,4 +73,5 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram:
def hasMessages: Boolean = delegate.hasMessages
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
}
+*/
diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
deleted file mode 100644
index 4c4b93e9..00000000
--- a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.metric
-
-import java.util.concurrent.{ ThreadPoolExecutor, ExecutorService }
-import scala.concurrent.forkjoin.ForkJoinPool
-import com.codahale.metrics.{ Metric, MetricFilter }
-
-object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {
-
- def register(fullName: String, executorService: ExecutorService) = executorService match {
- case fjp: ForkJoinPool ⇒ registerForkJoinPool(fullName, fjp)
- case tpe: ThreadPoolExecutor ⇒ registerThreadPoolExecutor(fullName, tpe)
- case _ ⇒ // If it is a unknown Executor then just do nothing.
- }
-
- def deregister(fullName: String) = {
- Metrics.registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
- })
- }
-}
-
-trait ForkJoinPoolMetricCollector {
- import GaugeGenerator._
- import BasicExecutorMetricNames._
-
- def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
- val forkJoinPoolGauge = newNumericGaugeFor(fjp) _
-
- val allMetrics = Map(
- fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
- fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
- fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount))
-
- allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) }
- }
-}
-
-trait ThreadPoolExecutorMetricCollector {
- import GaugeGenerator._
- import BasicExecutorMetricNames._
-
- def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
- val tpeGauge = newNumericGaugeFor(tpe) _
-
- val allMetrics = Map(
- fullName + queueSize -> tpeGauge(_.getQueue.size()),
- fullName + poolSize -> tpeGauge(_.getPoolSize),
- fullName + activeThreads -> tpeGauge(_.getActiveCount))
-
- allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) }
- }
-}
-
-object BasicExecutorMetricNames {
- val queueSize = "queueSize"
- val poolSize = "threads/poolSize"
- val activeThreads = "threads/activeThreads"
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
deleted file mode 100644
index b904ec56..00000000
--- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.metric
-
-import java.util.concurrent.TimeUnit
-import akka.actor.ActorRef
-import com.codahale.metrics
-import com.codahale.metrics.{ MetricFilter, Metric, ConsoleReporter, MetricRegistry }
-import scala.collection.concurrent.TrieMap
-
-object Metrics {
- val registry: MetricRegistry = new MetricRegistry
-
- val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
- //consoleReporter.build().start(45, TimeUnit.SECONDS)
-
- //val newrelicReporter = NewRelicReporter(registry)
- //newrelicReporter.start(5, TimeUnit.SECONDS)
-
- def include(name: String, metric: Metric) = {
- //registry.register(name, metric)
- }
-
- def exclude(name: String) = {
- registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
- })
- }
-
- def deregister(fullName: String) = {
- registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
- })
- }
-}
-
-object Watched {
- case object Actor
- case object Dispatcher
-}
-
-object MetricDirectory {
- def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
-
- def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"
-
- def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")
-
- def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")
-
- def shouldInstrumentActor(actorPath: String): Boolean = {
- !(actorPath.isEmpty || actorPath.startsWith("system"))
- }
-
-}
-
-case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
-
-trait Histogram {
- def update(value: Long): Unit
- def snapshot: HistogramSnapshot
-}
-
-trait HistogramSnapshot {
- def median: Double
- def max: Double
- def min: Double
-}
-
-case class ActorSystemMetrics(actorSystemName: String) {
- val dispatchers = TrieMap.empty[String, DispatcherMetricCollector]
-
- private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
-
- def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
- val stats = createDispatcherCollector
- dispatchers.put(dispatcherName, stats)
- Some(stats)
- }
-
-}
-
-case class CodahaleHistogram() extends Histogram {
- private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
-
- def update(value: Long) = histogram.update(value)
- def snapshot: HistogramSnapshot = {
- val snapshot = histogram.getSnapshot
-
- CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
- }
-}
-
-case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
-
-/**
- * Dispatcher Metrics that we care about currently with a histogram-like nature:
- * - Work Queue Size
- * - Total/Active Thread Count
- */
-
-import annotation.tailrec
-import java.util.concurrent.atomic.AtomicReference
-
-object Atomic {
- def apply[T](obj: T) = new Atomic(new AtomicReference(obj))
- implicit def toAtomic[T](ref: AtomicReference[T]): Atomic[T] = new Atomic(ref)
-}
-
-class Atomic[T](val atomic: AtomicReference[T]) {
- @tailrec
- final def update(f: T ⇒ T): T = {
- val oldValue = atomic.get()
- val newValue = f(oldValue)
- if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
- }
-
- def get() = atomic.get()
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
new file mode 100644
index 00000000..72e473e8
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -0,0 +1,31 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+
+package kamon.metrics
+
+import akka.actor.{ Props, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor
+import kamon.Kamon
+
+object ActorMetrics extends ExtensionId[ActorMetricsExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = ActorMetrics
+ def createExtension(system: ExtendedActorSystem): ActorMetricsExtension = new ActorMetricsExtension(system)
+
+}
+
+class ActorMetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension with ActorMetricsOps {
+ lazy val metricsDispatcher = system.actorOf(Props[ActorMetricsDispatcher], "kamon-actor-metrics")
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
new file mode 100644
index 00000000..dc4abde0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
@@ -0,0 +1,148 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.HdrHistogram.{ AbstractHistogram, AtomicHistogram }
+import kamon.util.GlobPathFilter
+import scala.collection.concurrent.TrieMap
+import scala.collection.JavaConversions.iterableAsScalaIterable
+import akka.actor._
+import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics }
+import kamon.Kamon
+import scala.concurrent.duration._
+import java.util.concurrent.TimeUnit
+import kamon.metrics.ActorMetricsDispatcher.Subscribe
+
+trait ActorMetricsOps {
+ self: ActorMetricsExtension ⇒
+
+ val config = system.settings.config.getConfig("kamon.metrics.actors")
+ val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]()
+
+ val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector
+ val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector
+
+ val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = {
+ val settings = config.getConfig("hdr-settings")
+ val processingTimeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("processing-time"))
+ val timeInMailboxHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("time-in-mailbox"))
+ val mailboxSizeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("mailbox-size"))
+
+ () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig)
+ }
+
+ import scala.concurrent.duration._
+ system.scheduler.schedule(0.seconds, 10.seconds)(
+ actorMetrics.collect {
+ case (name, recorder: HdrActorMetricsRecorder) ⇒
+ println(s"Actor: $name")
+ recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D)
+ })(system.dispatcher)
+
+ def shouldTrackActor(path: String): Boolean =
+ trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path))
+
+ def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory())
+
+ def unregisterActor(path: String): Unit = actorMetrics.remove(path)
+}
+
+class HdrActorMetricsRecorder(processingTimeHdrConfig: HdrConfiguration, timeInMailboxHdrConfig: HdrConfiguration,
+ mailboxSizeHdrConfig: HdrConfiguration) {
+
+ val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits)
+ val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits)
+ val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits)
+
+ def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime)
+
+ def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime)
+
+ def snapshot(): HdrActorMetricsSnapshot = {
+ HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy())
+ }
+
+ def reset(): Unit = {
+ processingTimeHistogram.reset()
+ timeInMailboxHistogram.reset()
+ mailboxSizeHistogram.reset()
+ }
+}
+
+case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram,
+ mailboxSizeHistogram: AbstractHistogram)
+
+class ActorMetricsDispatcher extends Actor {
+ val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS)
+ val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+
+ var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty
+ var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty
+ var lastTick = System.currentTimeMillis()
+
+ def receive = {
+ case Subscribe(path, true) ⇒ subscribeForever(path, sender)
+ case Subscribe(path, false) ⇒ subscribeOneOff(path, sender)
+ case FlushMetrics ⇒ flushMetrics()
+ }
+
+ def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever)
+
+ def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne)
+
+ def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = {
+ val pathFilter = new GlobPathFilter(path)
+ val oldReceivers = target.get(pathFilter).getOrElse(Nil)
+ target.updated(pathFilter, receiver :: oldReceivers)
+ }
+
+ def flushMetrics(): Unit = {
+ val currentTick = System.currentTimeMillis()
+ val snapshots = Kamon(ActorMetrics)(context.system).actorMetrics.map {
+ case (path, metrics) ⇒
+ val snapshot = metrics.snapshot()
+ metrics.reset()
+
+ (path, snapshot)
+ }.toMap
+
+ dispatchMetricsTo(subscribedForOne, snapshots, currentTick)
+ dispatchMetricsTo(subscribedForever, snapshots, currentTick)
+
+ subscribedForOne = Map.empty
+ lastTick = currentTick
+ }
+
+ def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot],
+ currentTick: Long): Unit = {
+
+ for ((subscribedPath, receivers) ← subscribers) {
+ val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath))
+ val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics)
+
+ receivers.foreach(ref ⇒ ref ! actorMetrics)
+ }
+ }
+}
+
+object ActorMetricsDispatcher {
+ case class Subscribe(path: String, forever: Boolean = false)
+ case class UnSubscribe(path: String)
+
+ case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot])
+ case object FlushMetrics
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala
new file mode 100644
index 00000000..d6359ead
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/package.scala
@@ -0,0 +1,31 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+
+import scala.concurrent.duration._
+import com.typesafe.config.Config
+
+package object metrics {
+ val OneHour = 1.hour.toNanos
+
+ case class HdrConfiguration(highestTrackableValue: Long, significantValueDigits: Int)
+ case object HdrConfiguration {
+ def fromConfig(config: Config): HdrConfiguration = {
+ HdrConfiguration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits"))
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala
new file mode 100644
index 00000000..0bc68ee7
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Segments.scala
@@ -0,0 +1,38 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+
+package kamon.trace
+
+import kamon.trace.Trace.SegmentCompletionHandle
+
+object Segments {
+
+ trait Category
+ case object HttpClientRequest extends Category
+
+ case class Start(category: Category, description: String = "",
+ attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
+
+ case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
+
+ case class Segment(start: Start, end: End)
+
+ trait SegmentCompletionHandleAware {
+ var completionHandle: Option[SegmentCompletionHandle]
+ }
+
+ trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala
new file mode 100644
index 00000000..31e8185a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala
@@ -0,0 +1,114 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace
+
+import kamon.Kamon
+import akka.actor._
+import scala.Some
+import kamon.trace.Trace.Register
+import scala.concurrent.duration._
+import java.util.concurrent.atomic.AtomicLong
+
+object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: Extension] = Trace
+ def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
+
+ /*** Protocol */
+ case object Register
+
+ /** User API */
+ //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None)
+ private[trace] val traceContext = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue(): Option[TraceContext] = None
+ }
+ private[trace] val tranid = new AtomicLong()
+
+ def context() = traceContext.get
+ private def set(ctx: Option[TraceContext]) = traceContext.set(ctx)
+
+ def clear: Unit = traceContext.remove()
+ def start(name: String)(implicit system: ActorSystem): TraceContext = {
+ val ctx = newTraceContext(name)
+ ctx.start(name)
+ set(Some(ctx))
+
+ ctx
+ }
+
+ def withContext[T](ctx: Option[TraceContext])(thunk: ⇒ T): T = {
+ val oldval = context
+ set(ctx)
+
+ try thunk
+ finally set(oldval)
+ }
+
+ def transformContext(f: TraceContext ⇒ TraceContext): Unit = {
+ context.map(f).foreach(ctx ⇒ set(Some(ctx)))
+ }
+
+ def finish(): Option[TraceContext] = {
+ val ctx = context()
+ ctx.map(_.finish)
+ clear
+ ctx
+ }
+
+ // TODO: FIX
+ def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name)
+
+ def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = {
+ val start = Segments.Start(category, description, attributes)
+ SegmentCompletionHandle(start)
+ }
+
+ def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start)
+
+ case class SegmentCompletionHandle(start: Segments.Start) {
+ def complete(): Unit = {
+ val end = Segments.End()
+ println(s"Completing the Segment: $start - $end")
+ }
+ def complete(end: Segments.End): Unit = {
+ println(s"Completing the Segment: $start - $end")
+ }
+ }
+}
+
+class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace")
+}
+
+class TraceManager extends Actor with ActorLogging {
+ var listeners: Seq[ActorRef] = Seq.empty
+
+ def receive = {
+ case Register ⇒
+ listeners = sender +: listeners
+ log.info("Registered [{}] as listener for Kamon traces", sender)
+
+ case segment: UowSegment ⇒
+ val tracerName = segment.id.toString
+ context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment
+
+ case trace: UowTrace ⇒
+ listeners foreach (_ ! trace)
+ }
+
+ def newTracer(name: String): ActorRef = {
+ context.actorOf(UowTraceAggregator.props(self, 30 seconds), name)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
new file mode 100644
index 00000000..3e68a816
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -0,0 +1,51 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace
+
+import java.util.UUID
+import akka.actor._
+import java.util.concurrent.atomic.AtomicLong
+import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.trace.UowTracing.{ Finish, Start }
+
+// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
+case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) {
+
+ def start(name: String) = {
+ collector ! Start(id, name)
+ }
+
+ def finish: Unit = {
+ collector ! Finish(id)
+ }
+
+}
+
+trait ContextAware {
+ def traceContext: Option[TraceContext]
+}
+
+object ContextAware {
+ def default: ContextAware = new ContextAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ }
+}
+
+trait TimedContextAware {
+ def timestamp: Long
+ def traceContext: Option[TraceContext]
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
new file mode 100644
index 00000000..20cce830
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
@@ -0,0 +1,82 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace
+
+import akka.actor._
+import scala.concurrent.duration.Duration
+import kamon.trace.UowTracing._
+
+sealed trait UowSegment {
+ def id: Long
+ def timestamp: Long
+}
+
+trait AutoTimestamp extends UowSegment {
+ val timestamp = System.nanoTime
+}
+
+object UowTracing {
+ case class Start(id: Long, name: String) extends AutoTimestamp
+ case class Finish(id: Long) extends AutoTimestamp
+ case class Rename(id: Long, name: String) extends AutoTimestamp
+ case class WebExternalStart(id: Long, host: String) extends AutoTimestamp
+ case class WebExternalFinish(id: Long) extends AutoTimestamp
+ case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp
+}
+
+case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) {
+ def elapsed: Long = end - start
+}
+
+class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging {
+ context.setReceiveTimeout(aggregationTimeout)
+
+ var name: String = "UNKNOWN"
+ var segments: Seq[UowSegment] = Nil
+
+ var pendingExternal = List[WebExternalStart]()
+
+ var start = 0L
+ var end = 0L
+
+ def receive = {
+ case start: Start ⇒
+ this.start = start.timestamp
+ segments = segments :+ start
+ name = start.name
+ case finish: Finish ⇒
+ end = finish.timestamp
+ segments = segments :+ finish; finishTracing()
+ case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes
+ case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ {
+ segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host)
+ })
+ case Rename(id, newName) ⇒ name = newName
+ case segment: UowSegment ⇒ segments = segments :+ segment
+ case ReceiveTimeout ⇒
+ log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments)
+ context.stop(self)
+ }
+
+ def finishTracing(): Unit = {
+ reporting ! UowTrace(name, "", start, end, segments)
+ context.stop(self)
+ }
+}
+
+object UowTraceAggregator {
+ def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala
index 9eff2739..add47fdf 100644
--- a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala
@@ -13,15 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.metric
+package kamon.trace.logging
-import com.codahale.metrics.Gauge
+import ch.qos.logback.classic.pattern.ClassicConverter
+import ch.qos.logback.classic.spi.ILoggingEvent
+import kamon.trace.Trace
-trait GaugeGenerator {
-
- def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T ⇒ V) = new Gauge[V] {
- def getValue: V = generator(target)
- }
+class LogbackUowConverter extends ClassicConverter {
+ def convert(event: ILoggingEvent): String = Trace.context().map(_.uow).getOrElse("undefined")
}
-
-object GaugeGenerator extends GaugeGenerator
diff --git a/kamon-core/src/test/resources/application.conf b/kamon-core/src/test/resources/application.conf
new file mode 100644
index 00000000..e8217fc2
--- /dev/null
+++ b/kamon-core/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+akka {
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
+} \ No newline at end of file
diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml
new file mode 100644
index 00000000..2ae1e3bd
--- /dev/null
+++ b/kamon-core/src/test/resources/logback.xml
@@ -0,0 +1,12 @@
+<configuration scan="true">
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="debug">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala
index 252e5220..5108af25 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
+++ b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala
@@ -13,26 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.instrumentation
+package kamon
-import org.aspectj.lang.ProceedingJoinPoint
+import org.scalatest.{ WordSpecLike, WordSpec }
+import akka.testkit.TestKit
+import akka.actor.{ Props, ActorSystem }
-trait ProceedingJoinPointPimp {
- import language.implicitConversions
+class MailboxSizeMetricsSpec extends TestKit(ActorSystem("mailbox-size-metrics-spec")) with WordSpecLike {
- implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
-}
-
-object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
-
-case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
- def proceedWith(newUniqueArg: AnyRef) = {
- val args = pjp.getArgs
- args.update(0, newUniqueArg)
- pjp.proceed(args)
- }
+ "the mailbox size metrics instrumentation" should {
+ "register a counter for mailbox size upon actor creation" in {
+ val target = system.actorOf(Props.empty, "sample")
- def proceedWithTarget(args: AnyRef*) = {
- pjp.proceed(args.toArray)
+ //Metrics.registry.getHistograms.get("akka://mailbox-size-metrics-spec/sample:MAILBOX")
+ }
}
}
diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
new file mode 100644
index 00000000..ff08ca0f
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.scalatest.{ WordSpecLike, Matchers, WordSpec }
+import akka.testkit.TestKitBase
+import akka.actor.{ Actor, Props, ActorSystem }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, Subscribe }
+import scala.concurrent.duration._
+
+class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
+ implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics.actors.tracked = ["user/test*"]
+ """.stripMargin))
+
+ implicit def self = testActor
+
+ lazy val metricsExtension = Kamon(ActorMetrics).metricsDispatcher
+
+ "the Kamon actor metrics" should {
+ "track configured actors" in {
+ system.actorOf(Props[Other], "test-tracked-actor") ! "nothing"
+ metricsExtension ! Subscribe("user/test-tracked-actor")
+
+ within(5 seconds) {
+ expectMsgType[ActorMetricsDispatcher.ActorMetricsSnapshot]
+ }
+ }
+ }
+}
+
+class Other extends Actor {
+ def receive = { case a ⇒ }
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
new file mode 100644
index 00000000..89742651
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
@@ -0,0 +1,51 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace.instrumentation
+
+import akka.testkit.TestKit
+import org.scalatest.{ Inspectors, Matchers, WordSpecLike }
+import akka.actor.{ Props, ActorLogging, Actor, ActorSystem }
+import akka.event.Logging.{ LogEvent }
+import kamon.trace.{ ContextAware, TraceContext, Trace }
+
+class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors {
+
+ "the ActorLogging instrumentation" should {
+ "attach the TraceContext (if available) to log events" in {
+ val testTraceContext = Some(TraceContext(Actor.noSender, 1))
+ val loggerActor = system.actorOf(Props[LoggerActor])
+ system.eventStream.subscribe(testActor, classOf[LogEvent])
+
+ Trace.withContext(testTraceContext) {
+ loggerActor ! "info"
+ }
+
+ fishForMessage() {
+ case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒
+ val ctxInEvent = event.asInstanceOf[ContextAware].traceContext
+ ctxInEvent === testTraceContext
+
+ case event: LogEvent ⇒ false
+ }
+ }
+ }
+}
+
+class LoggerActor extends Actor with ActorLogging {
+ def receive = {
+ case "info" ⇒ log.info("TraceContext => {}", Trace.context())
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
new file mode 100644
index 00000000..89251bf4
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
@@ -0,0 +1,84 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace.instrumentation
+
+import org.scalatest.{ WordSpecLike, Matchers }
+import akka.actor.{ ActorRef, Actor, Props, ActorSystem }
+
+import akka.testkit.{ ImplicitSender, TestKit }
+import kamon.trace.Trace
+import akka.pattern.{ pipe, ask }
+import akka.util.Timeout
+import scala.concurrent.duration._
+import scala.concurrent.{ Await, Future }
+import akka.routing.RoundRobinRouter
+import kamon.trace.TraceContext
+
+class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender {
+ implicit val executionContext = system.dispatcher
+
+ "the message passing instrumentation" should {
+ "propagate the TraceContext using bang" in new TraceContextEchoFixture {
+ Trace.withContext(testTraceContext) {
+ ctxEchoActor ! "test"
+ }
+
+ expectMsg(testTraceContext)
+ }
+
+ "propagate the TraceContext using tell" in new TraceContextEchoFixture {
+ Trace.withContext(testTraceContext) {
+ ctxEchoActor.tell("test", testActor)
+ }
+
+ expectMsg(testTraceContext)
+ }
+
+ "propagate the TraceContext using ask" in new TraceContextEchoFixture {
+ implicit val timeout = Timeout(1 seconds)
+ Trace.withContext(testTraceContext) {
+ // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it.
+ (ctxEchoActor ? "test") pipeTo (testActor)
+ }
+
+ expectMsg(testTraceContext)
+ }
+
+ "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture {
+ Trace.withContext(testTraceContext) {
+ ctxEchoActor ! "test"
+ }
+
+ expectMsg(testTraceContext)
+ }
+ }
+
+ trait TraceContextEchoFixture {
+ val testTraceContext = Some(Trace.newTraceContext(""))
+ val ctxEchoActor = system.actorOf(Props[TraceContextEcho])
+ }
+
+ trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture {
+ override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1)))
+ }
+}
+
+class TraceContextEcho extends Actor {
+ def receive = {
+ case msg: String ⇒ sender ! Trace.context()
+ }
+}
+
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
new file mode 100644
index 00000000..7d539370
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
@@ -0,0 +1,165 @@
+package kamon.trace.instrumentation
+
+import akka.testkit.{ ImplicitSender, TestKit }
+import akka.actor._
+import org.scalatest.WordSpecLike
+import kamon.trace.Trace
+import scala.util.control.NonFatal
+import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume }
+import scala.concurrent.duration._
+
+class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender {
+ implicit val executionContext = system.dispatcher
+
+ "the system message passing instrumentation" should {
+ "keep the TraceContext while processing the Create message in top level actors" in new TraceContextFixture {
+ Trace.withContext(testTraceContext) {
+ system.actorOf(Props(new Actor {
+
+ testActor ! Trace.context()
+
+ def receive: Actor.Receive = { case any ⇒ }
+ }))
+ }
+
+ expectMsg(testTraceContext)
+ }
+
+ "keep the TraceContext while processing the Create message in non top level actors" in new TraceContextFixture {
+ Trace.withContext(testTraceContext) {
+ system.actorOf(Props(new Actor {
+ def receive: Actor.Receive = {
+ case any ⇒
+ context.actorOf(Props(new Actor {
+
+ testActor ! Trace.context()
+
+ def receive: Actor.Receive = { case any ⇒ }
+ }))
+ }
+ })) ! "any"
+ }
+
+ expectMsg(testTraceContext)
+ }
+
+ "keep the TraceContext in the supervision cycle" when {
+ "the actor is resumed" in new TraceContextFixture {
+ val supervisor = supervisorWithDirective(Resume)
+
+ Trace.withContext(testTraceContext) {
+ supervisor ! "fail"
+ }
+
+ expectMsg(testTraceContext) // From the parent executing the supervision strategy
+
+ // Ensure we didn't tie the actor with the context
+ supervisor ! "context"
+ expectMsg(None)
+ }
+
+ "the actor is restarted" in new TraceContextFixture {
+ val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true)
+
+ Trace.withContext(testTraceContext) {
+ supervisor ! "fail"
+ }
+
+ expectMsg(testTraceContext) // From the parent executing the supervision strategy
+ expectMsg(testTraceContext) // From the preRestart hook
+ expectMsg(testTraceContext) // From the postRestart hook
+
+ // Ensure we didn't tie the actor with the context
+ supervisor ! "context"
+ expectMsg(None)
+ }
+
+ "the actor is stopped" in new TraceContextFixture {
+ val supervisor = supervisorWithDirective(Stop, sendPostStop = true)
+
+ Trace.withContext(testTraceContext) {
+ supervisor ! "fail"
+ }
+
+ expectMsg(testTraceContext) // From the parent executing the supervision strategy
+ expectMsg(testTraceContext) // From the postStop hook
+ expectNoMsg(1 second)
+ }
+
+ "the failure is escalated" in new TraceContextFixture {
+ val supervisor = supervisorWithDirective(Escalate, sendPostStop = true)
+
+ Trace.withContext(testTraceContext) {
+ supervisor ! "fail"
+ }
+
+ expectMsg(testTraceContext) // From the parent executing the supervision strategy
+ expectMsg(testTraceContext) // From the grandparent executing the supervision strategy
+ expectMsg(testTraceContext) // From the postStop hook in the child
+ expectMsg(testTraceContext) // From the postStop hook in the parent
+ expectNoMsg(1 second)
+ }
+ }
+ }
+
+ def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false,
+ sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = {
+ class GrandParent extends Actor {
+ val child = context.actorOf(Props(new Parent))
+
+ override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+ case NonFatal(throwable) ⇒ testActor ! Trace.context(); Stop
+ }
+
+ def receive = {
+ case any ⇒ child forward any
+ }
+ }
+
+ class Parent extends Actor {
+ val child = context.actorOf(Props(new Child))
+
+ override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+ case NonFatal(throwable) ⇒ testActor ! Trace.context(); directive
+ }
+
+ def receive: Actor.Receive = {
+ case any ⇒ child forward any
+ }
+
+ override def postStop(): Unit = {
+ if (sendPostStop) testActor ! Trace.context()
+ super.postStop()
+ }
+ }
+
+ class Child extends Actor {
+ def receive = {
+ case "fail" ⇒ 1 / 0
+ case "context" ⇒ sender ! Trace.context()
+ }
+
+ override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
+ if (sendPreRestart) testActor ! Trace.context()
+ super.preRestart(reason, message)
+ }
+
+ override def postRestart(reason: Throwable): Unit = {
+ if (sendPostRestart) testActor ! Trace.context()
+ super.postRestart(reason)
+ }
+
+ override def postStop(): Unit = {
+ if (sendPostStop) testActor ! Trace.context()
+ super.postStop()
+ }
+
+ override def preStart(): Unit = {
+ if (sendPreStart) testActor ! Trace.context()
+ super.preStart()
+ }
+ }
+
+ system.actorOf(Props(new GrandParent))
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
new file mode 100644
index 00000000..9df67391
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
@@ -0,0 +1,59 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace.instrumentation
+
+import akka.testkit.TestKit
+import akka.actor.{ Props, Actor, ActorSystem }
+import org.scalatest.{ Matchers, WordSpecLike }
+import akka.event.Logging.Warning
+import scala.concurrent.duration._
+import akka.pattern.ask
+import akka.util.Timeout
+import kamon.trace.{ Trace, ContextAware }
+import org.scalatest.OptionValues._
+
+class AskPatternTracingSpec extends TestKit(ActorSystem("ask-pattern-tracing-spec")) with WordSpecLike with Matchers {
+
+ "the AskPatternTracing" should {
+ "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in new TraceContextFixture {
+ implicit val ec = system.dispatcher
+ implicit val timeout = Timeout(10 milliseconds)
+ val noReply = system.actorOf(Props[NoReply])
+ system.eventStream.subscribe(testActor, classOf[Warning])
+
+ within(500 milliseconds) {
+ val initialCtx = Trace.withContext(testTraceContext) {
+ noReply ? "hello"
+ Trace.context()
+ }
+
+ val warn = expectMsgPF() {
+ case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn
+ }
+ val capturedCtx = warn.asInstanceOf[ContextAware].traceContext
+
+ capturedCtx should be('defined)
+ capturedCtx should equal(initialCtx)
+ }
+ }
+ }
+}
+
+class NoReply extends Actor {
+ def receive = {
+ case any ⇒
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
new file mode 100644
index 00000000..a5554836
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
@@ -0,0 +1,62 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace.instrumentation
+
+import scala.concurrent.{ ExecutionContext, Await, Promise, Future }
+import org.scalatest.{ Matchers, OptionValues, WordSpec }
+import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration }
+import java.util.UUID
+import scala.util.{ Random, Success }
+import scala.concurrent.duration._
+import java.util.concurrent.TimeUnit
+import akka.actor.{ Actor, ActorSystem }
+import kamon.trace.{ Trace, TraceContext }
+
+class FutureTracingSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
+
+ implicit val execContext = ExecutionContext.Implicits.global
+
+ "a Future created with FutureTracing" should {
+ "capture the TraceContext available when created" which {
+ "must be available when executing the future's body" in new TraceContextFixture {
+ var future: Future[Option[TraceContext]] = _
+
+ Trace.withContext(testTraceContext) {
+ future = Future(Trace.context)
+ }
+
+ whenReady(future)(ctxInFuture ⇒
+ ctxInFuture should equal(testTraceContext))
+ }
+
+ "must be available when executing callbacks on the future" in new TraceContextFixture {
+ var future: Future[Option[TraceContext]] = _
+
+ Trace.withContext(testTraceContext) {
+ future = Future("Hello Kamon!")
+ // The TraceContext is expected to be available during all intermediate processing.
+ .map(_.length)
+ .flatMap(len ⇒ Future(len.toString))
+ .map(s ⇒ Trace.context())
+ }
+
+ whenReady(future)(ctxInFuture ⇒
+ ctxInFuture should equal(testTraceContext))
+ }
+ }
+ }
+}
+
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala
new file mode 100644
index 00000000..3b32f3ac
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala
@@ -0,0 +1,51 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace.instrumentation
+
+import org.scalatest.{ WordSpecLike, WordSpec }
+import akka.testkit.{ TestKitBase, TestKit }
+import akka.actor.ActorSystem
+import scala.concurrent.duration._
+import kamon.trace.UowTracing.{ Finish, Rename, Start }
+import kamon.trace.{ UowTrace, UowTraceAggregator }
+
+class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike {
+
+ "a TraceAggregator" should {
+ "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture {
+ within(1 second) {
+ aggregator ! Start(1, "/accounts")
+ aggregator ! Finish(1)
+
+ //expectMsg(UowTrace("UNKNOWN", Seq(Start(1, "/accounts"), Finish(1))))
+ }
+ }
+
+ "change the uow name after receiving a Rename message" in new AggregatorFixture {
+ within(1 second) {
+ aggregator ! Start(1, "/accounts")
+ aggregator ! Rename(1, "test-uow")
+ aggregator ! Finish(1)
+
+ //expectMsg(UowTrace("test-uow", Seq(Start(1, "/accounts"), Finish(1))))
+ }
+ }
+ }
+
+ trait AggregatorFixture {
+ val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds))
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala
new file mode 100644
index 00000000..62f7ec84
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala
@@ -0,0 +1,10 @@
+package kamon.trace.instrumentation
+
+import scala.util.Random
+import kamon.trace.TraceContext
+import akka.actor.Actor
+
+trait TraceContextFixture {
+ val random = new Random(System.nanoTime)
+ val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt))
+} \ No newline at end of file