aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala')
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala163
1 files changed, 60 insertions, 103 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index 25fbc9db..f71ecd7f 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -18,88 +18,73 @@ package kamon.newrelic
import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
-import akka.actor.{ ActorLogging, Actor }
+import akka.actor.{ ActorSystem, ActorLogging, Actor }
import akka.event.LoggingAdapter
-import org.slf4j.LoggerFactory
+import akka.io.IO
+import akka.util.Timeout
+import kamon.Kamon
+import kamon.metric.{ CollectionContext, Metrics }
+import spray.can.Http
import spray.json._
import scala.concurrent.{ ExecutionContext, Future }
-import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation }
-import spray.httpx.encoding.Deflate
+import spray.httpx.{ SprayJsonSupport, ResponseTransformation }
import spray.http._
import spray.json.lenses.JsonLenses._
import java.lang.management.ManagementFactory
-import spray.client.pipelining._
-import scala.util.{ Failure, Success }
import spray.http.Uri.Query
-import kamon.newrelic.MetricTranslator.TimeSliceMetrics
import scala.concurrent.duration._
+import Agent._
-class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {
+import akka.pattern.pipe
+// TODO: Setup a proper host connector with custom timeout configuration for use with this.
+class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging {
+ import JsonProtocol._
import context.dispatcher
- import Agent._
- import Retry._
-
- self ! Initialize
-
- val agentInfo = {
- val config = context.system.settings.config.getConfig("kamon.newrelic")
- val appName = config.getString("app-name")
- val licenseKey = config.getString("license-key")
-
- // Name has the format of pid@host
- val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
- val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds)
- val maxRetry = config.getInt("max-retry")
-
- AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay)
- }
+ implicit val operationTimeout = Timeout(30 seconds)
+ val collectorClient = compressedToJsonPipeline(IO(Http)(context.system))
+ val settings = buildAgentSettings(context.system)
val baseQuery = Query(
- "license_key" -> agentInfo.licenseKey,
+ "license_key" -> settings.licenseKey,
"marshal_format" -> "json",
"protocol_version" -> "12")
- def receive: Receive = uninitialized
-
- def uninitialized: Receive = {
- case Initialize ⇒ {
- connectToCollector onComplete {
- case Success(agent) ⇒ {
- log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector)
- context become reporting(agent.runId, agent.collector)
- }
- case Failure(reason) ⇒ self ! InitializationFailed(reason)
- }
- }
- case InitializationFailed(reason) ⇒ {
- log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds)
- context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize)
- }
- case everythingElse ⇒ //ignore
- }
+ // Start the connection to the New Relic collector.
+ self ! Initialize
- def reporting(runId: Long, collector: String): Receive = {
- case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics)
- }
+ def receive: Receive = uninitialized(settings.maxRetries)
- def connectToCollector: Future[Initialized] = for {
- collector ← selectCollector
- runId ← connect(collector, agentInfo)
- } yield Initialized(runId, collector)
+ def uninitialized(attemptsLeft: Int): Receive = {
+ case Initialize ⇒ pipe(connectToCollector) to self
+ case Initialized(runID, collector) ⇒
+ log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector)
- import AgentJsonProtocol._
+ val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery)
+ context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter")
- val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive
- val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson
+ case InitializationFailed(reason) if (attemptsLeft > 0) ⇒
+ log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds)
+ context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize)
+ context become (uninitialized(attemptsLeft - 1))
- def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson
+ case InitializationFailed(reason) ⇒
+ log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.")
+ context.stop(self)
+ }
+
+ def connectToCollector: Future[InitResult] = {
+ (for {
+ collector ← selectCollector
+ runId ← connect(collector, settings)
+ } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) }
+ }
def selectCollector: Future[String] = {
val query = ("method" -> "get_redirect_host") +: baseQuery
val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)
- compressedToJsonPipeline {
+ collectorClient {
Post(getRedirectHostUri, JsArray())
} map { json ⇒
@@ -107,67 +92,39 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
}
}
- def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
+ def connect(collectorHost: String, connect: Settings): Future[Long] = {
log.debug("Connecting to NewRelic Collector [{}]", collectorHost)
val query = ("method" -> "connect") +: baseQuery
val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)
- compressedToJsonPipeline {
+ collectorClient {
Post(connectUri, connect)
} map { json ⇒
json.extract[Long]('return_value / 'agent_run_id)
}
}
-
- def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = {
- val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery
- val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query)
-
- withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒
- compressedPipeline {
- log.info("Sending metrics to NewRelic collector")
- Post(sendMetricDataUri, MetricData(runId, currentMetrics))
- }
- }
- }
}
object Agent {
- case class Initialize()
- case class Initialized(runId: Long, collector: String)
- case class InitializationFailed(reason: Throwable)
- case class CollectorSelection(return_value: String)
- case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration)
- case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics)
-}
+ case object Initialize
+ sealed trait InitResult
+ case class Initialized(runId: Long, collector: String) extends InitResult
+ case class InitializationFailed(reason: Throwable) extends InitResult
+ case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double)
+
+ def buildAgentSettings(system: ActorSystem) = {
+ val config = system.settings.config.getConfig("kamon.newrelic")
+ val appName = config.getString("app-name")
+ val licenseKey = config.getString("license-key")
+ val maxRetries = config.getInt("max-initialize-retries")
+ val retryDelay = FiniteDuration(config.getDuration("initialize-retry-delay", milliseconds), milliseconds)
+ val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds.
-object Retry {
-
- @volatile private var attempts: Int = 0
- @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None
-
- def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = {
-
- val currentMetrics = metrics.merge(pendingMetrics)
-
- if (currentMetrics.metrics.nonEmpty) {
- block(currentMetrics) onComplete {
- case Success(_) ⇒
- pendingMetrics = None
- attempts = 0
- case Failure(_) ⇒
- attempts += 1
- if (maxRetry > attempts) {
- log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry)
- pendingMetrics = Some(currentMetrics)
- } else {
- log.info("Max attempts achieved, proceeding to remove all pending metrics")
- pendingMetrics = None
- attempts = 0
- }
- }
- }
+ // Name has the format of 'pid'@'host'
+ val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
+
+ Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT)
}
} \ No newline at end of file