aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-11-07 18:41:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-11-07 18:41:33 -0300
commitbf86900669d649308f4914c54e6fe076510506a6 (patch)
treed8bf9af9f5c8a946d757137a303f6956c05edb03 /kamon-newrelic
parent2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 (diff)
downloadKamon-bf86900669d649308f4914c54e6fe076510506a6.tar.gz
Kamon-bf86900669d649308f4914c54e6fe076510506a6.tar.bz2
Kamon-bf86900669d649308f4914c54e6fe076510506a6.zip
halfway to our own NewRelic Agent
Diffstat (limited to 'kamon-newrelic')
-rw-r--r--kamon-newrelic/src/main/resources/reference.conf14
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala103
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala56
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala78
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala102
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala6
-rw-r--r--kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala70
7 files changed, 418 insertions, 11 deletions
diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf
index a2583195..dacabe41 100644
--- a/kamon-newrelic/src/main/resources/reference.conf
+++ b/kamon-newrelic/src/main/resources/reference.conf
@@ -1,13 +1,13 @@
akka {
- actor {
- debug {
- unhandled = on
- }
- }
- loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"]
+ extensions = ["kamon.newrelic.NewRelic"]
}
-
+kamon {
+ newrelic {
+ app-name = "Kamon[Development]"
+ license-key = 2e24765acb032cb9e7207013b5ba3e2ab7d2d75c
+ }
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
new file mode 100644
index 00000000..c4d7c089
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -0,0 +1,103 @@
+package kamon.newrelic
+
+import akka.actor.Actor
+import spray.json._
+import scala.concurrent.Future
+import spray.httpx.{SprayJsonSupport, RequestBuilding, ResponseTransformation}
+import spray.httpx.encoding.Deflate
+import spray.http._
+import spray.json.lenses.JsonLenses._
+import akka.pattern.pipe
+import java.lang.management.ManagementFactory
+import spray.client.pipelining._
+import scala.util.control.NonFatal
+import kamon.newrelic.NewRelicMetric.{Data, ID, MetricBatch}
+
+class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport {
+ import context.dispatcher
+ import Agent._
+
+ val agentInfo = {
+ val config = context.system.settings.config.getConfig("kamon.newrelic")
+ val appName = config.getString("app-name")
+ val licenseKey = config.getString("license-key")
+
+ // Name has the format of pid@host
+ val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
+
+ AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt)
+ }
+
+
+
+ def receive = {
+ case Initialize(runId, collector) => context become reporting(runId, collector)
+ }
+
+
+ def reporting(runId: Long, collector: String): Receive = {
+ case batch: MetricBatch => sendMetricData(runId, collector, batch.metrics)
+ }
+
+ override def preStart(): Unit = {
+ super.preStart()
+ initialize
+ }
+
+ def initialize: Unit = {
+ pipe ({
+ for(
+ collector <- selectCollector;
+ runId <- connect(collector, agentInfo)
+ ) yield Initialize(runId, collector)
+ } recover {
+ case NonFatal(ex) => InitializationFailed(ex)
+ }) to self
+ }
+
+ import AgentJsonProtocol._
+ val compressedPipeline: HttpRequest => Future[HttpResponse] = encode(Deflate) ~> sendReceive
+ val compressedToJsonPipeline: HttpRequest => Future[JsValue] = compressedPipeline ~> toJson
+
+ def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson
+
+ def selectCollector: Future[String] = {
+ compressedToJsonPipeline {
+ Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray())
+ } map { json =>
+ json.extract[String]('return_value)
+ }
+ }
+
+ def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
+ compressedToJsonPipeline {
+ Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect)
+ } map { json =>
+ json.extract[Long]('return_value / 'agent_run_id)
+ }
+ }
+
+
+ def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = {
+ val end = System.currentTimeMillis() / 1000L
+ val start = end - 60
+ compressedPipeline {
+ Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics))
+ }
+ }
+
+
+
+}
+
+object Agent {
+
+
+
+ case class Initialize(runId: Long, collector: String)
+ case class InitializationFailed(reason: Throwable)
+ case class CollectorSelection(return_value: String)
+ case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int)
+
+ case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)])
+} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala
new file mode 100644
index 00000000..30e17e77
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala
@@ -0,0 +1,56 @@
+package kamon.newrelic
+
+import spray.json._
+import kamon.newrelic.Agent._
+
+object AgentJsonProtocol extends DefaultJsonProtocol {
+
+ implicit object ConnectJsonWriter extends RootJsonWriter[AgentInfo] {
+ def write(obj: AgentInfo): JsValue =
+ JsArray(
+ JsObject(
+ "agent_version" -> JsString("3.1.0"),
+ "app_name" -> JsArray(JsString(obj.appName)),
+ "host" -> JsString(obj.host),
+ "identifier" -> JsString(s"java:${obj.appName}"),
+ "language" -> JsString("java"),
+ "pid" -> JsNumber(obj.pid)
+ )
+ )
+ }
+
+ import NewRelicMetric._
+
+ implicit def listWriter[T : JsonWriter] = new JsonWriter[List[T]] {
+ def write(list: List[T]) = JsArray(list.map(_.toJson))
+ }
+
+ implicit object MetricDetailWriter extends JsonWriter[(ID, Data)] {
+ def write(obj: (ID, Data)): JsValue = {
+ val (id, data) = obj
+ JsArray(
+ JsObject(
+ "name" -> JsString(id.name) // TODO Include scope
+ ),
+ JsArray(
+ JsNumber(data.total),
+ JsNumber(data.totalExclusive),
+ JsNumber(data.min),
+ JsNumber(data.max),
+ JsNumber(data.sumOfSquares),
+ JsNumber(data.callCount)
+ )
+ )
+ }
+ }
+
+ implicit object MetricDataWriter extends RootJsonWriter[MetricData] {
+ def write(obj: MetricData): JsValue =
+ JsArray(
+ JsNumber(obj.runId),
+ JsNumber(obj.start),
+ JsNumber(obj.end),
+ obj.metrics.toJson
+ )
+ }
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala
new file mode 100644
index 00000000..c9178292
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala
@@ -0,0 +1,78 @@
+package kamon.newrelic
+
+import akka.actor.Actor
+import kamon.trace.UowTrace
+import com.newrelic.api.agent.{NewRelic => NRAgent}
+import kamon.trace.UowTracing.WebExternal
+
+class Apdex extends Actor {
+ val t = 500
+
+ var satisfied: Int = 0
+ var tolerating: Int = 0
+ var frustrated: Int = 0
+
+ def receive = {
+ case trace: UowTrace => recordTransaction(trace)
+
+ }
+
+ def clearStats: Unit = {
+ satisfied = 0
+ tolerating = 0
+ frustrated = 0
+ }
+
+ def total: Int = satisfied + tolerating + frustrated
+
+ def updateStats(sampleTime: Double): Unit = {
+ if(sampleTime < t)
+ satisfied += 1
+ else
+ if(sampleTime >= t && sampleTime <= 4*t)
+ tolerating += 1
+ else
+ frustrated += 1
+ }
+
+ def recordTransaction(uowTrace: UowTrace): Unit = {
+ val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9)
+
+ updateStats(time)
+
+ NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat )
+ NRAgent.recordMetric("WebTransaction", time.toFloat)
+ NRAgent.recordMetric("HttpDispatcher", time.toFloat)
+
+ uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace =>
+ val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat
+
+ println("Web External: " + webExternalTrace)
+ NRAgent.recordMetric(s"External/${webExternalTrace.host}/http", external)
+ NRAgent.recordMetric(s"External/${webExternalTrace.host}/all", external)
+ NRAgent.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
+ }
+
+
+ val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp)
+
+
+ def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match {
+ case Nil => accum
+ case head :: tail =>
+ if(head.start > lastEnd)
+ measureExternal(accum + (head.finish-head.start), head.finish, tail)
+ else
+ measureExternal(accum + (head.finish-lastEnd), head.finish, tail)
+ }
+
+ val external = measureExternal(0, 0, allExternals) / 1E9
+
+
+ NRAgent.recordMetric(s"External/all", external.toFloat)
+ NRAgent.recordMetric(s"External/allWeb", external.toFloat)
+
+ }
+}
+
+case object Flush
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
new file mode 100644
index 00000000..2ee7ada0
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
@@ -0,0 +1,102 @@
+package kamon.newrelic
+
+import akka.actor._
+import scala.collection.mutable
+import kamon.Kamon
+import kamon.trace.{UowTrace, Trace}
+import kamon.newrelic.NewRelicMetric.{MetricBatch, FlushMetrics}
+import scala.concurrent.duration._
+
+class NewRelic extends ExtensionId[NewRelicExtension] {
+ def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system)
+}
+
+class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic")
+}
+
+class NewRelicManager extends Actor with ActorLogging {
+ log.info("Registering the Kamon(NewRelic) extension")
+
+ Kamon(Trace)(context.system) ! Trace.Register
+
+
+
+ val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics")
+ val agent = context.actorOf(Props[Agent], "agent")
+
+ import context.dispatcher
+ context.system.scheduler.schedule(1 minute, 1 minute) {
+ webTransactionMetrics.tell(FlushMetrics, agent)
+ }
+
+ def receive = {
+ case trace: UowTrace => webTransactionMetrics ! trace
+ }
+}
+
+object NewRelicMetric {
+ case class ID(name: String, scope: Option[String])
+ case class Data(var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double, var callCount: Long) {
+ def record(value: Double): Unit = {
+ if(value > max) max = value
+ if(value < min) min = value
+
+ total += value
+ totalExclusive += value
+ sumOfSquares += value * value
+ callCount += 1
+ }
+ }
+
+ object Data {
+ def apply(): Data = Data(0, 0, 0, 0, 0, 0)
+ }
+
+ case object FlushMetrics
+ case class MetricBatch(metrics: List[(ID, Data)])
+}
+
+
+class WebTransactionMetrics extends Actor with ActorLogging {
+ val apdexT = 1500000000
+ var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data]
+ var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0)
+
+ def receive = {
+ case trace: UowTrace => updateStats(trace)
+ case FlushMetrics => flush
+ }
+
+ def flush: Unit = {
+ sender ! MetricBatch(metrics.toList :+ (NewRelicMetric.ID("Apdex", None), apdex))
+ apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0)
+ metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data]
+ }
+
+ def recordValue(metricID: NewRelicMetric.ID, value: Double): Unit = {
+ metrics.getOrElseUpdate(metricID, NewRelicMetric.Data()).record(value)
+ }
+
+ def recordApdex(time: Double): Unit = {
+ if(time <= apdexT)
+ apdex.total += 1
+ else
+ if(time > apdexT && time <= (4 * apdexT))
+ apdex.totalExclusive += 1
+ else
+ apdex.min += 1
+
+ }
+
+ def updateStats(trace: UowTrace): Unit = {
+ // Basic Metrics
+ recordApdex(trace.elapsed)
+ recordValue(NewRelicMetric.ID("WebTransaction", None), trace.elapsed)
+ recordValue(NewRelicMetric.ID("HttpDispatcher", None), trace.elapsed)
+ recordValue(NewRelicMetric.ID("WebTransaction/Custom/" + trace.name, None), trace.elapsed)
+
+ println("Recorded Apdex: " + apdex)
+ println("Current Metrics: " + metrics.mkString("\n"))
+ }
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala
index 106f27e2..8c6bfe77 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala
@@ -2,10 +2,8 @@ package kamon.newrelic
import akka.actor.Actor
import kamon.trace.UowTrace
-import com.newrelic.api.agent.{Response, Request, Trace, NewRelic}
-import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart}
-import java.util
-import java.util.Date
+import com.newrelic.api.agent.NewRelic
+import kamon.trace.UowTracing.WebExternal
class NewRelicReporting extends Actor {
diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala
new file mode 100644
index 00000000..8868b8c0
--- /dev/null
+++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala
@@ -0,0 +1,70 @@
+package kamon.newrelic
+
+import akka.testkit.{TestActor, TestProbe, TestKit}
+import akka.actor.{Props, ActorRef, ActorSystem}
+import org.scalatest.WordSpecLike
+import kamon.AkkaExtensionSwap
+import spray.can.Http
+import akka.io.IO
+import akka.testkit.TestActor.{KeepRunning, AutoPilot}
+import spray.http._
+import spray.http.HttpRequest
+import spray.http.HttpResponse
+
+class AgentSpec extends TestKit(ActorSystem("agent-spec")) with WordSpecLike {
+
+ setupFakeHttpManager
+
+ "the Newrelic Agent" should {
+ "try to connect upon creation" in {
+ val agent = system.actorOf(Props[Agent])
+
+ Thread.sleep(5000)
+ }
+ }
+
+ def setupFakeHttpManager: Unit = {
+ val fakeHttpManager = TestProbe()
+ fakeHttpManager.setAutoPilot(new TestActor.AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = {
+ msg match {
+ case HttpRequest(_, uri, _, _, _) if rawMethodIs("get_redirect_host", uri) =>
+ sender ! jsonResponse(
+ """
+ | {
+ | "return_value": "collector-8.newrelic.com"
+ | }
+ | """.stripMargin)
+
+ println("Selecting Collector")
+
+ case HttpRequest(_, uri, _, _, _) if rawMethodIs("connect", uri) =>
+ sender ! jsonResponse(
+ """
+ | {
+ | "return_value": {
+ | "agent_run_id": 161221111
+ | }
+ | }
+ | """.stripMargin)
+ println("Connecting")
+ }
+
+ KeepRunning
+ }
+
+ def jsonResponse(json: String): HttpResponse = {
+ HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json))
+ }
+
+ def rawMethodIs(method: String, uri: Uri): Boolean = {
+ uri.query.get("method").filter(_ == method).isDefined
+ }
+ })
+
+
+ AkkaExtensionSwap.swap(system, Http, new IO.Extension {
+ def manager: ActorRef = fakeHttpManager.ref
+ })
+ }
+}