aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon')
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala152
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala69
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala23
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala3
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala16
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala7
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala130
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala6
8 files changed, 240 insertions, 166 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index f71ecd7f..aa6aed3b 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -18,113 +18,127 @@ package kamon.newrelic
import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
-import akka.actor.{ ActorSystem, ActorLogging, Actor }
-import akka.event.LoggingAdapter
+import akka.actor.{ ActorLogging, Actor }
import akka.io.IO
import akka.util.Timeout
-import kamon.Kamon
-import kamon.metric.{ CollectionContext, Metrics }
+import com.typesafe.config.Config
import spray.can.Http
import spray.json._
-import scala.concurrent.{ ExecutionContext, Future }
-import spray.httpx.{ SprayJsonSupport, ResponseTransformation }
-import spray.http._
+import scala.concurrent.Future
+import spray.httpx.SprayJsonSupport
import spray.json.lenses.JsonLenses._
import java.lang.management.ManagementFactory
-import spray.http.Uri.Query
import scala.concurrent.duration._
import Agent._
-
+import JsonProtocol._
import akka.pattern.pipe
-// TODO: Setup a proper host connector with custom timeout configuration for use with this.
-class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging {
- import JsonProtocol._
+class Agent extends Actor with SprayJsonSupport with ActorLogging {
import context.dispatcher
- implicit val operationTimeout = Timeout(30 seconds)
- val collectorClient = compressedToJsonPipeline(IO(Http)(context.system))
- val settings = buildAgentSettings(context.system)
- val baseQuery = Query(
- "license_key" -> settings.licenseKey,
- "marshal_format" -> "json",
- "protocol_version" -> "12")
+ val agentSettings = AgentSettings.fromConfig(context.system.settings.config)
+
+ // Start the reporters
+ context.actorOf(MetricReporter.props(agentSettings), "metric-reporter")
// Start the connection to the New Relic collector.
- self ! Initialize
+ self ! Connect
+
+ def receive: Receive = disconnected(agentSettings.maxConnectionRetries)
+
+ def disconnected(attemptsLeft: Int): Receive = {
+ case Connect ⇒ pipe(connectToCollector) to self
+ case Connected(collector, runID) ⇒ configureChildren(collector, runID)
+ case ConnectFailed(reason) if (attemptsLeft > 0) ⇒ scheduleReconnection(reason, attemptsLeft)
+ case ConnectFailed(reason) ⇒ giveUpConnection()
+ }
- def receive: Receive = uninitialized(settings.maxRetries)
+ def connected: Receive = {
+ case Reconnect ⇒ reconnect()
+ case Shutdown ⇒ shutdown()
+ }
- def uninitialized(attemptsLeft: Int): Receive = {
- case Initialize ⇒ pipe(connectToCollector) to self
- case Initialized(runID, collector) ⇒
- log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector)
+ def reconnect(): Unit = {
+ log.warning("New Relic request the agent to restart the connection, all reporters will be paused until a new connection is available.")
+ self ! Connect
+ context.children.foreach(_ ! ResetConfiguration)
+ context become disconnected(agentSettings.maxConnectionRetries)
+ }
- val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery)
- context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter")
+ def shutdown(): Unit = {
+ log.error("New Relic requested the agent to be stopped, no metrics will be reported after this point.")
+ context stop self
+ }
+
+ def configureChildren(collector: String, runID: Long): Unit = {
+ log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}]", runID, collector)
+ context.children.foreach(_ ! Configure(collector, runID))
+ context become connected
+ }
- case InitializationFailed(reason) if (attemptsLeft > 0) ⇒
- log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds)
- context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize)
- context become (uninitialized(attemptsLeft - 1))
+ def scheduleReconnection(connectionFailureReason: Throwable, attemptsLeft: Int): Unit = {
+ log.error(connectionFailureReason, "Initialization failed, retrying in {} seconds", agentSettings.retryDelay.toSeconds)
+ context.system.scheduler.scheduleOnce(agentSettings.retryDelay, self, Connect)
+ context become (disconnected(attemptsLeft - 1))
+ }
- case InitializationFailed(reason) ⇒
- log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.")
- context.stop(self)
+ def giveUpConnection(): Unit = {
+ log.error("Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.")
+ context.stop(self)
}
- def connectToCollector: Future[InitResult] = {
+ def connectToCollector: Future[ConnectResult] = {
(for {
collector ← selectCollector
- runId ← connect(collector, settings)
- } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) }
+ runID ← connect(collector, agentSettings)
+ } yield Connected(collector, runID)) recover { case error ⇒ ConnectFailed(error) }
}
def selectCollector: Future[String] = {
- val query = ("method" -> "get_redirect_host") +: baseQuery
- val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)
-
- collectorClient {
- Post(getRedirectHostUri, JsArray())
-
- } map { json ⇒
+ val apiClient = new ApiMethodClient("collector.newrelic.com", None, agentSettings, IO(Http)(context.system))
+ apiClient.invokeMethod(RawMethods.GetRedirectHost, JsArray()) map { json ⇒
json.extract[String]('return_value)
}
}
- def connect(collectorHost: String, connect: Settings): Future[Long] = {
- log.debug("Connecting to NewRelic Collector [{}]", collectorHost)
-
- val query = ("method" -> "connect") +: baseQuery
- val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)
-
- collectorClient {
- Post(connectUri, connect)
-
- } map { json ⇒
+ def connect(collectorHost: String, connect: AgentSettings): Future[Long] = {
+ val apiClient = new ApiMethodClient(collectorHost, None, agentSettings, IO(Http)(context.system))
+ apiClient.invokeMethod(RawMethods.Connect, connect) map { json ⇒
json.extract[Long]('return_value / 'agent_run_id)
}
}
}
object Agent {
- case object Initialize
- sealed trait InitResult
- case class Initialized(runId: Long, collector: String) extends InitResult
- case class InitializationFailed(reason: Throwable) extends InitResult
- case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double)
-
- def buildAgentSettings(system: ActorSystem) = {
- val config = system.settings.config.getConfig("kamon.newrelic")
- val appName = config.getString("app-name")
- val licenseKey = config.getString("license-key")
- val maxRetries = config.getInt("max-initialize-retries")
- val retryDelay = FiniteDuration(config.getDuration("initialize-retry-delay", milliseconds), milliseconds)
- val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds.
+ case object Connect
+ case object Reconnect
+ case object Shutdown
+ case object ResetConfiguration
+ case class Configure(collector: String, runID: Long)
+
+ sealed trait ConnectResult
+ case class Connected(collector: String, runID: Long) extends ConnectResult
+ case class ConnectFailed(reason: Throwable) extends ConnectResult
+}
+
+case class AgentSettings(licenseKey: String, appName: String, hostname: String, pid: Int, operationTimeout: Timeout,
+ maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double)
+object AgentSettings {
+
+ def fromConfig(config: Config) = {
// Name has the format of 'pid'@'host'
val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
-
- Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT)
+ val newRelicConfig = config.getConfig("kamon.newrelic")
+
+ AgentSettings(
+ newRelicConfig.getString("license-key"),
+ newRelicConfig.getString("app-name"),
+ runtimeName(1),
+ runtimeName(0).toInt,
+ Timeout(newRelicConfig.getDuration("operation-timeout", milliseconds).millis),
+ newRelicConfig.getInt("max-connect-retries"),
+ FiniteDuration(newRelicConfig.getDuration("connect-retry-delay", milliseconds), milliseconds),
+ newRelicConfig.getDuration("apdexT", milliseconds) / 1E3D)
}
} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala
new file mode 100644
index 00000000..8b95a959
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala
@@ -0,0 +1,69 @@
+package kamon.newrelic
+
+import akka.actor.ActorRef
+import kamon.newrelic.ApiMethodClient.{ NewRelicException, AgentShutdownRequiredException, AgentRestartRequiredException }
+import spray.http.Uri.Query
+import spray.http._
+import spray.httpx.encoding.Deflate
+import spray.httpx.marshalling.Marshaller
+import spray.httpx.unmarshalling._
+import spray.json.{ JsonParser, JsValue }
+import spray.json.lenses.JsonLenses._
+import spray.json.DefaultJsonProtocol._
+import spray.client.pipelining._
+
+import scala.concurrent.{ Future, ExecutionContext }
+import scala.util.control.NoStackTrace
+
+class ApiMethodClient(host: String, val runID: Option[Long], agentSettings: AgentSettings, httpTransport: ActorRef)(implicit exeContext: ExecutionContext) {
+
+ implicit val to = agentSettings.operationTimeout
+
+ val baseQuery = Query(runID.map(ri ⇒ Map("run_id" -> String.valueOf(ri))).getOrElse(Map.empty[String, String]) +
+ ("license_key" -> agentSettings.licenseKey) +
+ ("marshal_format" -> "json") +
+ ("protocol_version" -> "12"))
+
+ // New Relic responses contain JSON but with text/plain content type :(.
+ implicit val JsValueUnmarshaller = Unmarshaller[JsValue](MediaTypes.`application/json`, MediaTypes.`text/plain`) {
+ case x: HttpEntity.NonEmpty ⇒
+ JsonParser(x.asString(defaultCharset = HttpCharsets.`UTF-8`))
+ }
+
+ val httpClient = encode(Deflate) ~> sendReceive(httpTransport) ~> decode(Deflate) ~> unmarshal[JsValue]
+ val baseCollectorUri = Uri("/agent_listener/invoke_raw_method").withHost(host).withScheme("http")
+
+ def invokeMethod[T: Marshaller](method: String, payload: T): Future[JsValue] = {
+ val methodQuery = ("method" -> method) +: baseQuery
+
+ httpClient(Post(baseCollectorUri.withQuery(methodQuery), payload)) map { jsResponse ⇒
+ jsResponse.extract[String]('exception.? / 'error_type.?).map(_ match {
+ case CollectorErrors.`ForceRestart` ⇒ throw AgentRestartRequiredException
+ case CollectorErrors.`ForceShutdown` ⇒ throw AgentShutdownRequiredException
+ case anyOtherError ⇒
+ val errorMessage = jsResponse.extract[String]('exception / 'message.?).getOrElse("no message")
+ println("Error occured: " + anyOtherError + " - " + errorMessage)
+ throw NewRelicException(anyOtherError, errorMessage)
+ })
+
+ jsResponse
+ }
+ }
+}
+
+object ApiMethodClient {
+ case class NewRelicException(exceptionType: String, message: String) extends RuntimeException with NoStackTrace
+ case object AgentRestartRequiredException extends RuntimeException with NoStackTrace
+ case object AgentShutdownRequiredException extends RuntimeException with NoStackTrace
+}
+
+object RawMethods {
+ val GetRedirectHost = "get_redirect_host"
+ val Connect = "connect"
+ val MetricData = "metric_data"
+}
+
+object CollectorErrors {
+ val ForceRestart = "NewRelic::Agent::ForceRestartException"
+ val ForceShutdown = "NewRelic::Agent::ForceDisconnectException"
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala
deleted file mode 100644
index ca003646..00000000
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package kamon.newrelic
-
-import akka.actor.ActorRef
-import akka.util.Timeout
-import spray.http.{ HttpResponse, HttpRequest }
-import spray.httpx.RequestBuilding
-import spray.httpx.encoding.Deflate
-import spray.json._
-import spray.client.pipelining.sendReceive
-
-import scala.concurrent.{ ExecutionContext, Future }
-
-trait ClientPipelines extends RequestBuilding {
-
- def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] =
- encode(Deflate) ~> sendReceive(transport)
-
- def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] =
- compressedPipeline(transport) ~> toJson
-
- def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson
-
-}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
index 84472593..e97c24dc 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
@@ -18,11 +18,10 @@ package kamon.newrelic
import kamon.metric.UserMetrics.UserMetricGroup
import kamon.metric._
-import kamon.newrelic.Agent.Settings
object CustomMetricExtractor extends MetricExtractor {
- def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
+ def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
metrics.collect {
case (mg: UserMetricGroup, groupSnapshot) ⇒
groupSnapshot.metrics collect {
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
index 26e8839e..0e53be0b 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
@@ -15,18 +15,18 @@
* ========================================================== */
package kamon.newrelic
+import kamon.Timestamp
import spray.json._
-import kamon.newrelic.Agent._
object JsonProtocol extends DefaultJsonProtocol {
- implicit object ConnectJsonWriter extends RootJsonWriter[Settings] {
- def write(obj: Settings): JsValue =
+ implicit object ConnectJsonWriter extends RootJsonWriter[AgentSettings] {
+ def write(obj: AgentSettings): JsValue =
JsArray(
JsObject(
"agent_version" -> JsString("3.1.0"),
"app_name" -> JsArray(JsString(obj.appName)),
- "host" -> JsString(obj.host),
+ "host" -> JsString(obj.hostname),
"identifier" -> JsString(s"java:${obj.appName}"),
"language" -> JsString("java"),
"pid" -> JsNumber(obj.pid)))
@@ -87,8 +87,8 @@ object JsonProtocol extends DefaultJsonProtocol {
def read(json: JsValue): MetricBatch = json match {
case JsArray(elements) ⇒
val runID = elements(0).convertTo[Long]
- val timeSliceFrom = elements(1).convertTo[Long]
- val timeSliceTo = elements(2).convertTo[Long]
+ val timeSliceFrom = new Timestamp(elements(1).convertTo[Long])
+ val timeSliceTo = new Timestamp(elements(2).convertTo[Long])
val metrics = elements(3).convertTo[Seq[Metric]]
MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap))
@@ -99,8 +99,8 @@ object JsonProtocol extends DefaultJsonProtocol {
def write(obj: MetricBatch): JsValue =
JsArray(
JsNumber(obj.runID),
- JsNumber(obj.timeSliceMetrics.from),
- JsNumber(obj.timeSliceMetrics.to),
+ JsNumber(obj.timeSliceMetrics.from.seconds),
+ JsNumber(obj.timeSliceMetrics.to.seconds),
obj.timeSliceMetrics.metrics.toSeq.toJson)
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
index 14541483..52d21f31 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
@@ -1,5 +1,6 @@
package kamon.newrelic
+import kamon.Timestamp
import kamon.metric.instrument.{ Counter, Histogram }
import kamon.metric.{ MetricSnapshot, Scale }
@@ -40,12 +41,12 @@ object Metric {
}
}
-case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) {
+case class TimeSliceMetrics(from: Timestamp, to: Timestamp, metrics: Map[MetricID, MetricData]) {
import kamon.metric.combineMaps
def merge(that: TimeSliceMetrics): TimeSliceMetrics = {
- val mergedFrom = math.min(from, that.from)
- val mergedTo = math.max(to, that.to)
+ val mergedFrom = Timestamp.earlier(from, that.from)
+ val mergedTo = Timestamp.later(to, that.to)
val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r))
TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics)
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
index 0aa078f5..286b0a77 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
@@ -5,99 +5,113 @@ import java.util.concurrent.TimeUnit
import akka.actor.{ Props, ActorLogging, Actor }
import akka.pattern.pipe
import akka.io.IO
-import akka.util.Timeout
import kamon.Kamon
import kamon.metric.Subscriptions.TickMetricSnapshot
import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms }
import kamon.metric._
-import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult }
+import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException }
+import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded }
import spray.can.Http
-import spray.http.Uri
import spray.httpx.SprayJsonSupport
-import scala.concurrent.Future
import scala.concurrent.duration._
-import scala.util.control.NoStackTrace
+import JsonProtocol._
-class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor
- with ClientPipelines with ActorLogging with SprayJsonSupport {
-
- import JsonProtocol._
- import MetricReporter.Extractors
+class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging with SprayJsonSupport {
import context.dispatcher
- val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query
- val metricDataUri = baseUri.withQuery(metricDataQuery)
-
- implicit val operationTimeout = Timeout(30 seconds)
val metricsExtension = Kamon(Metrics)(context.system)
val collectionContext = metricsExtension.buildDefaultCollectionContext
- val collectorClient = compressedPipeline(IO(Http)(context.system))
-
- val subscriber = {
+ val metricsSubscriber = {
val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS)
- if (tickInterval == 60000)
- self
- else
- context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer")
+
+ // Metrics are always sent to New Relic in 60 seconds intervals.
+ if (tickInterval == 60000) self
+ else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer")
+ }
+
+ subscribeToMetrics()
+
+ def receive = awaitingConfiguration(None)
+
+ def awaitingConfiguration(bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
+ case Agent.Configure(collector, runID) ⇒ startReporting(collector, runID, bufferedMetrics)
+ case Agent.ResetConfiguration ⇒ // Stay waiting.
+ case tickSnapshot: TickMetricSnapshot ⇒ keepWaitingForConfig(tickSnapshot, bufferedMetrics)
+ case PostSucceeded ⇒ // Ignore
+ case PostFailed(reason) ⇒ // Ignore any problems until we get a new configuration
}
- // Subscribe to Trace Metrics
- metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true)
+ def reporting(apiClient: ApiMethodClient, bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
+ case tick: TickMetricSnapshot ⇒ sendMetricData(apiClient, tick, bufferedMetrics)
+ case PostSucceeded ⇒ context become reporting(apiClient, None)
+ case PostFailed(reason) ⇒ processCollectorFailure(reason)
+ case Agent.ResetConfiguration ⇒ context become awaitingConfiguration(bufferedMetrics)
+ }
+
+ def sendMetricData(apiClient: ApiMethodClient, tick: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
+ val metricsToReport = merge(convertToTimeSliceMetrics(tick), bufferedMetrics)
+ val customMarshaller = sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)
- // Subscribe to all User Metrics
- metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true)
- metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true)
- metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true)
- metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true)
+ if (log.isDebugEnabled)
+ log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", metricsToReport.metrics.size,
+ metricsToReport.from, metricsToReport.to)
- def receive = reporting(None)
+ pipe {
+ apiClient.invokeMethod(RawMethods.MetricData, MetricBatch(apiClient.runID.get, metricsToReport))(customMarshaller)
+ .map { _ ⇒ PostSucceeded }
+ .recover { case error ⇒ PostFailed(error) }
+ } to self
- def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = {
- case TickMetricSnapshot(from, to, metrics) ⇒
- val fromInSeconds = (from / 1E3).toInt
- val toInSeconds = (to / 1E3).toInt
- val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap
- val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics)
+ context become reporting(apiClient, Some(metricsToReport))
+ }
- val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n))
- context become reporting(Some(metricsToReport))
- pipe(sendMetricData(metricsToReport)) to self
+ def processCollectorFailure(failureReason: Throwable): Unit = failureReason match {
+ case AgentRestartRequiredException ⇒ context.parent ! Agent.Reconnect
+ case AgentShutdownRequiredException ⇒ context.parent ! Agent.Shutdown
+ case anyOtherFailure ⇒
+ log.error(anyOtherFailure, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
+ }
- case PostSucceeded ⇒
- context become (reporting(None))
+ def startReporting(collector: String, runID: Long, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
+ val apiClient = new ApiMethodClient(collector, Some(runID), settings, IO(Http)(context.system))
+ context become reporting(apiClient, bufferedMetrics)
+ }
- case PostFailed(reason) ⇒
- log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
+ def keepWaitingForConfig(tickSnapshot: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
+ val timeSliceMetrics = convertToTimeSliceMetrics(tickSnapshot)
+ context become awaitingConfiguration(Some(merge(timeSliceMetrics, bufferedMetrics)))
}
- def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = {
- log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to)
+ def merge(tsm: TimeSliceMetrics, buffered: Option[TimeSliceMetrics]): TimeSliceMetrics =
+ buffered.foldLeft(tsm)((p, n) ⇒ p.merge(n))
+
+ def convertToTimeSliceMetrics(tick: TickMetricSnapshot): TimeSliceMetrics = {
+ val extractedMetrics = MetricReporter.MetricExtractors.flatMap(_.extract(settings, collectionContext, tick.metrics)).toMap
+ TimeSliceMetrics(tick.from.toTimestamp, tick.to.toTimestamp, extractedMetrics)
+ }
- collectorClient {
- Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter))
+ def subscribeToMetrics(): Unit = {
+ // Subscribe to Trace Metrics
+ metricsExtension.subscribe(TraceMetrics, "*", metricsSubscriber, permanently = true)
- } map { response ⇒
- if (response.status.isSuccess)
- PostSucceeded
- else
- PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector."))
- } recover { case t: Throwable ⇒ PostFailed(t) }
+ // Subscribe to all User Metrics
+ metricsExtension.subscribe(UserHistograms, "*", metricsSubscriber, permanently = true)
+ metricsExtension.subscribe(UserCounters, "*", metricsSubscriber, permanently = true)
+ metricsExtension.subscribe(UserMinMaxCounters, "*", metricsSubscriber, permanently = true)
+ metricsExtension.subscribe(UserGauges, "*", metricsSubscriber, permanently = true)
}
}
object MetricReporter {
- val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
-
- def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props =
- Props(new MetricReporter(settings, runID, baseUri))
+ def props(settings: AgentSettings): Props = Props(new MetricReporter(settings))
sealed trait MetricDataPostResult
case object PostSucceeded extends MetricDataPostResult
case class PostFailed(reason: Throwable) extends MetricDataPostResult
- class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace
+ val MetricExtractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
}
trait MetricExtractor {
- def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData]
+ def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData]
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
index 0a4a516b..a47cac55 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
@@ -16,16 +16,16 @@
package kamon.newrelic
-import scala.collection.mutable;
+import scala.collection.mutable
import kamon.metric._
-import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
import kamon.metric.instrument.Histogram
import kamon.trace.SegmentCategory.HttpClient
import kamon.trace.SegmentMetricIdentity
object WebTransactionMetricExtractor extends MetricExtractor {
- def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
+ def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT)
// Trace metrics are recorded in nanoseconds.