aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/Kamon.scala
blob: c3080909b6ffa9c39a4794f44c336db4790a96f4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package kamon

import akka.actor.{Actor, Props, ActorSystem}
import scala.collection.JavaConverters._
import java.util.concurrent.ConcurrentHashMap
import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
import scala.concurrent.duration.{FiniteDuration, Duration}
import com.newrelic.api.agent.NewRelic

object Kamon {

  val ctx = new ThreadLocal[Option[TraceContext]] {
    override def initialValue() = None
  }
  
  implicit lazy val actorSystem = ActorSystem("kamon")


  def context() = ctx.get()
  def clear = ctx.remove()
  def set(traceContext: TraceContext) = ctx.set(Some(traceContext))

  def start = set(newTraceContext)
  def stop = ctx.get match {
    case Some(context) => context.close
    case None =>
  }

  def newTraceContext(): TraceContext = TraceContext()


  val publisher = actorSystem.actorOf(Props[TransactionPublisher])

  def publish(tx: FullTransaction) = publisher ! tx



  object Metric {
    val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala

    def actorSystemNames: List[String] = actorSystems.keys.toList
    def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))

    def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
  }



  val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
  val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")

}









object Tracer {
  val ctx = new ThreadLocal[Option[TraceContext]] {
    override def initialValue() = None
  }

  def context() = ctx.get()
  def clear = ctx.remove()
  def set(traceContext: TraceContext) = ctx.set(Some(traceContext))

  def start = ??? //set(newTraceContext)
  def stop = ctx.get match {
    case Some(context) => context.close
    case None =>
  }

  //def newTraceContext(): TraceContext = TraceContext()
}


class MetricManager extends Actor {
  implicit val ec = context.system.dispatcher

  def receive = {
    case RegisterForAllDispatchers(frequency) => {
      val subscriber = sender
      context.system.scheduler.schedule(frequency, frequency) {
        Kamon.Metric.actorSystems.foreach {
          case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
            case (dispatcherName, dispatcherMetrics) => {
              val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
              val poolSize = dispatcherMetrics.poolSize.snapshot
              val queueSize = dispatcherMetrics.queueSize.snapshot

              subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)

            }
          }
        }
      }
    }
  }
}

case class RegisterForAllDispatchers(frequency: FiniteDuration)
case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)






class NewrelicReporterActor extends Actor {
  import scala.concurrent.duration._

  Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)

  def receive = {
    case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
      /*println("PUBLISHED DISPATCHER STATS")
      println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
      println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
      println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/


      NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
      NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))

      NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
    }
  }
}