diff options
47 files changed, 538 insertions, 401 deletions
diff --git a/kamon-core/src/main/java/kamon/util/GlobPathFilter.java b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java new file mode 100644 index 00000000..5b019bec --- /dev/null +++ b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java @@ -0,0 +1,137 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + + +// This file was copied from: https://github.com/jboss-modules/jboss-modules/blob/master/src/main/java/org/jboss/modules/filter/GlobPathFilter.java +package kamon.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** +* Default implementation of PathFilter. Uses glob based includes and excludes to determine whether to export. +* +* @author John E. Bailey +* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a> +*/ +public final class GlobPathFilter { + private static final Pattern GLOB_PATTERN = Pattern.compile("(\\*\\*?)|(\\?)|(\\\\.)|(/+)|([^*?]+)"); + + private final String glob; + private final Pattern pattern; + + /** +* Construct a new instance. +* +* @param glob the path glob to match +*/ + public GlobPathFilter(final String glob) { + pattern = getGlobPattern(glob); + this.glob = glob; + } + + /** +* Determine whether a path should be accepted. +* +* @param path the path to check +* @return true if the path should be accepted, false if not +*/ + public boolean accept(final String path) { + return pattern.matcher(path).matches(); + } + + /** + * Get a regular expression pattern which accept any path names which match the given glob. The glob patterns + * function similarly to {@code ant} file patterns. Valid metacharacters in the glob pattern include: + * <ul> + * <li><code>"\"</code> - escape the next character (treat it literally, even if it is itself a recognized metacharacter)</li> + * <li><code>"?"</code> - match any non-slash character</li> + * <li><code>"*"</code> - match zero or more non-slash characters</li> + * <li><code>"**"</code> - match zero or more characters, including slashes</li> + * <li><code>"/"</code> - match one or more slash characters. Consecutive {@code /} characters are collapsed down into one.</li> + * </ul> + * In addition, any glob pattern matches all subdirectories thereof. A glob pattern ending in {@code /} is equivalent + * to a glob pattern ending in <code>/**</code> in that the named directory is not itself included in the glob. + * <p/> + * <b>See also:</b> <a href="http://ant.apache.org/manual/dirtasks.html#patterns">"Patterns" in the Ant Manual</a> + * + * @param glob the glob to match + * + * @return the pattern + */ + private static Pattern getGlobPattern(final String glob) { + StringBuilder patternBuilder = new StringBuilder(); + final Matcher m = GLOB_PATTERN.matcher(glob); + boolean lastWasSlash = false; + while (m.find()) { + lastWasSlash = false; + String grp; + if ((grp = m.group(1)) != null) { + // match a * or ** + if (grp.length() == 2) { + // it's a ** + patternBuilder.append(".*"); + } else { + // it's a * + patternBuilder.append("[^/]*"); + } + } else if ((grp = m.group(2)) != null) { + // match a '?' glob pattern; any non-slash character + patternBuilder.append("[^/]"); + } else if ((grp = m.group(3)) != null) { + // backslash-escaped value + patternBuilder.append(Pattern.quote(m.group().substring(1))); + } else if ((grp = m.group(4)) != null) { + // match any number of / chars + patternBuilder.append("/+"); + lastWasSlash = true; + } else { + // some other string + patternBuilder.append(Pattern.quote(m.group())); + } + } + if (lastWasSlash) { + // ends in /, append ** + patternBuilder.append(".*"); + } else { + patternBuilder.append("(?:/.*)?"); + } + return Pattern.compile(patternBuilder.toString()); + } + + public int hashCode() { + return glob.hashCode() + 13; + } + + public boolean equals(final Object obj) { + return obj instanceof GlobPathFilter && equals((GlobPathFilter) obj); + } + + public boolean equals(final GlobPathFilter obj) { + return obj != null && obj.pattern.equals(pattern); + } + + public String toString() { + final StringBuilder b = new StringBuilder(); + b.append("match "); + if (glob != null) { + b.append('"').append(glob).append('"'); + } else { + b.append('/').append(pattern).append('/'); + } + return b.toString(); + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index f6951705..1448f22f 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -2,17 +2,30 @@ <aspectj> <weaver options="-verbose -showWeaveInfo"> - <!--<dump within="*"/>--> + <!-- In case you want to see the weaved classes --> + <!--<dump within="*"/>--> </weaver> <aspects> - <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> + <!-- Actors --> + <aspect name="akka.instrumentation.RepointableActorRefTraceContextMixin"/> + <aspect name="akka.instrumentation.SystemMessageTraceContextMixin"/> + <aspect name="akka.instrumentation.ActorSystemMessagePassingTracing"/> + <aspect name="akka.instrumentation.EnvelopeTraceContextMixin"/> + <aspect name="akka.instrumentation.BehaviourInvokeTracing"/> + <aspect name="kamon.instrumentation.ActorLoggingTracing"/> + + <!-- Futures --> + <aspect name="kamon.instrumentation.FutureTracing"/> + + <!-- Patterns --> + <aspect name="akka.instrumentation.AskPatternTracing"/> + - <!--<exclude within="*"/>--> <include within="scala.concurrent..*"/> <include within="akka..*"/> <include within="spray..*"/> <include within="kamon..*"/> </aspects> -</aspectj> +</aspectj>
\ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf new file mode 100644 index 00000000..11e7cbb4 --- /dev/null +++ b/kamon-core/src/main/resources/reference.conf @@ -0,0 +1,30 @@ +kamon { + metrics { + tick-interval = 1 second + + actors { + tracked = [] + + excluded = [ "system/*", "user/IO-*" ] + + hdr-settings { + processing-time { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + time-in-mailbox { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + mailbox-size { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + } + } + } + + trace { + ask-pattern-tracing = off + } +}
\ No newline at end of file diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 399ddf61..6cede344 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -13,32 +13,59 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ -package kamon.trace.instrumentation +package akka.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ Props, ActorSystem, ActorRef } +import akka.actor.{ Cell, Props, ActorSystem, ActorRef } import akka.dispatch.{ Envelope, MessageDispatcher } -import com.codahale.metrics.Timer -import kamon.trace.{ ContextAware, TraceContext, Trace } +import kamon.trace.{ TraceContext, ContextAware, Trace } +import kamon.metrics.{ HdrActorMetricsRecorder, ActorMetrics } +import kamon.Kamon -@Aspect +@Aspect("perthis(actorCellCreation(*, *, *, *, *))") class BehaviourInvokeTracing { + var path: Option[String] = None + var actorMetrics: Option[HdrActorMetricsRecorder] = None @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val metricsExtension = Kamon(ActorMetrics)(system) + val simplePathString = ref.path.elements.mkString("/") + + if (metricsExtension.shouldTrackActor(simplePathString)) { + path = Some(ref.path.toString) + actorMetrics = Some(metricsExtension.registerActor(simplePathString)) + } + } + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - //safe cast - val ctxInMessage = envelope.asInstanceOf[ContextAware].traceContext + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware] - Trace.withContext(ctxInMessage) { + Trace.withContext(contextAndTimestamp.traceContext) { pjp.proceed() } + + actorMetrics.map { am ⇒ + am.recordProcessingTime(System.nanoTime() - timestampBeforeProcessing) + am.recordTimeInMailbox(timestampBeforeProcessing - contextAndTimestamp.timestamp) + } + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: Cell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: Cell): Unit = { + path.map(p ⇒ Kamon(ActorMetrics)(cell.system).unregisterActor(p)) } } @@ -46,7 +73,10 @@ class BehaviourInvokeTracing { class EnvelopeTraceContextMixin { @DeclareMixin("akka.dispatch.Envelope") - def mixin: ContextAware = ContextAware.default + def mixin: ContextAndTimestampAware = new ContextAndTimestampAware { + val traceContext: Option[TraceContext] = Trace.context() + val timestamp: Long = System.nanoTime() + } @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") def envelopeCreation(ctx: ContextAware): Unit = {} @@ -57,3 +87,7 @@ class EnvelopeTraceContextMixin { ctx.traceContext } } + +trait ContextAndTimestampAware extends ContextAware { + def timestamp: Long +} diff --git a/kamon-trace/src/main/scala/akka/actor/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala index 6a82782c..7d26016e 100644 --- a/kamon-trace/src/main/scala/akka/actor/ActorSystemMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala @@ -1,4 +1,4 @@ -package akka.actor +package akka.instrumentation import org.aspectj.lang.annotation._ import kamon.trace.{ Trace, ContextAware } diff --git a/kamon-trace/src/main/scala/akka/pattern/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala index 970a4a51..b5b23e61 100644 --- a/kamon-trace/src/main/scala/akka/pattern/AskPatternTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala @@ -13,12 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ -package akka.pattern +package akka.instrumentation import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } import akka.event.Logging.Warning import scala.compat.Platform.EOL import akka.actor.ActorRefProvider +import akka.pattern.{ AskTimeoutException, PromiseActorRef } @Aspect class AskPatternTracing { diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index b5c3d552..b72e8fea 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -18,10 +18,8 @@ package kamon import akka.actor._ object Kamon { - trait Extension extends akka.actor.Extension { - def manager: ActorRef - } + trait Extension extends akka.actor.Extension - def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager + def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system) } diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala index 0d06eb2c..297017cf 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ -package kamon.trace.instrumentation +package kamon.instrumentation import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import org.aspectj.lang.ProceedingJoinPoint diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala deleted file mode 100644 index 252e5220..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import org.aspectj.lang.ProceedingJoinPoint - -trait ProceedingJoinPointPimp { - import language.implicitConversions - - implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) -} - -object ProceedingJoinPointPimp extends ProceedingJoinPointPimp - -case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { - def proceedWith(newUniqueArg: AnyRef) = { - val args = pjp.getArgs - args.update(0, newUniqueArg) - pjp.proceed(args) - } - - def proceedWithTarget(args: AnyRef*) = { - pjp.proceed(args.toArray) - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index a3da76f7..90d2b270 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -19,10 +19,8 @@ import org.aspectj.lang.annotation._ import java.util.concurrent._ import org.aspectj.lang.ProceedingJoinPoint import java.util -import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector } import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory } import com.typesafe.config.Config -import kamon.Kamon import scala.concurrent.forkjoin.ForkJoinPool import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool @@ -41,8 +39,8 @@ class ActorSystemInstrumentation { @Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") class ForkJoinPoolInstrumentation { - var activeThreadsHistogram: Histogram = _ - var poolSizeHistogram: Histogram = _ + /* var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _*/ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} @@ -71,8 +69,8 @@ class ForkJoinPoolInstrumentation { @After("forkJoinScan(fjp)") def updateMetrics(fjp: AkkaForkJoinPool): Unit = { - activeThreadsHistogram.update(fjp.getActiveThreadCount) - poolSizeHistogram.update(fjp.getPoolSize) + /*activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize)*/ } } @@ -90,6 +88,7 @@ trait WatchedExecutorService { def collector: ExecutorServiceCollector } +/* trait ExecutorServiceMonitoring { def dispatcherMetrics: DispatcherMetricCollector } @@ -97,6 +96,7 @@ trait ExecutorServiceMonitoring { class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { @volatile var dispatcherMetrics: DispatcherMetricCollector = _ } +*/ case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = delegate.createExecutorService @@ -133,9 +133,9 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { val delegate = pjp.proceed().asInstanceOf[ExecutorService] - val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - ExecutorServiceMetricCollector.register(executorFullName, delegate) + //ExecutorServiceMetricCollector.register(executorFullName, delegate) new NamedExecutorServiceDelegate(executorFullName, delegate) } @@ -143,7 +143,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { def shutdown() = { - ExecutorServiceMetricCollector.deregister(fullName) + //ExecutorServiceMetricCollector.deregister(fullName) delegate.shutdown() } def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala index 844f1d61..5600d582 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/FutureTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ -package kamon.trace.instrumentation +package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index da797fa1..44eb8c43 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -15,17 +15,16 @@ * ========================================================== */ package kamon.instrumentation -import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram } import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue } import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import akka.actor.{ ActorSystem, ActorRef } -import kamon.metric.{ Metrics, MetricDirectory } import org.aspectj.lang.ProceedingJoinPoint /** * For Mailboxes we would like to track the queue size and message latency. Currently the latency * will be gathered from the ActorCellMetrics. */ +/* @Aspect class MessageQueueInstrumentation { @@ -74,4 +73,5 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def hasMessages: Boolean = delegate.hasMessages def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } +*/ diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala deleted file mode 100644 index 4c4b93e9..00000000 --- a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import java.util.concurrent.{ ThreadPoolExecutor, ExecutorService } -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.{ Metric, MetricFilter } - -object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { - - def register(fullName: String, executorService: ExecutorService) = executorService match { - case fjp: ForkJoinPool ⇒ registerForkJoinPool(fullName, fjp) - case tpe: ThreadPoolExecutor ⇒ registerThreadPoolExecutor(fullName, tpe) - case _ ⇒ // If it is a unknown Executor then just do nothing. - } - - def deregister(fullName: String) = { - Metrics.registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -trait ForkJoinPoolMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { - val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ - - val allMetrics = Map( - fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), - fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), - fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)) - - allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } - } -} - -trait ThreadPoolExecutorMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { - val tpeGauge = newNumericGaugeFor(tpe) _ - - val allMetrics = Map( - fullName + queueSize -> tpeGauge(_.getQueue.size()), - fullName + poolSize -> tpeGauge(_.getPoolSize), - fullName + activeThreads -> tpeGauge(_.getActiveCount)) - - allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } - } -} - -object BasicExecutorMetricNames { - val queueSize = "queueSize" - val poolSize = "threads/poolSize" - val activeThreads = "threads/activeThreads" -} - diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala deleted file mode 100644 index 9eff2739..00000000 --- a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import com.codahale.metrics.Gauge - -trait GaugeGenerator { - - def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T ⇒ V) = new Gauge[V] { - def getValue: V = generator(target) - } -} - -object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala deleted file mode 100644 index b904ec56..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.metric - -import java.util.concurrent.TimeUnit -import akka.actor.ActorRef -import com.codahale.metrics -import com.codahale.metrics.{ MetricFilter, Metric, ConsoleReporter, MetricRegistry } -import scala.collection.concurrent.TrieMap - -object Metrics { - val registry: MetricRegistry = new MetricRegistry - - val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) - //consoleReporter.build().start(45, TimeUnit.SECONDS) - - //val newrelicReporter = NewRelicReporter(registry) - //newrelicReporter.start(5, TimeUnit.SECONDS) - - def include(name: String, metric: Metric) = { - //registry.register(name, metric) - } - - def exclude(name: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(name) - }) - } - - def deregister(fullName: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -object Watched { - case object Actor - case object Dispatcher -} - -object MetricDirectory { - def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" - - def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" - - def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") - - def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") - - def shouldInstrumentActor(actorPath: String): Boolean = { - !(actorPath.isEmpty || actorPath.startsWith("system")) - } - -} - -case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) - -trait Histogram { - def update(value: Long): Unit - def snapshot: HistogramSnapshot -} - -trait HistogramSnapshot { - def median: Double - def max: Double - def min: Double -} - -case class ActorSystemMetrics(actorSystemName: String) { - val dispatchers = TrieMap.empty[String, DispatcherMetricCollector] - - private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) - - def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { - val stats = createDispatcherCollector - dispatchers.put(dispatcherName, stats) - Some(stats) - } - -} - -case class CodahaleHistogram() extends Histogram { - private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) - - def update(value: Long) = histogram.update(value) - def snapshot: HistogramSnapshot = { - val snapshot = histogram.getSnapshot - - CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) - } -} - -case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot - -/** - * Dispatcher Metrics that we care about currently with a histogram-like nature: - * - Work Queue Size - * - Total/Active Thread Count - */ - -import annotation.tailrec -import java.util.concurrent.atomic.AtomicReference - -object Atomic { - def apply[T](obj: T) = new Atomic(new AtomicReference(obj)) - implicit def toAtomic[T](ref: AtomicReference[T]): Atomic[T] = new Atomic(ref) -} - -class Atomic[T](val atomic: AtomicReference[T]) { - @tailrec - final def update(f: T ⇒ T): T = { - val oldValue = atomic.get() - val newValue = f(oldValue) - if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) - } - - def get() = atomic.get() -}
\ No newline at end of file diff --git a/kamon-metrics/src/main/scala/kamon/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index c7d57f13..72e473e8 100644 --- a/kamon-metrics/src/main/scala/kamon/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -13,21 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ -package kamon -import com.codahale.metrics.MetricRegistry -import com.typesafe.config.ConfigFactory -import akka.actor.{ ActorRef, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } -import kamon.Kamon.Extension +package kamon.metrics + +import akka.actor.{ Props, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } import akka.actor +import kamon.Kamon -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) +object ActorMetrics extends ExtensionId[ActorMetricsExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = ActorMetrics + def createExtension(system: ExtendedActorSystem): ActorMetricsExtension = new ActorMetricsExtension(system) } -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - def manager: ActorRef = ??? +class ActorMetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension with ActorMetricsOps { + lazy val metricsDispatcher = system.actorOf(Props[ActorMetricsDispatcher], "kamon-actor-metrics") } - diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala new file mode 100644 index 00000000..dc4abde0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala @@ -0,0 +1,148 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.HdrHistogram.{ AbstractHistogram, AtomicHistogram } +import kamon.util.GlobPathFilter +import scala.collection.concurrent.TrieMap +import scala.collection.JavaConversions.iterableAsScalaIterable +import akka.actor._ +import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics } +import kamon.Kamon +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import kamon.metrics.ActorMetricsDispatcher.Subscribe + +trait ActorMetricsOps { + self: ActorMetricsExtension ⇒ + + val config = system.settings.config.getConfig("kamon.metrics.actors") + val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]() + + val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector + val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector + + val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = { + val settings = config.getConfig("hdr-settings") + val processingTimeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("processing-time")) + val timeInMailboxHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("time-in-mailbox")) + val mailboxSizeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("mailbox-size")) + + () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig) + } + + import scala.concurrent.duration._ + system.scheduler.schedule(0.seconds, 10.seconds)( + actorMetrics.collect { + case (name, recorder: HdrActorMetricsRecorder) ⇒ + println(s"Actor: $name") + recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D) + })(system.dispatcher) + + def shouldTrackActor(path: String): Boolean = + trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path)) + + def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory()) + + def unregisterActor(path: String): Unit = actorMetrics.remove(path) +} + +class HdrActorMetricsRecorder(processingTimeHdrConfig: HdrConfiguration, timeInMailboxHdrConfig: HdrConfiguration, + mailboxSizeHdrConfig: HdrConfiguration) { + + val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits) + val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits) + val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits) + + def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime) + + def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime) + + def snapshot(): HdrActorMetricsSnapshot = { + HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy()) + } + + def reset(): Unit = { + processingTimeHistogram.reset() + timeInMailboxHistogram.reset() + mailboxSizeHistogram.reset() + } +} + +case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram, + mailboxSizeHistogram: AbstractHistogram) + +class ActorMetricsDispatcher extends Actor { + val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty + var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty + var lastTick = System.currentTimeMillis() + + def receive = { + case Subscribe(path, true) ⇒ subscribeForever(path, sender) + case Subscribe(path, false) ⇒ subscribeOneOff(path, sender) + case FlushMetrics ⇒ flushMetrics() + } + + def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever) + + def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne) + + def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = { + val pathFilter = new GlobPathFilter(path) + val oldReceivers = target.get(pathFilter).getOrElse(Nil) + target.updated(pathFilter, receiver :: oldReceivers) + } + + def flushMetrics(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(ActorMetrics)(context.system).actorMetrics.map { + case (path, metrics) ⇒ + val snapshot = metrics.snapshot() + metrics.reset() + + (path, snapshot) + }.toMap + + dispatchMetricsTo(subscribedForOne, snapshots, currentTick) + dispatchMetricsTo(subscribedForever, snapshots, currentTick) + + subscribedForOne = Map.empty + lastTick = currentTick + } + + def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot], + currentTick: Long): Unit = { + + for ((subscribedPath, receivers) ← subscribers) { + val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath)) + val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics) + + receivers.foreach(ref ⇒ ref ! actorMetrics) + } + } +} + +object ActorMetricsDispatcher { + case class Subscribe(path: String, forever: Boolean = false) + case class UnSubscribe(path: String) + + case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot]) + case object FlushMetrics +} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala new file mode 100644 index 00000000..d6359ead --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import scala.concurrent.duration._ +import com.typesafe.config.Config + +package object metrics { + val OneHour = 1.hour.toNanos + + case class HdrConfiguration(highestTrackableValue: Long, significantValueDigits: Int) + case object HdrConfiguration { + def fromConfig(config: Config): HdrConfiguration = { + HdrConfiguration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + } + } +} diff --git a/kamon-trace/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala index 0bc68ee7..0bc68ee7 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Segments.scala +++ b/kamon-core/src/main/scala/kamon/trace/Segments.scala diff --git a/kamon-trace/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala index 0985f547..31e8185a 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Trace.scala +++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala @@ -68,7 +68,7 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } // TODO: FIX - def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace), tranid.getAndIncrement, name) + def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name) def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = { val start = Segments.Start(category, description, attributes) @@ -89,7 +89,7 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val manager: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") + val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") } class TraceManager extends Actor with ActorLogging { diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 3698cea1..3e68a816 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -25,7 +25,9 @@ import kamon.trace.UowTracing.{ Finish, Start } // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { - def start(name: String) = collector ! Start(id, name) + def start(name: String) = { + collector ! Start(id, name) + } def finish: Unit = { collector ! Finish(id) diff --git a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala index 20cce830..20cce830 100644 --- a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala diff --git a/kamon-trace/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala index add47fdf..add47fdf 100644 --- a/kamon-trace/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala diff --git a/kamon-trace/src/test/resources/application.conf b/kamon-core/src/test/resources/application.conf index e8217fc2..e8217fc2 100644 --- a/kamon-trace/src/test/resources/application.conf +++ b/kamon-core/src/test/resources/application.conf diff --git a/kamon-trace/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml index 2ae1e3bd..2ae1e3bd 100644 --- a/kamon-trace/src/test/resources/logback.xml +++ b/kamon-core/src/test/resources/logback.xml diff --git a/kamon-metrics/src/test/scala/kamon/MailboxSizeMetricsSpec.scala b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala index 5108af25..5108af25 100644 --- a/kamon-metrics/src/test/scala/kamon/MailboxSizeMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/MailboxSizeMetricsSpec.scala diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala new file mode 100644 index 00000000..ff08ca0f --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala @@ -0,0 +1,51 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.scalatest.{ WordSpecLike, Matchers, WordSpec } +import akka.testkit.TestKitBase +import akka.actor.{ Actor, Props, ActorSystem } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, Subscribe } +import scala.concurrent.duration._ + +class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics.actors.tracked = ["user/test*"] + """.stripMargin)) + + implicit def self = testActor + + lazy val metricsExtension = Kamon(ActorMetrics).metricsDispatcher + + "the Kamon actor metrics" should { + "track configured actors" in { + system.actorOf(Props[Other], "test-tracked-actor") ! "nothing" + metricsExtension ! Subscribe("user/test-tracked-actor") + + within(5 seconds) { + expectMsgType[ActorMetricsDispatcher.ActorMetricsSnapshot] + } + } + } +} + +class Other extends Actor { + def receive = { case a ⇒ } +} diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala index 89742651..89742651 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala index 89251bf4..89251bf4 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala index 7d539370..7d539370 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala index 9df67391..9df67391 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala index a5554836..a5554836 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala index 3b32f3ac..3b32f3ac 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala diff --git a/kamon-trace/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala index 62f7ec84..62f7ec84 100644 --- a/kamon-trace/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala +++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala index d092a947..f02bd31f 100644 --- a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala +++ b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala @@ -21,10 +21,8 @@ import spray.routing.directives.LogEntry import akka.event.Logging import spray.http.MediaTypes._ import spray.httpx.SprayJsonSupport -import kamon.Kamon import spray.http.HttpRequest import akka.actor.OneForOneStrategy -import com.codahale.metrics.{ Metric, MetricFilter } class DashboardServiceActor extends Actor with DashboardService { diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index f933bee4..668e29f7 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -27,13 +27,13 @@ class NewRelic extends ExtensionId[NewRelicExtension] { } class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") + val api: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") } class NewRelicManager extends Actor with ActorLogging { log.info("Registering the Kamon(NewRelic) extension") - Kamon(Trace)(context.system) ! Trace.Register + Kamon(Trace)(context.system).api ! Trace.Register val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics") val agent = context.actorOf(Props[Agent], "agent") diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index c6d87769..ed787332 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -18,7 +18,7 @@ package kamon.newrelic import akka.actor.Actor import akka.event.Logging.Error import akka.event.Logging.{ LoggerInitialized, InitializeLogger } -import com.newrelic.api.agent.NewRelic +import com.newrelic.api.agent.{ NewRelic ⇒ NR } import kamon.trace.ContextAware class NewRelicErrorLogger extends Actor { @@ -36,6 +36,6 @@ class NewRelicErrorLogger extends Actor { params.put("UOW", c.uow) } - NewRelic.noticeError(error.cause, params) + NR.noticeError(error.cause, params) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 000e6286..4e3d0d8d 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -17,7 +17,7 @@ package kamon.newrelic import akka.actor.Actor import kamon.trace.UowTrace -import com.newrelic.api.agent.NewRelic +import com.newrelic.api.agent.{ NewRelic ⇒ NR } import kamon.trace.UowTracing.WebExternal class NewRelicReporting extends Actor { @@ -28,16 +28,16 @@ class NewRelicReporting extends Actor { def recordTransaction(uowTrace: UowTrace): Unit = { val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9) - NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) - NewRelic.recordMetric("WebTransaction", time.toFloat) - NewRelic.recordMetric("HttpDispatcher", time.toFloat) + NR.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) + NR.recordMetric("WebTransaction", time.toFloat) + NR.recordMetric("HttpDispatcher", time.toFloat) uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒ val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) + NR.recordMetric(s"External/${webExternalTrace.host}/http", external) + NR.recordMetric(s"External/${webExternalTrace.host}/all", external) + NR.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) } val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp) @@ -53,8 +53,8 @@ class NewRelicReporting extends Actor { val external = measureExternal(0, 0, allExternals) / 1E9 - NewRelic.recordMetric(s"External/all", external.toFloat) - NewRelic.recordMetric(s"External/allWeb", external.toFloat) + NR.recordMetric(s"External/all", external.toFloat) + NR.recordMetric(s"External/allWeb", external.toFloat) } } diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 88fe8e8d..88a77566 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,9 +1,21 @@ akka { loggers = [ "akka.event.slf4j.Slf4jLogger" ] - loglevel = DEBUG + loglevel = INFO actor { debug { unhandled = on } } +} + + +kamon { + metrics { + actors { + tracked = [ + "user/simple-service-actor", + "other" + ] + } + } }
\ No newline at end of file diff --git a/kamon-playground/src/main/resources/logback.xml b/kamon-playground/src/main/resources/logback.xml index 2ae1e3bd..e948eaf5 100644 --- a/kamon-playground/src/main/resources/logback.xml +++ b/kamon-playground/src/main/resources/logback.xml @@ -1,7 +1,8 @@ <configuration scan="true"> + <conversionRule conversionWord="uow" converterClass="kamon.trace.logging.LogbackUowConverter" /> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>%date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n</pattern> + <pattern>%date{HH:mm:ss.SSS} %-5level [%uow][%X{akkaSource}] [%thread] %logger{55} - %msg%n</pattern> </encoder> </appender> diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 333f5031..868a1921 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -24,6 +24,7 @@ import kamon.spray.UowDirectives import kamon.trace.Trace import kamon.Kamon import scala.util.Random +import akka.routing.RoundRobinRouter object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { import scala.concurrent.duration._ @@ -34,7 +35,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil import system.dispatcher val act = system.actorOf(Props(new Actor { - println("Initializing from: " + (new Throwable).getStackTraceString) def receive: Actor.Receive = { case any ⇒ sender ! any } }), "com") @@ -43,7 +43,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil implicit val timeout = Timeout(30 seconds) val pipeline = sendReceive - val replier = system.actorOf(Props[Replier]) + val replier = system.actorOf(Props[Replier].withRouter(RoundRobinRouter(nrOfInstances = 2)), "replier") val random = new Random() startServer(interface = "localhost", port = 9090) { get { @@ -83,6 +83,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil complete(Future { "OK" }) } } ~ + path("kill") { + dynamic { + replier ! PoisonPill + complete(Future { "OK" }) + } + } ~ path("error") { complete { throw new NullPointerException @@ -121,7 +127,7 @@ class Replier extends Actor with ActorLogging { if (Trace.context.isEmpty) log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") - log.info("Processing at the Replier") + log.info("Processing at the Replier, and self is: {}", self) sender ! anything } } diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala index cd6d105d..44ed3baf 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala @@ -70,7 +70,7 @@ class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-traci trait TestServer extends SimpleRoutingApp { self: TestKit ⇒ - Kamon(Trace).tell(Trace.Register, testActor) + Kamon(Trace).api.tell(Trace.Register, testActor) implicit val timeout = Timeout(20 seconds) val port: Int = Await.result( diff --git a/kamon-trace/src/main/resources/META-INF/aop.xml b/kamon-trace/src/main/resources/META-INF/aop.xml deleted file mode 100644 index d9916724..00000000 --- a/kamon-trace/src/main/resources/META-INF/aop.xml +++ /dev/null @@ -1,26 +0,0 @@ -<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> - -<aspectj> - <aspects> - <!-- Actors --> - <aspect name="akka.actor.RepointableActorRefTraceContextMixin"/> - <aspect name="akka.actor.SystemMessageTraceContextMixin"/> - <aspect name="akka.actor.ActorSystemMessagePassingTracing"/> - <aspect name="kamon.trace.instrumentation.EnvelopeTraceContextMixin"/> - <aspect name="kamon.trace.instrumentation.BehaviourInvokeTracing"/> - <aspect name="kamon.trace.instrumentation.ActorLoggingTracing"/> - - <!-- Futures --> - <aspect name="kamon.trace.instrumentation.FutureTracing"/> - - <!-- Patterns --> - <aspect name="akka.pattern.AskPatternTracing"/> - - - <include within="scala.concurrent..*"/> - <include within="akka..*"/> - <include within="spray..*"/> - <include within="kamon..*"/> - </aspects> - -</aspectj> diff --git a/kamon-trace/src/main/resources/reference.conf b/kamon-trace/src/main/resources/reference.conf deleted file mode 100644 index 03f13f01..00000000 --- a/kamon-trace/src/main/resources/reference.conf +++ /dev/null @@ -1,3 +0,0 @@ -kamon.trace { - ask-pattern-tracing = off -}
\ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index 2b022ba6..c7a0fd5f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -9,7 +9,7 @@ object Build extends Build { import Dependencies._ lazy val root = Project("root", file(".")) - .aggregate(kamonCore, kamonTrace, kamonMetrics, kamonSpray, kamonNewrelic, kamonPlayground, kamonDashboard) + .aggregate(kamonCore, kamonSpray, kamonNewrelic, kamonPlayground, kamonDashboard) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -21,31 +21,9 @@ object Build extends Build { .settings(aspectJSettings: _*) .settings( libraryDependencies ++= - compile(akkaActor, aspectJ, metrics) ++ - test(scalatest, akkaTestKit)) - - - lazy val kamonTrace = Project("kamon-trace", file("kamon-trace")) - .settings(basicSettings: _*) - .settings(formatSettings: _*) - .settings(aspectJSettings: _*) - .settings( - libraryDependencies ++= - compile(akkaActor, aspectJ) ++ + compile(akkaActor, aspectJ, hdrHistogram) ++ provided(logback) ++ test(scalatest, akkaTestKit, sprayTestkit, akkaSlf4j, logback)) - .dependsOn(kamonCore) - - - lazy val kamonMetrics = Project("kamon-metrics", file("kamon-metrics")) - .settings(basicSettings: _*) - .settings(formatSettings: _*) - .settings(aspectJSettings: _*) - .settings( - libraryDependencies ++= - compile(hdrHistogram, akkaActor, aspectJ, newrelic) ++ - test(scalatest, akkaTestKit, sprayTestkit)) - .dependsOn(kamonCore) lazy val kamonSpray = Project("kamon-spray", file("kamon-spray")) @@ -56,7 +34,7 @@ object Build extends Build { libraryDependencies ++= compile(akkaActor, aspectJ, sprayCan, sprayClient, sprayRouting) ++ test(scalatest, akkaTestKit, sprayTestkit)) - .dependsOn(kamonTrace) + .dependsOn(kamonCore) lazy val kamonNewrelic = Project("kamon-newrelic", file("kamon-newrelic")) @@ -67,7 +45,7 @@ object Build extends Build { libraryDependencies ++= compile(aspectJ, sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, snakeYaml) ++ test(scalatest, akkaTestKit, sprayTestkit)) - .dependsOn(kamonTrace) + .dependsOn(kamonCore) lazy val kamonPlayground = Project("kamon-playground", file("kamon-playground")) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index eb1f245a..85186ce6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,20 +12,19 @@ object Dependencies { val sprayJson = "io.spray" %% "spray-json" % "1.2.5" val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.5.3" - val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.RC2" + val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0" val logback = "ch.qos.logback" % "logback-classic" % "1.0.13" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" - val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.1" val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "3.1.0" - val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion - val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion - val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion + val snakeYaml = "org.yaml" % "snakeyaml" % "1.13" + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.0.8" val sprayCan = "io.spray" % "spray-can" % sprayVersion val sprayRouting = "io.spray" % "spray-routing" % sprayVersion val sprayTestkit = "io.spray" % "spray-testkit" % sprayVersion val sprayClient = "io.spray" % "spray-client" % sprayVersion - val snakeYaml = "org.yaml" % "snakeyaml" % "1.13" - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.0.8" + val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion + val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion + val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") diff --git a/site/src/jekyll/newrelic/index.md b/site/src/jekyll/newrelic/index.md index f320861e..ee91a520 100644 --- a/site/src/jekyll/newrelic/index.md +++ b/site/src/jekyll/newrelic/index.md @@ -17,7 +17,7 @@ Add the following sbt dependencies to your project settings: libraryDependencies += "org.kamon" % "kamon-newrelic" % "0.1.0" libraryDependencies += "com.newrelic.agent.java" % "newrelic-api" % "3.1.0" ``` -### Example Configuration +### Configuration ```scala -javaagent:/path-to-newrelic-agent.jar, -Dnewrelic.environment=production, -Dnewrelic.config.file=/path-to-newrelic.yml |