From f498749274bc9f25ede7221d6bd8b3f0c3822dda Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 6 Nov 2014 16:29:54 +0100 Subject: ! newrelic: major refactor of the newrelic reporter Most notable changes: - The agent connection setup is separated from the actual metrics reporting, this will be important in the near future when we start sending errors too. - The metrics subscriptions are delayed until the connection to the agent is established. - The Tick metrics buffer is only created if necessary. - Introduced the kamon.newrelic.max-initialize-retries and initialize-retry-delay settings. - External service calls via HTTP clients are reported as external services. --- .../src/main/scala/kamon/newrelic/Agent.scala | 163 ++++++++------------- 1 file changed, 60 insertions(+), 103 deletions(-) (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala') diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 25fbc9db..f71ecd7f 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -18,88 +18,73 @@ package kamon.newrelic import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } -import akka.actor.{ ActorLogging, Actor } +import akka.actor.{ ActorSystem, ActorLogging, Actor } import akka.event.LoggingAdapter -import org.slf4j.LoggerFactory +import akka.io.IO +import akka.util.Timeout +import kamon.Kamon +import kamon.metric.{ CollectionContext, Metrics } +import spray.can.Http import spray.json._ import scala.concurrent.{ ExecutionContext, Future } -import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } -import spray.httpx.encoding.Deflate +import spray.httpx.{ SprayJsonSupport, ResponseTransformation } import spray.http._ import spray.json.lenses.JsonLenses._ import java.lang.management.ManagementFactory -import spray.client.pipelining._ -import scala.util.{ Failure, Success } import spray.http.Uri.Query -import kamon.newrelic.MetricTranslator.TimeSliceMetrics import scala.concurrent.duration._ +import Agent._ -class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { +import akka.pattern.pipe +// TODO: Setup a proper host connector with custom timeout configuration for use with this. +class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging { + import JsonProtocol._ import context.dispatcher - import Agent._ - import Retry._ - - self ! Initialize - - val agentInfo = { - val config = context.system.settings.config.getConfig("kamon.newrelic") - val appName = config.getString("app-name") - val licenseKey = config.getString("license-key") - - // Name has the format of pid@host - val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') - val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds) - val maxRetry = config.getInt("max-retry") - - AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay) - } + implicit val operationTimeout = Timeout(30 seconds) + val collectorClient = compressedToJsonPipeline(IO(Http)(context.system)) + val settings = buildAgentSettings(context.system) val baseQuery = Query( - "license_key" -> agentInfo.licenseKey, + "license_key" -> settings.licenseKey, "marshal_format" -> "json", "protocol_version" -> "12") - def receive: Receive = uninitialized - - def uninitialized: Receive = { - case Initialize ⇒ { - connectToCollector onComplete { - case Success(agent) ⇒ { - log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector) - context become reporting(agent.runId, agent.collector) - } - case Failure(reason) ⇒ self ! InitializationFailed(reason) - } - } - case InitializationFailed(reason) ⇒ { - log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds) - context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize) - } - case everythingElse ⇒ //ignore - } + // Start the connection to the New Relic collector. + self ! Initialize - def reporting(runId: Long, collector: String): Receive = { - case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) - } + def receive: Receive = uninitialized(settings.maxRetries) - def connectToCollector: Future[Initialized] = for { - collector ← selectCollector - runId ← connect(collector, agentInfo) - } yield Initialized(runId, collector) + def uninitialized(attemptsLeft: Int): Receive = { + case Initialize ⇒ pipe(connectToCollector) to self + case Initialized(runID, collector) ⇒ + log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector) - import AgentJsonProtocol._ + val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery) + context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter") - val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive - val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson + case InitializationFailed(reason) if (attemptsLeft > 0) ⇒ + log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize) + context become (uninitialized(attemptsLeft - 1)) - def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson + case InitializationFailed(reason) ⇒ + log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.") + context.stop(self) + } + + def connectToCollector: Future[InitResult] = { + (for { + collector ← selectCollector + runId ← connect(collector, settings) + } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) } + } def selectCollector: Future[String] = { val query = ("method" -> "get_redirect_host") +: baseQuery val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) - compressedToJsonPipeline { + collectorClient { Post(getRedirectHostUri, JsArray()) } map { json ⇒ @@ -107,67 +92,39 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } } - def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + def connect(collectorHost: String, connect: Settings): Future[Long] = { log.debug("Connecting to NewRelic Collector [{}]", collectorHost) val query = ("method" -> "connect") +: baseQuery val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) - compressedToJsonPipeline { + collectorClient { Post(connectUri, connect) } map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } - - def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = { - val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery - val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - - withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒ - compressedPipeline { - log.info("Sending metrics to NewRelic collector") - Post(sendMetricDataUri, MetricData(runId, currentMetrics)) - } - } - } } object Agent { - case class Initialize() - case class Initialized(runId: Long, collector: String) - case class InitializationFailed(reason: Throwable) - case class CollectorSelection(return_value: String) - case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration) - case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) -} + case object Initialize + sealed trait InitResult + case class Initialized(runId: Long, collector: String) extends InitResult + case class InitializationFailed(reason: Throwable) extends InitResult + case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double) + + def buildAgentSettings(system: ActorSystem) = { + val config = system.settings.config.getConfig("kamon.newrelic") + val appName = config.getString("app-name") + val licenseKey = config.getString("license-key") + val maxRetries = config.getInt("max-initialize-retries") + val retryDelay = FiniteDuration(config.getDuration("initialize-retry-delay", milliseconds), milliseconds) + val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds. -object Retry { - - @volatile private var attempts: Int = 0 - @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None - - def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = { - - val currentMetrics = metrics.merge(pendingMetrics) - - if (currentMetrics.metrics.nonEmpty) { - block(currentMetrics) onComplete { - case Success(_) ⇒ - pendingMetrics = None - attempts = 0 - case Failure(_) ⇒ - attempts += 1 - if (maxRetry > attempts) { - log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry) - pendingMetrics = Some(currentMetrics) - } else { - log.info("Max attempts achieved, proceeding to remove all pending metrics") - pendingMetrics = None - attempts = 0 - } - } - } + // Name has the format of 'pid'@'host' + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + + Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT) } } \ No newline at end of file -- cgit v1.2.3