From 355b910f9e7c2e82217ef2443b734e3220752555 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 5 Dec 2014 02:22:25 +0100 Subject: + newrelic: react correctly to restart and shutdown events from the New Relic collector. --- kamon-newrelic/src/main/resources/reference.conf | 7 +- .../src/main/scala/kamon/newrelic/Agent.scala | 152 +++++++++++---------- .../scala/kamon/newrelic/ApiMethodClient.scala | 69 ++++++++++ .../scala/kamon/newrelic/ClientPipelines.scala | 23 ---- .../kamon/newrelic/CustomMetricExtractor.scala | 3 +- .../main/scala/kamon/newrelic/JsonProtocol.scala | 16 +-- .../src/main/scala/kamon/newrelic/Metric.scala | 7 +- .../main/scala/kamon/newrelic/MetricReporter.scala | 130 ++++++++++-------- .../newrelic/WebTransactionMetricExtractor.scala | 6 +- 9 files changed, 245 insertions(+), 168 deletions(-) create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala delete mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala (limited to 'kamon-newrelic/src/main') diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index c86e64ae..dd112aeb 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -14,12 +14,15 @@ kamon { # Your New Relic license key. license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 + # Time to wait for a response when calling any of the New Relic collector API methods. + operation-timeout = 30 seconds + # attempts to send pending metrics in the next tick, # combining the current metrics plus the pending, after max-retry, deletes all pending metrics - max-initialize-retries = 3 + max-connect-retries = 3 # delay between connection attempts to NewRelic collector - initialize-retry-delay = 30 seconds + connect-retry-delay = 30 seconds } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index f71ecd7f..aa6aed3b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -18,113 +18,127 @@ package kamon.newrelic import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } -import akka.actor.{ ActorSystem, ActorLogging, Actor } -import akka.event.LoggingAdapter +import akka.actor.{ ActorLogging, Actor } import akka.io.IO import akka.util.Timeout -import kamon.Kamon -import kamon.metric.{ CollectionContext, Metrics } +import com.typesafe.config.Config import spray.can.Http import spray.json._ -import scala.concurrent.{ ExecutionContext, Future } -import spray.httpx.{ SprayJsonSupport, ResponseTransformation } -import spray.http._ +import scala.concurrent.Future +import spray.httpx.SprayJsonSupport import spray.json.lenses.JsonLenses._ import java.lang.management.ManagementFactory -import spray.http.Uri.Query import scala.concurrent.duration._ import Agent._ - +import JsonProtocol._ import akka.pattern.pipe -// TODO: Setup a proper host connector with custom timeout configuration for use with this. -class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging { - import JsonProtocol._ +class Agent extends Actor with SprayJsonSupport with ActorLogging { import context.dispatcher - implicit val operationTimeout = Timeout(30 seconds) - val collectorClient = compressedToJsonPipeline(IO(Http)(context.system)) - val settings = buildAgentSettings(context.system) - val baseQuery = Query( - "license_key" -> settings.licenseKey, - "marshal_format" -> "json", - "protocol_version" -> "12") + val agentSettings = AgentSettings.fromConfig(context.system.settings.config) + + // Start the reporters + context.actorOf(MetricReporter.props(agentSettings), "metric-reporter") // Start the connection to the New Relic collector. - self ! Initialize + self ! Connect + + def receive: Receive = disconnected(agentSettings.maxConnectionRetries) + + def disconnected(attemptsLeft: Int): Receive = { + case Connect ⇒ pipe(connectToCollector) to self + case Connected(collector, runID) ⇒ configureChildren(collector, runID) + case ConnectFailed(reason) if (attemptsLeft > 0) ⇒ scheduleReconnection(reason, attemptsLeft) + case ConnectFailed(reason) ⇒ giveUpConnection() + } - def receive: Receive = uninitialized(settings.maxRetries) + def connected: Receive = { + case Reconnect ⇒ reconnect() + case Shutdown ⇒ shutdown() + } - def uninitialized(attemptsLeft: Int): Receive = { - case Initialize ⇒ pipe(connectToCollector) to self - case Initialized(runID, collector) ⇒ - log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector) + def reconnect(): Unit = { + log.warning("New Relic request the agent to restart the connection, all reporters will be paused until a new connection is available.") + self ! Connect + context.children.foreach(_ ! ResetConfiguration) + context become disconnected(agentSettings.maxConnectionRetries) + } - val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery) - context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter") + def shutdown(): Unit = { + log.error("New Relic requested the agent to be stopped, no metrics will be reported after this point.") + context stop self + } + + def configureChildren(collector: String, runID: Long): Unit = { + log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}]", runID, collector) + context.children.foreach(_ ! Configure(collector, runID)) + context become connected + } - case InitializationFailed(reason) if (attemptsLeft > 0) ⇒ - log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds) - context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize) - context become (uninitialized(attemptsLeft - 1)) + def scheduleReconnection(connectionFailureReason: Throwable, attemptsLeft: Int): Unit = { + log.error(connectionFailureReason, "Initialization failed, retrying in {} seconds", agentSettings.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(agentSettings.retryDelay, self, Connect) + context become (disconnected(attemptsLeft - 1)) + } - case InitializationFailed(reason) ⇒ - log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.") - context.stop(self) + def giveUpConnection(): Unit = { + log.error("Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.") + context.stop(self) } - def connectToCollector: Future[InitResult] = { + def connectToCollector: Future[ConnectResult] = { (for { collector ← selectCollector - runId ← connect(collector, settings) - } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) } + runID ← connect(collector, agentSettings) + } yield Connected(collector, runID)) recover { case error ⇒ ConnectFailed(error) } } def selectCollector: Future[String] = { - val query = ("method" -> "get_redirect_host") +: baseQuery - val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) - - collectorClient { - Post(getRedirectHostUri, JsArray()) - - } map { json ⇒ + val apiClient = new ApiMethodClient("collector.newrelic.com", None, agentSettings, IO(Http)(context.system)) + apiClient.invokeMethod(RawMethods.GetRedirectHost, JsArray()) map { json ⇒ json.extract[String]('return_value) } } - def connect(collectorHost: String, connect: Settings): Future[Long] = { - log.debug("Connecting to NewRelic Collector [{}]", collectorHost) - - val query = ("method" -> "connect") +: baseQuery - val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) - - collectorClient { - Post(connectUri, connect) - - } map { json ⇒ + def connect(collectorHost: String, connect: AgentSettings): Future[Long] = { + val apiClient = new ApiMethodClient(collectorHost, None, agentSettings, IO(Http)(context.system)) + apiClient.invokeMethod(RawMethods.Connect, connect) map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } } object Agent { - case object Initialize - sealed trait InitResult - case class Initialized(runId: Long, collector: String) extends InitResult - case class InitializationFailed(reason: Throwable) extends InitResult - case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double) - - def buildAgentSettings(system: ActorSystem) = { - val config = system.settings.config.getConfig("kamon.newrelic") - val appName = config.getString("app-name") - val licenseKey = config.getString("license-key") - val maxRetries = config.getInt("max-initialize-retries") - val retryDelay = FiniteDuration(config.getDuration("initialize-retry-delay", milliseconds), milliseconds) - val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds. + case object Connect + case object Reconnect + case object Shutdown + case object ResetConfiguration + case class Configure(collector: String, runID: Long) + + sealed trait ConnectResult + case class Connected(collector: String, runID: Long) extends ConnectResult + case class ConnectFailed(reason: Throwable) extends ConnectResult +} + +case class AgentSettings(licenseKey: String, appName: String, hostname: String, pid: Int, operationTimeout: Timeout, + maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double) +object AgentSettings { + + def fromConfig(config: Config) = { // Name has the format of 'pid'@'host' val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') - - Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT) + val newRelicConfig = config.getConfig("kamon.newrelic") + + AgentSettings( + newRelicConfig.getString("license-key"), + newRelicConfig.getString("app-name"), + runtimeName(1), + runtimeName(0).toInt, + Timeout(newRelicConfig.getDuration("operation-timeout", milliseconds).millis), + newRelicConfig.getInt("max-connect-retries"), + FiniteDuration(newRelicConfig.getDuration("connect-retry-delay", milliseconds), milliseconds), + newRelicConfig.getDuration("apdexT", milliseconds) / 1E3D) } } \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala new file mode 100644 index 00000000..8b95a959 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala @@ -0,0 +1,69 @@ +package kamon.newrelic + +import akka.actor.ActorRef +import kamon.newrelic.ApiMethodClient.{ NewRelicException, AgentShutdownRequiredException, AgentRestartRequiredException } +import spray.http.Uri.Query +import spray.http._ +import spray.httpx.encoding.Deflate +import spray.httpx.marshalling.Marshaller +import spray.httpx.unmarshalling._ +import spray.json.{ JsonParser, JsValue } +import spray.json.lenses.JsonLenses._ +import spray.json.DefaultJsonProtocol._ +import spray.client.pipelining._ + +import scala.concurrent.{ Future, ExecutionContext } +import scala.util.control.NoStackTrace + +class ApiMethodClient(host: String, val runID: Option[Long], agentSettings: AgentSettings, httpTransport: ActorRef)(implicit exeContext: ExecutionContext) { + + implicit val to = agentSettings.operationTimeout + + val baseQuery = Query(runID.map(ri ⇒ Map("run_id" -> String.valueOf(ri))).getOrElse(Map.empty[String, String]) + + ("license_key" -> agentSettings.licenseKey) + + ("marshal_format" -> "json") + + ("protocol_version" -> "12")) + + // New Relic responses contain JSON but with text/plain content type :(. + implicit val JsValueUnmarshaller = Unmarshaller[JsValue](MediaTypes.`application/json`, MediaTypes.`text/plain`) { + case x: HttpEntity.NonEmpty ⇒ + JsonParser(x.asString(defaultCharset = HttpCharsets.`UTF-8`)) + } + + val httpClient = encode(Deflate) ~> sendReceive(httpTransport) ~> decode(Deflate) ~> unmarshal[JsValue] + val baseCollectorUri = Uri("/agent_listener/invoke_raw_method").withHost(host).withScheme("http") + + def invokeMethod[T: Marshaller](method: String, payload: T): Future[JsValue] = { + val methodQuery = ("method" -> method) +: baseQuery + + httpClient(Post(baseCollectorUri.withQuery(methodQuery), payload)) map { jsResponse ⇒ + jsResponse.extract[String]('exception.? / 'error_type.?).map(_ match { + case CollectorErrors.`ForceRestart` ⇒ throw AgentRestartRequiredException + case CollectorErrors.`ForceShutdown` ⇒ throw AgentShutdownRequiredException + case anyOtherError ⇒ + val errorMessage = jsResponse.extract[String]('exception / 'message.?).getOrElse("no message") + println("Error occured: " + anyOtherError + " - " + errorMessage) + throw NewRelicException(anyOtherError, errorMessage) + }) + + jsResponse + } + } +} + +object ApiMethodClient { + case class NewRelicException(exceptionType: String, message: String) extends RuntimeException with NoStackTrace + case object AgentRestartRequiredException extends RuntimeException with NoStackTrace + case object AgentShutdownRequiredException extends RuntimeException with NoStackTrace +} + +object RawMethods { + val GetRedirectHost = "get_redirect_host" + val Connect = "connect" + val MetricData = "metric_data" +} + +object CollectorErrors { + val ForceRestart = "NewRelic::Agent::ForceRestartException" + val ForceShutdown = "NewRelic::Agent::ForceDisconnectException" +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala deleted file mode 100644 index ca003646..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala +++ /dev/null @@ -1,23 +0,0 @@ -package kamon.newrelic - -import akka.actor.ActorRef -import akka.util.Timeout -import spray.http.{ HttpResponse, HttpRequest } -import spray.httpx.RequestBuilding -import spray.httpx.encoding.Deflate -import spray.json._ -import spray.client.pipelining.sendReceive - -import scala.concurrent.{ ExecutionContext, Future } - -trait ClientPipelines extends RequestBuilding { - - def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] = - encode(Deflate) ~> sendReceive(transport) - - def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] = - compressedPipeline(transport) ~> toJson - - def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson - -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala index 84472593..e97c24dc 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala @@ -18,11 +18,10 @@ package kamon.newrelic import kamon.metric.UserMetrics.UserMetricGroup import kamon.metric._ -import kamon.newrelic.Agent.Settings object CustomMetricExtractor extends MetricExtractor { - def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { metrics.collect { case (mg: UserMetricGroup, groupSnapshot) ⇒ groupSnapshot.metrics collect { diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index 26e8839e..0e53be0b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -15,18 +15,18 @@ * ========================================================== */ package kamon.newrelic +import kamon.Timestamp import spray.json._ -import kamon.newrelic.Agent._ object JsonProtocol extends DefaultJsonProtocol { - implicit object ConnectJsonWriter extends RootJsonWriter[Settings] { - def write(obj: Settings): JsValue = + implicit object ConnectJsonWriter extends RootJsonWriter[AgentSettings] { + def write(obj: AgentSettings): JsValue = JsArray( JsObject( "agent_version" -> JsString("3.1.0"), "app_name" -> JsArray(JsString(obj.appName)), - "host" -> JsString(obj.host), + "host" -> JsString(obj.hostname), "identifier" -> JsString(s"java:${obj.appName}"), "language" -> JsString("java"), "pid" -> JsNumber(obj.pid))) @@ -87,8 +87,8 @@ object JsonProtocol extends DefaultJsonProtocol { def read(json: JsValue): MetricBatch = json match { case JsArray(elements) ⇒ val runID = elements(0).convertTo[Long] - val timeSliceFrom = elements(1).convertTo[Long] - val timeSliceTo = elements(2).convertTo[Long] + val timeSliceFrom = new Timestamp(elements(1).convertTo[Long]) + val timeSliceTo = new Timestamp(elements(2).convertTo[Long]) val metrics = elements(3).convertTo[Seq[Metric]] MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap)) @@ -99,8 +99,8 @@ object JsonProtocol extends DefaultJsonProtocol { def write(obj: MetricBatch): JsValue = JsArray( JsNumber(obj.runID), - JsNumber(obj.timeSliceMetrics.from), - JsNumber(obj.timeSliceMetrics.to), + JsNumber(obj.timeSliceMetrics.from.seconds), + JsNumber(obj.timeSliceMetrics.to.seconds), obj.timeSliceMetrics.metrics.toSeq.toJson) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala index 14541483..52d21f31 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -1,5 +1,6 @@ package kamon.newrelic +import kamon.Timestamp import kamon.metric.instrument.{ Counter, Histogram } import kamon.metric.{ MetricSnapshot, Scale } @@ -40,12 +41,12 @@ object Metric { } } -case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) { +case class TimeSliceMetrics(from: Timestamp, to: Timestamp, metrics: Map[MetricID, MetricData]) { import kamon.metric.combineMaps def merge(that: TimeSliceMetrics): TimeSliceMetrics = { - val mergedFrom = math.min(from, that.from) - val mergedTo = math.max(to, that.to) + val mergedFrom = Timestamp.earlier(from, that.from) + val mergedTo = Timestamp.later(to, that.to) val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)) TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index 0aa078f5..286b0a77 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -5,99 +5,113 @@ import java.util.concurrent.TimeUnit import akka.actor.{ Props, ActorLogging, Actor } import akka.pattern.pipe import akka.io.IO -import akka.util.Timeout import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } import kamon.metric._ -import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult } +import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException } +import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded } import spray.can.Http -import spray.http.Uri import spray.httpx.SprayJsonSupport -import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util.control.NoStackTrace +import JsonProtocol._ -class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor - with ClientPipelines with ActorLogging with SprayJsonSupport { - - import JsonProtocol._ - import MetricReporter.Extractors +class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging with SprayJsonSupport { import context.dispatcher - val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query - val metricDataUri = baseUri.withQuery(metricDataQuery) - - implicit val operationTimeout = Timeout(30 seconds) val metricsExtension = Kamon(Metrics)(context.system) val collectionContext = metricsExtension.buildDefaultCollectionContext - val collectorClient = compressedPipeline(IO(Http)(context.system)) - - val subscriber = { + val metricsSubscriber = { val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS) - if (tickInterval == 60000) - self - else - context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") + + // Metrics are always sent to New Relic in 60 seconds intervals. + if (tickInterval == 60000) self + else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") + } + + subscribeToMetrics() + + def receive = awaitingConfiguration(None) + + def awaitingConfiguration(bufferedMetrics: Option[TimeSliceMetrics]): Receive = { + case Agent.Configure(collector, runID) ⇒ startReporting(collector, runID, bufferedMetrics) + case Agent.ResetConfiguration ⇒ // Stay waiting. + case tickSnapshot: TickMetricSnapshot ⇒ keepWaitingForConfig(tickSnapshot, bufferedMetrics) + case PostSucceeded ⇒ // Ignore + case PostFailed(reason) ⇒ // Ignore any problems until we get a new configuration } - // Subscribe to Trace Metrics - metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true) + def reporting(apiClient: ApiMethodClient, bufferedMetrics: Option[TimeSliceMetrics]): Receive = { + case tick: TickMetricSnapshot ⇒ sendMetricData(apiClient, tick, bufferedMetrics) + case PostSucceeded ⇒ context become reporting(apiClient, None) + case PostFailed(reason) ⇒ processCollectorFailure(reason) + case Agent.ResetConfiguration ⇒ context become awaitingConfiguration(bufferedMetrics) + } + + def sendMetricData(apiClient: ApiMethodClient, tick: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = { + val metricsToReport = merge(convertToTimeSliceMetrics(tick), bufferedMetrics) + val customMarshaller = sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter) - // Subscribe to all User Metrics - metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true) - metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true) - metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true) - metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true) + if (log.isDebugEnabled) + log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", metricsToReport.metrics.size, + metricsToReport.from, metricsToReport.to) - def receive = reporting(None) + pipe { + apiClient.invokeMethod(RawMethods.MetricData, MetricBatch(apiClient.runID.get, metricsToReport))(customMarshaller) + .map { _ ⇒ PostSucceeded } + .recover { case error ⇒ PostFailed(error) } + } to self - def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = { - case TickMetricSnapshot(from, to, metrics) ⇒ - val fromInSeconds = (from / 1E3).toInt - val toInSeconds = (to / 1E3).toInt - val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap - val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics) + context become reporting(apiClient, Some(metricsToReport)) + } - val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n)) - context become reporting(Some(metricsToReport)) - pipe(sendMetricData(metricsToReport)) to self + def processCollectorFailure(failureReason: Throwable): Unit = failureReason match { + case AgentRestartRequiredException ⇒ context.parent ! Agent.Reconnect + case AgentShutdownRequiredException ⇒ context.parent ! Agent.Shutdown + case anyOtherFailure ⇒ + log.error(anyOtherFailure, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") + } - case PostSucceeded ⇒ - context become (reporting(None)) + def startReporting(collector: String, runID: Long, bufferedMetrics: Option[TimeSliceMetrics]): Unit = { + val apiClient = new ApiMethodClient(collector, Some(runID), settings, IO(Http)(context.system)) + context become reporting(apiClient, bufferedMetrics) + } - case PostFailed(reason) ⇒ - log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") + def keepWaitingForConfig(tickSnapshot: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = { + val timeSliceMetrics = convertToTimeSliceMetrics(tickSnapshot) + context become awaitingConfiguration(Some(merge(timeSliceMetrics, bufferedMetrics))) } - def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = { - log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) + def merge(tsm: TimeSliceMetrics, buffered: Option[TimeSliceMetrics]): TimeSliceMetrics = + buffered.foldLeft(tsm)((p, n) ⇒ p.merge(n)) + + def convertToTimeSliceMetrics(tick: TickMetricSnapshot): TimeSliceMetrics = { + val extractedMetrics = MetricReporter.MetricExtractors.flatMap(_.extract(settings, collectionContext, tick.metrics)).toMap + TimeSliceMetrics(tick.from.toTimestamp, tick.to.toTimestamp, extractedMetrics) + } - collectorClient { - Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)) + def subscribeToMetrics(): Unit = { + // Subscribe to Trace Metrics + metricsExtension.subscribe(TraceMetrics, "*", metricsSubscriber, permanently = true) - } map { response ⇒ - if (response.status.isSuccess) - PostSucceeded - else - PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector.")) - } recover { case t: Throwable ⇒ PostFailed(t) } + // Subscribe to all User Metrics + metricsExtension.subscribe(UserHistograms, "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe(UserCounters, "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe(UserMinMaxCounters, "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe(UserGauges, "*", metricsSubscriber, permanently = true) } } object MetricReporter { - val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil - - def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props = - Props(new MetricReporter(settings, runID, baseUri)) + def props(settings: AgentSettings): Props = Props(new MetricReporter(settings)) sealed trait MetricDataPostResult case object PostSucceeded extends MetricDataPostResult case class PostFailed(reason: Throwable) extends MetricDataPostResult - class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace + val MetricExtractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil } trait MetricExtractor { - def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 0a4a516b..a47cac55 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,16 +16,16 @@ package kamon.newrelic -import scala.collection.mutable; +import scala.collection.mutable import kamon.metric._ -import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime } +import kamon.metric.TraceMetrics.TraceMetricsSnapshot import kamon.metric.instrument.Histogram import kamon.trace.SegmentCategory.HttpClient import kamon.trace.SegmentMetricIdentity object WebTransactionMetricExtractor extends MetricExtractor { - def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT) // Trace metrics are recorded in nanoseconds. -- cgit v1.2.3