diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-11 18:29:17 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-11 18:29:17 -0300 |
commit | 84c9ae342ea4a280b0033d9d78239b19b01b728f (patch) | |
tree | 7cc9730bc8f0c89410f4655988922e2b679549b5 /src/main/scala/kamon | |
parent | 197746563e47783ed4b5f43e94c9aa63734081f6 (diff) | |
download | Kamon-84c9ae342ea4a280b0033d9d78239b19b01b728f.tar.gz Kamon-84c9ae342ea4a280b0033d9d78239b19b01b728f.tar.bz2 Kamon-84c9ae342ea4a280b0033d9d78239b19b01b728f.zip |
wip
Diffstat (limited to 'src/main/scala/kamon')
-rw-r--r-- | src/main/scala/kamon/Kamon.scala | 2 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 6 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 32 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala | 71 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala | 6 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala | 67 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/GaugeGenerator.scala | 12 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 14 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/NewRelicReporter.scala | 15 |
9 files changed, 198 insertions, 27 deletions
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala index c1b97722..c58f95e4 100644 --- a/src/main/scala/kamon/Kamon.scala +++ b/src/main/scala/kamon/Kamon.scala @@ -8,7 +8,7 @@ object Kamon { override def initialValue() = None } - implicit lazy val actorSystem = ActorSystem("kamon") + implicit lazy val actorSystem = ActorSystem("kamon-test") def context() = ctx.get() diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index 351446f3..0bfcd74b 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -53,12 +53,14 @@ object ThreadLocalTraceEntryStorage extends TraceEntryStorage { private val storage = new ThreadLocal[List[TraceEntry]] { override def initialValue(): List[TraceEntry] = Nil - def update(f: List[TraceEntry] => List[TraceEntry]) = set(f(get())) } + def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) + def store(entry: TraceEntry): Boolean = { - storage.update(entry :: _) + update(entry :: _) true } } + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index ed76334f..41554410 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -7,8 +7,7 @@ import java.util.concurrent.TimeUnit import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout -import scala.util.Success -import scala.util.Failure +import scala.util.{Random, Success, Failure} import scala.concurrent.Future trait Message @@ -35,31 +34,24 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ case class Ping() case class Pong() -class PingActor(val target: ActorRef) extends Actor with ActorLogging { - implicit def executionContext = context.dispatcher - implicit val timeout = Timeout(30, TimeUnit.SECONDS) +class PingActor extends Actor with ActorLogging { + val pong = context.actorOf(Props[PongActor]) + val random = new Random() def receive = { case Pong() => { - log.info(s"pong with context ${Kamon.context}") - Thread.sleep(1000) - sender ! Ping() + Thread.sleep(random.nextInt(2000)) + //log.info("Message from Ping") + pong ! Ping() } - case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) } - - def withAny(): Any = {1} - def withAnyRef(): AnyRef = {new Object} } class PongActor extends Actor with ActorLogging { def receive = { case Ping() => { - Thread.sleep(3000) sender ! Pong() - log.info(s"ping with context ${Kamon.context}") } - case a: Any => println(s"Got ${a} in PONG") } } @@ -74,8 +66,10 @@ object TryAkka extends App{ } })) - - + for(i <- 1 to 4) { + val ping = system.actorOf(Props[PingActor]) + ping ! Pong() + } def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") @@ -100,8 +94,8 @@ object TryAkka extends App{ Kamon.stop - Thread.sleep(3000) - system.shutdown() + //Thread.sleep(3000) + //system.shutdown() /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 00000000..35e06b5d --- /dev/null +++ b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,71 @@ +package akka.dispatch + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import scala.concurrent.forkjoin.ForkJoinPool +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import akka.dispatch.NamedExecutorServiceFactoryDelegate +import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)") + def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {} + + @Around("factoryMethodCall(id, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +}
\ No newline at end of file diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index ef908625..e75a638f 100644 --- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -45,6 +45,12 @@ class RunnableInstrumentation { */ import kamon.TraceContextSwap.withContext + @Before("instrumentedRunnableCreation()") + def beforeCreation = { + //println((new Throwable).getStackTraceString) + } + + @Around("runnableExecution()") def around(pjp: ProceedingJoinPoint) = { import pjp._ diff --git a/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala new file mode 100644 index 00000000..78711267 --- /dev/null +++ b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -0,0 +1,67 @@ +package kamon.metric + +import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.{Metric, MetricFilter} + +object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { + + def register(fullName: String, executorService: ExecutorService) = executorService match { + case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) + case _ => // If it is a unknown Executor then just do nothing. + } + + def deregister(fullName: String) = { + Metrics.registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + + +trait ForkJoinPoolMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + + def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { + val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ + + val allMetrics = Map( + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) + ) + + allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2)) + } +} + +trait ThreadPoolExecutorMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { + val tpeGauge = newNumericGaugeFor(tpe) _ + + val allMetrics = Map( + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount) + ) + + allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2)) + } +} + + +object BasicExecutorMetricNames { + val queueSize = "queueSize" + val poolSize = "poolSize" + val activeThreads = "activeThreads" +} + + + + diff --git a/src/main/scala/kamon/metric/GaugeGenerator.scala b/src/main/scala/kamon/metric/GaugeGenerator.scala new file mode 100644 index 00000000..30635432 --- /dev/null +++ b/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -0,0 +1,12 @@ +package kamon.metric + +import com.codahale.metrics.Gauge + +trait GaugeGenerator { + + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def getValue: V = generator(target) + } +} + +object GaugeGenerator extends GaugeGenerator diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index cf04659b..25c9bd8e 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -4,11 +4,15 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics._ object Metrics { - val metricsRegistry: MetricRegistry = new MetricRegistry + val registry: MetricRegistry = new MetricRegistry - val consoleReporter = ConsoleReporter.forRegistry(metricsRegistry) - val newrelicReporter = NewRelicReporter(metricsRegistry) + val consoleReporter = ConsoleReporter.forRegistry(registry) + val newrelicReporter = NewRelicReporter(registry) newrelicReporter.start(5, TimeUnit.SECONDS) - consoleReporter.build().start(5, TimeUnit.SECONDS) -}
\ No newline at end of file + //consoleReporter.build().start(5, TimeUnit.SECONDS) +} + +object MetricDirectory { + def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" +} diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala index 56dce913..67ee1ba5 100644 --- a/src/main/scala/kamon/metric/NewRelicReporter.scala +++ b/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -17,12 +17,27 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt println(s"Logging to NewRelic: ${counter.getCount}") } + + +/* def processGauge(name: String, gauge: Gauge[_]) = { + println(s"the value is: "+gauge.getValue) + NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) + }*/ + + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { //Process Meters meters.asScala.map{case(name, meter) => processMeter(name, meter)} //Process Meters counters.asScala.map{case(name, counter) => processCounter(name, counter)} + + // Gauges + gauges.asScala.foreach{ case (name, gauge) => { + val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() + val fullMetricName = "Custom" + name + NewRelic.recordMetric(fullMetricName, measure) + }} } } |