aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-11-06 16:29:54 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-11-09 18:01:35 +0100
commitf498749274bc9f25ede7221d6bd8b3f0c3822dda (patch)
tree9371eb98b88830b5c61619a29f53fd4d45040e71 /kamon-newrelic/src/main/scala/kamon/newrelic
parent6e3d9ae88ecce10420eeac82294c54c1b43dedf4 (diff)
downloadKamon-f498749274bc9f25ede7221d6bd8b3f0c3822dda.tar.gz
Kamon-f498749274bc9f25ede7221d6bd8b3f0c3822dda.tar.bz2
Kamon-f498749274bc9f25ede7221d6bd8b3f0c3822dda.zip
! newrelic: major refactor of the newrelic reporter
Most notable changes: - The agent connection setup is separated from the actual metrics reporting, this will be important in the near future when we start sending errors too. - The metrics subscriptions are delayed until the connection to the agent is established. - The Tick metrics buffer is only created if necessary. - Introduced the kamon.newrelic.max-initialize-retries and initialize-retry-delay settings. - External service calls via HTTP clients are reported as external services.
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic')
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala163
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala23
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala (renamed from kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala)11
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala (renamed from kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala)34
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala55
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala105
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala48
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala48
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala (renamed from kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala)35
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/package.scala42
10 files changed, 291 insertions, 273 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index 25fbc9db..f71ecd7f 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -18,88 +18,73 @@ package kamon.newrelic
import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
-import akka.actor.{ ActorLogging, Actor }
+import akka.actor.{ ActorSystem, ActorLogging, Actor }
import akka.event.LoggingAdapter
-import org.slf4j.LoggerFactory
+import akka.io.IO
+import akka.util.Timeout
+import kamon.Kamon
+import kamon.metric.{ CollectionContext, Metrics }
+import spray.can.Http
import spray.json._
import scala.concurrent.{ ExecutionContext, Future }
-import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation }
-import spray.httpx.encoding.Deflate
+import spray.httpx.{ SprayJsonSupport, ResponseTransformation }
import spray.http._
import spray.json.lenses.JsonLenses._
import java.lang.management.ManagementFactory
-import spray.client.pipelining._
-import scala.util.{ Failure, Success }
import spray.http.Uri.Query
-import kamon.newrelic.MetricTranslator.TimeSliceMetrics
import scala.concurrent.duration._
+import Agent._
-class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {
+import akka.pattern.pipe
+// TODO: Setup a proper host connector with custom timeout configuration for use with this.
+class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging {
+ import JsonProtocol._
import context.dispatcher
- import Agent._
- import Retry._
-
- self ! Initialize
-
- val agentInfo = {
- val config = context.system.settings.config.getConfig("kamon.newrelic")
- val appName = config.getString("app-name")
- val licenseKey = config.getString("license-key")
-
- // Name has the format of pid@host
- val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
- val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds)
- val maxRetry = config.getInt("max-retry")
-
- AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay)
- }
+ implicit val operationTimeout = Timeout(30 seconds)
+ val collectorClient = compressedToJsonPipeline(IO(Http)(context.system))
+ val settings = buildAgentSettings(context.system)
val baseQuery = Query(
- "license_key" -> agentInfo.licenseKey,
+ "license_key" -> settings.licenseKey,
"marshal_format" -> "json",
"protocol_version" -> "12")
- def receive: Receive = uninitialized
-
- def uninitialized: Receive = {
- case Initialize ⇒ {
- connectToCollector onComplete {
- case Success(agent) ⇒ {
- log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector)
- context become reporting(agent.runId, agent.collector)
- }
- case Failure(reason) ⇒ self ! InitializationFailed(reason)
- }
- }
- case InitializationFailed(reason) ⇒ {
- log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds)
- context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize)
- }
- case everythingElse ⇒ //ignore
- }
+ // Start the connection to the New Relic collector.
+ self ! Initialize
- def reporting(runId: Long, collector: String): Receive = {
- case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics)
- }
+ def receive: Receive = uninitialized(settings.maxRetries)
- def connectToCollector: Future[Initialized] = for {
- collector ← selectCollector
- runId ← connect(collector, agentInfo)
- } yield Initialized(runId, collector)
+ def uninitialized(attemptsLeft: Int): Receive = {
+ case Initialize ⇒ pipe(connectToCollector) to self
+ case Initialized(runID, collector) ⇒
+ log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector)
- import AgentJsonProtocol._
+ val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery)
+ context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter")
- val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive
- val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson
+ case InitializationFailed(reason) if (attemptsLeft > 0) ⇒
+ log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds)
+ context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize)
+ context become (uninitialized(attemptsLeft - 1))
- def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson
+ case InitializationFailed(reason) ⇒
+ log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.")
+ context.stop(self)
+ }
+
+ def connectToCollector: Future[InitResult] = {
+ (for {
+ collector ← selectCollector
+ runId ← connect(collector, settings)
+ } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) }
+ }
def selectCollector: Future[String] = {
val query = ("method" -> "get_redirect_host") +: baseQuery
val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)
- compressedToJsonPipeline {
+ collectorClient {
Post(getRedirectHostUri, JsArray())
} map { json ⇒
@@ -107,67 +92,39 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
}
}
- def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
+ def connect(collectorHost: String, connect: Settings): Future[Long] = {
log.debug("Connecting to NewRelic Collector [{}]", collectorHost)
val query = ("method" -> "connect") +: baseQuery
val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)
- compressedToJsonPipeline {
+ collectorClient {
Post(connectUri, connect)
} map { json ⇒
json.extract[Long]('return_value / 'agent_run_id)
}
}
-
- def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = {
- val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery
- val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query)
-
- withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒
- compressedPipeline {
- log.info("Sending metrics to NewRelic collector")
- Post(sendMetricDataUri, MetricData(runId, currentMetrics))
- }
- }
- }
}
object Agent {
- case class Initialize()
- case class Initialized(runId: Long, collector: String)
- case class InitializationFailed(reason: Throwable)
- case class CollectorSelection(return_value: String)
- case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration)
- case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics)
-}
+ case object Initialize
+ sealed trait InitResult
+ case class Initialized(runId: Long, collector: String) extends InitResult
+ case class InitializationFailed(reason: Throwable) extends InitResult
+ case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double)
+
+ def buildAgentSettings(system: ActorSystem) = {
+ val config = system.settings.config.getConfig("kamon.newrelic")
+ val appName = config.getString("app-name")
+ val licenseKey = config.getString("license-key")
+ val maxRetries = config.getInt("max-initialize-retries")
+ val retryDelay = FiniteDuration(config.getDuration("initialize-retry-delay", milliseconds), milliseconds)
+ val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds.
-object Retry {
-
- @volatile private var attempts: Int = 0
- @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None
-
- def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = {
-
- val currentMetrics = metrics.merge(pendingMetrics)
-
- if (currentMetrics.metrics.nonEmpty) {
- block(currentMetrics) onComplete {
- case Success(_) ⇒
- pendingMetrics = None
- attempts = 0
- case Failure(_) ⇒
- attempts += 1
- if (maxRetry > attempts) {
- log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry)
- pendingMetrics = Some(currentMetrics)
- } else {
- log.info("Max attempts achieved, proceeding to remove all pending metrics")
- pendingMetrics = None
- attempts = 0
- }
- }
- }
+ // Name has the format of 'pid'@'host'
+ val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
+
+ Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT)
}
} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala
new file mode 100644
index 00000000..ca003646
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala
@@ -0,0 +1,23 @@
+package kamon.newrelic
+
+import akka.actor.ActorRef
+import akka.util.Timeout
+import spray.http.{ HttpResponse, HttpRequest }
+import spray.httpx.RequestBuilding
+import spray.httpx.encoding.Deflate
+import spray.json._
+import spray.client.pipelining.sendReceive
+
+import scala.concurrent.{ ExecutionContext, Future }
+
+trait ClientPipelines extends RequestBuilding {
+
+ def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] =
+ encode(Deflate) ~> sendReceive(transport)
+
+ def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] =
+ compressedPipeline(transport) ~> toJson
+
+ def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson
+
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
index a2b208dc..84472593 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
@@ -16,19 +16,18 @@
package kamon.newrelic
-import akka.actor.Actor
import kamon.metric.UserMetrics.UserMetricGroup
import kamon.metric._
+import kamon.newrelic.Agent.Settings
-trait CustomMetrics {
- self: Actor ⇒
+object CustomMetricExtractor extends MetricExtractor {
- def collectCustomMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = {
+ def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
metrics.collect {
case (mg: UserMetricGroup, groupSnapshot) ⇒
groupSnapshot.metrics collect {
- case (name, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/${mg.name}", None, snapshot)
+ case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit)
}
- }.flatten.toSeq
+ }.flatten.toMap
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
index 9b3e6dea..c573d04d 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
@@ -18,10 +18,10 @@ package kamon.newrelic
import spray.json._
import kamon.newrelic.Agent._
-object AgentJsonProtocol extends DefaultJsonProtocol {
+object JsonProtocol extends DefaultJsonProtocol {
- implicit object ConnectJsonWriter extends RootJsonWriter[AgentInfo] {
- def write(obj: AgentInfo): JsValue =
+ implicit object ConnectJsonWriter extends RootJsonWriter[Settings] {
+ def write(obj: Settings): JsValue =
JsArray(
JsObject(
"agent_version" -> JsString("3.1.0"),
@@ -36,28 +36,30 @@ object AgentJsonProtocol extends DefaultJsonProtocol {
def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector)
}
- implicit object MetricDetailWriter extends JsonWriter[NewRelic.Metric] {
- def write(obj: NewRelic.Metric): JsValue = {
+ implicit object MetricDetailWriter extends JsonWriter[Metric] {
+ def write(obj: Metric): JsValue = {
+ val (metricID, metricData) = obj
+
JsArray(
JsObject(
- "name" -> JsString(obj.name) // TODO Include scope
+ "name" -> JsString(metricID.name) // TODO Include scope
),
JsArray(
- JsNumber(obj.callCount),
- JsNumber(obj.total),
- JsNumber(obj.totalExclusive),
- JsNumber(obj.min),
- JsNumber(obj.max),
- JsNumber(obj.sumOfSquares)))
+ JsNumber(metricData.callCount),
+ JsNumber(metricData.total),
+ JsNumber(metricData.totalExclusive),
+ JsNumber(metricData.min),
+ JsNumber(metricData.max),
+ JsNumber(metricData.sumOfSquares)))
}
}
- implicit object MetricDataWriter extends RootJsonWriter[MetricData] {
- def write(obj: MetricData): JsValue =
+ implicit object MetricBatchWriter extends RootJsonWriter[MetricBatch] {
+ def write(obj: MetricBatch): JsValue =
JsArray(
- JsNumber(obj.runId),
+ JsNumber(obj.runID),
JsNumber(obj.timeSliceMetrics.from),
JsNumber(obj.timeSliceMetrics.to),
- obj.timeSliceMetrics.metrics.values.toSeq.toJson)
+ obj.timeSliceMetrics.metrics.toSeq.toJson)
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
new file mode 100644
index 00000000..14541483
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
@@ -0,0 +1,55 @@
+package kamon.newrelic
+
+import kamon.metric.instrument.{ Counter, Histogram }
+import kamon.metric.{ MetricSnapshot, Scale }
+
+case class MetricID(name: String, scope: Option[String])
+case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) {
+ def merge(that: MetricData): MetricData =
+ MetricData(
+ callCount + that.callCount,
+ total + that.total,
+ totalExclusive + that.totalExclusive,
+ math.min(min, that.min),
+ math.max(max, that.max),
+ sumOfSquares + that.sumOfSquares)
+}
+
+object Metric {
+
+ def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = {
+ snapshot match {
+ case hs: Histogram.Snapshot ⇒
+ var total: Double = 0D
+ var sumOfSquares: Double = 0D
+ val scaledMin = Scale.convert(hs.scale, targetScale, hs.min)
+ val scaledMax = Scale.convert(hs.scale, targetScale, hs.max)
+
+ hs.recordsIterator.foreach { record ⇒
+ val scaledValue = Scale.convert(hs.scale, targetScale, record.level)
+
+ total += scaledValue * record.count
+ sumOfSquares += (scaledValue * scaledValue) * record.count
+ }
+
+ (MetricID(name, scope), MetricData(hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares))
+
+ case cs: Counter.Snapshot ⇒
+ (MetricID(name, scope), MetricData(cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count))
+ }
+ }
+}
+
+case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) {
+ import kamon.metric.combineMaps
+
+ def merge(that: TimeSliceMetrics): TimeSliceMetrics = {
+ val mergedFrom = math.min(from, that.from)
+ val mergedTo = math.max(to, that.to)
+ val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r))
+
+ TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics)
+ }
+}
+
+case class MetricBatch(runID: Long, timeSliceMetrics: TimeSliceMetrics) \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
new file mode 100644
index 00000000..b09973ef
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
@@ -0,0 +1,105 @@
+package kamon.newrelic
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ Props, ActorLogging, Actor }
+import akka.pattern.pipe
+import akka.io.IO
+import akka.util.Timeout
+import kamon.Kamon
+import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms }
+import kamon.metric._
+import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult }
+import spray.can.Http
+import spray.http.Uri
+import spray.httpx.SprayJsonSupport
+import spray.json.CompactPrinter
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
+
+class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor
+ with ClientPipelines with ActorLogging with SprayJsonSupport {
+
+ import JsonProtocol._
+ import MetricReporter.Extractors
+ import context.dispatcher
+
+ val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query
+ val metricDataUri = baseUri.withQuery(metricDataQuery)
+
+ implicit val operationTimeout = Timeout(30 seconds)
+ val metricsExtension = Kamon(Metrics)(context.system)
+ val collectionContext = metricsExtension.buildDefaultCollectionContext
+ val collectorClient = compressedPipeline(IO(Http)(context.system))
+
+ val subscriber = {
+ val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS)
+ if (tickInterval == 60000)
+ self
+ else
+ context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer")
+ }
+
+ // Subscribe to Trace Metrics
+ metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true)
+
+ // Subscribe to all User Metrics
+ metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true)
+ metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true)
+ metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true)
+ metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true)
+
+ def receive = reporting(None)
+
+ def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = {
+ case TickMetricSnapshot(from, to, metrics) ⇒
+ val fromInSeconds = (from / 1E3).toInt
+ val toInSeconds = (to / 1E3).toInt
+ val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap
+ val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics)
+
+ val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n))
+ context become reporting(Some(metricsToReport))
+ pipe(sendMetricData(metricsToReport)) to self
+
+ case PostSucceeded ⇒
+ context become (reporting(None))
+
+ case PostFailed(reason) ⇒
+ log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
+ }
+
+ def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = {
+ log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to)
+
+ collectorClient {
+ Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter))
+
+ } map { response ⇒
+ if (response.status.isSuccess)
+ PostSucceeded
+ else
+ PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector."))
+ } recover { case t: Throwable ⇒ PostFailed(t) }
+ }
+}
+
+object MetricReporter {
+ val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
+
+ def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props =
+ Props(new MetricReporter(settings, runID, baseUri))
+
+ sealed trait MetricDataPostResult
+ case object PostSucceeded extends MetricDataPostResult
+ case class PostFailed(reason: Throwable) extends MetricDataPostResult
+
+ class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace
+}
+
+trait MetricExtractor {
+ def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData]
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
deleted file mode 100644
index 5fa571e1..00000000
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.newrelic
-
-import akka.actor.{ Props, ActorRef, Actor }
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.newrelic.MetricTranslator.TimeSliceMetrics
-
-class MetricTranslator(receiver: ActorRef) extends Actor
- with WebTransactionMetrics with CustomMetrics {
-
- def receive = {
- case TickMetricSnapshot(from, to, metrics) ⇒
- val fromInSeconds = (from / 1E3).toInt
- val toInSeconds = (to / 1E3).toInt
- val allMetrics = collectWebTransactionMetrics(metrics) ++ collectCustomMetrics(metrics)
- val groupedMetrics: Map[String, NewRelic.Metric] = allMetrics.map(metric ⇒ metric.name -> metric)(collection.breakOut) // avoid intermediate tuple
-
- receiver ! TimeSliceMetrics(fromInSeconds, toInSeconds, groupedMetrics)
- }
-
-}
-
-object MetricTranslator {
- case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[String, NewRelic.Metric]) {
- import kamon.metric._
-
- def merge(thatMetrics: Option[TimeSliceMetrics]): TimeSliceMetrics = {
- thatMetrics.map(that ⇒ TimeSliceMetrics(from + that.from, to + that.to, combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)))).getOrElse(this)
- }
- }
-
- def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver))
-}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
index b270d228..a4be4c0b 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
@@ -16,61 +16,19 @@
package kamon.newrelic
-import java.util.concurrent.TimeUnit.MILLISECONDS
-
import akka.actor
import akka.actor._
+import akka.event.Logging
import kamon.Kamon
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.UserMetrics.{ UserCounters, UserGauges, UserHistograms, UserMinMaxCounters }
-import kamon.metric.{ Metrics, TickMetricSnapshotBuffer, TraceMetrics }
-
-import scala.concurrent.duration._
class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config.getConfig("kamon.newrelic")
-
- val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext
- val metricsListener = system.actorOf(Props[NewRelicMetricsListener], "kamon-newrelic")
- val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds.
+ val log = Logging(system, classOf[NewRelicExtension])
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricsListener, permanently = true)
-
- // Subscribe to all user metrics
- Kamon(Metrics)(system).subscribe(UserHistograms, "*", metricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserCounters, "*", metricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", metricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserGauges, "*", metricsListener, permanently = true)
-
-}
-
-class NewRelicMetricsListener extends Actor with ActorLogging {
log.info("Starting the Kamon(NewRelic) extension")
-
- val agent = context.actorOf(Props[Agent], "agent")
- val translator = context.actorOf(MetricTranslator.props(agent), "translator")
- val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "metric-buffer")
-
- def receive = {
- case tick: TickMetricSnapshot ⇒ buffer.forward(tick)
- }
+ val agent = system.actorOf(Props[Agent], "newrelic-agent")
}
object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = NewRelic
def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system)
-
- case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double,
- min: Double, max: Double, sumOfSquares: Double) {
-
- def merge(that: Metric): Metric = {
- Metric(name, scope,
- callCount + that.callCount,
- total + that.total,
- totalExclusive + that.totalExclusive,
- math.min(min, that.min),
- math.max(max, that.max),
- sumOfSquares + that.sumOfSquares)
- }
- }
} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
index a8c54684..cfb0e721 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
@@ -18,39 +18,46 @@ package kamon.newrelic
import kamon.metric._
import kamon.metric.TraceMetrics.ElapsedTime
-import akka.actor.Actor
-import kamon.Kamon
import kamon.metric.instrument.Histogram
+import kamon.trace.SegmentMetricIdentityLabel.HttpClient
+import kamon.trace.SegmentMetricIdentity
-trait WebTransactionMetrics {
- self: Actor ⇒
+object WebTransactionMetricExtractor extends MetricExtractor {
- def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = {
- val newRelicExtension = Kamon(NewRelic)(context.system)
- val apdexBuilder = new ApdexBuilder("Apdex", None, newRelicExtension.apdexT)
- val collectionContext = newRelicExtension.collectionContext
+ def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
+ val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT)
// Trace metrics are recorded in nanoseconds.
var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano)
+ var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano)
- val webTransactionMetrics = metrics.collect {
+ val transactionMetrics = metrics.collect {
case (TraceMetrics(name), groupSnapshot) ⇒
groupSnapshot.metrics collect {
+ // Extract WebTransaction metrics and accumulate HttpDispatcher
case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒
accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext)
snapshot.recordsIterator.foreach { record ⇒
apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count)
}
- toNewRelicMetric(Scale.Unit)(s"WebTransaction/Custom/$name", None, snapshot)
+ Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit)
+
+ // Extract all external services.
+ case (SegmentMetricIdentity(segmentName, label), snapshot: Histogram.Snapshot) if label.equals(HttpClient)⇒
+ accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext)
+
+ Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit)
}
}
- val httpDispatcher = toNewRelicMetric(Scale.Unit)("HttpDispatcher", None, accumulatedHttpDispatcher)
- val webTransaction = toNewRelicMetric(Scale.Unit)("WebTransaction", None, accumulatedHttpDispatcher)
+ val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit)
+ val webTransaction = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "WebTransaction", None, Scale.Unit)
+ val external = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External", None, Scale.Unit)
+ val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit)
- Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq
+ Map(httpDispatcher, webTransaction, external, externalAllWeb, apdexBuilder.build) ++ transactionMetrics.flatten.toMap
}
}
@@ -70,5 +77,5 @@ class ApdexBuilder(name: String, scope: Option[String], apdexT: Double) {
frustrating += count
// NewRelic reuses the same metric structure for recording the Apdex.. weird, but that's how it works.
- def build: NewRelic.Metric = NewRelic.Metric(name, scope, satisfying, tolerating, frustrating, apdexT, apdexT, 0)
+ def build: Metric = (MetricID(name, scope), MetricData(satisfying, tolerating, frustrating, apdexT, apdexT, 0))
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
index 89a8b15b..06c3dad0 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
@@ -1,45 +1,5 @@
-/*=========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- *
- */
-
package kamon
-import kamon.metric.instrument.{ Counter, Histogram }
-import kamon.metric.{ MetricSnapshot, Scale }
-
package object newrelic {
-
- def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = {
- snapshot match {
- case hs: Histogram.Snapshot ⇒
- var total: Double = 0D
- var sumOfSquares: Double = 0D
- val scaledMin = Scale.convert(hs.scale, scale, hs.min)
- val scaledMax = Scale.convert(hs.scale, scale, hs.max)
-
- hs.recordsIterator.foreach { record ⇒
- val scaledValue = Scale.convert(hs.scale, scale, record.level)
-
- total += scaledValue * record.count
- sumOfSquares += (scaledValue * scaledValue) * record.count
- }
-
- NewRelic.Metric(name, scope, hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)
-
- case cs: Counter.Snapshot ⇒
- NewRelic.Metric(name, scope, cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count)
- }
- }
+ type Metric = (MetricID, MetricData)
}