diff options
-rw-r--r-- | README.md | 7 | ||||
-rw-r--r-- | project/Build.scala | 2 | ||||
-rw-r--r-- | project/Dependencies.scala | 12 | ||||
-rw-r--r-- | project/NewRelic.scala | 2 | ||||
-rw-r--r-- | src/main/resources/META-INF/aop.xml | 14 | ||||
-rw-r--r-- | src/main/resources/newrelic.yml | 4 | ||||
-rw-r--r-- | src/main/scala/akka/ActorInstrumentation.scala | 2 | ||||
-rw-r--r-- | src/main/scala/akka/PoolMonitorInstrumentation.scala | 18 | ||||
-rw-r--r-- | src/main/scala/kamon/Kamon.scala | 2 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 6 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 32 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala | 71 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala | 6 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala | 67 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/GaugeGenerator.scala | 12 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 14 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/NewRelicReporter.scala | 15 | ||||
-rw-r--r-- | src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala | 2 |
18 files changed, 246 insertions, 42 deletions
@@ -1,2 +1,9 @@ Kamon ===== + + + + +/metrics/actorsystem/{actorsystem-name}/dispatcher/{dispatcher-name}/ +For each dispatcher, show: + -
\ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index 37765ccf..b1ce638e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -14,7 +14,7 @@ object Build extends Build { .settings(newrelicSettings: _*) .settings( libraryDependencies ++= - compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++ + compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++ test(scalatest, sprayTestkit)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 62c86c84..2395acd3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,11 +7,11 @@ object Dependencies { "spray nightlies repo" at "http://nightlies.spray.io" ) - val sprayCan = "io.spray" % "spray-can" % "1.1-20130509" - val sprayRouting = "io.spray" % "spray-routing" % "1.1-20130509" - val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-20130509" - val sprayClient = "io.spray" % "spray-client" % "1.1-20130509" - val sprayServlet = "io.spray" % "spray-servlet" % "1.1-20130509" + val sprayCan = "io.spray" % "spray-can" % "1.1-M8" + val sprayRouting = "io.spray" % "spray-routing" % "1.1-M8" + val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-M8" + val sprayClient = "io.spray" % "spray-client" % "1.1-M8" + val sprayServlet = "io.spray" % "spray-servlet" % "1.1-M8" val sprayJson = "io.spray" %% "spray-json" % "1.2.3" val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1" val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2" @@ -21,7 +21,7 @@ object Dependencies { val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M5b" val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" - val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0-BETA2" + val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0" val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0" def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") diff --git a/project/NewRelic.scala b/project/NewRelic.scala index 766eb28d..74f8fc30 100644 --- a/project/NewRelic.scala +++ b/project/NewRelic.scala @@ -8,6 +8,6 @@ object NewRelic { lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq( javaOptions in run <++= jvmOptions in newrelic, - newrelicVersion in newrelic := "2.18.0" + newrelicVersion in newrelic := "2.19.0" ) } diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 1413f424..33a70483 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -9,11 +9,23 @@ <!--<aspect name="akka.ActorSystemAspect"/> <!–<aspect name="akka.MailboxAspect"/>–>--> - <!--<aspect name="akka.PoolMonitorInstrumentation"/>--> + <!--<aspect name="akka.PoolMonitorAspect"/>--> <aspect name="akka.ActorInstrumentation" /> <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> <aspect name="kamon.instrumentation.RunnableInstrumentation" /> + <!--<aspect name="kamon.instrumentation.DispatcherInstrumentation" />--> + <!--<aspect name ="akka.dispatch.FactoryInstrumentation" />--> + + + <!-- ExecutorService Instrumentation for Akka. --> + <aspect name="akka.dispatch.ExecutorServiceFactoryProviderInstrumentation"/> + <aspect name="akka.dispatch.NamedExecutorServiceFactoryDelegateInstrumentation"/> + + + + + <include within="*"/> <exclude within="javax..*"/> diff --git a/src/main/resources/newrelic.yml b/src/main/resources/newrelic.yml index e347635e..c395bd01 100644 --- a/src/main/resources/newrelic.yml +++ b/src/main/resources/newrelic.yml @@ -48,7 +48,7 @@ common: &default_settings # This setting is dynamic, so changes do not require restarting your application. # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest # Default is info. - log_level: info + log_level: finer enable_custom_tracing: true # Log all data to and from New Relic in plain text. @@ -70,7 +70,7 @@ common: &default_settings # The log file directory. # Default is the logs directory in the newrelic.jar parent directory. - #log_file_path: + log_file_path: /home/ivantopo/Desktop/tmp # The agent communicates with New Relic via https by # default. If you want to communicate with newrelic via http, diff --git a/src/main/scala/akka/ActorInstrumentation.scala b/src/main/scala/akka/ActorInstrumentation.scala index afe0e459..aa14f237 100644 --- a/src/main/scala/akka/ActorInstrumentation.scala +++ b/src/main/scala/akka/ActorInstrumentation.scala @@ -3,7 +3,7 @@ package akka import actor.ActorCell import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics.{ metricsRegistry => meterRegistry } +import kamon.metric.Metrics.{ registry => meterRegistry } import com.codahale.metrics.Meter import kamon.metric.MetricsUtils._ diff --git a/src/main/scala/akka/PoolMonitorInstrumentation.scala b/src/main/scala/akka/PoolMonitorInstrumentation.scala index 167083e8..e78e0d7e 100644 --- a/src/main/scala/akka/PoolMonitorInstrumentation.scala +++ b/src/main/scala/akka/PoolMonitorInstrumentation.scala @@ -1,16 +1,30 @@ package akka import org.aspectj.lang.annotation._ +import akka.dispatch.MonitorableThreadFactory +import kamon.metric.Metrics +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.Gauge -@Aspect("perthis(poolMonitor(*))") +@Aspect("perthis(poolMonitor(scala.concurrent.forkjoin.ForkJoinPool))") class PoolMonitorAspect { println("Created PoolMonitorAspect") + @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)") - protected def poolMonitor(pool:scala.concurrent.forkjoin.ForkJoinPool):Unit = {} + protected def poolMonitor(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {} @After("poolMonitor(pool)") def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = { + pool.getFactory match { + case m: MonitorableThreadFactory => registerForMonitoring(pool, m.name) + } + } + def registerForMonitoring(fjp: ForkJoinPool, name: String) { + Metrics.registry.register(s"/metrics/actorsystem/{actorsystem-name}/dispatcher/$name", + new Gauge[Long] { + def getValue: Long = fjp.getPoolSize + }) } } diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala index c1b97722..c58f95e4 100644 --- a/src/main/scala/kamon/Kamon.scala +++ b/src/main/scala/kamon/Kamon.scala @@ -8,7 +8,7 @@ object Kamon { override def initialValue() = None } - implicit lazy val actorSystem = ActorSystem("kamon") + implicit lazy val actorSystem = ActorSystem("kamon-test") def context() = ctx.get() diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index 351446f3..0bfcd74b 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -53,12 +53,14 @@ object ThreadLocalTraceEntryStorage extends TraceEntryStorage { private val storage = new ThreadLocal[List[TraceEntry]] { override def initialValue(): List[TraceEntry] = Nil - def update(f: List[TraceEntry] => List[TraceEntry]) = set(f(get())) } + def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) + def store(entry: TraceEntry): Boolean = { - storage.update(entry :: _) + update(entry :: _) true } } + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index ed76334f..41554410 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -7,8 +7,7 @@ import java.util.concurrent.TimeUnit import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout -import scala.util.Success -import scala.util.Failure +import scala.util.{Random, Success, Failure} import scala.concurrent.Future trait Message @@ -35,31 +34,24 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ case class Ping() case class Pong() -class PingActor(val target: ActorRef) extends Actor with ActorLogging { - implicit def executionContext = context.dispatcher - implicit val timeout = Timeout(30, TimeUnit.SECONDS) +class PingActor extends Actor with ActorLogging { + val pong = context.actorOf(Props[PongActor]) + val random = new Random() def receive = { case Pong() => { - log.info(s"pong with context ${Kamon.context}") - Thread.sleep(1000) - sender ! Ping() + Thread.sleep(random.nextInt(2000)) + //log.info("Message from Ping") + pong ! Ping() } - case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) } - - def withAny(): Any = {1} - def withAnyRef(): AnyRef = {new Object} } class PongActor extends Actor with ActorLogging { def receive = { case Ping() => { - Thread.sleep(3000) sender ! Pong() - log.info(s"ping with context ${Kamon.context}") } - case a: Any => println(s"Got ${a} in PONG") } } @@ -74,8 +66,10 @@ object TryAkka extends App{ } })) - - + for(i <- 1 to 4) { + val ping = system.actorOf(Props[PingActor]) + ping ! Pong() + } def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") @@ -100,8 +94,8 @@ object TryAkka extends App{ Kamon.stop - Thread.sleep(3000) - system.shutdown() + //Thread.sleep(3000) + //system.shutdown() /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 00000000..35e06b5d --- /dev/null +++ b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,71 @@ +package akka.dispatch + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import scala.concurrent.forkjoin.ForkJoinPool +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import akka.dispatch.NamedExecutorServiceFactoryDelegate +import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)") + def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {} + + @Around("factoryMethodCall(id, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +}
\ No newline at end of file diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index ef908625..e75a638f 100644 --- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -45,6 +45,12 @@ class RunnableInstrumentation { */ import kamon.TraceContextSwap.withContext + @Before("instrumentedRunnableCreation()") + def beforeCreation = { + //println((new Throwable).getStackTraceString) + } + + @Around("runnableExecution()") def around(pjp: ProceedingJoinPoint) = { import pjp._ diff --git a/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala new file mode 100644 index 00000000..78711267 --- /dev/null +++ b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -0,0 +1,67 @@ +package kamon.metric + +import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.{Metric, MetricFilter} + +object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { + + def register(fullName: String, executorService: ExecutorService) = executorService match { + case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) + case _ => // If it is a unknown Executor then just do nothing. + } + + def deregister(fullName: String) = { + Metrics.registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + + +trait ForkJoinPoolMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + + def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { + val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ + + val allMetrics = Map( + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) + ) + + allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2)) + } +} + +trait ThreadPoolExecutorMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { + val tpeGauge = newNumericGaugeFor(tpe) _ + + val allMetrics = Map( + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount) + ) + + allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2)) + } +} + + +object BasicExecutorMetricNames { + val queueSize = "queueSize" + val poolSize = "poolSize" + val activeThreads = "activeThreads" +} + + + + diff --git a/src/main/scala/kamon/metric/GaugeGenerator.scala b/src/main/scala/kamon/metric/GaugeGenerator.scala new file mode 100644 index 00000000..30635432 --- /dev/null +++ b/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -0,0 +1,12 @@ +package kamon.metric + +import com.codahale.metrics.Gauge + +trait GaugeGenerator { + + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def getValue: V = generator(target) + } +} + +object GaugeGenerator extends GaugeGenerator diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index cf04659b..25c9bd8e 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -4,11 +4,15 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics._ object Metrics { - val metricsRegistry: MetricRegistry = new MetricRegistry + val registry: MetricRegistry = new MetricRegistry - val consoleReporter = ConsoleReporter.forRegistry(metricsRegistry) - val newrelicReporter = NewRelicReporter(metricsRegistry) + val consoleReporter = ConsoleReporter.forRegistry(registry) + val newrelicReporter = NewRelicReporter(registry) newrelicReporter.start(5, TimeUnit.SECONDS) - consoleReporter.build().start(5, TimeUnit.SECONDS) -}
\ No newline at end of file + //consoleReporter.build().start(5, TimeUnit.SECONDS) +} + +object MetricDirectory { + def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" +} diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala index 56dce913..67ee1ba5 100644 --- a/src/main/scala/kamon/metric/NewRelicReporter.scala +++ b/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -17,12 +17,27 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt println(s"Logging to NewRelic: ${counter.getCount}") } + + +/* def processGauge(name: String, gauge: Gauge[_]) = { + println(s"the value is: "+gauge.getValue) + NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) + }*/ + + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { //Process Meters meters.asScala.map{case(name, meter) => processMeter(name, meter)} //Process Meters counters.asScala.map{case(name, counter) => processCounter(name, counter)} + + // Gauges + gauges.asScala.foreach{ case (name, gauge) => { + val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() + val fullMetricName = "Custom" + name + NewRelic.recordMetric(fullMetricName, measure) + }} } } diff --git a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index 4cc15a2f..c3606d23 100644 --- a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -25,7 +25,7 @@ class ActorInstrumentationSpec extends WordSpec with MustMatchers with ShouldMat //to ensure that all messages was received Thread.sleep(1000) - val messages = metricsRegistry.getMeters.asScala.get(meterForEchoActor).get.getCount + val messages = registry.getMeters.asScala.get(meterForEchoActor).get.getCount messages should equal(totalMessages) } |