From 7a10c0ef2a6566229e8571f6d385ca2ff794cc20 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 2 Jan 2014 18:09:53 -0300 Subject: integrate trace and metrics into the base project --- .../src/main/java/kamon/util/GlobPathFilter.java | 137 +++++++++++++++++ kamon-core/src/main/resources/META-INF/aop.xml | 21 ++- kamon-core/src/main/resources/reference.conf | 30 ++++ .../ActorMessagePassingTracing.scala | 93 ++++++++++++ .../ActorSystemMessagePassingTracing.scala | 65 ++++++++ .../akka/instrumentation/AskPatternTracing.scala | 49 ++++++ kamon-core/src/main/scala/kamon/Kamon.scala | 6 +- .../instrumentation/ActorLoggingTracing.scala | 37 +++++ .../scala/kamon/instrumentation/AspectJPimps.scala | 38 ----- .../instrumentation/ExecutorServiceMetrics.scala | 18 +-- .../kamon/instrumentation/FutureTracing.scala | 47 ++++++ .../instrumentation/MessageQueueMetrics.scala | 4 +- .../metric/ExecutorServiceMetricCollector.scala | 74 --------- .../main/scala/kamon/metric/GaugeGenerator.scala | 27 ---- .../src/main/scala/kamon/metric/Metrics.scala | 132 ----------------- .../main/scala/kamon/metrics/ActorMetrics.scala | 31 ++++ .../main/scala/kamon/metrics/ActorMetricsOps.scala | 148 ++++++++++++++++++ .../src/main/scala/kamon/metrics/package.scala | 31 ++++ .../src/main/scala/kamon/trace/Segments.scala | 38 +++++ kamon-core/src/main/scala/kamon/trace/Trace.scala | 114 ++++++++++++++ .../src/main/scala/kamon/trace/TraceContext.scala | 51 +++++++ .../src/main/scala/kamon/trace/UowTracing.scala | 82 ++++++++++ .../kamon/trace/logging/LogbackUowConverter.scala | 24 +++ kamon-core/src/test/resources/application.conf | 3 + kamon-core/src/test/resources/logback.xml | 12 ++ .../test/scala/kamon/MailboxSizeMetricsSpec.scala | 31 ++++ .../scala/kamon/metrics/ActorMetricsSpec.scala | 51 +++++++ .../trace/instrumentation/ActorLoggingSpec.scala | 51 +++++++ .../ActorMessagePassingTracingSpec.scala | 84 +++++++++++ ...orSystemMessagePassingInstrumentationSpec.scala | 165 +++++++++++++++++++++ .../instrumentation/AskPatternTracingSpec.scala | 59 ++++++++ .../trace/instrumentation/FutureTracingSpec.scala | 62 ++++++++ .../instrumentation/TraceAggregatorSpec.scala | 51 +++++++ .../instrumentation/TraceContextFixture.scala | 10 ++ 34 files changed, 1586 insertions(+), 290 deletions(-) create mode 100644 kamon-core/src/main/java/kamon/util/GlobPathFilter.java create mode 100644 kamon-core/src/main/resources/reference.conf create mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala create mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala create mode 100644 kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/Metrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/package.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Segments.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Trace.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TraceContext.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/UowTracing.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala create mode 100644 kamon-core/src/test/resources/application.conf create mode 100644 kamon-core/src/test/resources/logback.xml create mode 100644 kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/java/kamon/util/GlobPathFilter.java b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java new file mode 100644 index 00000000..5b019bec --- /dev/null +++ b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java @@ -0,0 +1,137 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + + +// This file was copied from: https://github.com/jboss-modules/jboss-modules/blob/master/src/main/java/org/jboss/modules/filter/GlobPathFilter.java +package kamon.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** +* Default implementation of PathFilter. Uses glob based includes and excludes to determine whether to export. +* +* @author John E. Bailey +* @author David M. Lloyd +*/ +public final class GlobPathFilter { + private static final Pattern GLOB_PATTERN = Pattern.compile("(\\*\\*?)|(\\?)|(\\\\.)|(/+)|([^*?]+)"); + + private final String glob; + private final Pattern pattern; + + /** +* Construct a new instance. +* +* @param glob the path glob to match +*/ + public GlobPathFilter(final String glob) { + pattern = getGlobPattern(glob); + this.glob = glob; + } + + /** +* Determine whether a path should be accepted. +* +* @param path the path to check +* @return true if the path should be accepted, false if not +*/ + public boolean accept(final String path) { + return pattern.matcher(path).matches(); + } + + /** + * Get a regular expression pattern which accept any path names which match the given glob. The glob patterns + * function similarly to {@code ant} file patterns. Valid metacharacters in the glob pattern include: + * + * In addition, any glob pattern matches all subdirectories thereof. A glob pattern ending in {@code /} is equivalent + * to a glob pattern ending in /** in that the named directory is not itself included in the glob. + *

