From bf86900669d649308f4914c54e6fe076510506a6 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 7 Nov 2013 18:41:33 -0300 Subject: halfway to our own NewRelic Agent --- kamon-newrelic/src/main/resources/reference.conf | 14 +-- .../src/main/scala/kamon/newrelic/Agent.scala | 103 +++++++++++++++++++++ .../scala/kamon/newrelic/AgentJsonProtocol.scala | 56 +++++++++++ .../src/main/scala/kamon/newrelic/Apdex.scala | 78 ++++++++++++++++ .../src/main/scala/kamon/newrelic/NewRelic.scala | 102 ++++++++++++++++++++ .../scala/kamon/newrelic/NewRelicReporting.scala | 6 +- .../src/test/scala/kamon/newrelic/AgentSpec.scala | 70 ++++++++++++++ 7 files changed, 418 insertions(+), 11 deletions(-) create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala create mode 100644 kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala (limited to 'kamon-newrelic') diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index a2583195..dacabe41 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -1,13 +1,13 @@ akka { - actor { - debug { - unhandled = on - } - } - loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"] + extensions = ["kamon.newrelic.NewRelic"] } - +kamon { + newrelic { + app-name = "Kamon[Development]" + license-key = 2e24765acb032cb9e7207013b5ba3e2ab7d2d75c + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala new file mode 100644 index 00000000..c4d7c089 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -0,0 +1,103 @@ +package kamon.newrelic + +import akka.actor.Actor +import spray.json._ +import scala.concurrent.Future +import spray.httpx.{SprayJsonSupport, RequestBuilding, ResponseTransformation} +import spray.httpx.encoding.Deflate +import spray.http._ +import spray.json.lenses.JsonLenses._ +import akka.pattern.pipe +import java.lang.management.ManagementFactory +import spray.client.pipelining._ +import scala.util.control.NonFatal +import kamon.newrelic.NewRelicMetric.{Data, ID, MetricBatch} + +class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport { + import context.dispatcher + import Agent._ + + val agentInfo = { + val config = context.system.settings.config.getConfig("kamon.newrelic") + val appName = config.getString("app-name") + val licenseKey = config.getString("license-key") + + // Name has the format of pid@host + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + + AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) + } + + + + def receive = { + case Initialize(runId, collector) => context become reporting(runId, collector) + } + + + def reporting(runId: Long, collector: String): Receive = { + case batch: MetricBatch => sendMetricData(runId, collector, batch.metrics) + } + + override def preStart(): Unit = { + super.preStart() + initialize + } + + def initialize: Unit = { + pipe ({ + for( + collector <- selectCollector; + runId <- connect(collector, agentInfo) + ) yield Initialize(runId, collector) + } recover { + case NonFatal(ex) => InitializationFailed(ex) + }) to self + } + + import AgentJsonProtocol._ + val compressedPipeline: HttpRequest => Future[HttpResponse] = encode(Deflate) ~> sendReceive + val compressedToJsonPipeline: HttpRequest => Future[JsValue] = compressedPipeline ~> toJson + + def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson + + def selectCollector: Future[String] = { + compressedToJsonPipeline { + Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray()) + } map { json => + json.extract[String]('return_value) + } + } + + def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + compressedToJsonPipeline { + Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect) + } map { json => + json.extract[Long]('return_value / 'agent_run_id) + } + } + + + def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = { + val end = System.currentTimeMillis() / 1000L + val start = end - 60 + compressedPipeline { + Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics)) + } + } + + + +} + +object Agent { + + + + case class Initialize(runId: Long, collector: String) + case class InitializationFailed(reason: Throwable) + case class CollectorSelection(return_value: String) + case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) + + case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)]) +} \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala new file mode 100644 index 00000000..30e17e77 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala @@ -0,0 +1,56 @@ +package kamon.newrelic + +import spray.json._ +import kamon.newrelic.Agent._ + +object AgentJsonProtocol extends DefaultJsonProtocol { + + implicit object ConnectJsonWriter extends RootJsonWriter[AgentInfo] { + def write(obj: AgentInfo): JsValue = + JsArray( + JsObject( + "agent_version" -> JsString("3.1.0"), + "app_name" -> JsArray(JsString(obj.appName)), + "host" -> JsString(obj.host), + "identifier" -> JsString(s"java:${obj.appName}"), + "language" -> JsString("java"), + "pid" -> JsNumber(obj.pid) + ) + ) + } + + import NewRelicMetric._ + + implicit def listWriter[T : JsonWriter] = new JsonWriter[List[T]] { + def write(list: List[T]) = JsArray(list.map(_.toJson)) + } + + implicit object MetricDetailWriter extends JsonWriter[(ID, Data)] { + def write(obj: (ID, Data)): JsValue = { + val (id, data) = obj + JsArray( + JsObject( + "name" -> JsString(id.name) // TODO Include scope + ), + JsArray( + JsNumber(data.total), + JsNumber(data.totalExclusive), + JsNumber(data.min), + JsNumber(data.max), + JsNumber(data.sumOfSquares), + JsNumber(data.callCount) + ) + ) + } + } + + implicit object MetricDataWriter extends RootJsonWriter[MetricData] { + def write(obj: MetricData): JsValue = + JsArray( + JsNumber(obj.runId), + JsNumber(obj.start), + JsNumber(obj.end), + obj.metrics.toJson + ) + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala new file mode 100644 index 00000000..c9178292 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala @@ -0,0 +1,78 @@ +package kamon.newrelic + +import akka.actor.Actor +import kamon.trace.UowTrace +import com.newrelic.api.agent.{NewRelic => NRAgent} +import kamon.trace.UowTracing.WebExternal + +class Apdex extends Actor { + val t = 500 + + var satisfied: Int = 0 + var tolerating: Int = 0 + var frustrated: Int = 0 + + def receive = { + case trace: UowTrace => recordTransaction(trace) + + } + + def clearStats: Unit = { + satisfied = 0 + tolerating = 0 + frustrated = 0 + } + + def total: Int = satisfied + tolerating + frustrated + + def updateStats(sampleTime: Double): Unit = { + if(sampleTime < t) + satisfied += 1 + else + if(sampleTime >= t && sampleTime <= 4*t) + tolerating += 1 + else + frustrated += 1 + } + + def recordTransaction(uowTrace: UowTrace): Unit = { + val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) + + updateStats(time) + + NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) + NRAgent.recordMetric("WebTransaction", time.toFloat) + NRAgent.recordMetric("HttpDispatcher", time.toFloat) + + uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => + val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat + + println("Web External: " + webExternalTrace) + NRAgent.recordMetric(s"External/${webExternalTrace.host}/http", external) + NRAgent.recordMetric(s"External/${webExternalTrace.host}/all", external) + NRAgent.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) + } + + + val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) + + + def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { + case Nil => accum + case head :: tail => + if(head.start > lastEnd) + measureExternal(accum + (head.finish-head.start), head.finish, tail) + else + measureExternal(accum + (head.finish-lastEnd), head.finish, tail) + } + + val external = measureExternal(0, 0, allExternals) / 1E9 + + + NRAgent.recordMetric(s"External/all", external.toFloat) + NRAgent.recordMetric(s"External/allWeb", external.toFloat) + + } +} + +case object Flush diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala new file mode 100644 index 00000000..2ee7ada0 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -0,0 +1,102 @@ +package kamon.newrelic + +import akka.actor._ +import scala.collection.mutable +import kamon.Kamon +import kamon.trace.{UowTrace, Trace} +import kamon.newrelic.NewRelicMetric.{MetricBatch, FlushMetrics} +import scala.concurrent.duration._ + +class NewRelic extends ExtensionId[NewRelicExtension] { + def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) +} + +class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") +} + +class NewRelicManager extends Actor with ActorLogging { + log.info("Registering the Kamon(NewRelic) extension") + + Kamon(Trace)(context.system) ! Trace.Register + + + + val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics") + val agent = context.actorOf(Props[Agent], "agent") + + import context.dispatcher + context.system.scheduler.schedule(1 minute, 1 minute) { + webTransactionMetrics.tell(FlushMetrics, agent) + } + + def receive = { + case trace: UowTrace => webTransactionMetrics ! trace + } +} + +object NewRelicMetric { + case class ID(name: String, scope: Option[String]) + case class Data(var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double, var callCount: Long) { + def record(value: Double): Unit = { + if(value > max) max = value + if(value < min) min = value + + total += value + totalExclusive += value + sumOfSquares += value * value + callCount += 1 + } + } + + object Data { + def apply(): Data = Data(0, 0, 0, 0, 0, 0) + } + + case object FlushMetrics + case class MetricBatch(metrics: List[(ID, Data)]) +} + + +class WebTransactionMetrics extends Actor with ActorLogging { + val apdexT = 1500000000 + var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] + var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) + + def receive = { + case trace: UowTrace => updateStats(trace) + case FlushMetrics => flush + } + + def flush: Unit = { + sender ! MetricBatch(metrics.toList :+ (NewRelicMetric.ID("Apdex", None), apdex)) + apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) + metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] + } + + def recordValue(metricID: NewRelicMetric.ID, value: Double): Unit = { + metrics.getOrElseUpdate(metricID, NewRelicMetric.Data()).record(value) + } + + def recordApdex(time: Double): Unit = { + if(time <= apdexT) + apdex.total += 1 + else + if(time > apdexT && time <= (4 * apdexT)) + apdex.totalExclusive += 1 + else + apdex.min += 1 + + } + + def updateStats(trace: UowTrace): Unit = { + // Basic Metrics + recordApdex(trace.elapsed) + recordValue(NewRelicMetric.ID("WebTransaction", None), trace.elapsed) + recordValue(NewRelicMetric.ID("HttpDispatcher", None), trace.elapsed) + recordValue(NewRelicMetric.ID("WebTransaction/Custom/" + trace.name, None), trace.elapsed) + + println("Recorded Apdex: " + apdex) + println("Current Metrics: " + metrics.mkString("\n")) + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 106f27e2..8c6bfe77 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -2,10 +2,8 @@ package kamon.newrelic import akka.actor.Actor import kamon.trace.UowTrace -import com.newrelic.api.agent.{Response, Request, Trace, NewRelic} -import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart} -import java.util -import java.util.Date +import com.newrelic.api.agent.NewRelic +import kamon.trace.UowTracing.WebExternal class NewRelicReporting extends Actor { diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala new file mode 100644 index 00000000..8868b8c0 --- /dev/null +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -0,0 +1,70 @@ +package kamon.newrelic + +import akka.testkit.{TestActor, TestProbe, TestKit} +import akka.actor.{Props, ActorRef, ActorSystem} +import org.scalatest.WordSpecLike +import kamon.AkkaExtensionSwap +import spray.can.Http +import akka.io.IO +import akka.testkit.TestActor.{KeepRunning, AutoPilot} +import spray.http._ +import spray.http.HttpRequest +import spray.http.HttpResponse + +class AgentSpec extends TestKit(ActorSystem("agent-spec")) with WordSpecLike { + + setupFakeHttpManager + + "the Newrelic Agent" should { + "try to connect upon creation" in { + val agent = system.actorOf(Props[Agent]) + + Thread.sleep(5000) + } + } + + def setupFakeHttpManager: Unit = { + val fakeHttpManager = TestProbe() + fakeHttpManager.setAutoPilot(new TestActor.AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = { + msg match { + case HttpRequest(_, uri, _, _, _) if rawMethodIs("get_redirect_host", uri) => + sender ! jsonResponse( + """ + | { + | "return_value": "collector-8.newrelic.com" + | } + | """.stripMargin) + + println("Selecting Collector") + + case HttpRequest(_, uri, _, _, _) if rawMethodIs("connect", uri) => + sender ! jsonResponse( + """ + | { + | "return_value": { + | "agent_run_id": 161221111 + | } + | } + | """.stripMargin) + println("Connecting") + } + + KeepRunning + } + + def jsonResponse(json: String): HttpResponse = { + HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json)) + } + + def rawMethodIs(method: String, uri: Uri): Boolean = { + uri.query.get("method").filter(_ == method).isDefined + } + }) + + + AkkaExtensionSwap.swap(system, Http, new IO.Extension { + def manager: ActorRef = fakeHttpManager.ref + }) + } +} -- cgit v1.2.3