aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
commitcd1a9dd25fb550a515e7a7408b88233773268c38 (patch)
tree98c16e292c533cc9aa51bb0f073864b1f9e2b68a /kamon-core/src/main/scala
parent6566e1c41510e54dd987d3e34e40f1031169d592 (diff)
downloadKamon-cd1a9dd25fb550a515e7a7408b88233773268c38.tar.gz
Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.tar.bz2
Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.zip
upgrading to akka 2.2
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala132
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/TransactionPublisher.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala103
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala89
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala23
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala245
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala73
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricFilter.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala146
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala51
-rw-r--r--kamon-core/src/main/scala/spraytest/ClientTest.scala55
-rw-r--r--kamon-core/src/main/scala/spraytest/FutureTesting.scala81
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala34
20 files changed, 1386 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
new file mode 100644
index 00000000..c3080909
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -0,0 +1,132 @@
+package kamon
+
+import akka.actor.{Actor, Props, ActorSystem}
+import scala.collection.JavaConverters._
+import java.util.concurrent.ConcurrentHashMap
+import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import com.newrelic.api.agent.NewRelic
+
+object Kamon {
+
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ implicit lazy val actorSystem = ActorSystem("kamon")
+
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ def newTraceContext(): TraceContext = TraceContext()
+
+
+ val publisher = actorSystem.actorOf(Props[TransactionPublisher])
+
+ def publish(tx: FullTransaction) = publisher ! tx
+
+
+
+ object Metric {
+ val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+
+ def actorSystemNames: List[String] = actorSystems.keys.toList
+ def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
+
+ def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
+ }
+
+
+
+ val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+
+}
+
+
+
+
+
+
+
+
+
+object Tracer {
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = ??? //set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ //def newTraceContext(): TraceContext = TraceContext()
+}
+
+
+class MetricManager extends Actor {
+ implicit val ec = context.system.dispatcher
+
+ def receive = {
+ case RegisterForAllDispatchers(frequency) => {
+ val subscriber = sender
+ context.system.scheduler.schedule(frequency, frequency) {
+ Kamon.Metric.actorSystems.foreach {
+ case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
+ case (dispatcherName, dispatcherMetrics) => {
+ val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
+ val poolSize = dispatcherMetrics.poolSize.snapshot
+ val queueSize = dispatcherMetrics.queueSize.snapshot
+
+ subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)
+
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+case class RegisterForAllDispatchers(frequency: FiniteDuration)
+case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)
+
+
+
+
+
+
+class NewrelicReporterActor extends Actor {
+ import scala.concurrent.duration._
+
+ Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+
+ def receive = {
+ case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
+ /*println("PUBLISHED DISPATCHER STATS")
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/
+
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
new file mode 100644
index 00000000..6b32550f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -0,0 +1,67 @@
+package kamon
+
+import java.util.UUID
+import akka.actor.{ActorSystem, ActorPath}
+import akka.agent.Agent
+import java.util.concurrent.TimeUnit
+import scala.util.{Failure, Success}
+import akka.util.Timeout
+
+
+case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+ implicit val as = Kamon.actorSystem.dispatcher
+
+ def append(entry: TraceEntry) = entries send (entry :: _)
+ def close = entries.future.onComplete({
+ case Success(list) => Kamon.publish(FullTransaction(id, list))
+ case Failure(t) => println("WTF!")
+ })
+}
+
+object TraceContext {
+ implicit val as2 = Kamon.actorSystem.dispatcher
+ def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+}
+
+
+
+trait TraceEntry
+
+case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
+
+
+
+case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry])
+
+
+
+
+
+object Collector {
+
+}
+
+trait TraceEntryStorage {
+ def store(entry: TraceEntry): Boolean
+}
+
+class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) {
+ def store(entry: TraceEntry) = storage.store(entry)
+}
+
+object ThreadLocalTraceEntryStorage extends TraceEntryStorage {
+
+ private val storage = new ThreadLocal[List[TraceEntry]] {
+ override def initialValue(): List[TraceEntry] = Nil
+ }
+
+ def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get)
+
+ def store(entry: TraceEntry): Boolean = {
+ update(entry :: _)
+ true
+ }
+}
+
+
diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
new file mode 100644
index 00000000..68ee808b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
@@ -0,0 +1,26 @@
+package kamon
+
+/**
+ * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
+ */
+trait TraceContextSwap {
+
+ def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body)
+
+ def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
+ ctx match {
+ case Some(context) => {
+ Kamon.set(context)
+ val bodyResult = primary
+ Kamon.clear
+
+ bodyResult
+ }
+ case None => fallback
+ }
+
+ }
+
+}
+
+object TraceContextSwap extends TraceContextSwap
diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
new file mode 100644
index 00000000..0626b91d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
@@ -0,0 +1,15 @@
+package kamon
+
+import akka.actor.Actor
+import java.util.UUID
+
+class TransactionPublisher extends Actor {
+
+ def receive = {
+ case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries")
+ }
+
+}
+
+
+case class FullTransaction(id: UUID, entries: List[TraceEntry])
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
new file mode 100644
index 00000000..599f2a7a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -0,0 +1,103 @@
+package kamon.executor
+
+import akka.event.ActorEventBus
+import akka.event.LookupClassification
+import akka.actor._
+import java.util.concurrent.TimeUnit
+
+import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
+import akka.util.Timeout
+import scala.util.{Random, Success, Failure}
+import scala.concurrent.Future
+
+trait Message
+
+case class PostMessage(text:String) extends Message
+
+case class MessageEvent(val channel:String, val message:Message)
+
+class AppActorEventBus extends ActorEventBus with LookupClassification{
+ type Event = MessageEvent
+ type Classifier=String
+ protected def mapSize(): Int={
+ 10
+ }
+
+ protected def classify(event: Event): Classifier={
+ event.channel
+ }
+
+ protected def publish(event: Event, subscriber: Subscriber): Unit={
+ subscriber ! event
+ }
+}
+case class Ping()
+case class Pong()
+
+class PingActor extends Actor with ActorLogging {
+
+ val pong = context.actorOf(Props[PongActor])
+ val random = new Random()
+ def receive = {
+ case Pong() => {
+ //Thread.sleep(random.nextInt(2000))
+ //log.info("Message from Ping")
+ pong ! Ping()
+ }
+ }
+}
+
+class PongActor extends Actor with ActorLogging {
+ def receive = {
+ case Ping() => {
+ sender ! Pong()
+ }
+ }
+}
+
+
+object TryAkka extends App{
+ val system = ActorSystem("MySystem")
+ val appActorEventBus=new AppActorEventBus
+ val NEW_POST_CHANNEL="/posts/new"
+ val subscriber = system.actorOf(Props(new Actor {
+ def receive = {
+ case d: MessageEvent => println(d)
+ }
+ }))
+
+ Kamon.start
+ for(i <- 1 to 4) {
+ val ping = system.actorOf(Props[PingActor])
+ ping ! Pong()
+ }
+
+
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
+
+ /*
+ val newRelicReporter = new NewRelicReporter(registry)
+ newRelicReporter.start(1, TimeUnit.SECONDS)
+
+*/
+ import akka.pattern.ask
+ implicit val timeout = Timeout(10, TimeUnit.SECONDS)
+ implicit def execContext = system.dispatcher
+
+
+
+ Kamon.start
+
+ Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
+ threadPrintln("Before doing it")
+ val f = Future { threadPrintln("This is happening inside the future body") }
+
+ Kamon.stop
+
+
+ //Thread.sleep(3000)
+ //system.shutdown()
+
+/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
+ appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..82915ce9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,89 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{Props, ActorSystem, ActorRef}
+import kamon.{Kamon, TraceContext}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
+import kamon.metric.{MetricDirectory, Metrics}
+import com.codahale.metrics
+import kamon.instrumentation.TraceableMessage
+import scala.Some
+
+case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
+
+
+@Aspect
+class ActorRefTellInstrumentation {
+ import ProceedingJoinPointPimp._
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)")
+ def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
+
+ @Around("sendingMessageToActorRef(actor, message, sender)")
+ def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
+
+ val actorName = MetricDirectory.nameForActor(actor)
+ val t = Metrics.registry.timer(actorName + "LATENCY")
+ //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
+ pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender)
+ }
+}
+
+
+@Aspect("perthis(actorCellCreation(..))")
+class ActorCellInvokeInstrumentation {
+
+ var processingTimeTimer: Timer = _
+ var shouldTrack = false
+
+ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val actorName = MetricDirectory.nameForActor(ref)
+ val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
+
+ /** TODO: Find a better way to filter the things we don't want to measure. */
+ //if(system.name != "kamon" && actorName.startsWith("/user")) {
+ processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
+ shouldTrack = true
+ //}
+ }
+
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ import ProceedingJoinPointPimp._
+ println("ENVELOPE --------------------->"+envelope)
+ envelope match {
+ case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
+ timer.stop()
+
+ val originalEnvelope = envelope.copy(message = msg)
+
+ //println("PROCESSING TIME TIMER: "+processingTimeTimer)
+ val pt = processingTimeTimer.time()
+ ctx match {
+ case Some(c) => {
+ Kamon.set(c)
+ println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ pjp.proceedWith(originalEnvelope)
+ Kamon.clear
+ }
+ case None => pjp.proceedWith(originalEnvelope)
+ }
+ pt.stop()
+ }
+ case _ => pjp.proceed
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
new file mode 100644
index 00000000..84c20c52
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
@@ -0,0 +1,23 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.ProceedingJoinPoint
+
+trait ProceedingJoinPointPimp {
+ import language.implicitConversions
+
+ implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
+}
+
+object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
+
+case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
+ def proceedWith(newUniqueArg: AnyRef) = {
+ val args = pjp.getArgs
+ args.update(0, newUniqueArg)
+ pjp.proceed(args)
+ }
+
+ def proceedWithTarget(args: AnyRef*) = {
+ pjp.proceed(args.toArray)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
new file mode 100644
index 00000000..b4f8a475
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -0,0 +1,245 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent._
+import org.aspectj.lang.ProceedingJoinPoint
+import java.util
+import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector}
+import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
+import com.typesafe.config.Config
+import kamon.Kamon
+import scala.concurrent.forkjoin.ForkJoinPool
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
+
+
+@Aspect
+class ActorSystemInstrumentation {
+
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
+ def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
+
+ @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
+ def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
+
+ Kamon.Metric.registerActorSystem(name)
+ }
+}
+
+@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
+class ForkJoinPoolInstrumentation {
+ var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _
+
+ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
+ def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
+
+ @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
+ def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
+ val (actorSystemName, dispatcherName) = threadFactory match {
+ case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
+ case _ => ("Unknown", "Unknown")
+ }
+
+ val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
+ for(m <- metrics) {
+ activeThreadsHistogram = m.activeThreadCount
+ poolSizeHistogram = m.poolSize
+ println(s"Registered $dispatcherName for actor system $actorSystemName")
+ }
+ }
+
+ def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
+ knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown"))
+ }
+
+
+
+
+ @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
+ def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
+
+ @After("forkJoinScan(fjp)")
+ def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
+ activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ poolSizeHistogram.update(fjp.getPoolSize)
+ }
+
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+/**
+ * ExecutorService monitoring base:
+ */
+trait ExecutorServiceCollector {
+ def updateActiveThreadCount(diff: Int): Unit
+ def updateTotalThreadCount(diff: Int): Unit
+ def updateQueueSize(diff: Int): Unit
+}
+
+trait WatchedExecutorService {
+ def collector: ExecutorServiceCollector
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+trait ExecutorServiceMonitoring {
+ def dispatcherMetrics: DispatcherMetricCollector
+}
+
+class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
+ @volatile var dispatcherMetrics: DispatcherMetricCollector = _
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = delegate.createExecutorService
+}
+
+@Aspect
+class ExecutorServiceFactoryProviderInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
+ def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
+ true
+ }
+
+ @Around("factoryMethodCall(dispatcherName, threadFactory)")
+ def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
+
+ val actorSystemName = threadFactory match {
+ case m: MonitorableThreadFactory => m.name
+ case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
+ }
+
+ new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
+ }
+
+}
+
+
+@Aspect
+class NamedExecutorServiceFactoryDelegateInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
+ def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
+
+ @Around("factoryMethodCall(namedFactory)")
+ def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorService]
+ val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+
+ ExecutorServiceMetricCollector.register(executorFullName, delegate)
+
+ new NamedExecutorServiceDelegate(executorFullName, delegate)
+ }
+}
+
+case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
+ def shutdown() = {
+ ExecutorServiceMetricCollector.deregister(fullName)
+ delegate.shutdown()
+ }
+ def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+ def isShutdown: Boolean = delegate.isShutdown
+ def isTerminated: Boolean = delegate.isTerminated
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
+ def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
+ def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
+ def submit(task: Runnable): Future[_] = delegate.submit(task)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
+ def execute(command: Runnable) = delegate.execute(command)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
new file mode 100644
index 00000000..c21502ac
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -0,0 +1,73 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
+import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
+import akka.actor.{ActorSystem, ActorRef}
+import kamon.metric.{Metrics, MetricDirectory}
+import org.aspectj.lang.ProceedingJoinPoint
+
+
+/**
+ * For Mailboxes we would like to track the queue size and message latency. Currently the latency
+ * will be gathered from the ActorCellMetrics.
+ */
+
+
+@Aspect
+class MessageQueueInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)")
+ def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {}
+
+ @Around("messageQueueCreation(owner, system)")
+ def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
+ val delegate = pjp.proceed.asInstanceOf[MessageQueue]
+
+ // We are not interested in monitoring mailboxes if we don't know where they belong to.
+ val monitoredMailbox = for(own <- owner; sys <- system) yield {
+ val systemName = sys.name
+ val ownerName = MetricDirectory.nameForActor(own)
+ val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName)
+
+ val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir())
+ Metrics.include(mailBoxName, queueSizeHistogram)
+
+ new MonitoredMessageQueue(delegate, queueSizeHistogram)
+ }
+
+ monitoredMailbox match {
+ case None => delegate
+ case Some(mmb) => mmb
+ }
+ }
+}
+
+
+class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
+
+ def enqueue(receiver: ActorRef, handle: Envelope) = {
+ delegate.enqueue(receiver, handle)
+ queueSizeHistogram.update(numberOfMessages)
+ }
+
+ def dequeue(): Envelope = {
+ val envelope = delegate.dequeue()
+ queueSizeHistogram.update(numberOfMessages)
+
+ envelope
+ }
+
+ def numberOfMessages: Int = delegate.numberOfMessages
+ def hasMessages: Boolean = delegate.hasMessages
+ def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
+}
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
new file mode 100644
index 00000000..e75a638f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -0,0 +1,61 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.{Kamon, TraceContext}
+import org.aspectj.lang.ProceedingJoinPoint
+import scala.Some
+
+/**
+ * Marker interface, just to make sure we don't instrument all the Runnables in the classpath.
+ */
+trait TraceContextAwareRunnable extends Runnable {}
+
+
+@Aspect("perthis(instrumentedRunnableCreation())")
+class RunnableInstrumentation {
+
+ /**
+ * These are the Runnables that need to be instrumented and make the TraceContext available
+ * while their run method is executed.
+ */
+ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
+ def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null
+
+
+ /**
+ * Pointcuts
+ */
+
+ @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))")
+ def instrumentedRunnableCreation(): Unit = {}
+
+ @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())")
+ def runnableExecution() = {}
+
+
+ /**
+ * Aspect members
+ */
+
+ private val traceContext = Kamon.context
+
+
+ /**
+ * Advices
+ */
+ import kamon.TraceContextSwap.withContext
+
+ @Before("instrumentedRunnableCreation()")
+ def beforeCreation = {
+ //println((new Throwable).getStackTraceString)
+ }
+
+
+ @Around("runnableExecution()")
+ def around(pjp: ProceedingJoinPoint) = {
+ import pjp._
+
+ withContext(traceContext, proceed())
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
new file mode 100644
index 00000000..74261403
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
@@ -0,0 +1,49 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
+
+class ActorCage(val name: String, val size: Int) {
+
+ def doIt: Unit = println("name")
+}
+
+trait CageMonitoring {
+ def histogram: Histogram
+ def count(value: Int): Unit
+}
+
+class CageMonitoringImp extends CageMonitoring{
+ final val histogram = new Histogram(new ExponentiallyDecayingReservoir())
+
+ def count(value: Int) = histogram.update(value)
+
+}
+
+
+@Aspect
+class InceptionAspect {
+
+ @DeclareMixin("kamon.instrumentation.ActorCage")
+ def mixin: CageMonitoring = new CageMonitoringImp
+
+
+ @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)")
+ def theActorCageDidIt(actorCage: CageMonitoring) = {}
+
+ @After("theActorCageDidIt(actorCage)")
+ def afterDoingIt(actorCage: CageMonitoring) = {
+ actorCage.count(1)
+ actorCage.histogram.getSnapshot.dump(System.out)
+ }
+
+
+
+}
+
+
+object Runner extends App {
+ val cage = new ActorCage("ivan", 10)
+
+ cage.doIt
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
new file mode 100644
index 00000000..54a13f39
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
@@ -0,0 +1,67 @@
+package kamon.metric
+
+import java.util.concurrent.{ThreadPoolExecutor, ExecutorService}
+import scala.concurrent.forkjoin.ForkJoinPool
+import com.codahale.metrics.{Metric, MetricFilter}
+
+object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {
+
+ def register(fullName: String, executorService: ExecutorService) = executorService match {
+ case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp)
+ case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe)
+ case _ => // If it is a unknown Executor then just do nothing.
+ }
+
+ def deregister(fullName: String) = {
+ Metrics.registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+
+trait ForkJoinPoolMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+
+ def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
+ val forkJoinPoolGauge = newNumericGaugeFor(fjp) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
+ fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
+ fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+trait ThreadPoolExecutorMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+ def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
+ val tpeGauge = newNumericGaugeFor(tpe) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> tpeGauge(_.getQueue.size()),
+ fullName + poolSize -> tpeGauge(_.getPoolSize),
+ fullName + activeThreads -> tpeGauge(_.getActiveCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+
+object BasicExecutorMetricNames {
+ val queueSize = "queueSize"
+ val poolSize = "threads/poolSize"
+ val activeThreads = "threads/activeThreads"
+}
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
new file mode 100644
index 00000000..30635432
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
@@ -0,0 +1,12 @@
+package kamon.metric
+
+import com.codahale.metrics.Gauge
+
+trait GaugeGenerator {
+
+ def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] {
+ def getValue: V = generator(target)
+ }
+}
+
+object GaugeGenerator extends GaugeGenerator
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
new file mode 100644
index 00000000..fb117968
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
@@ -0,0 +1,6 @@
+package kamon.metric
+
+object MetricFilter {
+ def actorSystem(system: String): Boolean = !system.startsWith("kamon")
+ def actor(path: String, system: String): Boolean = true
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
new file mode 100644
index 00000000..cdc0a334
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -0,0 +1,146 @@
+package kamon.metric
+
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
+import akka.actor.ActorRef
+import com.codahale.metrics
+import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}
+
+
+object Metrics {
+ val registry: MetricRegistry = new MetricRegistry
+
+ val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
+ //consoleReporter.build().start(45, TimeUnit.SECONDS)
+
+ //val newrelicReporter = NewRelicReporter(registry)
+ //newrelicReporter.start(5, TimeUnit.SECONDS)
+
+ def include(name: String, metric: Metric) = {
+ //registry.register(name, metric)
+ }
+
+ def exclude(name: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
+ })
+ }
+
+
+
+ def deregister(fullName: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+object Watched {
+ case object Actor
+ case object Dispatcher
+}
+
+object MetricDirectory {
+ def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
+
+ def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"
+
+ def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")
+
+ def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")
+
+
+ def shouldInstrumentActor(actorPath: String): Boolean = {
+ !(actorPath.isEmpty || actorPath.startsWith("system"))
+ }
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
+
+
+
+
+trait Histogram {
+ def update(value: Long): Unit
+ def snapshot: HistogramSnapshot
+}
+
+trait HistogramSnapshot {
+ def median: Double
+ def max: Double
+ def min: Double
+}
+
+
+case class ActorSystemMetrics(actorSystemName: String) {
+ import scala.collection.JavaConverters._
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala
+
+ private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
+
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
+ val stats = createDispatcherCollector
+ dispatchers.put(dispatcherName, stats)
+ Some(stats)
+ }
+
+}
+
+
+case class CodahaleHistogram() extends Histogram {
+ private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
+
+ def update(value: Long) = histogram.update(value)
+ def snapshot: HistogramSnapshot = {
+ val snapshot = histogram.getSnapshot
+
+ CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
+ }
+}
+
+case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
+
+
+
+
+
+
+
+/**
+ * Dispatcher Metrics that we care about currently with a histogram-like nature:
+ * - Work Queue Size
+ * - Total/Active Thread Count
+ */
+
+
+
+import annotation.tailrec
+import java.util.concurrent.atomic.AtomicReference
+
+object Atomic {
+ def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
+ implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
+}
+
+class Atomic[T](val atomic : AtomicReference[T]) {
+ @tailrec
+ final def update(f: T => T) : T = {
+ val oldValue = atomic.get()
+ val newValue = f(oldValue)
+ if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
+ }
+
+ def get() = atomic.get()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
new file mode 100644
index 00000000..5b4ceaf4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
@@ -0,0 +1,51 @@
+package kamon.metric
+
+import com.codahale.metrics._
+
+object MetricsUtils {
+
+ def markMeter[T](meter:Meter)(f: => T): T = {
+ meter.mark()
+ f
+ }
+//
+// def incrementCounter(key: String) {
+// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count
+// }
+//
+// def markMeter(key: String) {
+// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark()
+// }
+//
+// def trace[T](key: String)(f: => T): T = {
+// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) )
+// timer.time(f)
+// }
+
+// def markAndCountMeter[T](key: String)(f: => T): T = {
+// markMeter(key)
+// f
+// }
+//
+// def traceAndCount[T](key: String)(f: => T): T = {
+// incrementCounter(key)
+// trace(key) {
+// f
+// }
+ //}
+
+// implicit def runnable(f: () => Unit): Runnable =
+// new Runnable() { def run() = f() }
+//
+//
+// import java.util.concurrent.Callable
+//
+// implicit def callable[T](f: () => T): Callable[T] =
+// new Callable[T]() { def call() = f() }
+
+// private val actorCounter:Counter = new Counter
+// private val actorTimer:Timer = new Timer
+//
+// metricsRegistry.register(s"counter-for-${actorName}", actorCounter)
+// metricsRegistry.register(s"timer-for-${actorName}", actorTimer)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
new file mode 100644
index 00000000..70f3e54a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -0,0 +1,51 @@
+package kamon.metric
+
+import com.codahale.metrics
+import metrics._
+import java.util.concurrent.TimeUnit
+import java.util
+import com.newrelic.api.agent.NewRelic
+import scala.collection.JavaConverters._
+
+
+class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
+
+
+
+ private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
+ NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
+ }
+
+ private[NewRelicReporter] def processCounter(name:String, counter:Counter) {
+ println(s"Logging to NewRelic: ${counter.getCount}")
+
+ }
+
+
+/* def processGauge(name: String, gauge: Gauge[_]) = {
+ println(s"the value is: "+gauge.getValue)
+ NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float])
+ }*/
+
+
+ def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
+ //Process Meters
+ meters.asScala.map{case(name, meter) => processMeter(name, meter)}
+
+ //Process Meters
+ counters.asScala.map{case(name, counter) => processCounter(name, counter)}
+
+ // Gauges
+ gauges.asScala.foreach{ case (name, gauge) => {
+ val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue()
+ val fullMetricName = "Custom" + name
+ NewRelic.recordMetric(fullMetricName, measure)
+ }}
+ }
+
+
+}
+
+object NewRelicReporter {
+ def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala
new file mode 100644
index 00000000..07532d0a
--- /dev/null
+++ b/kamon-core/src/main/scala/spraytest/ClientTest.scala
@@ -0,0 +1,55 @@
+package spraytest
+
+import akka.actor.ActorSystem
+import spray.client.pipelining._
+import spray.httpx.SprayJsonSupport
+import spray.json._
+import scala.concurrent.Future
+import spray.can.Http
+import akka.io.IO
+
+/**
+ * BEGIN JSON Infrastructure
+ */
+case class Container(data: List[PointOfInterest])
+case class Geolocation(latitude: Float, longitude: Float)
+case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation)
+
+object GeoJsonProtocol extends DefaultJsonProtocol {
+ implicit val geolocationFormat = jsonFormat2(Geolocation)
+ implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest)
+ implicit val containerFormat = jsonFormat1(Container)
+}
+/** END-OF JSON Infrastructure */
+
+
+
+
+
+
+class ClientTest extends App {
+ implicit val actorSystem = ActorSystem("spray-client-test")
+ import actorSystem.dispatcher
+
+
+ import GeoJsonProtocol._
+ import SprayJsonSupport._
+
+
+ val actor = IO(Http)
+
+ val pipeline = sendReceive ~> unmarshal[Container]
+
+ val response = pipeline {
+ Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco")
+ } onSuccess {
+ case a => {
+ println(a)
+ }
+ }
+}
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
new file mode 100644
index 00000000..b864d6d6
--- /dev/null
+++ b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
@@ -0,0 +1,81 @@
+package spraytest
+/*
+import akka.actor.ActorSystem
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Try, Success}
+import kamon.actor.TransactionContext
+
+object FutureTesting extends App {
+
+ val actorSystem = ActorSystem("future-testing")
+ implicit val ec = actorSystem.dispatcher
+ implicit val tctx = TransactionContext(11, Nil)
+
+ threadPrintln("In the initial Thread")
+
+
+ val f = TraceableFuture {
+ threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}")
+ }
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+
+
+
+
+
+
+
+ def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]")
+
+}
+
+
+
+
+trait TransactionContextWrapper {
+ def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = {
+ TransactionContext.current.set(tranContext.fork)
+ println(s"SetContext to: ${tranContext}")
+ val result = f
+
+ TransactionContext.current.remove()
+ result
+ }
+
+}
+
+class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper {
+ def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = {
+ future.onComplete(wrap(func, transactionContext))
+ }
+}
+
+object TraceableFuture {
+
+ implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
+
+ def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil))
+
+ new TraceableFuture(Future { wrappedBody })
+ }
+
+
+
+
+ def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = {
+ TransactionContext.current.set(transactionContext)
+ val result = body
+ TransactionContext.current.remove()
+ result
+ }
+}*/
+
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
new file mode 100644
index 00000000..f9d6869c
--- /dev/null
+++ b/kamon-core/src/main/scala/test/PingPong.scala
@@ -0,0 +1,34 @@
+package test
+
+import akka.actor.{Props, Actor, ActorSystem}
+
+object PingPong extends App {
+
+ val as = ActorSystem("ping-pong")
+
+ val pinger = as.actorOf(Props[Pinger])
+ val ponger = as.actorOf(Props[Ponger])
+
+ pinger.tell(Pong, ponger)
+
+
+ Thread.sleep(30000)
+ as.shutdown()
+
+
+}
+
+case object Ping
+case object Pong
+
+class Pinger extends Actor {
+ def receive = {
+ case Pong => sender ! Ping
+ }
+}
+
+class Ponger extends Actor {
+ def receive = {
+ case Ping => sender ! Pong
+ }
+}