aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
blob: 7c2b34ea70c6b66a685a33cb4f69904e69e32c90 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/* ===================================================
 * Copyright © 2013 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ========================================================== */
package kamon.newrelic

import akka.actor.{ActorLogging, Actor}
import spray.json._
import scala.concurrent.Future
import spray.httpx.{SprayJsonSupport, RequestBuilding, ResponseTransformation}
import spray.httpx.encoding.Deflate
import spray.http._
import spray.json.lenses.JsonLenses._
import akka.pattern.pipe
import java.lang.management.ManagementFactory
import spray.client.pipelining._
import scala.util.control.NonFatal
import kamon.newrelic.NewRelicMetric.{Data, ID, MetricBatch}

class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {
  import context.dispatcher
  import Agent._

  val agentInfo = {
    val config = context.system.settings.config.getConfig("kamon.newrelic")
    val appName = config.getString("app-name")
    val licenseKey = config.getString("license-key")

    // Name has the format of pid@host
    val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')

    AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt)
  }



  def receive = {
    case Initialize(runId, collector) => context become reporting(runId, collector)
  }


  def reporting(runId: Long, collector: String): Receive = {
    case batch: MetricBatch => sendMetricData(runId, collector, batch.metrics)
  }

  override def preStart(): Unit = {
    super.preStart()
    initialize
  }

  def initialize: Unit = {
    pipe ({
        for(
          collector <- selectCollector;
          runId     <- connect(collector, agentInfo)
        ) yield Initialize(runId, collector)
      } recover {
        case NonFatal(ex) => InitializationFailed(ex)
      }) to self
  }

  import AgentJsonProtocol._
  val compressedPipeline: HttpRequest => Future[HttpResponse] = encode(Deflate) ~> sendReceive
  val compressedToJsonPipeline: HttpRequest => Future[JsValue] = compressedPipeline ~> toJson

  def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson

  def selectCollector: Future[String] = {
    compressedToJsonPipeline {
      Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray())
    } map { json =>
      json.extract[String]('return_value)
    }
  }

  def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
    compressedToJsonPipeline {
      Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect)
    } map { json =>
      json.extract[Long]('return_value / 'agent_run_id)
    }
  }


  def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = {
    log.info("Reporting this to NewRelic: " + metrics.mkString("\n"))

    val end = System.currentTimeMillis() / 1000L
    val start = end - 60
    compressedPipeline {
      Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics))
    }
  }



}

object Agent {



  case class Initialize(runId: Long, collector: String)
  case class InitializationFailed(reason: Throwable)
  case class CollectorSelection(return_value: String)
  case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int)

  case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)])
}