aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
blob: 7ee7d1e6f0fde54e4398ab6adcf71f7d731d05a0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package kamon.newrelic

import akka.actor.{ Props, ActorLogging, Actor }
import akka.pattern.pipe
import akka.io.IO
import kamon.Kamon
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.metric._
import kamon.metric.instrument.CollectionContext
import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException }
import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded }
import spray.can.Http
import spray.httpx.SprayJsonSupport
import scala.concurrent.duration._
import JsonProtocol._

class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging with SprayJsonSupport {
  import context.dispatcher

  val metricsExtension = Kamon.metrics
  val collectionContext = metricsExtension.buildDefaultCollectionContext
  val metricsSubscriber = {
    val tickInterval = Kamon.metrics.settings.tickInterval

    // Metrics are always sent to New Relic in 60 seconds intervals.
    if (tickInterval == 60.seconds) self
    else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer")
  }

  subscribeToMetrics()

  def receive = awaitingConfiguration(None)

  def awaitingConfiguration(bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
    case Agent.Configure(collector, runID)  startReporting(collector, runID, bufferedMetrics)
    case Agent.ResetConfiguration           // Stay waiting.
    case tickSnapshot: TickMetricSnapshot   keepWaitingForConfig(tickSnapshot, bufferedMetrics)
    case PostSucceeded                      // Ignore
    case PostFailed(reason)                 // Ignore any problems until we get a new configuration
  }

  def reporting(apiClient: ApiMethodClient, bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
    case tick: TickMetricSnapshot  sendMetricData(apiClient, tick, bufferedMetrics)
    case PostSucceeded             context become reporting(apiClient, None)
    case PostFailed(reason)        processCollectorFailure(reason)
    case Agent.ResetConfiguration  context become awaitingConfiguration(bufferedMetrics)
  }

  def sendMetricData(apiClient: ApiMethodClient, tick: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
    val metricsToReport = merge(convertToTimeSliceMetrics(tick), bufferedMetrics)
    val customMarshaller = sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)

    if (log.isDebugEnabled)
      log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", metricsToReport.metrics.size,
        metricsToReport.from, metricsToReport.to)

    pipe {
      apiClient.invokeMethod(RawMethods.MetricData, MetricBatch(apiClient.runID.get, metricsToReport))(customMarshaller)
        .map { _  PostSucceeded }
        .recover { case error  PostFailed(error) }
    } to self

    context become reporting(apiClient, Some(metricsToReport))
  }

  def processCollectorFailure(failureReason: Throwable): Unit = failureReason match {
    case AgentRestartRequiredException   context.parent ! Agent.Reconnect
    case AgentShutdownRequiredException  context.parent ! Agent.Shutdown
    case anyOtherFailure 
      log.error(anyOtherFailure, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
  }

  def startReporting(collector: String, runID: Long, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
    val apiClient = new ApiMethodClient(collector, Some(runID), settings, IO(Http)(context.system))
    context become reporting(apiClient, bufferedMetrics)
  }

  def keepWaitingForConfig(tickSnapshot: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
    val timeSliceMetrics = convertToTimeSliceMetrics(tickSnapshot)
    context become awaitingConfiguration(Some(merge(timeSliceMetrics, bufferedMetrics)))
  }

  def merge(tsm: TimeSliceMetrics, buffered: Option[TimeSliceMetrics]): TimeSliceMetrics =
    buffered.foldLeft(tsm)((p, n)  p.merge(n))

  def convertToTimeSliceMetrics(tick: TickMetricSnapshot): TimeSliceMetrics = {
    val extractedMetrics = MetricReporter.MetricExtractors.flatMap(_.extract(settings, collectionContext, tick.metrics)).toMap
    TimeSliceMetrics(tick.from.toTimestamp, tick.to.toTimestamp, extractedMetrics)
  }

  def subscribeToMetrics(): Unit = {
    metricsExtension.subscribe("trace", "*", metricsSubscriber, permanently = true)
    metricsExtension.subscribe("user-metrics", "*", metricsSubscriber, permanently = true)
  }
}

object MetricReporter {
  def props(settings: AgentSettings): Props = Props(new MetricReporter(settings))

  sealed trait MetricDataPostResult
  case object PostSucceeded extends MetricDataPostResult
  case class PostFailed(reason: Throwable) extends MetricDataPostResult

  val MetricExtractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
}

trait MetricExtractor {
  def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData]
}