diff options
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon')
10 files changed, 291 insertions, 273 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 25fbc9db..f71ecd7f 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -18,88 +18,73 @@ package kamon.newrelic import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } -import akka.actor.{ ActorLogging, Actor } +import akka.actor.{ ActorSystem, ActorLogging, Actor } import akka.event.LoggingAdapter -import org.slf4j.LoggerFactory +import akka.io.IO +import akka.util.Timeout +import kamon.Kamon +import kamon.metric.{ CollectionContext, Metrics } +import spray.can.Http import spray.json._ import scala.concurrent.{ ExecutionContext, Future } -import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } -import spray.httpx.encoding.Deflate +import spray.httpx.{ SprayJsonSupport, ResponseTransformation } import spray.http._ import spray.json.lenses.JsonLenses._ import java.lang.management.ManagementFactory -import spray.client.pipelining._ -import scala.util.{ Failure, Success } import spray.http.Uri.Query -import kamon.newrelic.MetricTranslator.TimeSliceMetrics import scala.concurrent.duration._ +import Agent._ -class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { +import akka.pattern.pipe +// TODO: Setup a proper host connector with custom timeout configuration for use with this. +class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging { + import JsonProtocol._ import context.dispatcher - import Agent._ - import Retry._ - - self ! Initialize - - val agentInfo = { - val config = context.system.settings.config.getConfig("kamon.newrelic") - val appName = config.getString("app-name") - val licenseKey = config.getString("license-key") - - // Name has the format of pid@host - val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') - val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds) - val maxRetry = config.getInt("max-retry") - - AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay) - } + implicit val operationTimeout = Timeout(30 seconds) + val collectorClient = compressedToJsonPipeline(IO(Http)(context.system)) + val settings = buildAgentSettings(context.system) val baseQuery = Query( - "license_key" -> agentInfo.licenseKey, + "license_key" -> settings.licenseKey, "marshal_format" -> "json", "protocol_version" -> "12") - def receive: Receive = uninitialized - - def uninitialized: Receive = { - case Initialize ⇒ { - connectToCollector onComplete { - case Success(agent) ⇒ { - log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector) - context become reporting(agent.runId, agent.collector) - } - case Failure(reason) ⇒ self ! InitializationFailed(reason) - } - } - case InitializationFailed(reason) ⇒ { - log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds) - context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize) - } - case everythingElse ⇒ //ignore - } + // Start the connection to the New Relic collector. + self ! Initialize - def reporting(runId: Long, collector: String): Receive = { - case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) - } + def receive: Receive = uninitialized(settings.maxRetries) - def connectToCollector: Future[Initialized] = for { - collector ← selectCollector - runId ← connect(collector, agentInfo) - } yield Initialized(runId, collector) + def uninitialized(attemptsLeft: Int): Receive = { + case Initialize ⇒ pipe(connectToCollector) to self + case Initialized(runID, collector) ⇒ + log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector) - import AgentJsonProtocol._ + val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery) + context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter") - val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive - val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson + case InitializationFailed(reason) if (attemptsLeft > 0) ⇒ + log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize) + context become (uninitialized(attemptsLeft - 1)) - def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson + case InitializationFailed(reason) ⇒ + log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.") + context.stop(self) + } + + def connectToCollector: Future[InitResult] = { + (for { + collector ← selectCollector + runId ← connect(collector, settings) + } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) } + } def selectCollector: Future[String] = { val query = ("method" -> "get_redirect_host") +: baseQuery val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) - compressedToJsonPipeline { + collectorClient { Post(getRedirectHostUri, JsArray()) } map { json ⇒ @@ -107,67 +92,39 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } } - def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + def connect(collectorHost: String, connect: Settings): Future[Long] = { log.debug("Connecting to NewRelic Collector [{}]", collectorHost) val query = ("method" -> "connect") +: baseQuery val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) - compressedToJsonPipeline { + collectorClient { Post(connectUri, connect) } map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } - - def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = { - val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery - val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - - withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒ - compressedPipeline { - log.info("Sending metrics to NewRelic collector") - Post(sendMetricDataUri, MetricData(runId, currentMetrics)) - } - } - } } object Agent { - case class Initialize() - case class Initialized(runId: Long, collector: String) - case class InitializationFailed(reason: Throwable) - case class CollectorSelection(return_value: String) - case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration) - case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) -} + case object Initialize + sealed trait InitResult + case class Initialized(runId: Long, collector: String) extends InitResult + case class InitializationFailed(reason: Throwable) extends InitResult + case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double) + + def buildAgentSettings(system: ActorSystem) = { + val config = system.settings.config.getConfig("kamon.newrelic") + val appName = config.getString("app-name") + val licenseKey = config.getString("license-key") + val maxRetries = config.getInt("max-initialize-retries") + val retryDelay = FiniteDuration(config.getDuration("initialize-retry-delay", milliseconds), milliseconds) + val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds. -object Retry { - - @volatile private var attempts: Int = 0 - @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None - - def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = { - - val currentMetrics = metrics.merge(pendingMetrics) - - if (currentMetrics.metrics.nonEmpty) { - block(currentMetrics) onComplete { - case Success(_) ⇒ - pendingMetrics = None - attempts = 0 - case Failure(_) ⇒ - attempts += 1 - if (maxRetry > attempts) { - log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry) - pendingMetrics = Some(currentMetrics) - } else { - log.info("Max attempts achieved, proceeding to remove all pending metrics") - pendingMetrics = None - attempts = 0 - } - } - } + // Name has the format of 'pid'@'host' + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + + Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT) } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala new file mode 100644 index 00000000..ca003646 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala @@ -0,0 +1,23 @@ +package kamon.newrelic + +import akka.actor.ActorRef +import akka.util.Timeout +import spray.http.{ HttpResponse, HttpRequest } +import spray.httpx.RequestBuilding +import spray.httpx.encoding.Deflate +import spray.json._ +import spray.client.pipelining.sendReceive + +import scala.concurrent.{ ExecutionContext, Future } + +trait ClientPipelines extends RequestBuilding { + + def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] = + encode(Deflate) ~> sendReceive(transport) + + def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] = + compressedPipeline(transport) ~> toJson + + def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson + +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala index a2b208dc..84472593 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala @@ -16,19 +16,18 @@ package kamon.newrelic -import akka.actor.Actor import kamon.metric.UserMetrics.UserMetricGroup import kamon.metric._ +import kamon.newrelic.Agent.Settings -trait CustomMetrics { - self: Actor ⇒ +object CustomMetricExtractor extends MetricExtractor { - def collectCustomMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { + def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { metrics.collect { case (mg: UserMetricGroup, groupSnapshot) ⇒ groupSnapshot.metrics collect { - case (name, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/${mg.name}", None, snapshot) + case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit) } - }.flatten.toSeq + }.flatten.toMap } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index 9b3e6dea..c573d04d 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -18,10 +18,10 @@ package kamon.newrelic import spray.json._ import kamon.newrelic.Agent._ -object AgentJsonProtocol extends DefaultJsonProtocol { +object JsonProtocol extends DefaultJsonProtocol { - implicit object ConnectJsonWriter extends RootJsonWriter[AgentInfo] { - def write(obj: AgentInfo): JsValue = + implicit object ConnectJsonWriter extends RootJsonWriter[Settings] { + def write(obj: Settings): JsValue = JsArray( JsObject( "agent_version" -> JsString("3.1.0"), @@ -36,28 +36,30 @@ object AgentJsonProtocol extends DefaultJsonProtocol { def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) } - implicit object MetricDetailWriter extends JsonWriter[NewRelic.Metric] { - def write(obj: NewRelic.Metric): JsValue = { + implicit object MetricDetailWriter extends JsonWriter[Metric] { + def write(obj: Metric): JsValue = { + val (metricID, metricData) = obj + JsArray( JsObject( - "name" -> JsString(obj.name) // TODO Include scope + "name" -> JsString(metricID.name) // TODO Include scope ), JsArray( - JsNumber(obj.callCount), - JsNumber(obj.total), - JsNumber(obj.totalExclusive), - JsNumber(obj.min), - JsNumber(obj.max), - JsNumber(obj.sumOfSquares))) + JsNumber(metricData.callCount), + JsNumber(metricData.total), + JsNumber(metricData.totalExclusive), + JsNumber(metricData.min), + JsNumber(metricData.max), + JsNumber(metricData.sumOfSquares))) } } - implicit object MetricDataWriter extends RootJsonWriter[MetricData] { - def write(obj: MetricData): JsValue = + implicit object MetricBatchWriter extends RootJsonWriter[MetricBatch] { + def write(obj: MetricBatch): JsValue = JsArray( - JsNumber(obj.runId), + JsNumber(obj.runID), JsNumber(obj.timeSliceMetrics.from), JsNumber(obj.timeSliceMetrics.to), - obj.timeSliceMetrics.metrics.values.toSeq.toJson) + obj.timeSliceMetrics.metrics.toSeq.toJson) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala new file mode 100644 index 00000000..14541483 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -0,0 +1,55 @@ +package kamon.newrelic + +import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.{ MetricSnapshot, Scale } + +case class MetricID(name: String, scope: Option[String]) +case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) { + def merge(that: MetricData): MetricData = + MetricData( + callCount + that.callCount, + total + that.total, + totalExclusive + that.totalExclusive, + math.min(min, that.min), + math.max(max, that.max), + sumOfSquares + that.sumOfSquares) +} + +object Metric { + + def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = { + snapshot match { + case hs: Histogram.Snapshot ⇒ + var total: Double = 0D + var sumOfSquares: Double = 0D + val scaledMin = Scale.convert(hs.scale, targetScale, hs.min) + val scaledMax = Scale.convert(hs.scale, targetScale, hs.max) + + hs.recordsIterator.foreach { record ⇒ + val scaledValue = Scale.convert(hs.scale, targetScale, record.level) + + total += scaledValue * record.count + sumOfSquares += (scaledValue * scaledValue) * record.count + } + + (MetricID(name, scope), MetricData(hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)) + + case cs: Counter.Snapshot ⇒ + (MetricID(name, scope), MetricData(cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count)) + } + } +} + +case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) { + import kamon.metric.combineMaps + + def merge(that: TimeSliceMetrics): TimeSliceMetrics = { + val mergedFrom = math.min(from, that.from) + val mergedTo = math.max(to, that.to) + val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)) + + TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics) + } +} + +case class MetricBatch(runID: Long, timeSliceMetrics: TimeSliceMetrics)
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala new file mode 100644 index 00000000..b09973ef --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -0,0 +1,105 @@ +package kamon.newrelic + +import java.util.concurrent.TimeUnit + +import akka.actor.{ Props, ActorLogging, Actor } +import akka.pattern.pipe +import akka.io.IO +import akka.util.Timeout +import kamon.Kamon +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } +import kamon.metric._ +import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult } +import spray.can.Http +import spray.http.Uri +import spray.httpx.SprayJsonSupport +import spray.json.CompactPrinter + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor + with ClientPipelines with ActorLogging with SprayJsonSupport { + + import JsonProtocol._ + import MetricReporter.Extractors + import context.dispatcher + + val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query + val metricDataUri = baseUri.withQuery(metricDataQuery) + + implicit val operationTimeout = Timeout(30 seconds) + val metricsExtension = Kamon(Metrics)(context.system) + val collectionContext = metricsExtension.buildDefaultCollectionContext + val collectorClient = compressedPipeline(IO(Http)(context.system)) + + val subscriber = { + val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS) + if (tickInterval == 60000) + self + else + context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") + } + + // Subscribe to Trace Metrics + metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true) + + // Subscribe to all User Metrics + metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true) + metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true) + metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true) + metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true) + + def receive = reporting(None) + + def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = { + case TickMetricSnapshot(from, to, metrics) ⇒ + val fromInSeconds = (from / 1E3).toInt + val toInSeconds = (to / 1E3).toInt + val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap + val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics) + + val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n)) + context become reporting(Some(metricsToReport)) + pipe(sendMetricData(metricsToReport)) to self + + case PostSucceeded ⇒ + context become (reporting(None)) + + case PostFailed(reason) ⇒ + log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") + } + + def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = { + log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) + + collectorClient { + Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter)) + + } map { response ⇒ + if (response.status.isSuccess) + PostSucceeded + else + PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector.")) + } recover { case t: Throwable ⇒ PostFailed(t) } + } +} + +object MetricReporter { + val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil + + def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props = + Props(new MetricReporter(settings, runID, baseUri)) + + sealed trait MetricDataPostResult + case object PostSucceeded extends MetricDataPostResult + case class PostFailed(reason: Throwable) extends MetricDataPostResult + + class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace +} + +trait MetricExtractor { + def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala deleted file mode 100644 index 5fa571e1..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.newrelic - -import akka.actor.{ Props, ActorRef, Actor } -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.newrelic.MetricTranslator.TimeSliceMetrics - -class MetricTranslator(receiver: ActorRef) extends Actor - with WebTransactionMetrics with CustomMetrics { - - def receive = { - case TickMetricSnapshot(from, to, metrics) ⇒ - val fromInSeconds = (from / 1E3).toInt - val toInSeconds = (to / 1E3).toInt - val allMetrics = collectWebTransactionMetrics(metrics) ++ collectCustomMetrics(metrics) - val groupedMetrics: Map[String, NewRelic.Metric] = allMetrics.map(metric ⇒ metric.name -> metric)(collection.breakOut) // avoid intermediate tuple - - receiver ! TimeSliceMetrics(fromInSeconds, toInSeconds, groupedMetrics) - } - -} - -object MetricTranslator { - case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[String, NewRelic.Metric]) { - import kamon.metric._ - - def merge(thatMetrics: Option[TimeSliceMetrics]): TimeSliceMetrics = { - thatMetrics.map(that ⇒ TimeSliceMetrics(from + that.from, to + that.to, combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)))).getOrElse(this) - } - } - - def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver)) -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index b270d228..a4be4c0b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -16,61 +16,19 @@ package kamon.newrelic -import java.util.concurrent.TimeUnit.MILLISECONDS - import akka.actor import akka.actor._ +import akka.event.Logging import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.UserMetrics.{ UserCounters, UserGauges, UserHistograms, UserMinMaxCounters } -import kamon.metric.{ Metrics, TickMetricSnapshotBuffer, TraceMetrics } - -import scala.concurrent.duration._ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.newrelic") - - val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext - val metricsListener = system.actorOf(Props[NewRelicMetricsListener], "kamon-newrelic") - val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds. + val log = Logging(system, classOf[NewRelicExtension]) - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricsListener, permanently = true) - - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", metricsListener, permanently = true) - -} - -class NewRelicMetricsListener extends Actor with ActorLogging { log.info("Starting the Kamon(NewRelic) extension") - - val agent = context.actorOf(Props[Agent], "agent") - val translator = context.actorOf(MetricTranslator.props(agent), "translator") - val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "metric-buffer") - - def receive = { - case tick: TickMetricSnapshot ⇒ buffer.forward(tick) - } + val agent = system.actorOf(Props[Agent], "newrelic-agent") } object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = NewRelic def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) - - case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double, - min: Double, max: Double, sumOfSquares: Double) { - - def merge(that: Metric): Metric = { - Metric(name, scope, - callCount + that.callCount, - total + that.total, - totalExclusive + that.totalExclusive, - math.min(min, that.min), - math.max(max, that.max), - sumOfSquares + that.sumOfSquares) - } - } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index a8c54684..cfb0e721 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -18,39 +18,46 @@ package kamon.newrelic import kamon.metric._ import kamon.metric.TraceMetrics.ElapsedTime -import akka.actor.Actor -import kamon.Kamon import kamon.metric.instrument.Histogram +import kamon.trace.SegmentMetricIdentityLabel.HttpClient +import kamon.trace.SegmentMetricIdentity -trait WebTransactionMetrics { - self: Actor ⇒ +object WebTransactionMetricExtractor extends MetricExtractor { - def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { - val newRelicExtension = Kamon(NewRelic)(context.system) - val apdexBuilder = new ApdexBuilder("Apdex", None, newRelicExtension.apdexT) - val collectionContext = newRelicExtension.collectionContext + def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT) // Trace metrics are recorded in nanoseconds. var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) + var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - val webTransactionMetrics = metrics.collect { + val transactionMetrics = metrics.collect { case (TraceMetrics(name), groupSnapshot) ⇒ groupSnapshot.metrics collect { + // Extract WebTransaction metrics and accumulate HttpDispatcher case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒ accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext) snapshot.recordsIterator.foreach { record ⇒ apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count) } - toNewRelicMetric(Scale.Unit)(s"WebTransaction/Custom/$name", None, snapshot) + Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit) + + // Extract all external services. + case (SegmentMetricIdentity(segmentName, label), snapshot: Histogram.Snapshot) if label.equals(HttpClient)⇒ + accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) + + Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit) } } - val httpDispatcher = toNewRelicMetric(Scale.Unit)("HttpDispatcher", None, accumulatedHttpDispatcher) - val webTransaction = toNewRelicMetric(Scale.Unit)("WebTransaction", None, accumulatedHttpDispatcher) + val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) + val webTransaction = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "WebTransaction", None, Scale.Unit) + val external = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External", None, Scale.Unit) + val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) - Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq + Map(httpDispatcher, webTransaction, external, externalAllWeb, apdexBuilder.build) ++ transactionMetrics.flatten.toMap } } @@ -70,5 +77,5 @@ class ApdexBuilder(name: String, scope: Option[String], apdexT: Double) { frustrating += count // NewRelic reuses the same metric structure for recording the Apdex.. weird, but that's how it works. - def build: NewRelic.Metric = NewRelic.Metric(name, scope, satisfying, tolerating, frustrating, apdexT, apdexT, 0) + def build: Metric = (MetricID(name, scope), MetricData(satisfying, tolerating, frustrating, apdexT, apdexT, 0)) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala index 89a8b15b..06c3dad0 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala @@ -1,45 +1,5 @@ -/*========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - * - */ - package kamon -import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metric.{ MetricSnapshot, Scale } - package object newrelic { - - def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = { - snapshot match { - case hs: Histogram.Snapshot ⇒ - var total: Double = 0D - var sumOfSquares: Double = 0D - val scaledMin = Scale.convert(hs.scale, scale, hs.min) - val scaledMax = Scale.convert(hs.scale, scale, hs.max) - - hs.recordsIterator.foreach { record ⇒ - val scaledValue = Scale.convert(hs.scale, scale, record.level) - - total += scaledValue * record.count - sumOfSquares += (scaledValue * scaledValue) * record.count - } - - NewRelic.Metric(name, scope, hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) - - case cs: Counter.Snapshot ⇒ - NewRelic.Metric(name, scope, cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count) - } - } + type Metric = (MetricID, MetricData) } |