+ * See also: "Patterns" in the Ant Manual + * + * @param glob the glob to match + * + * @return the pattern + */ + private static Pattern getGlobPattern(final String glob) { + StringBuilder patternBuilder = new StringBuilder(); + final Matcher m = GLOB_PATTERN.matcher(glob); + boolean lastWasSlash = false; + while (m.find()) { + lastWasSlash = false; + String grp; + if ((grp = m.group(1)) != null) { + // match a * or ** + if (grp.length() == 2) { + // it's a ** + patternBuilder.append(".*"); + } else { + // it's a * + patternBuilder.append("[^/]*"); + } + } else if ((grp = m.group(2)) != null) { + // match a '?' glob pattern; any non-slash character + patternBuilder.append("[^/]"); + } else if ((grp = m.group(3)) != null) { + // backslash-escaped value + patternBuilder.append(Pattern.quote(m.group().substring(1))); + } else if ((grp = m.group(4)) != null) { + // match any number of / chars + patternBuilder.append("/+"); + lastWasSlash = true; + } else { + // some other string + patternBuilder.append(Pattern.quote(m.group())); + } + } + if (lastWasSlash) { + // ends in /, append ** + patternBuilder.append(".*"); + } else { + patternBuilder.append("(?:/.*)?"); + } + return Pattern.compile(patternBuilder.toString()); + } + + public int hashCode() { + return glob.hashCode() + 13; + } + + public boolean equals(final Object obj) { + return obj instanceof GlobPathFilter && equals((GlobPathFilter) obj); + } + + public boolean equals(final GlobPathFilter obj) { + return obj != null && obj.pattern.equals(pattern); + } + + public String toString() { + final StringBuilder b = new StringBuilder(); + b.append("match "); + if (glob != null) { + b.append('"').append(glob).append('"'); + } else { + b.append('/').append(pattern).append('/'); + } + return b.toString(); + } +} \ No newline at end of file diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index f6951705..1448f22f 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -2,17 +2,30 @@ - + + - + + + + + + + + + + + + + + - - + \ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf new file mode 100644 index 00000000..11e7cbb4 --- /dev/null +++ b/kamon-core/src/main/resources/reference.conf @@ -0,0 +1,30 @@ +kamon { + metrics { + tick-interval = 1 second + + actors { + tracked = [] + + excluded = [ "system/*", "user/IO-*" ] + + hdr-settings { + processing-time { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + time-in-mailbox { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + mailbox-size { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + } + } + } + + trace { + ask-pattern-tracing = off + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala new file mode 100644 index 00000000..6cede344 --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -0,0 +1,93 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package akka.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{ Cell, Props, ActorSystem, ActorRef } +import akka.dispatch.{ Envelope, MessageDispatcher } +import kamon.trace.{ TraceContext, ContextAware, Trace } +import kamon.metrics.{ HdrActorMetricsRecorder, ActorMetrics } +import kamon.Kamon + +@Aspect("perthis(actorCellCreation(*, *, *, *, *))") +class BehaviourInvokeTracing { + var path: Option[String] = None + var actorMetrics: Option[HdrActorMetricsRecorder] = None + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val metricsExtension = Kamon(ActorMetrics)(system) + val simplePathString = ref.path.elements.mkString("/") + + if (metricsExtension.shouldTrackActor(simplePathString)) { + path = Some(ref.path.toString) + actorMetrics = Some(metricsExtension.registerActor(simplePathString)) + } + } + + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware] + + Trace.withContext(contextAndTimestamp.traceContext) { + pjp.proceed() + } + + actorMetrics.map { am ⇒ + am.recordProcessingTime(System.nanoTime() - timestampBeforeProcessing) + am.recordTimeInMailbox(timestampBeforeProcessing - contextAndTimestamp.timestamp) + } + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: Cell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: Cell): Unit = { + path.map(p ⇒ Kamon(ActorMetrics)(cell.system).unregisterActor(p)) + } +} + +@Aspect +class EnvelopeTraceContextMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixin: ContextAndTimestampAware = new ContextAndTimestampAware { + val traceContext: Option[TraceContext] = Trace.context() + val timestamp: Long = System.nanoTime() + } + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +trait ContextAndTimestampAware extends ContextAware { + def timestamp: Long +} diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala new file mode 100644 index 00000000..7d26016e --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala @@ -0,0 +1,65 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation._ +import kamon.trace.{ Trace, ContextAware } +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import org.aspectj.lang.ProceedingJoinPoint + +@Aspect +class SystemMessageTraceContextMixin { + + @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +@Aspect +class RepointableActorRefTraceContextMixin { + + @DeclareMixin("akka.actor.RepointableActorRef") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } + + @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") + def repointableActorRefCreation(repointableActorRef: ContextAware): Unit = {} + + @Around("repointableActorRefCreation(repointableActorRef)") + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: ContextAware): Any = { + Trace.withContext(repointableActorRef.traceContext) { + pjp.proceed() + } + } + +} + +@Aspect +class ActorSystemMessagePassingTracing { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[ContextAware].traceContext + Trace.withContext(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala new file mode 100644 index 00000000..b5b23e61 --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala @@ -0,0 +1,49 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package akka.instrumentation + +import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } +import akka.event.Logging.Warning +import scala.compat.Platform.EOL +import akka.actor.ActorRefProvider +import akka.pattern.{ AskTimeoutException, PromiseActorRef } + +@Aspect +class AskPatternTracing { + + class StackTraceCaptureException extends Throwable + + @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider") + def promiseActorRefApply(provider: ActorRefProvider): Unit = { + provider.settings.config.getBoolean("kamon.trace.ask-pattern-tracing") + } + + @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") + def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { + val future = promiseActor.result.future + val system = promiseActor.provider.guardian.underlying.system + implicit val ec = system.dispatcher + val stack = new StackTraceCaptureException + + future onFailure { + case timeout: AskTimeoutException ⇒ + val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) + + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing], + "Timeout triggered for ask pattern registered at: " + stackString)) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index b5c3d552..b72e8fea 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -18,10 +18,8 @@ package kamon import akka.actor._ object Kamon { - trait Extension extends akka.actor.Extension { - def manager: ActorRef - } + trait Extension extends akka.actor.Extension - def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager + def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system) } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala new file mode 100644 index 00000000..297017cf --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala @@ -0,0 +1,37 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{ ContextAware, Trace } + +@Aspect +class ActorLoggingTracing { + + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = { + Trace.withContext(logEvent.traceContext) { + pjp.proceed() + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala deleted file mode 100644 index 252e5220..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import org.aspectj.lang.ProceedingJoinPoint - -trait ProceedingJoinPointPimp { - import language.implicitConversions - - implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) -} - -object ProceedingJoinPointPimp extends ProceedingJoinPointPimp - -case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { - def proceedWith(newUniqueArg: AnyRef) = { - val args = pjp.getArgs - args.update(0, newUniqueArg) - pjp.proceed(args) - } - - def proceedWithTarget(args: AnyRef*) = { - pjp.proceed(args.toArray) - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index a3da76f7..90d2b270 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -19,10 +19,8 @@ import org.aspectj.lang.annotation._ import java.util.concurrent._ import org.aspectj.lang.ProceedingJoinPoint import java.util -import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector } import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory } import com.typesafe.config.Config -import kamon.Kamon import scala.concurrent.forkjoin.ForkJoinPool import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool @@ -41,8 +39,8 @@ class ActorSystemInstrumentation { @Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") class ForkJoinPoolInstrumentation { - var activeThreadsHistogram: Histogram = _ - var poolSizeHistogram: Histogram = _ + /* var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _*/ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} @@ -71,8 +69,8 @@ class ForkJoinPoolInstrumentation { @After("forkJoinScan(fjp)") def updateMetrics(fjp: AkkaForkJoinPool): Unit = { - activeThreadsHistogram.update(fjp.getActiveThreadCount) - poolSizeHistogram.update(fjp.getPoolSize) + /*activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize)*/ } } @@ -90,6 +88,7 @@ trait WatchedExecutorService { def collector: ExecutorServiceCollector } +/* trait ExecutorServiceMonitoring { def dispatcherMetrics: DispatcherMetricCollector } @@ -97,6 +96,7 @@ trait ExecutorServiceMonitoring { class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { @volatile var dispatcherMetrics: DispatcherMetricCollector = _ } +*/ case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = delegate.createExecutorService @@ -133,9 +133,9 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { val delegate = pjp.proceed().asInstanceOf[ExecutorService] - val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - ExecutorServiceMetricCollector.register(executorFullName, delegate) + //ExecutorServiceMetricCollector.register(executorFullName, delegate) new NamedExecutorServiceDelegate(executorFullName, delegate) } @@ -143,7 +143,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { def shutdown() = { - ExecutorServiceMetricCollector.deregister(fullName) + //ExecutorServiceMetricCollector.deregister(fullName) delegate.shutdown() } def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala new file mode 100644 index 00000000..5600d582 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala @@ -0,0 +1,47 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{ ContextAware, TraceContext, Trace } + +@Aspect +class FutureTracing { + + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") + def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {} + + @After("futureRelatedRunnableCreation(runnable)") + def afterCreation(runnable: ContextAware): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") + def futureRelatedRunnableExecution(runnable: ContextAware) = {} + + @Around("futureRelatedRunnableExecution(runnable)") + def aroundExecution(pjp: ProceedingJoinPoint, runnable: ContextAware): Any = { + Trace.withContext(runnable.traceContext) { + pjp.proceed() + } + } + +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index da797fa1..44eb8c43 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -15,17 +15,16 @@ * ========================================================== */ package kamon.instrumentation -import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram } import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue } import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import akka.actor.{ ActorSystem, ActorRef } -import kamon.metric.{ Metrics, MetricDirectory } import org.aspectj.lang.ProceedingJoinPoint /** * For Mailboxes we would like to track the queue size and message latency. Currently the latency * will be gathered from the ActorCellMetrics. */ +/* @Aspect class MessageQueueInstrumentation { @@ -74,4 +73,5 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def hasMessages: Boolean = delegate.hasMessages def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } +*/ diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala deleted file mode 100644 index 4c4b93e9..00000000 --- a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import java.util.concurrent.{ ThreadPoolExecutor, ExecutorService } -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.{ Metric, MetricFilter } - -object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { - - def register(fullName: String, executorService: ExecutorService) = executorService match { - case fjp: ForkJoinPool ⇒ registerForkJoinPool(fullName, fjp) - case tpe: ThreadPoolExecutor ⇒ registerThreadPoolExecutor(fullName, tpe) - case _ ⇒ // If it is a unknown Executor then just do nothing. - } - - def deregister(fullName: String) = { - Metrics.registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -trait ForkJoinPoolMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { - val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ - - val allMetrics = Map( - fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), - fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), - fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)) - - allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } - } -} - -trait ThreadPoolExecutorMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { - val tpeGauge = newNumericGaugeFor(tpe) _ - - val allMetrics = Map( - fullName + queueSize -> tpeGauge(_.getQueue.size()), - fullName + poolSize -> tpeGauge(_.getPoolSize), - fullName + activeThreads -> tpeGauge(_.getActiveCount)) - - allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } - } -} - -object BasicExecutorMetricNames { - val queueSize = "queueSize" - val poolSize = "threads/poolSize" - val activeThreads = "threads/activeThreads" -} - diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala deleted file mode 100644 index 9eff2739..00000000 --- a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import com.codahale.metrics.Gauge - -trait GaugeGenerator { - - def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T ⇒ V) = new Gauge[V] { - def getValue: V = generator(target) - } -} - -object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala deleted file mode 100644 index b904ec56..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import java.util.concurrent.TimeUnit -import akka.actor.ActorRef -import com.codahale.metrics -import com.codahale.metrics.{ MetricFilter, Metric, ConsoleReporter, MetricRegistry } -import scala.collection.concurrent.TrieMap - -object Metrics { - val registry: MetricRegistry = new MetricRegistry - - val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) - //consoleReporter.build().start(45, TimeUnit.SECONDS) - - //val newrelicReporter = NewRelicReporter(registry) - //newrelicReporter.start(5, TimeUnit.SECONDS) - - def include(name: String, metric: Metric) = { - //registry.register(name, metric) - } - - def exclude(name: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(name) - }) - } - - def deregister(fullName: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -object Watched { - case object Actor - case object Dispatcher -} - -object MetricDirectory { - def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" - - def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" - - def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") - - def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") - - def shouldInstrumentActor(actorPath: String): Boolean = { - !(actorPath.isEmpty || actorPath.startsWith("system")) - } - -} - -case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) - -trait Histogram { - def update(value: Long): Unit - def snapshot: HistogramSnapshot -} - -trait HistogramSnapshot { - def median: Double - def max: Double - def min: Double -} - -case class ActorSystemMetrics(actorSystemName: String) { - val dispatchers = TrieMap.empty[String, DispatcherMetricCollector] - - private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) - - def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { - val stats = createDispatcherCollector - dispatchers.put(dispatcherName, stats) - Some(stats) - } - -} - -case class CodahaleHistogram() extends Histogram { - private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) - - def update(value: Long) = histogram.update(value) - def snapshot: HistogramSnapshot = { - val snapshot = histogram.getSnapshot - - CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) - } -} - -case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot - -/** - * Dispatcher Metrics that we care about currently with a histogram-like nature: - * - Work Queue Size - * - Total/Active Thread Count - */ - -import annotation.tailrec -import java.util.concurrent.atomic.AtomicReference - -object Atomic { - def apply[T](obj: T) = new Atomic(new AtomicReference(obj)) - implicit def toAtomic[T](ref: AtomicReference[T]): Atomic[T] = new Atomic(ref) -} - -class Atomic[T](val atomic: AtomicReference[T]) { - @tailrec - final def update(f: T ⇒ T): T = { - val oldValue = atomic.get() - val newValue = f(oldValue) - if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) - } - - def get() = atomic.get() -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala new file mode 100644 index 00000000..72e473e8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -0,0 +1,31 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ + +package kamon.metrics + +import akka.actor.{ Props, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor +import kamon.Kamon + +object ActorMetrics extends ExtensionId[ActorMetricsExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = ActorMetrics + def createExtension(system: ExtendedActorSystem): ActorMetricsExtension = new ActorMetricsExtension(system) + +} + +class ActorMetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension with ActorMetricsOps { + lazy val metricsDispatcher = system.actorOf(Props[ActorMetricsDispatcher], "kamon-actor-metrics") +} diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala new file mode 100644 index 00000000..dc4abde0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala @@ -0,0 +1,148 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.HdrHistogram.{ AbstractHistogram, AtomicHistogram } +import kamon.util.GlobPathFilter +import scala.collection.concurrent.TrieMap +import scala.collection.JavaConversions.iterableAsScalaIterable +import akka.actor._ +import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics } +import kamon.Kamon +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import kamon.metrics.ActorMetricsDispatcher.Subscribe + +trait ActorMetricsOps { + self: ActorMetricsExtension ⇒ + + val config = system.settings.config.getConfig("kamon.metrics.actors") + val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]() + + val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector + val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector + + val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = { + val settings = config.getConfig("hdr-settings") + val processingTimeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("processing-time")) + val timeInMailboxHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("time-in-mailbox")) + val mailboxSizeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("mailbox-size")) + + () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig) + } + + import scala.concurrent.duration._ + system.scheduler.schedule(0.seconds, 10.seconds)( + actorMetrics.collect { + case (name, recorder: HdrActorMetricsRecorder) ⇒ + println(s"Actor: $name") + recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D) + })(system.dispatcher) + + def shouldTrackActor(path: String): Boolean = + trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path)) + + def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory()) + + def unregisterActor(path: String): Unit = actorMetrics.remove(path) +} + +class HdrActorMetricsRecorder(processingTimeHdrConfig: HdrConfiguration, timeInMailboxHdrConfig: HdrConfiguration, + mailboxSizeHdrConfig: HdrConfiguration) { + + val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits) + val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits) + val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits) + + def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime) + + def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime) + + def snapshot(): HdrActorMetricsSnapshot = { + HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy()) + } + + def reset(): Unit = { + processingTimeHistogram.reset() + timeInMailboxHistogram.reset() + mailboxSizeHistogram.reset() + } +} + +case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram, + mailboxSizeHistogram: AbstractHistogram) + +class ActorMetricsDispatcher extends Actor { + val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty + var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty + var lastTick = System.currentTimeMillis() + + def receive = { + case Subscribe(path, true) ⇒ subscribeForever(path, sender) + case Subscribe(path, false) ⇒ subscribeOneOff(path, sender) + case FlushMetrics ⇒ flushMetrics() + } + + def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever) + + def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne) + + def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = { + val pathFilter = new GlobPathFilter(path) + val oldReceivers = target.get(pathFilter).getOrElse(Nil) + target.updated(pathFilter, receiver :: oldReceivers) + } + + def flushMetrics(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(ActorMetrics)(context.system).actorMetrics.map { + case (path, metrics) ⇒ + val snapshot = metrics.snapshot() + metrics.reset() + + (path, snapshot) + }.toMap + + dispatchMetricsTo(subscribedForOne, snapshots, currentTick) + dispatchMetricsTo(subscribedForever, snapshots, currentTick) + + subscribedForOne = Map.empty + lastTick = currentTick + } + + def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot], + currentTick: Long): Unit = { + + for ((subscribedPath, receivers) ← subscribers) { + val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath)) + val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics) + + receivers.foreach(ref ⇒ ref ! actorMetrics) + } + } +} + +object ActorMetricsDispatcher { + case class Subscribe(path: String, forever: Boolean = false) + case class UnSubscribe(path: String) + + case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot]) + case object FlushMetrics +} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala new file mode 100644 index 00000000..d6359ead --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import scala.concurrent.duration._ +import com.typesafe.config.Config + +package object metrics { + val OneHour = 1.hour.toNanos + + case class HdrConfiguration(highestTrackableValue: Long, significantValueDigits: Int) + case object HdrConfiguration { + def fromConfig(config: Config): HdrConfiguration = { + HdrConfiguration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala new file mode 100644 index 00000000..0bc68ee7 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Segments.scala @@ -0,0 +1,38 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ + +package kamon.trace + +import kamon.trace.Trace.SegmentCompletionHandle + +object Segments { + + trait Category + case object HttpClientRequest extends Category + + case class Start(category: Category, description: String = "", + attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime()) + + case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime()) + + case class Segment(start: Start, end: End) + + trait SegmentCompletionHandleAware { + var completionHandle: Option[SegmentCompletionHandle] + } + + trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware +} diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala new file mode 100644 index 00000000..31e8185a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala @@ -0,0 +1,114 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace + +import kamon.Kamon +import akka.actor._ +import scala.Some +import kamon.trace.Trace.Register +import scala.concurrent.duration._ +import java.util.concurrent.atomic.AtomicLong + +object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: Extension] = Trace + def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) + + /*** Protocol */ + case object Register + + /** User API */ + //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) + private[trace] val traceContext = new ThreadLocal[Option[TraceContext]] { + override def initialValue(): Option[TraceContext] = None + } + private[trace] val tranid = new AtomicLong() + + def context() = traceContext.get + private def set(ctx: Option[TraceContext]) = traceContext.set(ctx) + + def clear: Unit = traceContext.remove() + def start(name: String)(implicit system: ActorSystem): TraceContext = { + val ctx = newTraceContext(name) + ctx.start(name) + set(Some(ctx)) + + ctx + } + + def withContext[T](ctx: Option[TraceContext])(thunk: ⇒ T): T = { + val oldval = context + set(ctx) + + try thunk + finally set(oldval) + } + + def transformContext(f: TraceContext ⇒ TraceContext): Unit = { + context.map(f).foreach(ctx ⇒ set(Some(ctx))) + } + + def finish(): Option[TraceContext] = { + val ctx = context() + ctx.map(_.finish) + clear + ctx + } + + // TODO: FIX + def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name) + + def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = { + val start = Segments.Start(category, description, attributes) + SegmentCompletionHandle(start) + } + + def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start) + + case class SegmentCompletionHandle(start: Segments.Start) { + def complete(): Unit = { + val end = Segments.End() + println(s"Completing the Segment: $start - $end") + } + def complete(end: Segments.End): Unit = { + println(s"Completing the Segment: $start - $end") + } + } +} + +class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") +} + +class TraceManager extends Actor with ActorLogging { + var listeners: Seq[ActorRef] = Seq.empty + + def receive = { + case Register ⇒ + listeners = sender +: listeners + log.info("Registered [{}] as listener for Kamon traces", sender) + + case segment: UowSegment ⇒ + val tracerName = segment.id.toString + context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment + + case trace: UowTrace ⇒ + listeners foreach (_ ! trace) + } + + def newTracer(name: String): ActorRef = { + context.actorOf(UowTraceAggregator.props(self, 30 seconds), name) + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala new file mode 100644 index 00000000..3e68a816 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -0,0 +1,51 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace + +import java.util.UUID +import akka.actor._ +import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.duration._ +import kamon.Kamon +import kamon.trace.UowTracing.{ Finish, Start } + +// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. +case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { + + def start(name: String) = { + collector ! Start(id, name) + } + + def finish: Unit = { + collector ! Finish(id) + } + +} + +trait ContextAware { + def traceContext: Option[TraceContext] +} + +object ContextAware { + def default: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } +} + +trait TimedContextAware { + def timestamp: Long + def traceContext: Option[TraceContext] +} diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..20cce830 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,82 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace + +import akka.actor._ +import scala.concurrent.duration.Duration +import kamon.trace.UowTracing._ + +sealed trait UowSegment { + def id: Long + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start(id: Long, name: String) extends AutoTimestamp + case class Finish(id: Long) extends AutoTimestamp + case class Rename(id: Long, name: String) extends AutoTimestamp + case class WebExternalStart(id: Long, host: String) extends AutoTimestamp + case class WebExternalFinish(id: Long) extends AutoTimestamp + case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp +} + +case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) { + def elapsed: Long = end - start +} + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { + context.setReceiveTimeout(aggregationTimeout) + + var name: String = "UNKNOWN" + var segments: Seq[UowSegment] = Nil + + var pendingExternal = List[WebExternalStart]() + + var start = 0L + var end = 0L + + def receive = { + case start: Start ⇒ + this.start = start.timestamp + segments = segments :+ start + name = start.name + case finish: Finish ⇒ + end = finish.timestamp + segments = segments :+ finish; finishTracing() + case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ { + segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) + }) + case Rename(id, newName) ⇒ name = newName + case segment: UowSegment ⇒ segments = segments :+ segment + case ReceiveTimeout ⇒ + log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) + context.stop(self) + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name, "", start, end, segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +} diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala new file mode 100644 index 00000000..add47fdf --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala @@ -0,0 +1,24 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.logging + +import ch.qos.logback.classic.pattern.ClassicConverter +import ch.qos.logback.classic.spi.ILoggingEvent +import kamon.trace.Trace + +class LogbackUowConverter extends ClassicConverter { + def convert(event: ILoggingEvent): String = Trace.context().map(_.uow).getOrElse("undefined") +} diff --git a/kamon-core/src/test/resources/application.conf b/kamon-core/src/test/resources/application.conf new file mode 100644 index 00000000..e8217fc2 --- /dev/null +++ b/kamon-core/src/test/resources/application.conf @@ -0,0 +1,3 @@ +akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] +} \ No newline at end of file diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml new file mode 100644 index 00000000..2ae1e3bd --- /dev/null +++ b/kamon-core/src/test/resources/logback.xml @@ -0,0 +1,12 @@ + + + + %date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n + + + + + + + + diff --git a/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala new file mode 100644 index 00000000..5108af25 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala @@ -0,0 +1,31 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon + +import org.scalatest.{ WordSpecLike, WordSpec } +import akka.testkit.TestKit +import akka.actor.{ Props, ActorSystem } + +class MailboxSizeMetricsSpec extends TestKit(ActorSystem("mailbox-size-metrics-spec")) with WordSpecLike { + + "the mailbox size metrics instrumentation" should { + "register a counter for mailbox size upon actor creation" in { + val target = system.actorOf(Props.empty, "sample") + + //Metrics.registry.getHistograms.get("akka://mailbox-size-metrics-spec/sample:MAILBOX") + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala new file mode 100644 index 00000000..ff08ca0f --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala @@ -0,0 +1,51 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.scalatest.{ WordSpecLike, Matchers, WordSpec } +import akka.testkit.TestKitBase +import akka.actor.{ Actor, Props, ActorSystem } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, Subscribe } +import scala.concurrent.duration._ + +class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics.actors.tracked = ["user/test*"] + """.stripMargin)) + + implicit def self = testActor + + lazy val metricsExtension = Kamon(ActorMetrics).metricsDispatcher + + "the Kamon actor metrics" should { + "track configured actors" in { + system.actorOf(Props[Other], "test-tracked-actor") ! "nothing" + metricsExtension ! Subscribe("user/test-tracked-actor") + + within(5 seconds) { + expectMsgType[ActorMetricsDispatcher.ActorMetricsSnapshot] + } + } + } +} + +class Other extends Actor { + def receive = { case a ⇒ } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala new file mode 100644 index 00000000..89742651 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala @@ -0,0 +1,51 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import akka.testkit.TestKit +import org.scalatest.{ Inspectors, Matchers, WordSpecLike } +import akka.actor.{ Props, ActorLogging, Actor, ActorSystem } +import akka.event.Logging.{ LogEvent } +import kamon.trace.{ ContextAware, TraceContext, Trace } + +class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors { + + "the ActorLogging instrumentation" should { + "attach the TraceContext (if available) to log events" in { + val testTraceContext = Some(TraceContext(Actor.noSender, 1)) + val loggerActor = system.actorOf(Props[LoggerActor]) + system.eventStream.subscribe(testActor, classOf[LogEvent]) + + Trace.withContext(testTraceContext) { + loggerActor ! "info" + } + + fishForMessage() { + case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ + val ctxInEvent = event.asInstanceOf[ContextAware].traceContext + ctxInEvent === testTraceContext + + case event: LogEvent ⇒ false + } + } + } +} + +class LoggerActor extends Actor with ActorLogging { + def receive = { + case "info" ⇒ log.info("TraceContext => {}", Trace.context()) + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala new file mode 100644 index 00000000..89251bf4 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala @@ -0,0 +1,84 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import org.scalatest.{ WordSpecLike, Matchers } +import akka.actor.{ ActorRef, Actor, Props, ActorSystem } + +import akka.testkit.{ ImplicitSender, TestKit } +import kamon.trace.Trace +import akka.pattern.{ pipe, ask } +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } +import akka.routing.RoundRobinRouter +import kamon.trace.TraceContext + +class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { + implicit val executionContext = system.dispatcher + + "the message passing instrumentation" should { + "propagate the TraceContext using bang" in new TraceContextEchoFixture { + Trace.withContext(testTraceContext) { + ctxEchoActor ! "test" + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using tell" in new TraceContextEchoFixture { + Trace.withContext(testTraceContext) { + ctxEchoActor.tell("test", testActor) + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + Trace.withContext(testTraceContext) { + // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. + (ctxEchoActor ? "test") pipeTo (testActor) + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture { + Trace.withContext(testTraceContext) { + ctxEchoActor ! "test" + } + + expectMsg(testTraceContext) + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Some(Trace.newTraceContext("")) + val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1))) + } +} + +class TraceContextEcho extends Actor { + def receive = { + case msg: String ⇒ sender ! Trace.context() + } +} + diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala new file mode 100644 index 00000000..7d539370 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala @@ -0,0 +1,165 @@ +package kamon.trace.instrumentation + +import akka.testkit.{ ImplicitSender, TestKit } +import akka.actor._ +import org.scalatest.WordSpecLike +import kamon.trace.Trace +import scala.util.control.NonFatal +import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume } +import scala.concurrent.duration._ + +class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { + implicit val executionContext = system.dispatcher + + "the system message passing instrumentation" should { + "keep the TraceContext while processing the Create message in top level actors" in new TraceContextFixture { + Trace.withContext(testTraceContext) { + system.actorOf(Props(new Actor { + + testActor ! Trace.context() + + def receive: Actor.Receive = { case any ⇒ } + })) + } + + expectMsg(testTraceContext) + } + + "keep the TraceContext while processing the Create message in non top level actors" in new TraceContextFixture { + Trace.withContext(testTraceContext) { + system.actorOf(Props(new Actor { + def receive: Actor.Receive = { + case any ⇒ + context.actorOf(Props(new Actor { + + testActor ! Trace.context() + + def receive: Actor.Receive = { case any ⇒ } + })) + } + })) ! "any" + } + + expectMsg(testTraceContext) + } + + "keep the TraceContext in the supervision cycle" when { + "the actor is resumed" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Resume) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + + // Ensure we didn't tie the actor with the context + supervisor ! "context" + expectMsg(None) + } + + "the actor is restarted" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the preRestart hook + expectMsg(testTraceContext) // From the postRestart hook + + // Ensure we didn't tie the actor with the context + supervisor ! "context" + expectMsg(None) + } + + "the actor is stopped" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Stop, sendPostStop = true) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the postStop hook + expectNoMsg(1 second) + } + + "the failure is escalated" in new TraceContextFixture { + val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) + + Trace.withContext(testTraceContext) { + supervisor ! "fail" + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the grandparent executing the supervision strategy + expectMsg(testTraceContext) // From the postStop hook in the child + expectMsg(testTraceContext) // From the postStop hook in the parent + expectNoMsg(1 second) + } + } + } + + def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, + sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { + class GrandParent extends Actor { + val child = context.actorOf(Props(new Parent)) + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ testActor ! Trace.context(); Stop + } + + def receive = { + case any ⇒ child forward any + } + } + + class Parent extends Actor { + val child = context.actorOf(Props(new Child)) + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ testActor ! Trace.context(); directive + } + + def receive: Actor.Receive = { + case any ⇒ child forward any + } + + override def postStop(): Unit = { + if (sendPostStop) testActor ! Trace.context() + super.postStop() + } + } + + class Child extends Actor { + def receive = { + case "fail" ⇒ 1 / 0 + case "context" ⇒ sender ! Trace.context() + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + if (sendPreRestart) testActor ! Trace.context() + super.preRestart(reason, message) + } + + override def postRestart(reason: Throwable): Unit = { + if (sendPostRestart) testActor ! Trace.context() + super.postRestart(reason) + } + + override def postStop(): Unit = { + if (sendPostStop) testActor ! Trace.context() + super.postStop() + } + + override def preStart(): Unit = { + if (sendPreStart) testActor ! Trace.context() + super.preStart() + } + } + + system.actorOf(Props(new GrandParent)) + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala new file mode 100644 index 00000000..9df67391 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala @@ -0,0 +1,59 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import akka.testkit.TestKit +import akka.actor.{ Props, Actor, ActorSystem } +import org.scalatest.{ Matchers, WordSpecLike } +import akka.event.Logging.Warning +import scala.concurrent.duration._ +import akka.pattern.ask +import akka.util.Timeout +import kamon.trace.{ Trace, ContextAware } +import org.scalatest.OptionValues._ + +class AskPatternTracingSpec extends TestKit(ActorSystem("ask-pattern-tracing-spec")) with WordSpecLike with Matchers { + + "the AskPatternTracing" should { + "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in new TraceContextFixture { + implicit val ec = system.dispatcher + implicit val timeout = Timeout(10 milliseconds) + val noReply = system.actorOf(Props[NoReply]) + system.eventStream.subscribe(testActor, classOf[Warning]) + + within(500 milliseconds) { + val initialCtx = Trace.withContext(testTraceContext) { + noReply ? "hello" + Trace.context() + } + + val warn = expectMsgPF() { + case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn + } + val capturedCtx = warn.asInstanceOf[ContextAware].traceContext + + capturedCtx should be('defined) + capturedCtx should equal(initialCtx) + } + } + } +} + +class NoReply extends Actor { + def receive = { + case any ⇒ + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala new file mode 100644 index 00000000..a5554836 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala @@ -0,0 +1,62 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import scala.concurrent.{ ExecutionContext, Await, Promise, Future } +import org.scalatest.{ Matchers, OptionValues, WordSpec } +import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration } +import java.util.UUID +import scala.util.{ Random, Success } +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import akka.actor.{ Actor, ActorSystem } +import kamon.trace.{ Trace, TraceContext } + +class FutureTracingSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { + + implicit val execContext = ExecutionContext.Implicits.global + + "a Future created with FutureTracing" should { + "capture the TraceContext available when created" which { + "must be available when executing the future's body" in new TraceContextFixture { + var future: Future[Option[TraceContext]] = _ + + Trace.withContext(testTraceContext) { + future = Future(Trace.context) + } + + whenReady(future)(ctxInFuture ⇒ + ctxInFuture should equal(testTraceContext)) + } + + "must be available when executing callbacks on the future" in new TraceContextFixture { + var future: Future[Option[TraceContext]] = _ + + Trace.withContext(testTraceContext) { + future = Future("Hello Kamon!") + // The TraceContext is expected to be available during all intermediate processing. + .map(_.length) + .flatMap(len ⇒ Future(len.toString)) + .map(s ⇒ Trace.context()) + } + + whenReady(future)(ctxInFuture ⇒ + ctxInFuture should equal(testTraceContext)) + } + } + } +} + diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala new file mode 100644 index 00000000..3b32f3ac --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala @@ -0,0 +1,51 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.instrumentation + +import org.scalatest.{ WordSpecLike, WordSpec } +import akka.testkit.{ TestKitBase, TestKit } +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.trace.UowTracing.{ Finish, Rename, Start } +import kamon.trace.{ UowTrace, UowTraceAggregator } + +class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { + + "a TraceAggregator" should { + "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1, "/accounts") + aggregator ! Finish(1) + + //expectMsg(UowTrace("UNKNOWN", Seq(Start(1, "/accounts"), Finish(1)))) + } + } + + "change the uow name after receiving a Rename message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1, "/accounts") + aggregator ! Rename(1, "test-uow") + aggregator ! Finish(1) + + //expectMsg(UowTrace("test-uow", Seq(Start(1, "/accounts"), Finish(1)))) + } + } + } + + trait AggregatorFixture { + val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) + } +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala new file mode 100644 index 00000000..62f7ec84 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala @@ -0,0 +1,10 @@ +package kamon.trace.instrumentation + +import scala.util.Random +import kamon.trace.TraceContext +import akka.actor.Actor + +trait TraceContextFixture { + val random = new Random(System.nanoTime) + val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt)) +} \ No newline at end of file -- cgit v1.2.3