aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-02-04 18:16:07 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-02-04 18:16:07 -0300
commit57e433c07a271b4e5e4159500cdc828cd7bb6a83 (patch)
tree6b2928dcf3c1dc68c4131aa864a0a0f53ccf2160 /kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
parent7307e1cc97e0363d1fb4cc116fc69a5272ca3730 (diff)
downloadKamon-57e433c07a271b4e5e4159500cdc828cd7bb6a83.tar.gz
Kamon-57e433c07a271b4e5e4159500cdc828cd7bb6a83.tar.bz2
Kamon-57e433c07a271b4e5e4159500cdc828cd7bb6a83.zip
partial rewrite of kamon-newrelic
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala')
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala43
1 files changed, 31 insertions, 12 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index 4082458c..a73f390a 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -26,7 +26,8 @@ import akka.pattern.pipe
import java.lang.management.ManagementFactory
import spray.client.pipelining._
import scala.util.control.NonFatal
-import kamon.newrelic.NewRelicMetric.{ Data, ID, MetricBatch }
+import spray.http.Uri.Query
+import kamon.newrelic.MetricTranslator.TimeSliceMetrics
class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {
import context.dispatcher
@@ -43,12 +44,20 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt)
}
+ val baseQuery = Query(
+ "license_key" -> agentInfo.licenseKey,
+ "marshal_format" -> "json",
+ "protocol_version" -> "12")
+
+
def receive = {
- case Initialize(runId, collector) ⇒ context become reporting(runId, collector)
+ case Initialize(runId, collector) ⇒
+ log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector)
+ context become reporting(runId, collector)
}
def reporting(runId: Long, collector: String): Receive = {
- case batch: MetricBatch ⇒ sendMetricData(runId, collector, batch.metrics)
+ case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics)
}
override def preStart(): Unit = {
@@ -68,34 +77,44 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
}
import AgentJsonProtocol._
- val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive
+ val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = logRequest(println(_)) ~>encode(Deflate) ~> sendReceive ~> logResponse(println(_))
val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson
def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson
+
def selectCollector: Future[String] = {
+ val query = ("method" -> "get_redirect_host") +: baseQuery
+ val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)
+
compressedToJsonPipeline {
- Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray())
+ Post(getRedirectHostUri, JsArray())
+
} map { json ⇒
json.extract[String]('return_value)
}
}
def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
+ log.debug("Connecting to NewRelic Collector [{}]", collectorHost)
+
+ val query = ("method" -> "connect") +: baseQuery
+ val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)
+
compressedToJsonPipeline {
- Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect)
+ Post(connectUri, connect)
+
} map { json ⇒
json.extract[Long]('return_value / 'agent_run_id)
}
}
- def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = {
- log.info("Reporting this to NewRelic: " + metrics.mkString("\n"))
+ def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = {
+ val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery
+ val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query)
- val end = System.currentTimeMillis() / 1000L
- val start = end - 60
compressedPipeline {
- Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics))
+ Post(sendMetricDataUri, MetricData(runId, metrics))
}
}
@@ -108,5 +127,5 @@ object Agent {
case class CollectorSelection(return_value: String)
case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int)
- case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)])
+ case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics)
} \ No newline at end of file