diff options
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala')
-rw-r--r-- | kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala new file mode 100644 index 00000000..c4d7c089 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -0,0 +1,103 @@ +package kamon.newrelic + +import akka.actor.Actor +import spray.json._ +import scala.concurrent.Future +import spray.httpx.{SprayJsonSupport, RequestBuilding, ResponseTransformation} +import spray.httpx.encoding.Deflate +import spray.http._ +import spray.json.lenses.JsonLenses._ +import akka.pattern.pipe +import java.lang.management.ManagementFactory +import spray.client.pipelining._ +import scala.util.control.NonFatal +import kamon.newrelic.NewRelicMetric.{Data, ID, MetricBatch} + +class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport { + import context.dispatcher + import Agent._ + + val agentInfo = { + val config = context.system.settings.config.getConfig("kamon.newrelic") + val appName = config.getString("app-name") + val licenseKey = config.getString("license-key") + + // Name has the format of pid@host + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + + AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) + } + + + + def receive = { + case Initialize(runId, collector) => context become reporting(runId, collector) + } + + + def reporting(runId: Long, collector: String): Receive = { + case batch: MetricBatch => sendMetricData(runId, collector, batch.metrics) + } + + override def preStart(): Unit = { + super.preStart() + initialize + } + + def initialize: Unit = { + pipe ({ + for( + collector <- selectCollector; + runId <- connect(collector, agentInfo) + ) yield Initialize(runId, collector) + } recover { + case NonFatal(ex) => InitializationFailed(ex) + }) to self + } + + import AgentJsonProtocol._ + val compressedPipeline: HttpRequest => Future[HttpResponse] = encode(Deflate) ~> sendReceive + val compressedToJsonPipeline: HttpRequest => Future[JsValue] = compressedPipeline ~> toJson + + def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson + + def selectCollector: Future[String] = { + compressedToJsonPipeline { + Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray()) + } map { json => + json.extract[String]('return_value) + } + } + + def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + compressedToJsonPipeline { + Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect) + } map { json => + json.extract[Long]('return_value / 'agent_run_id) + } + } + + + def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = { + val end = System.currentTimeMillis() / 1000L + val start = end - 60 + compressedPipeline { + Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics)) + } + } + + + +} + +object Agent { + + + + case class Initialize(runId: Long, collector: String) + case class InitializationFailed(reason: Throwable) + case class CollectorSelection(return_value: String) + case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) + + case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)]) +}
\ No newline at end of file |