From 57e433c07a271b4e5e4159500cdc828cd7bb6a83 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 4 Feb 2014 18:16:07 -0300 Subject: partial rewrite of kamon-newrelic --- .../src/main/scala/kamon/newrelic/Agent.scala | 43 ++++++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala') diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 4082458c..a73f390a 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -26,7 +26,8 @@ import akka.pattern.pipe import java.lang.management.ManagementFactory import spray.client.pipelining._ import scala.util.control.NonFatal -import kamon.newrelic.NewRelicMetric.{ Data, ID, MetricBatch } +import spray.http.Uri.Query +import kamon.newrelic.MetricTranslator.TimeSliceMetrics class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { import context.dispatcher @@ -43,12 +44,20 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) } + val baseQuery = Query( + "license_key" -> agentInfo.licenseKey, + "marshal_format" -> "json", + "protocol_version" -> "12") + + def receive = { - case Initialize(runId, collector) ⇒ context become reporting(runId, collector) + case Initialize(runId, collector) ⇒ + log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector) + context become reporting(runId, collector) } def reporting(runId: Long, collector: String): Receive = { - case batch: MetricBatch ⇒ sendMetricData(runId, collector, batch.metrics) + case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) } override def preStart(): Unit = { @@ -68,34 +77,44 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } import AgentJsonProtocol._ - val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive + val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = logRequest(println(_)) ~>encode(Deflate) ~> sendReceive ~> logResponse(println(_)) val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson + def selectCollector: Future[String] = { + val query = ("method" -> "get_redirect_host") +: baseQuery + val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) + compressedToJsonPipeline { - Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray()) + Post(getRedirectHostUri, JsArray()) + } map { json ⇒ json.extract[String]('return_value) } } def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + log.debug("Connecting to NewRelic Collector [{}]", collectorHost) + + val query = ("method" -> "connect") +: baseQuery + val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) + compressedToJsonPipeline { - Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect) + Post(connectUri, connect) + } map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } - def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = { - log.info("Reporting this to NewRelic: " + metrics.mkString("\n")) + def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = { + val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery + val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - val end = System.currentTimeMillis() / 1000L - val start = end - 60 compressedPipeline { - Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics)) + Post(sendMetricDataUri, MetricData(runId, metrics)) } } @@ -108,5 +127,5 @@ object Agent { case class CollectorSelection(return_value: String) case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) - case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)]) + case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) } \ No newline at end of file -- cgit v1.2.3