From bf86900669d649308f4914c54e6fe076510506a6 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 7 Nov 2013 18:41:33 -0300 Subject: halfway to our own NewRelic Agent --- .../src/main/scala/kamon/newrelic/NewRelic.scala | 102 +++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala') diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala new file mode 100644 index 00000000..2ee7ada0 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -0,0 +1,102 @@ +package kamon.newrelic + +import akka.actor._ +import scala.collection.mutable +import kamon.Kamon +import kamon.trace.{UowTrace, Trace} +import kamon.newrelic.NewRelicMetric.{MetricBatch, FlushMetrics} +import scala.concurrent.duration._ + +class NewRelic extends ExtensionId[NewRelicExtension] { + def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) +} + +class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") +} + +class NewRelicManager extends Actor with ActorLogging { + log.info("Registering the Kamon(NewRelic) extension") + + Kamon(Trace)(context.system) ! Trace.Register + + + + val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics") + val agent = context.actorOf(Props[Agent], "agent") + + import context.dispatcher + context.system.scheduler.schedule(1 minute, 1 minute) { + webTransactionMetrics.tell(FlushMetrics, agent) + } + + def receive = { + case trace: UowTrace => webTransactionMetrics ! trace + } +} + +object NewRelicMetric { + case class ID(name: String, scope: Option[String]) + case class Data(var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double, var callCount: Long) { + def record(value: Double): Unit = { + if(value > max) max = value + if(value < min) min = value + + total += value + totalExclusive += value + sumOfSquares += value * value + callCount += 1 + } + } + + object Data { + def apply(): Data = Data(0, 0, 0, 0, 0, 0) + } + + case object FlushMetrics + case class MetricBatch(metrics: List[(ID, Data)]) +} + + +class WebTransactionMetrics extends Actor with ActorLogging { + val apdexT = 1500000000 + var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] + var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) + + def receive = { + case trace: UowTrace => updateStats(trace) + case FlushMetrics => flush + } + + def flush: Unit = { + sender ! MetricBatch(metrics.toList :+ (NewRelicMetric.ID("Apdex", None), apdex)) + apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) + metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] + } + + def recordValue(metricID: NewRelicMetric.ID, value: Double): Unit = { + metrics.getOrElseUpdate(metricID, NewRelicMetric.Data()).record(value) + } + + def recordApdex(time: Double): Unit = { + if(time <= apdexT) + apdex.total += 1 + else + if(time > apdexT && time <= (4 * apdexT)) + apdex.totalExclusive += 1 + else + apdex.min += 1 + + } + + def updateStats(trace: UowTrace): Unit = { + // Basic Metrics + recordApdex(trace.elapsed) + recordValue(NewRelicMetric.ID("WebTransaction", None), trace.elapsed) + recordValue(NewRelicMetric.ID("HttpDispatcher", None), trace.elapsed) + recordValue(NewRelicMetric.ID("WebTransaction/Custom/" + trace.name, None), trace.elapsed) + + println("Recorded Apdex: " + apdex) + println("Current Metrics: " + metrics.mkString("\n")) + } +} -- cgit v1.2.3