diff options
30 files changed, 926 insertions, 748 deletions
diff --git a/kamon-akka/src/main/resources/META-INF/aop.xml b/kamon-akka/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..46e63f91 --- /dev/null +++ b/kamon-akka/src/main/resources/META-INF/aop.xml @@ -0,0 +1,34 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <aspects> + + <!-- Actors --> + <aspect name="akka.kamon.instrumentation.TraceContextIntoRepointableActorRefMixin"/> + <aspect name="akka.kamon.instrumentation.TraceContextIntoSystemMessageMixin"/> + <aspect name="akka.kamon.instrumentation.ActorSystemMessageInstrumentation"/> + <aspect name="akka.kamon.instrumentation.TraceContextIntoEnvelopeMixin"/> + <aspect name="akka.kamon.instrumentation.MetricsIntoActorCellsMixin"/> + <aspect name="akka.kamon.instrumentation.ActorCellInstrumentation"/> + <aspect name="akka.kamon.instrumentation.RoutedActorCellInstrumentation"/> + <aspect name="akka.kamon.instrumentation.ActorLoggingInstrumentation"/> + + <!-- Dispatchers --> + <aspect name="akka.kamon.instrumentation.DispatcherInstrumentation"/> + <aspect name="akka.kamon.instrumentation.DispatcherMetricCollectionInfoIntoDispatcherMixin"/> + + <!-- Patterns --> + <aspect name="akka.kamon.instrumentation.AskPatternInstrumentation"/> + </aspects> + + <weaver> + <include within="akka..*"/> + + <!-- For some weird reason ByteString produces a java.lang.VerifyError after going through the weaver. --> + <exclude within="akka.util.ByteString"/> + + <!-- Exclude CallingThreadDispatcher, is only for test purposes --> + <exclude within="akka.testkit.CallingThreadDispatcher"/> + </weaver> + +</aspectj>
\ No newline at end of file diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf new file mode 100644 index 00000000..4f742ee6 --- /dev/null +++ b/kamon-akka/src/main/resources/reference.conf @@ -0,0 +1,51 @@ +# ================================== # +# Kamon-Akka Reference Configuration # +# ================================== # + +kamon { + akka { + # If ask-pattern-timeout-warning is enabled, a WARN level log message will be generated if a future generated by the `ask` + # pattern fails with a `AskTimeoutException` and the log message will contain information depending of the strategy selected. + # strategies: + # - off: nothing to do. + # - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation. + # - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created. + ask-pattern-timeout-warning = off + + # Default dispatcher for all akka module operations + dispatcher = ${kamon.default-dispatcher} + } + + metrics.precision { + actor { + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} + } + + router { + routing-time = ${kamon.metrics.precision.default-histogram-precision} + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + } + + dispatcher { + maximum-pool-size { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + running-thread-count { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + queued-task-count { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + pool-size { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala index d2cb4e38..b22f7fa9 100644 --- a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala @@ -14,11 +14,12 @@ * ========================================================================================= */ -package kamon.metric +package kamon.akka import akka.actor.ActorSystem import com.typesafe.config.Config -import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram, MinMaxCounter } case class ActorMetrics(name: String) extends MetricGroupIdentity { val category = ActorMetrics @@ -73,7 +74,7 @@ object ActorMetrics extends MetricGroupCategory { } case object ActorMetricGroupFactory extends MetricGroupFactory { - import ActorMetrics._ + import kamon.akka.ActorMetrics._ type GroupRecorder = ActorMetricsRecorder diff --git a/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index 2fc7395e..bc013b63 100644 --- a/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -13,8 +13,8 @@ * and limitations under the License. * ========================================================================================= */ -// -package kamon.extension.akka + +package kamon.akka import akka.actor import akka.actor._ diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala index 126f6333..64e16f96 100644 --- a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala @@ -14,11 +14,12 @@ * ========================================================================================= */ -package kamon.metric +package kamon.akka import akka.actor.ActorSystem import com.typesafe.config.Config -import kamon.metric.instrument.{ Histogram, HdrHistogram } +import kamon.metric._ +import kamon.metric.instrument.Histogram case class DispatcherMetrics(name: String) extends MetricGroupIdentity { val category = DispatcherMetrics @@ -71,7 +72,7 @@ object DispatcherMetrics extends MetricGroupCategory { case object DispatcherMetricGroupFactory extends MetricGroupFactory { - import DispatcherMetrics._ + import kamon.akka.DispatcherMetrics._ type GroupRecorder = DispatcherMetricRecorder diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala index ddfef416..2eedf764 100644 --- a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala @@ -13,10 +13,11 @@ * and limitations under the License. * ========================================================================================= */ -package kamon.metric +package kamon.akka import akka.actor.ActorSystem import com.typesafe.config.Config +import kamon.metric._ import kamon.metric.instrument.{ Counter, Histogram } case class RouterMetrics(name: String) extends MetricGroupIdentity { @@ -26,33 +27,37 @@ case class RouterMetrics(name: String) extends MetricGroupIdentity { object RouterMetrics extends MetricGroupCategory { val name = "router" + case object RoutingTime extends MetricIdentity { val name = "routing-time" } case object ProcessingTime extends MetricIdentity { val name = "processing-time" } case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } case object Errors extends MetricIdentity { val name = "errors" } - case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { + case class RouterMetricsRecorder(routingTime: Histogram, processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) + RouterMetricSnapshot(routingTime.collect(context), processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) def cleanup: Unit = { + routingTime.cleanup processingTime.cleanup timeInMailbox.cleanup errors.cleanup } } - case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + case class RouterMetricSnapshot(routingTime: Histogram.Snapshot, processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { type GroupSnapshotType = RouterMetricSnapshot def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = RouterMetricSnapshot( + routingTime.merge(that.routingTime, context), processingTime.merge(that.processingTime, context), timeInMailbox.merge(that.timeInMailbox, context), errors.merge(that.errors, context)) lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + RoutingTime -> routingTime, ProcessingTime -> processingTime, TimeInMailbox -> timeInMailbox, Errors -> errors) @@ -63,17 +68,19 @@ object RouterMetrics extends MetricGroupCategory { case object RouterMetricGroupFactory extends MetricGroupFactory { - import RouterMetrics._ + import kamon.akka.RouterMetrics._ type GroupRecorder = RouterMetricsRecorder def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { val settings = config.getConfig("precision.router") + val routingTimeConfig = settings.getConfig("routing-time") val processingTimeConfig = settings.getConfig("processing-time") val timeInMailboxConfig = settings.getConfig("time-in-mailbox") new RouterMetricsRecorder( + Histogram.fromConfig(routingTimeConfig), Histogram.fromConfig(processingTimeConfig), Histogram.fromConfig(timeInMailboxConfig), Counter()) diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala new file mode 100644 index 00000000..78d88583 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -0,0 +1,213 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import akka.actor._ +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell +import kamon.Kamon +import kamon.akka.{ RouterMetrics, ActorMetrics } +import ActorMetrics.ActorMetricsRecorder +import RouterMetrics.RouterMetricsRecorder +import kamon.metric.Metrics +import kamon.trace._ +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorCellInstrumentation { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") + def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val metricsExtension = Kamon(Metrics)(system) + val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellMetrics.actorMetricIdentity = metricIdentity + cellMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + } + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] + + try { + TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + pjp.proceed() + } + } finally { + cellMetrics.actorMetricsRecorder.map { am ⇒ + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + + am.processingTime.record(processingTime) + am.timeInMailbox.record(timeInMailbox) + am.mailboxSize.decrement() + + // In case that this actor is behind a router, record the metrics for the router. + envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) + } + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {} + + @After("sendMessageInActorCell(cell, envelope)") + def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + cellMetrics.actorMetricsRecorder.map(_.mailboxSize.increment()) + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + cellMetrics.actorMetricsRecorder.map { _ ⇒ + Kamon(Metrics)(cell.system).unregister(cellMetrics.actorMetricIdentity) + } + + // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. + if (cell.isInstanceOf[RoutedActorCell]) { + val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + routedCellMetrics.routerMetricsRecorder.map { _ ⇒ + Kamon(Metrics)(cell.system).unregister(routedCellMetrics.routerMetricIdentity) + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") + def actorInvokeFailure(cell: ActorCell): Unit = {} + + @Before("actorInvokeFailure(cell)") + def beforeInvokeFailure(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + cellWithMetrics.actorMetricsRecorder.map(_.errors.increment()) + + // In case that this actor is behind a router, count the errors for the router as well. + val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] + envelope.routerMetricsRecorder.map(_.errors.increment()) + } +} + +@Aspect +class RoutedActorCellInstrumentation { + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") + def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} + + @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") + def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { + val metricsExtension = Kamon(Metrics)(system) + val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + + cellMetrics.routerMetricIdentity = metricIdentity + cellMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) + } + + @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} + + @Around("sendMessageInRouterActorCell(cell, envelope)") + def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] + + try { + TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + + // The router metrics recorder will only be picked up if the message is sent from a tracked router. + RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerMetricsRecorder) { + pjp.proceed() + } + } + } finally { + cellMetrics.routerMetricsRecorder map { routerRecorder ⇒ + routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) + } + } + } +} + +trait ActorCellMetrics { + var actorMetricIdentity: ActorMetrics = _ + var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ +} + +trait RoutedActorCellMetrics { + var routerMetricIdentity: RouterMetrics = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ +} + +trait RouterAwareEnvelope { + def routerMetricsRecorder: Option[RouterMetricsRecorder] +} + +object RouterAwareEnvelope { + import scala.util.DynamicVariable + private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetricsRecorder]](None) + + def default: RouterAwareEnvelope = new RouterAwareEnvelope { + val routerMetricsRecorder: Option[RouterMetricsRecorder] = dynamicRouterMetricsRecorder.value + } +} + +@Aspect +class MetricsIntoActorCellsMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} + +} + +@Aspect +class TraceContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default + + @DeclareMixin("akka.dispatch.Envelope") + def mixinRouterAwareToEnvelope: RouterAwareEnvelope = RouterAwareEnvelope.default + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TimestampedTraceContextAware with RouterAwareEnvelope): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + ctx.routerMetricsRecorder + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala index e0e5d316..e0e5d316 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala index 48016876..48016876 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala index 5b4fbbc8..ebddbfc8 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -17,8 +17,8 @@ package akka.kamon.instrumentation import kamon.Kamon -import kamon.extension.akka.Akka -import kamon.trace.{ TraceContext, EmptyTraceContext, TraceContextAware } +import kamon.akka.Akka +import kamon.trace.{ TraceRecorder, TraceContext, EmptyTraceContext, TraceContextAware } import akka.actor.{ ActorSystem, ActorRef } import akka.event.Logging.Warning import akka.pattern.AskTimeoutException @@ -33,30 +33,25 @@ class AskPatternInstrumentation { import AskPatternInstrumentation._ - @DeclareMixin("akka.pattern.AskableActorRef$") - def mixinContextAwareToAskableActorRef: TraceContextAware = TraceContextAware.default + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, *)") + def askableActorRefAsk(actor: ActorRef): Unit = {} - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(ctx) && args(actor, *, *)") - def askableActorRefAsk(ctx: TraceContextAware, actor: ActorRef): Unit = {} - - @Around("askableActorRefAsk(ctx, actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, ctx: TraceContextAware, actor: ActorRef): AnyRef = ctx.traceContext match { - case EmptyTraceContext ⇒ pjp.proceed() - case ctx: TraceContext ⇒ - implicit val system = ctx.system + @Around("askableActorRefAsk(actor)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef): AnyRef = + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ val akkaExtension = Kamon(Akka)(system) - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] val handler = akkaExtension.askPatternTimeoutWarning match { case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))) + case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) + case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) } handler.map(future.onFailure(_)(akkaExtension.dispatcher)) future - } + + } getOrElse (pjp.proceed()) def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { case e: AskTimeoutException ⇒ diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala index 4b1bbc4d..8280edca 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala @@ -23,8 +23,9 @@ import akka.actor.{ ActorSystemImpl, Cancellable } import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement import kamon.Kamon -import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.{ DispatcherMetrics, Metrics } +import kamon.akka.DispatcherMetrics +import DispatcherMetrics.DispatcherMetricRecorder +import kamon.metric.Metrics import org.aspectj.lang.annotation._ import scala.concurrent.forkjoin.ForkJoinPool diff --git a/kamon-akka/src/test/resources/logback.xml b/kamon-akka/src/test/resources/logback.xml new file mode 100644 index 00000000..10c9aa35 --- /dev/null +++ b/kamon-akka/src/test/resources/logback.xml @@ -0,0 +1,14 @@ +<configuration scan="true"> + <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> + </encoder> + </appender> + + <root level="error"> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala index 0f682500..06a232bd 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala @@ -17,17 +17,17 @@ package kamon.instrumentation.akka import akka.actor.{ Actor, ActorSystem, Props } import akka.pattern.{ ask, pipe } -import akka.routing.RoundRobinPool -import akka.testkit.{ ImplicitSender, TestKit } +import akka.routing._ +import akka.testkit.{ TestKitBase, ImplicitSender, TestKit } import akka.util.Timeout +import com.typesafe.config.ConfigFactory import kamon.trace.TraceRecorder -import org.scalatest.WordSpecLike +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } import scala.concurrent.duration._ -class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instrumentation-spec")) with WordSpecLike - with ImplicitSender { - +class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with ImplicitSender with BeforeAndAfterAll { + implicit lazy val system: ActorSystem = ActorSystem("actor-cell-instrumentation-spec") implicit val executionContext = system.dispatcher "the message passing instrumentation" should { @@ -60,9 +60,27 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instr expectMsg(testTraceContext) } - "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { + "propagate the TraceContext to actors behind a simple router" in new EchoSimpleRouterFixture { val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { - ctxEchoActor ! "test" + router.route("test", testActor) + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext to actors behind a pool router" in new EchoPoolRouterFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + pool ! "test" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext to actors behind a group router" in new EchoGroupRouterFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + group ! "test" TraceRecorder.currentContext } @@ -70,12 +88,32 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instr } } + override protected def afterAll(): Unit = shutdown() + trait EchoActorFixture { val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) } - trait RoutedEchoActorFixture extends EchoActorFixture { - override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinPool(nrOfInstances = 1))) + trait EchoSimpleRouterFixture { + val router = { + val routees = Vector.fill(5) { + val r = system.actorOf(Props[TraceContextEcho]) + ActorRefRoutee(r) + } + Router(RoundRobinRoutingLogic(), routees) + } + } + + trait EchoPoolRouterFixture { + val pool = system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[TraceContextEcho]), "pool-router") + } + + trait EchoGroupRouterFixture { + val routees = Vector.fill(5) { + system.actorOf(Props[TraceContextEcho]) + } + + val group = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), "group-router") } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala index 4b114d4f..598e9327 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala @@ -17,15 +17,17 @@ package kamon.instrumentation.akka import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } import akka.event.Logging.LogEvent -import akka.testkit.TestKit +import akka.testkit.TestKitBase +import com.typesafe.config.ConfigFactory import kamon.trace.TraceLocal.AvailableToMdc import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceLocal, TraceContextAware, TraceRecorder } -import org.scalatest.{ Inspectors, Matchers, WordSpecLike } +import kamon.trace.{ TraceContextAware, TraceLocal, TraceRecorder } +import org.scalatest.{ BeforeAndAfterAll, Inspectors, Matchers, WordSpecLike } import org.slf4j.MDC -class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging-instrumentation-spec")) with WordSpecLike - with Matchers with Inspectors with MdcKeysSupport { +class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with Inspectors with MdcKeysSupport with BeforeAndAfterAll { + implicit lazy val system: ActorSystem = ActorSystem("actor-logging-instrumentation-spec", + ConfigFactory.parseString("""akka.loggers = ["akka.event.slf4j.Slf4jLogger"]""")) "the ActorLogging instrumentation" should { "attach the TraceContext (if available) to log events" in { @@ -38,7 +40,7 @@ class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging } fishForMessage() { - case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ + case event: LogEvent if event.message.toString startsWith "TraceContext" ⇒ val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext ctxInEvent === testTraceContext @@ -48,26 +50,22 @@ class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging "allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in { val testString = "Hello World" - val SampleTraceLocalKeyAvailableToMDC = AvailableToMdc("some-cool-key") - - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - TraceRecorder.withNewTraceContext("logging-with-mdc") { - TraceLocal.store(SampleTraceLocalKeyAvailableToMDC)(testString) - - loggerActor ! "info" + TraceLocal.store(AvailableToMdc("some-cool-key"))(testString) withMdc { + MDC.get("other-key") shouldBe (null) MDC.get("some-cool-key") should equal(testString) } } } } + + override protected def afterAll(): Unit = shutdown() } class LoggerActor extends Actor with ActorLogging { def receive = { - case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) + case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceRecorder.currentContext.name, TraceRecorder.currentContext.token) } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala index d79ccbe0..0e9025af 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala @@ -2,15 +2,19 @@ package kamon.instrumentation.akka import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } import akka.actor._ -import akka.testkit.{ ImplicitSender, TestKit } +import akka.testkit.{ TestKitBase, ImplicitSender } +import com.typesafe.config.ConfigFactory import kamon.trace.{ EmptyTraceContext, TraceRecorder } import org.scalatest.WordSpecLike import scala.concurrent.duration._ import scala.util.control.NonFatal -class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-system-message-instrumentation-spec")) - with WordSpecLike with ImplicitSender { +class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLike with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("actor-system-message-instrumentation-spec", ConfigFactory.parseString( + """ + |akka.loglevel = OFF + """.stripMargin)) implicit val executionContext = system.dispatcher diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala new file mode 100644 index 00000000..5c9905ba --- /dev/null +++ b/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala @@ -0,0 +1,134 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.akka + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor._ +import akka.event.Logging.Warning +import akka.pattern.ask +import akka.testkit.{ TestProbe, TestKitBase } +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.akka.Akka +import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder } +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { + implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", + ConfigFactory.parseString("""akka.loggers = ["akka.event.slf4j.Slf4jLogger"]""")) + + implicit val ec = system.dispatcher + implicit val askTimeout = Timeout(10 millis) + + // TODO: Make this work with ActorSelections + + "the AskPatternInstrumentation" when { + "configured in heavyweight mode" should { + "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture { + setAskPatternTimeoutWarningMode("heavyweight") + + expectTimeoutWarning() { + TraceRecorder.withNewTraceContext("ask-timeout-warning") { + noReplyActorRef ? "hello" + TraceRecorder.currentContext + } + } + } + } + + "configured in lightweight mode" should { + "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture { + setAskPatternTimeoutWarningMode("lightweight") + + expectTimeoutWarning(messageSizeLimit = Some(1)) { + TraceRecorder.withNewTraceContext("ask-timeout-warning") { + noReplyActorRef ? "hello" + TraceRecorder.currentContext + } + } + } + } + + "configured in off mode" should { + "should not log any warning messages" in new NoReplyFixture { + setAskPatternTimeoutWarningMode("off") + + expectTimeoutWarning(expectWarning = false) { + TraceRecorder.withNewTraceContext("ask-timeout-warning") { + noReplyActorRef ? "hello" + TraceRecorder.currentContext + } + } + } + } + } + + override protected def afterAll(): Unit = shutdown() + + def expectTimeoutWarning(messageSizeLimit: Option[Int] = None, expectWarning: Boolean = true)(thunk: ⇒ TraceContext): Unit = { + val listener = warningListener() + val testTraceContext = thunk + + if (expectWarning) { + val warning = listener.fishForMessage() { + case Warning(_, _, msg) if msg.toString.startsWith("Timeout triggered for ask pattern registered at") ⇒ true + case others ⇒ false + }.asInstanceOf[Warning] + + warning.asInstanceOf[TraceContextAware].traceContext should equal(testTraceContext) + messageSizeLimit.map { messageLimit ⇒ + warning.message.toString.lines.size should be(messageLimit) + } + } else { + listener.expectNoMsg() + } + } + + def warningListener(): TestProbe = { + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[Warning]) + listener + } + + def setAskPatternTimeoutWarningMode(mode: String): Unit = { + val target = Kamon(Akka)(system) + val field = target.getClass.getDeclaredField("askPatternTimeoutWarning") + field.setAccessible(true) + field.set(target, mode) + } + + val fixtureCounter = new AtomicInteger(0) + + trait NoReplyFixture { + def noReplyActorRef: ActorRef = system.actorOf(Props[NoReply], "no-reply-" + fixtureCounter.incrementAndGet()) + + def noReplyActorSelection: ActorSelection = { + val target = noReplyActorRef + system.actorSelection(target.path) + } + } +} + +class NoReply extends Actor { + def receive = { + case any ⇒ + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala index 97bcb0cf..6d16386b 100644 --- a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala @@ -17,19 +17,17 @@ package kamon.metric import java.nio.LongBuffer -import akka.kamon.instrumentation.ActorCellMetrics import kamon.Kamon +import kamon.akka.ActorMetrics import kamon.metric.ActorMetricsTestActor._ -import kamon.metric.instrument.Histogram.MutableRecord -import org.scalatest.{ WordSpecLike, Matchers } +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike, Matchers } import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase } import akka.actor._ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } +import ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } -class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { +class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ |kamon.metrics { @@ -62,6 +60,9 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with | } | } |} + | + |akka.loglevel = OFF + | """.stripMargin)) "the Kamon actor metrics" should { @@ -78,35 +79,35 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with "reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures { val trackedActor = createTestActor("clean-after-collect") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - for (i ← 1 to 100) { - trackedActor ! Discard + + for (_ ← 1 to 100) { + for (i ← 1 to 100) { + trackedActor ! Discard + } + trackedActor ! Fail + trackedActor ! Ping + expectMsg(Pong) + + val firstSnapshot = collectMetricsOf(trackedActor).get + firstSnapshot.errors.count should be(1L) + firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L + firstSnapshot.processingTime.numberOfMeasurements should be(102L) // 102 examples + firstSnapshot.timeInMailbox.numberOfMeasurements should be(102L) // 102 examples + + val secondSnapshot = collectMetricsOf(trackedActor).get // Ensure that the recorders are clean + secondSnapshot.errors.count should be(0L) + secondSnapshot.mailboxSize.numberOfMeasurements should be(3L) // min, max and current + secondSnapshot.processingTime.numberOfMeasurements should be(0L) + secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) } - trackedActor ! Fail - trackedActor ! TrackTimings(sleep = Some(1 second)) - expectMsgType[TrackedTimings] - - val firstSnapshot = takeSnapshotOf(trackedActorMetrics) - firstSnapshot.errors.count should be(1L) - firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L - firstSnapshot.processingTime.numberOfMeasurements should be(103L) // 102 examples + Initialize message - firstSnapshot.timeInMailbox.numberOfMeasurements should be(103L) // 102 examples + Initialize message - - val secondSnapshot = takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - secondSnapshot.errors.count should be(0L) - secondSnapshot.mailboxSize.numberOfMeasurements should be(3L) // min, max and current - secondSnapshot.processingTime.numberOfMeasurements should be(0L) - secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) } "record the processing-time of the receive function" in new ActorMetricsFixtures { val trackedActor = createTestActor("measuring-processing-time") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - trackedActor ! TrackTimings(sleep = Some(1 second)) + trackedActor ! TrackTimings(sleep = Some(100 millis)) val timings = expectMsgType[TrackedTimings] - val snapshot = takeSnapshotOf(trackedActorMetrics) + val snapshot = collectMetricsOf(trackedActor).get snapshot.processingTime.numberOfMeasurements should be(1L) snapshot.processingTime.recordsIterator.next().count should be(1L) @@ -115,23 +116,19 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with "record the number of errors" in new ActorMetricsFixtures { val trackedActor = createTestActor("measuring-errors") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean for (i ← 1 to 10) { trackedActor ! Fail } trackedActor ! Ping expectMsg(Pong) - val snapshot = takeSnapshotOf(trackedActorMetrics) + val snapshot = collectMetricsOf(trackedActor).get snapshot.errors.count should be(10) } "record the mailbox-size" in new ActorMetricsFixtures { val trackedActor = createTestActor("measuring-mailbox-size") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - trackedActor ! TrackTimings(sleep = Some(1 second)) + trackedActor ! TrackTimings(sleep = Some(100 millis)) for (i ← 1 to 10) { trackedActor ! Discard } @@ -139,7 +136,7 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = expectMsgType[TrackedTimings] expectMsg(Pong) - val snapshot = takeSnapshotOf(trackedActorMetrics) + val snapshot = collectMetricsOf(trackedActor).get snapshot.mailboxSize.min should be(0L) snapshot.mailboxSize.max should be(11L +- 1L) @@ -147,12 +144,10 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with "record the time-in-mailbox" in new ActorMetricsFixtures { val trackedActor = createTestActor("measuring-time-in-mailbox") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - trackedActor ! TrackTimings(sleep = Some(1 second)) + trackedActor ! TrackTimings(sleep = Some(100 millis)) val timings = expectMsgType[TrackedTimings] - val snapshot = takeSnapshotOf(trackedActorMetrics) + val snapshot = collectMetricsOf(trackedActor).get snapshot.timeInMailbox.numberOfMeasurements should be(1L) snapshot.timeInMailbox.recordsIterator.next().count should be(1L) @@ -161,37 +156,46 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with "clean up the associated recorder when the actor is stopped" in new ActorMetricsFixtures { val trackedActor = createTestActor("stop") - actorMetricsRecorderOf(trackedActor).get // force the actor to be initialized - Kamon(Metrics).storage.get(ActorMetrics("user/stop")) should not be empty val deathWatcher = TestProbe() deathWatcher.watch(trackedActor) trackedActor ! PoisonPill deathWatcher.expectTerminated(trackedActor) - Kamon(Metrics).storage.get(ActorMetrics("user/stop")) shouldBe empty + actorMetricsRecorderOf(trackedActor) shouldBe empty } } + override protected def afterAll(): Unit = shutdown() + trait ActorMetricsFixtures { val collectionContext = new CollectionContext { val buffer: LongBuffer = LongBuffer.allocate(10000) } - def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = { - val initialisationListener = TestProbe() - ref.tell(Ping, initialisationListener.ref) - initialisationListener.expectMsg(Pong) + def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - val underlyingCellField = ref.getClass.getDeclaredMethod("underlying") - val cell = underlyingCellField.invoke(ref).asInstanceOf[ActorCellMetrics] + def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = + Kamon(Metrics)(system).storage.get(ActorMetrics(actorRecorderName(ref))).map(_.asInstanceOf[ActorMetricsRecorder]) - cell.actorMetricsRecorder + def collectMetricsOf(ref: ActorRef): Option[ActorMetricSnapshot] = { + Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. + actorMetricsRecorderOf(ref).map(_.collect(collectionContext)) } - def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) + def createTestActor(name: String): ActorRef = { + val actor = system.actorOf(Props[ActorMetricsTestActor], name) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + actor.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) - def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) + // Cleanup all the metric recording instruments: + collectMetricsOf(actor) + + actor + } } } diff --git a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala index ae324b73..55af3f2e 100644 --- a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala @@ -15,14 +15,16 @@ package kamon.metric -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ TestProbe, TestKitBase } -import akka.actor.{ ActorRef, Props, ActorSystem } +import akka.actor.{ ActorRef, ActorSystem, Props } +import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ import kamon.Kamon +import kamon.akka.DispatcherMetrics +import DispatcherMetrics.DispatcherMetricSnapshot import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.DispatcherMetrics.DispatcherMetricSnapshot +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( diff --git a/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala new file mode 100644 index 00000000..abc195ba --- /dev/null +++ b/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala @@ -0,0 +1,296 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.LongBuffer + +import akka.actor._ +import akka.kamon.instrumentation.ActorCellMetrics +import akka.routing._ +import akka.testkit.{ ImplicitSender, TestKitBase, TestProbe } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.akka.{ RouterMetrics, ActorMetrics } +import ActorMetrics.{ ActorMetricSnapshot, ActorMetricsRecorder } +import RouterMetrics._ +import kamon.metric.RouterMetricsTestActor._ +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.instrument.{ Counter, Histogram } +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { + implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 + | + | filters = [ + | { + | router { + | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] + | excludes = [ "user/tracked-explicitly-excluded-*"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | } + |} + | + |akka.loglevel = OFF + | + """.stripMargin)) + + "the Kamon router metrics" should { + "respect the configured include and exclude filters" in new RouterMetricsFixtures { + createTestPoolRouter("tracked-pool-router") + createTestGroupRouter("tracked-group-router") + createTestPoolRouter("non-tracked-pool-router") + createTestGroupRouter("non-tracked-group-router") + createTestPoolRouter("tracked-explicitly-excluded-pool-router") + createTestGroupRouter("tracked-explicitly-excluded-group-router") + + routerMetricsRecorderOf("user/tracked-pool-router") should not be empty + routerMetricsRecorderOf("user/tracked-group-router") should not be empty + routerMetricsRecorderOf("user/non-tracked-pool-router") shouldBe empty + routerMetricsRecorderOf("user/non-tracked-group-router") shouldBe empty + routerMetricsRecorderOf("user/tracked-explicitly-excluded-pool-router") shouldBe empty + routerMetricsRecorderOf("user/tracked-explicitly-excluded-group-router") shouldBe empty + } + + "record the routing-time of the receive function for pool routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestPoolRouter("measuring-routing-time-in-pool-router") + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-pool-router").get + + routerSnapshot.routingTime.numberOfMeasurements should be(1L) + } + + "record the routing-time of the receive function for group routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestGroupRouter("measuring-routing-time-in-group-router") + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-group-router").get + + routerSnapshot.routingTime.numberOfMeasurements should be(1L) + } + + "record the processing-time of the receive function for pool routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestPoolRouter("measuring-processing-time-in-pool-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-pool-router").get + + routerSnapshot.processingTime.numberOfMeasurements should be(1L) + routerSnapshot.processingTime.recordsIterator.next().count should be(1L) + routerSnapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the processing-time of the receive function for group routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestGroupRouter("measuring-processing-time-in-group-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-group-router").get + + routerSnapshot.processingTime.numberOfMeasurements should be(1L) + routerSnapshot.processingTime.recordsIterator.next().count should be(1L) + routerSnapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the number of errors for pool routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestPoolRouter("measuring-errors-in-pool-router") + + for (i ← 1 to 10) { + router.tell(Fail, listener.ref) + } + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + + val routerSnapshot = collectMetricsOf("user/measuring-errors-in-pool-router").get + routerSnapshot.errors.count should be(10L) + } + + "record the number of errors for group routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestGroupRouter("measuring-errors-in-group-router") + + for (i ← 1 to 10) { + router.tell(Fail, listener.ref) + } + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + + val routerSnapshot = collectMetricsOf("user/measuring-errors-in-group-router").get + routerSnapshot.errors.count should be(10L) + } + + "record the time-in-mailbox for pool routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestPoolRouter("measuring-time-in-mailbox-in-pool-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-pool-router").get + + routerSnapshot.timeInMailbox.numberOfMeasurements should be(1L) + routerSnapshot.timeInMailbox.recordsIterator.next().count should be(1L) + routerSnapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + + "record the time-in-mailbox for group routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestGroupRouter("measuring-time-in-mailbox-in-group-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-group-router").get + + routerSnapshot.timeInMailbox.numberOfMeasurements should be(1L) + routerSnapshot.timeInMailbox.recordsIterator.next().count should be(1L) + routerSnapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + + "clean up the associated recorder when the pool router is stopped" in new RouterMetricsFixtures { + val trackedRouter = createTestPoolRouter("stop-in-pool-router") + routerMetricsRecorderOf("user/stop-in-pool-router") should not be empty + + val deathWatcher = TestProbe() + deathWatcher.watch(trackedRouter) + trackedRouter ! PoisonPill + deathWatcher.expectTerminated(trackedRouter) + + routerMetricsRecorderOf("user/stop-in-pool-router") shouldBe empty + } + + "clean up the associated recorder when the group router is stopped" in new RouterMetricsFixtures { + val trackedRouter = createTestPoolRouter("stop-in-group-router") + routerMetricsRecorderOf("user/stop-in-group-router") should not be empty + + val deathWatcher = TestProbe() + deathWatcher.watch(trackedRouter) + trackedRouter ! PoisonPill + deathWatcher.expectTerminated(trackedRouter) + + routerMetricsRecorderOf("user/stop-in-group-router") shouldBe empty + } + } + + override protected def afterAll(): Unit = shutdown() + + trait RouterMetricsFixtures { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + def routerMetricsRecorderOf(routerName: String): Option[RouterMetricsRecorder] = + Kamon(Metrics)(system).storage.get(RouterMetrics(routerName)).map(_.asInstanceOf[RouterMetricsRecorder]) + + def collectMetricsOf(routerName: String): Option[RouterMetricSnapshot] = { + Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. + routerMetricsRecorderOf(routerName).map(_.collect(collectionContext)) + } + + def createTestGroupRouter(routerName: String): ActorRef = { + val routees = Vector.fill(5) { + system.actorOf(Props[RouterMetricsTestActor]) + } + + val group = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), routerName) + + //val router = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), routerName) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + group.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + // Cleanup all the metric recording instruments: + collectMetricsOf("user/" + routerName) + + group + } + + def createTestPoolRouter(routerName: String): ActorRef = { + val router = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), routerName) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + router.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + // Cleanup all the metric recording instruments: + collectMetricsOf("user/" + routerName) + + router + } + } + + trait ActorMetricsFixtures { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) + + def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) + } +} + +class RouterMetricsTestActor extends Actor { + def receive = { + case Discard ⇒ + case Fail ⇒ throw new ArithmeticException("Division by zero.") + case Ping ⇒ sender ! Pong + case RouterTrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object RouterMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} diff --git a/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto b/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto deleted file mode 100644 index d4ee21b5..00000000 --- a/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto +++ /dev/null @@ -1,31 +0,0 @@ -import "WireFormats.proto"; - -option java_package = "akka.remote.instrumentation"; -option optimize_for = SPEED; - - -/************************************************ - * Kamon-specific additions to the protocol - ************************************************/ - -message AckAndTraceContextAwareEnvelopeContainer { - optional AcknowledgementInfo ack = 1; - optional TraceContextAwareRemoteEnvelope envelope = 2; -} - -message TraceContextAwareRemoteEnvelope { - required ActorRefData recipient = 1; - required SerializedMessage message = 2; - optional ActorRefData sender = 4; - optional fixed64 seq = 5; - - optional RemoteTraceContext traceContext = 15; -} - -message RemoteTraceContext { - required string traceName = 1; - required string traceToken = 2; - required bool isOpen = 3; - required fixed64 startMilliTime = 4; -} - diff --git a/kamon-core/src/main/protobuf/WireFormats.proto b/kamon-core/src/main/protobuf/WireFormats.proto deleted file mode 100644 index 98a954cc..00000000 --- a/kamon-core/src/main/protobuf/WireFormats.proto +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> - */ - -// Extracted from https://github.com/akka/akka/blob/master/akka-remote/src/main/protobuf/WireFormats.proto - - -option java_package = "akka.remote"; -option optimize_for = SPEED; - -/****************************************** - * Remoting message formats - ******************************************/ - - -message AckAndEnvelopeContainer { - optional AcknowledgementInfo ack = 1; - optional RemoteEnvelope envelope = 2; -} - -/** - * Defines a remote message. - */ -message RemoteEnvelope { - required ActorRefData recipient = 1; - required SerializedMessage message = 2; - optional ActorRefData sender = 4; - optional fixed64 seq = 5; -} - -message AcknowledgementInfo { - required fixed64 cumulativeAck = 1; - repeated fixed64 nacks = 2; -} - -/** - * Defines a remote ActorRef that "remembers" and uses its original Actor instance - * on the original node. - */ -message ActorRefData { - required string path = 1; -} - -/** - * Defines a message. - */ -message SerializedMessage { - required bytes message = 1; - required int32 serializerId = 2; - optional bytes messageManifest = 3; -} - -/** - * Defines akka.remote.DaemonMsgCreate - */ -message DaemonMsgCreateData { - required PropsData props = 1; - required DeployData deploy = 2; - required string path = 3; - required ActorRefData supervisor = 4; -} - -/** - * Serialization of akka.actor.Props - */ -message PropsData { - required DeployData deploy = 2; - required string clazz = 3; - repeated bytes args = 4; - repeated string classes = 5; -} - -/** - * Serialization of akka.actor.Deploy - */ -message DeployData { - required string path = 1; - optional bytes config = 2; - optional bytes routerConfig = 3; - optional bytes scope = 4; - optional string dispatcher = 5; -} - - -/****************************************** - * Akka Protocol message formats - ******************************************/ - -/** - * Message format of Akka Protocol. - * Message contains either a payload or an instruction. - */ -message AkkaProtocolMessage { - optional bytes payload = 1; - optional AkkaControlMessage instruction = 2; -} - -/** - * Defines some control messages for the remoting - */ -message AkkaControlMessage { - required CommandType commandType = 1; - optional AkkaHandshakeInfo handshakeInfo = 2; -} - -message AkkaHandshakeInfo { - required AddressData origin = 1; - required fixed64 uid = 2; - optional string cookie = 3; - -} - -/** - * Defines the type of the AkkaControlMessage command type - */ -enum CommandType { - ASSOCIATE = 1; - DISASSOCIATE = 2; - HEARTBEAT = 3; - DISASSOCIATE_SHUTTING_DOWN = 4; // Remote system is going down and will not accepts new connections - DISASSOCIATE_QUARANTINED = 5; // Remote system refused the association since the current system is quarantined -} - -/** - * Defines a remote address. - */ -message AddressData { - required string system = 1; - required string hostname = 2; - required uint32 port = 3; - optional string protocol = 4; -} diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 8c7eb235..47ce11d8 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -5,38 +5,17 @@ <!-- Disable AspectJ Weaver not present error --> <aspect name="kamon.instrumentation.AspectJWeaverMissingWarning"/> - <!-- Actors --> - <aspect name="akka.kamon.instrumentation.TraceContextIntoRepointableActorRefMixin"/> - <aspect name="akka.kamon.instrumentation.TraceContextIntoSystemMessageMixin"/> - <aspect name="akka.kamon.instrumentation.ActorSystemMessageInstrumentation"/> - <aspect name="akka.kamon.instrumentation.TraceContextIntoEnvelopeMixin"/> - <aspect name="akka.kamon.instrumentation.ActorCellMetricsIntoActorCellMixin"/> - <aspect name="akka.kamon.instrumentation.ActorCellInstrumentation"/> - <aspect name="akka.kamon.instrumentation.ActorLoggingInstrumentation"/> - - <!-- Dispatchers --> - <aspect name="akka.kamon.instrumentation.DispatcherInstrumentation"/> - <aspect name="akka.kamon.instrumentation.DispatcherMetricCollectionInfoIntoDispatcherMixin"/> - <!-- Futures --> <aspect name="kamon.instrumentation.scala.FutureInstrumentation"/> <aspect name="kamon.instrumentation.scalaz.FutureInstrumentation"/> - <!-- Patterns --> - <aspect name="akka.kamon.instrumentation.AskPatternInstrumentation"/> </aspects> <weaver> <include within="scala.concurrent..*"/> <include within="scalaz.concurrent..*"/> - <include within="akka..*"/> <include within="spray..*"/> <include within="kamon..*"/> - - <!-- For some weird reason ByteString produces a java.lang.VerifyError after going through the weaver. --> - <exclude within="akka.util.ByteString"/> - <!-- Exclude CallingThreadDispatcher, is only for test purposes --> - <exclude within="akka.testkit.CallingThreadDispatcher"/> </weaver> </aspectj>
\ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 2d8b3f2e..8f5a8b45 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -87,41 +87,10 @@ kamon { significant-value-digits = 2 } - - actor { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} - } - - router { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - } - trace { elapsed-time = ${kamon.metrics.precision.default-histogram-precision} segment = ${kamon.metrics.precision.default-histogram-precision} } - - dispatcher { - maximum-pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - running-thread-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - queued-task-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - } } } @@ -176,19 +145,6 @@ kamon { dispatcher = ${kamon.default-dispatcher} } - akka { - # If ask-pattern-timeout-warning is enabled, a WARN level log message will be generated if a future generated by the `ask` - # pattern fails with a `AskTimeoutException` and the log message will contain information depending of the strategy selected. - # strategies: - # - off: nothing to do. - # - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation. - # - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created. - ask-pattern-timeout-warning = off - - # Default dispatcher for all akka module operations - dispatcher = ${kamon.default-dispatcher} - } - kamon-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala deleted file mode 100644 index a340566d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import akka.routing.RoutedActorCell -import kamon.Kamon -import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.RouterMetrics.RouterMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } -import kamon.trace._ -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorCellInstrumentation { - - import ActorCellInstrumentation.PimpedActorCellMetrics - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricIdentity = metricIdentity - cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") - routedActorCell.routerMetricIdentity = routerMetricIdentity - routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) - } - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime - - am.processingTime.record(processingTime) - am.timeInMailbox.record(timeInMailbox) - am.mailboxSize.decrement() - - (processingTime, timeInMailbox) - } map { - case (processingTime, timeInMailbox) ⇒ - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } - } - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") - def sendingMessageToActorCell(cell: ActorCell): Unit = {} - - @After("sendingMessageToActorCell(cell)") - def afterSendMessageToActorCell(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { rm ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errors.increment() - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ rm.errors.increment() - } - } - } - -} - -trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var routerMetricIdentity: RouterMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ -} - -@Aspect -class ActorCellMetricsIntoActorCellMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} -} - -@Aspect -class TraceContextIntoEnvelopeMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -object ActorCellInstrumentation { - implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { - def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match { - case routedActorCell: RoutedActorCell ⇒ block(cell) - case everythingElse ⇒ - } - } -}
\ No newline at end of file diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala deleted file mode 100644 index 471cbd4d..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorSystem, Props } -import akka.event.Logging.Warning -import akka.pattern.ask -import akka.testkit.TestKitBase -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString( - """ - |kamon { - | akka { - | ask-pattern-timeout-warning = heavyweight - | } - |} - """.stripMargin)) - - "the AskPatternTracing" should { - "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { - implicit val ec = system.dispatcher - implicit val timeout = Timeout(10 milliseconds) - val noReply = system.actorOf(Props[NoReply], "NoReply") - system.eventStream.subscribe(testActor, classOf[Warning]) - - val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { - noReply ? "hello" - TraceRecorder.currentContext - } - - val warn = expectMsgPF() { - case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn - } - val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext - - capturedCtx should equal(testTraceContext) - } - } -} - -class NoReply extends Actor { - def receive = { - case any ⇒ - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala deleted file mode 100644 index ebc43091..00000000 --- a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.LongBuffer - -import akka.actor._ -import akka.routing.RoundRobinPool -import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } -import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.RouterMetrics._ -import kamon.metric.RouterMetricsTestActor._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.instrument.{ Counter, Histogram } -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 second - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) - - "the Kamon router metrics" should { - "respect the configured include and exclude filters" in new RouterMetricsFixtures { - createTestRouter("tracked-router") - createTestRouter("non-tracked-router") - createTestRouter("tracked-explicitly-excluded") - - Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router")) - tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router")) - tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded")) - } - } - - "record the processing-time of the receive function" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-processing-time") - - trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) - val timings = metricsListener.expectMsgType[RouterTrackedTimings] - - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) - // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) - } - - "record the number of errors" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-errors") - - for (i ← 1 to 10) { - trackedRouter.tell(Fail, metricsListener.ref) - } - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L) - } - - "record the time-in-mailbox" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-time-in-mailbox") - - trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) - val timings = metricsListener.expectMsgType[RouterTrackedTimings] - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) - } - - "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures { - val trackedRouter = createTestRouter("stop") - trackedRouter ! Ping - Kamon(Metrics).storage.toString() // force to be initialized - Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty - - val deathWatcher = TestProbe() - deathWatcher.watch(trackedRouter) - trackedRouter ! PoisonPill - deathWatcher.expectTerminated(trackedRouter) - - Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty - } - } - - trait RouterMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name) - } -} - -class RouterMetricsTestActor extends Actor { - def receive = { - case Discard ⇒ - case Fail ⇒ throw new ArithmeticException("Division by zero.") - case Ping ⇒ sender ! Pong - case RouterTrackTimings(sendTimestamp, sleep) ⇒ { - val dequeueTimestamp = System.nanoTime() - sleep.map(s ⇒ Thread.sleep(s.toMillis)) - val afterReceiveTimestamp = System.nanoTime() - - sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) - } - } -} - -object RouterMetricsTestActor { - case object Ping - case object Pong - case object Fail - case object Discard - - case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) - case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { - def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp - def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp - } -} diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala index 5e5e08a1..596a6765 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor._ import akka.event.Logging import kamon.Kamon +import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics} import kamon.http.HttpServerMetrics import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } import kamon.metric._ diff --git a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala index fd76f50c..61d87793 100644 --- a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala +++ b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala @@ -19,7 +19,8 @@ package kamon.logreporter import akka.actor._ import akka.event.Logging import kamon.Kamon -import kamon.metric.ActorMetrics.ActorMetricSnapshot +import kamon.akka.ActorMetrics +import ActorMetrics.ActorMetricSnapshot import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.metric.TraceMetrics.TraceMetricsSnapshot import kamon.metric.UserMetrics._ diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 08053338..2505f06a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,6 +18,7 @@ package kamon.statsd import akka.actor._ import kamon.Kamon +import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics} import kamon.http.HttpServerMetrics import kamon.metric.UserMetrics._ import kamon.metric._ diff --git a/project/Projects.scala b/project/Projects.scala index 6da9b95c..a8d4b526 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -45,8 +45,21 @@ object Projects extends Build { test(scalatest, akkaTestKit, akkaSlf4j, slf4Jul, slf4Log4j, logback)) - lazy val kamonAkkaRemote = Project("kamon-akka-remote", file("kamon-akka-remote")) + lazy val kamonAkka = Project("kamon-akka", file("kamon-akka")) .dependsOn(kamonCore) + .dependsOn(kamonMacros % "compile-internal, test-internal") + .settings(basicSettings: _* ) + .settings(formatSettings: _*) + .settings(aspectJSettings: _*) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ + provided(aspectJ) ++ + optional(logback) ++ + test(scalatest, akkaTestKit, akkaSlf4j, slf4Jul, slf4Log4j, logback)) + + lazy val kamonAkkaRemote = Project("kamon-akka-remote", file("kamon-akka-remote")) + .dependsOn(kamonAkka) .settings(basicSettings: _* ) .settings(formatSettings: _*) .settings(aspectJSettings: _*) |