aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/Kamon.scala
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
commitcd1a9dd25fb550a515e7a7408b88233773268c38 (patch)
tree98c16e292c533cc9aa51bb0f073864b1f9e2b68a /kamon-core/src/main/scala/kamon/Kamon.scala
parent6566e1c41510e54dd987d3e34e40f1031169d592 (diff)
downloadKamon-cd1a9dd25fb550a515e7a7408b88233773268c38.tar.gz
Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.tar.bz2
Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.zip
upgrading to akka 2.2
Diffstat (limited to 'kamon-core/src/main/scala/kamon/Kamon.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala132
1 files changed, 132 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
new file mode 100644
index 00000000..c3080909
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -0,0 +1,132 @@
+package kamon
+
+import akka.actor.{Actor, Props, ActorSystem}
+import scala.collection.JavaConverters._
+import java.util.concurrent.ConcurrentHashMap
+import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import com.newrelic.api.agent.NewRelic
+
+object Kamon {
+
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ implicit lazy val actorSystem = ActorSystem("kamon")
+
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ def newTraceContext(): TraceContext = TraceContext()
+
+
+ val publisher = actorSystem.actorOf(Props[TransactionPublisher])
+
+ def publish(tx: FullTransaction) = publisher ! tx
+
+
+
+ object Metric {
+ val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+
+ def actorSystemNames: List[String] = actorSystems.keys.toList
+ def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
+
+ def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
+ }
+
+
+
+ val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+
+}
+
+
+
+
+
+
+
+
+
+object Tracer {
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = ??? //set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ //def newTraceContext(): TraceContext = TraceContext()
+}
+
+
+class MetricManager extends Actor {
+ implicit val ec = context.system.dispatcher
+
+ def receive = {
+ case RegisterForAllDispatchers(frequency) => {
+ val subscriber = sender
+ context.system.scheduler.schedule(frequency, frequency) {
+ Kamon.Metric.actorSystems.foreach {
+ case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
+ case (dispatcherName, dispatcherMetrics) => {
+ val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
+ val poolSize = dispatcherMetrics.poolSize.snapshot
+ val queueSize = dispatcherMetrics.queueSize.snapshot
+
+ subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)
+
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+case class RegisterForAllDispatchers(frequency: FiniteDuration)
+case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)
+
+
+
+
+
+
+class NewrelicReporterActor extends Actor {
+ import scala.concurrent.duration._
+
+ Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+
+ def receive = {
+ case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
+ /*println("PUBLISHED DISPATCHER STATS")
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/
+
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
+ }
+ }
+} \ No newline at end of file