diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/resources/META-INF/aop.xml | 5 | ||||
-rw-r--r-- | src/main/resources/newrelic.yml | 2 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala | 47 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/MessageQueueMetrics.scala | 71 | ||||
-rw-r--r-- | src/main/scala/kamon/Kamon.scala | 2 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 3 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala | 8 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 16 |
8 files changed, 134 insertions, 20 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 33a70483..b4b3d879 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -1,9 +1,9 @@ <!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> <aspectj> - <!--<weaver options="-verbose -showWeaveInfo"> + <weaver options="-verbose -showWeaveInfo"> <dump within="*"/> - </weaver>--> + </weaver> <aspects> @@ -17,6 +17,7 @@ <!--<aspect name="kamon.instrumentation.DispatcherInstrumentation" />--> <!--<aspect name ="akka.dispatch.FactoryInstrumentation" />--> + <aspect name="akka.instrumentation.MessageQueueInstrumentation" /> <!-- ExecutorService Instrumentation for Akka. --> <aspect name="akka.dispatch.ExecutorServiceFactoryProviderInstrumentation"/> diff --git a/src/main/resources/newrelic.yml b/src/main/resources/newrelic.yml index c395bd01..1b1ad53b 100644 --- a/src/main/resources/newrelic.yml +++ b/src/main/resources/newrelic.yml @@ -48,7 +48,7 @@ common: &default_settings # This setting is dynamic, so changes do not require restarting your application. # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest # Default is info. - log_level: finer + log_level: finest enable_custom_tracing: true # Log all data to and from New Relic in plain text. diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index f631b79a..218c09cc 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -1,12 +1,14 @@ package akka.instrumentation -import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} +import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ActorRef} +import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Kamon, TraceContext} import akka.dispatch.Envelope +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} -case class TraceableMessage(traceContext: TraceContext, message: Any) +case class TraceableEnvelope(traceContext: TraceContext, message: Any, timeStamp: Long = System.nanoTime()) @Aspect @@ -22,8 +24,12 @@ class ActorRefTellInstrumentation { Kamon.context() match { case Some(ctx) => { - val traceableMessage = TraceableMessage(ctx, message) - proceed(getArgs.updated(0, traceableMessage)) + val traceableMessage = TraceableEnvelope(ctx, message) + + // update the args with the new message + val args = getArgs + args.update(0, traceableMessage) + proceed(args) } case None => proceed } @@ -31,19 +37,42 @@ class ActorRefTellInstrumentation { } -@Aspect +@Aspect("perthis(actorCellCreation(..))") class ActorCellInvokeInstrumentation { + val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + val messagesPer + @volatile var shouldTrack = false + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} + + @Before("actorCellCreation(system, ref, props, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + // TODO: Find a better way to filter the thins we don't want to measure. + if(system.name != "kamon" && actorName.startsWith("/user")) { + Metrics.registry.register(histogramName + "/cell", latencyHistogram) + shouldTrack = true + } + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope) = { + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import pjp._ envelope match { - case Envelope(TraceableMessage(ctx, msg), sender) => { + case Envelope(TraceableEnvelope(ctx, msg, timeStamp), sender) => { + latencyHistogram.update(System.nanoTime() - timeStamp) + Kamon.set(ctx) val originalEnvelope = envelope.copy(message = msg) @@ -54,4 +83,4 @@ class ActorCellInvokeInstrumentation { case _ => proceed } } -}
\ No newline at end of file +} diff --git a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala b/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..a7f5cdc8 --- /dev/null +++ b/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,71 @@ +package akka.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield new MonitoredMessageQueue(delegate, own, sys) + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, owner: ActorRef, system: ActorSystem) extends MessageQueue { + val queueSizeHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + + val fullName = MetricDirectory.nameForMailbox(system.name, MetricDirectory.nameForActor(owner)) + Metrics.registry.register(fullName, queueSizeHistogram) + + def enqueue(receiver: ActorRef, handle: Envelope) = { + queueSizeHistogram.update(numberOfMessages) + delegate.enqueue(receiver, handle) + } + + def dequeue(): Envelope = { + queueSizeHistogram.update(numberOfMessages) + delegate.dequeue() + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = { + Metrics.deregister(fullName) + + delegate.cleanUp(owner, deadLetters) + } +} + + + + + + + + + diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala index c58f95e4..c1b97722 100644 --- a/src/main/scala/kamon/Kamon.scala +++ b/src/main/scala/kamon/Kamon.scala @@ -8,7 +8,7 @@ object Kamon { override def initialValue() = None } - implicit lazy val actorSystem = ActorSystem("kamon-test") + implicit lazy val actorSystem = ActorSystem("kamon") def context() = ctx.get() diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 41554410..599f2a7a 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -40,7 +40,7 @@ class PingActor extends Actor with ActorLogging { val random = new Random() def receive = { case Pong() => { - Thread.sleep(random.nextInt(2000)) + //Thread.sleep(random.nextInt(2000)) //log.info("Message from Ping") pong ! Ping() } @@ -66,6 +66,7 @@ object TryAkka extends App{ } })) + Kamon.start for(i <- 1 to 4) { val ping = system.actorOf(Props[PingActor]) ping ! Pong() diff --git a/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala index 78711267..54a13f39 100644 --- a/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala +++ b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -34,7 +34,7 @@ trait ForkJoinPoolMetricCollector { fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) ) - allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2)) + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } } } @@ -51,15 +51,15 @@ trait ThreadPoolExecutorMetricCollector { fullName + activeThreads -> tpeGauge(_.getActiveCount) ) - allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2)) + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } } } object BasicExecutorMetricNames { val queueSize = "queueSize" - val poolSize = "poolSize" - val activeThreads = "activeThreads" + val poolSize = "threads/poolSize" + val activeThreads = "threads/activeThreads" } diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index 25c9bd8e..ebf4fd2b 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -2,6 +2,7 @@ package kamon.metric import java.util.concurrent.TimeUnit import com.codahale.metrics._ +import akka.actor.ActorRef object Metrics { val registry: MetricRegistry = new MetricRegistry @@ -9,10 +10,21 @@ object Metrics { val consoleReporter = ConsoleReporter.forRegistry(registry) val newrelicReporter = NewRelicReporter(registry) - newrelicReporter.start(5, TimeUnit.SECONDS) - //consoleReporter.build().start(5, TimeUnit.SECONDS) + //newrelicReporter.start(5, TimeUnit.SECONDS) + consoleReporter.build().start(60, TimeUnit.SECONDS) + + + def deregister(fullName: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } } object MetricDirectory { def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" + + def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" + + def nameForActor(actorRef: ActorRef) = actorRef.path.elements.fold("")(_ + "/" + _) } |