1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.newrelic
import akka.actor._
import akka.io.IO
import akka.pattern.pipe
import kamon.Kamon
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.metric._
import kamon.metric.instrument.CollectionContext
import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException }
import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded }
import spray.can.Http
import spray.httpx.SprayJsonSupport
import JsonProtocol._
class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging with SprayJsonSupport {
import context.dispatcher
val metricsExtension = Kamon.metrics
val collectionContext = metricsExtension.buildDefaultCollectionContext
def receive = awaitingConfiguration(None)
def awaitingConfiguration(bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
case Agent.Configure(collector, runID) ⇒ startReporting(collector, runID, bufferedMetrics)
case Agent.ResetConfiguration ⇒ // Stay waiting.
case tickSnapshot: TickMetricSnapshot ⇒ keepWaitingForConfig(tickSnapshot, bufferedMetrics)
case PostSucceeded ⇒ // Ignore
case PostFailed(reason) ⇒ // Ignore any problems until we get a new configuration
}
def reporting(apiClient: ApiMethodClient, bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
case tick: TickMetricSnapshot ⇒ sendMetricData(apiClient, tick, bufferedMetrics)
case PostSucceeded ⇒ context become reporting(apiClient, None)
case PostFailed(reason) ⇒ processCollectorFailure(reason)
case Agent.ResetConfiguration ⇒ context become awaitingConfiguration(bufferedMetrics)
}
def sendMetricData(apiClient: ApiMethodClient, tick: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
val metricsToReport = merge(convertToTimeSliceMetrics(tick), bufferedMetrics)
val customMarshaller = sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)
if (log.isDebugEnabled)
log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", metricsToReport.metrics.size,
metricsToReport.from, metricsToReport.to)
pipe {
apiClient.invokeMethod(RawMethods.MetricData, MetricBatch(apiClient.runID.get, metricsToReport))(customMarshaller)
.map { _ ⇒ PostSucceeded }
.recover { case error ⇒ PostFailed(error) }
} to self
context become reporting(apiClient, Some(metricsToReport))
}
def processCollectorFailure(failureReason: Throwable): Unit = failureReason match {
case AgentRestartRequiredException ⇒ context.parent ! Agent.Reconnect
case AgentShutdownRequiredException ⇒ context.parent ! Agent.Shutdown
case anyOtherFailure ⇒
log.error(anyOtherFailure, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
}
def startReporting(collector: String, runID: Long, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
val apiClient = new ApiMethodClient(collector, Some(runID), settings, IO(Http)(context.system))
context become reporting(apiClient, bufferedMetrics)
}
def keepWaitingForConfig(tickSnapshot: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
val timeSliceMetrics = convertToTimeSliceMetrics(tickSnapshot)
context become awaitingConfiguration(Some(merge(timeSliceMetrics, bufferedMetrics)))
}
def merge(tsm: TimeSliceMetrics, buffered: Option[TimeSliceMetrics]): TimeSliceMetrics =
buffered.foldLeft(tsm)((p, n) ⇒ p.merge(n))
def convertToTimeSliceMetrics(tick: TickMetricSnapshot): TimeSliceMetrics = {
val extractedMetrics = MetricReporter.MetricExtractors.flatMap(_.extract(settings, collectionContext, tick.metrics)).toMap
TimeSliceMetrics(tick.from.toTimestamp, tick.to.toTimestamp, extractedMetrics)
}
}
object MetricReporter {
def props(settings: AgentSettings): Props = Props(new MetricReporter(settings))
sealed trait MetricDataPostResult
case object PostSucceeded extends MetricDataPostResult
case class PostFailed(reason: Throwable) extends MetricDataPostResult
val MetricExtractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
}
trait MetricExtractor {
def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData]
}
|