package kamon.newrelic import java.util.concurrent.TimeUnit import akka.actor.{ Props, ActorLogging, Actor } import akka.pattern.pipe import akka.io.IO import akka.util.Timeout import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } import kamon.metric._ import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult } import spray.can.Http import spray.http.Uri import spray.httpx.SprayJsonSupport import spray.json.CompactPrinter import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor with ClientPipelines with ActorLogging with SprayJsonSupport { import JsonProtocol._ import MetricReporter.Extractors import context.dispatcher val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query val metricDataUri = baseUri.withQuery(metricDataQuery) implicit val operationTimeout = Timeout(30 seconds) val metricsExtension = Kamon(Metrics)(context.system) val collectionContext = metricsExtension.buildDefaultCollectionContext val collectorClient = compressedPipeline(IO(Http)(context.system)) val subscriber = { val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS) if (tickInterval == 60000) self else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") } // Subscribe to Trace Metrics metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true) // Subscribe to all User Metrics metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true) metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true) metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true) metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true) def receive = reporting(None) def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = { case TickMetricSnapshot(from, to, metrics) ⇒ val fromInSeconds = (from / 1E3).toInt val toInSeconds = (to / 1E3).toInt val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics) val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n)) context become reporting(Some(metricsToReport)) pipe(sendMetricData(metricsToReport)) to self case PostSucceeded ⇒ context become (reporting(None)) case PostFailed(reason) ⇒ log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") } def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = { log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) collectorClient { Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter)) } map { response ⇒ if (response.status.isSuccess) PostSucceeded else PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector.")) } recover { case t: Throwable ⇒ PostFailed(t) } } } object MetricReporter { val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props = Props(new MetricReporter(settings, runID, baseUri)) sealed trait MetricDataPostResult case object PostSucceeded extends MetricDataPostResult case class PostFailed(reason: Throwable) extends MetricDataPostResult class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace } trait MetricExtractor { def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] }