aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/Kamon.scala
blob: fb1b2393764126266bed12735bcfd180554dcff9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package kamon

import akka.actor.{Actor, Props, ActorSystem}
import kamon.metric.{HistogramSnapshot, ActorSystemMetrics}
import scala.concurrent.duration.FiniteDuration
import com.newrelic.api.agent.NewRelic
import scala.collection.concurrent.TrieMap
import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration}
import scala.util.DynamicVariable


object Instrument {
  val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation
}

object Kamon {
  implicit lazy val actorSystem = ActorSystem("kamon")

  object Metric {

    val actorSystems =  TrieMap.empty[String, ActorSystemMetrics]

    def actorSystemNames: List[String] = actorSystems.keys.toList
    def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))

    def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
  }

  //val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
  //val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")

}


object Tracer {
  val traceContext = new DynamicVariable[Option[TraceContext]](None)


  def context() = traceContext.value
  def set(ctx: TraceContext) = traceContext.value = Some(ctx)

  def start = set(newTraceContext)
  def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem)
}


class MetricManager extends Actor {
  implicit val ec = context.system.dispatcher

  def receive = {
    case RegisterForAllDispatchers(frequency) => {
      val subscriber = sender
      context.system.scheduler.schedule(frequency, frequency) {
        Kamon.Metric.actorSystems.foreach {
          case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
            case (dispatcherName, dispatcherMetrics) => {
              val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
              val poolSize = dispatcherMetrics.poolSize.snapshot
              val queueSize = dispatcherMetrics.queueSize.snapshot

              subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)

            }
          }
        }
      }
    }
  }
}

case class RegisterForAllDispatchers(frequency: FiniteDuration)
case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)






class NewrelicReporterActor extends Actor {
  import scala.concurrent.duration._

  //Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)

  def receive = {
    case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
      /*println("PUBLISHED DISPATCHER STATS")
      println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
      println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
      println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/


      NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
      NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))

      NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
    }
  }
}