aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-newrelic/src')
-rw-r--r--kamon-newrelic/src/main/resources/reference.conf15
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala154
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala68
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala23
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala19
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala16
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala30
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala134
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala20
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala72
-rw-r--r--kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala14
-rw-r--r--kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala86
12 files changed, 381 insertions, 270 deletions
diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf
index c86e64ae..9dc793e1 100644
--- a/kamon-newrelic/src/main/resources/reference.conf
+++ b/kamon-newrelic/src/main/resources/reference.conf
@@ -14,12 +14,23 @@ kamon {
# Your New Relic license key.
license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5
+ # Time to wait for a response when calling any of the New Relic collector API methods.
+ operation-timeout = 30 seconds
+
# attempts to send pending metrics in the next tick,
# combining the current metrics plus the pending, after max-retry, deletes all pending metrics
- max-initialize-retries = 3
+ max-connect-retries = 3
# delay between connection attempts to NewRelic collector
- initialize-retry-delay = 30 seconds
+ connect-retry-delay = 30 seconds
+ }
+
+ modules {
+ kamon-newrelic {
+ auto-start = yes
+ requires-aspectj = no
+ extension-id = "kamon.newrelic.NewRelic"
+ }
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index 6244c0ad..5f6383f8 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -18,113 +18,129 @@ package kamon.newrelic
import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
-import akka.actor.{ ActorSystem, ActorLogging, Actor }
-import akka.event.LoggingAdapter
+import akka.actor.{ ActorLogging, Actor }
import akka.io.IO
import akka.util.Timeout
-import kamon.Kamon
-import kamon.metric.{ CollectionContext, Metrics }
+import com.typesafe.config.Config
import spray.can.Http
import spray.json._
-import scala.concurrent.{ ExecutionContext, Future }
-import spray.httpx.{ SprayJsonSupport, ResponseTransformation }
-import spray.http._
+import scala.concurrent.Future
+import spray.httpx.SprayJsonSupport
import spray.json.lenses.JsonLenses._
import java.lang.management.ManagementFactory
-import spray.http.Uri.Query
import scala.concurrent.duration._
import Agent._
-
+import JsonProtocol._
import akka.pattern.pipe
-// TODO: Setup a proper host connector with custom timeout configuration for use with this.
-class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging {
- import JsonProtocol._
+class Agent extends Actor with SprayJsonSupport with ActorLogging {
import context.dispatcher
- implicit val operationTimeout = Timeout(30 seconds)
- val collectorClient = compressedToJsonPipeline(IO(Http)(context.system))
- val settings = buildAgentSettings(context.system)
- val baseQuery = Query(
- "license_key" -> settings.licenseKey,
- "marshal_format" -> "json",
- "protocol_version" -> "12")
+ val agentSettings = AgentSettings.fromConfig(context.system.settings.config)
+
+ // Start the reporters
+ context.actorOf(MetricReporter.props(agentSettings), "metric-reporter")
// Start the connection to the New Relic collector.
- self ! Initialize
+ self ! Connect
+
+ def receive: Receive = disconnected(agentSettings.maxConnectionRetries)
- def receive: Receive = uninitialized(settings.maxRetries)
+ def disconnected(attemptsLeft: Int): Receive = {
+ case Connect ⇒ pipe(connectToCollector) to self
+ case Connected(collector, runID) ⇒ configureChildren(collector, runID)
+ case ConnectFailed(reason) if (attemptsLeft > 0) ⇒ scheduleReconnection(reason, attemptsLeft)
+ case ConnectFailed(reason) ⇒ giveUpConnection()
+ }
- def uninitialized(attemptsLeft: Int): Receive = {
- case Initialize ⇒ pipe(connectToCollector) to self
- case Initialized(runID, collector) ⇒
- log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector)
+ def connected: Receive = {
+ case Reconnect ⇒ reconnect()
+ case Shutdown ⇒ shutdown()
+ }
- val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery)
- context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter")
+ def reconnect(): Unit = {
+ log.warning("New Relic request the agent to restart the connection, all reporters will be paused until a new connection is available.")
+ self ! Connect
+ context.children.foreach(_ ! ResetConfiguration)
+ context become disconnected(agentSettings.maxConnectionRetries)
+ }
- case InitializationFailed(reason) if (attemptsLeft > 0) ⇒
- log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds)
- context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize)
- context become (uninitialized(attemptsLeft - 1))
+ def shutdown(): Unit = {
+ log.error("New Relic requested the agent to be stopped, no metrics will be reported after this point.")
+ context stop self
+ }
+
+ def configureChildren(collector: String, runID: Long): Unit = {
+ log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}]", runID, collector)
+ context.children.foreach(_ ! Configure(collector, runID))
+ context become connected
+ }
+
+ def scheduleReconnection(connectionFailureReason: Throwable, attemptsLeft: Int): Unit = {
+ log.error(connectionFailureReason, "Initialization failed, retrying in {} seconds", agentSettings.retryDelay.toSeconds)
+ context.system.scheduler.scheduleOnce(agentSettings.retryDelay, self, Connect)
+ context become (disconnected(attemptsLeft - 1))
+ }
- case InitializationFailed(reason) ⇒
- log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.")
- context.stop(self)
+ def giveUpConnection(): Unit = {
+ log.error("Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.")
+ context.stop(self)
}
- def connectToCollector: Future[InitResult] = {
+ def connectToCollector: Future[ConnectResult] = {
(for {
collector ← selectCollector
- runId ← connect(collector, settings)
- } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) }
+ runID ← connect(collector, agentSettings)
+ } yield Connected(collector, runID)) recover { case error ⇒ ConnectFailed(error) }
}
def selectCollector: Future[String] = {
- val query = ("method" -> "get_redirect_host") +: baseQuery
- val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)
-
- collectorClient {
- Post(getRedirectHostUri, JsArray())
-
- } map { json ⇒
+ val apiClient = new ApiMethodClient("collector.newrelic.com", None, agentSettings, IO(Http)(context.system))
+ apiClient.invokeMethod(RawMethods.GetRedirectHost, JsArray()) map { json ⇒
json.extract[String]('return_value)
}
}
- def connect(collectorHost: String, connect: Settings): Future[Long] = {
- log.debug("Connecting to NewRelic Collector [{}]", collectorHost)
-
- val query = ("method" -> "connect") +: baseQuery
- val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)
-
- collectorClient {
- Post(connectUri, connect)
-
- } map { json ⇒
+ def connect(collectorHost: String, connect: AgentSettings): Future[Long] = {
+ val apiClient = new ApiMethodClient(collectorHost, None, agentSettings, IO(Http)(context.system))
+ apiClient.invokeMethod(RawMethods.Connect, connect) map { json ⇒
json.extract[Long]('return_value / 'agent_run_id)
}
}
}
object Agent {
- case object Initialize
- sealed trait InitResult
- case class Initialized(runId: Long, collector: String) extends InitResult
- case class InitializationFailed(reason: Throwable) extends InitResult
- case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double)
-
- def buildAgentSettings(system: ActorSystem) = {
- val config = system.settings.config.getConfig("kamon.newrelic")
- val appName = config.getString("app-name")
- val licenseKey = config.getString("license-key")
- val maxRetries = config.getInt("max-initialize-retries")
- val retryDelay = FiniteDuration(config.getMilliseconds("initialize-retry-delay"), milliseconds)
- val apdexT: Double = config.getMilliseconds("apdexT").toDouble
+ case object Connect
+ case object Reconnect
+ case object Shutdown
+ case object ResetConfiguration
+ case class Configure(collector: String, runID: Long)
+
+ sealed trait ConnectResult
+ case class Connected(collector: String, runID: Long) extends ConnectResult
+ case class ConnectFailed(reason: Throwable) extends ConnectResult
+}
+
+case class AgentSettings(licenseKey: String, appName: String, hostname: String, pid: Int, operationTimeout: Timeout,
+ maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double)
+
+object AgentSettings {
+
+ def fromConfig(config: Config) = {
+ import kamon.util.ConfigTools.Syntax
// Name has the format of 'pid'@'host'
val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
-
- Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT)
+ val newRelicConfig = config.getConfig("kamon.newrelic")
+
+ AgentSettings(
+ newRelicConfig.getString("license-key"),
+ newRelicConfig.getString("app-name"),
+ runtimeName(1),
+ runtimeName(0).toInt,
+ Timeout(newRelicConfig.getFiniteDuration("operation-timeout")),
+ newRelicConfig.getInt("max-connect-retries"),
+ newRelicConfig.getFiniteDuration("connect-retry-delay"),
+ newRelicConfig.getFiniteDuration("apdexT").toMillis / 1E3D)
}
} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala
new file mode 100644
index 00000000..263faa63
--- /dev/null
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala
@@ -0,0 +1,68 @@
+package kamon.newrelic
+
+import akka.actor.ActorRef
+import kamon.newrelic.ApiMethodClient.{ NewRelicException, AgentShutdownRequiredException, AgentRestartRequiredException }
+import spray.http.Uri.Query
+import spray.http._
+import spray.httpx.encoding.Deflate
+import spray.httpx.marshalling.Marshaller
+import spray.httpx.unmarshalling._
+import spray.json.{ JsonParser, JsValue }
+import spray.json.lenses.JsonLenses._
+import spray.json.DefaultJsonProtocol._
+import spray.client.pipelining._
+
+import scala.concurrent.{ Future, ExecutionContext }
+import scala.util.control.NoStackTrace
+
+class ApiMethodClient(host: String, val runID: Option[Long], agentSettings: AgentSettings, httpTransport: ActorRef)(implicit exeContext: ExecutionContext) {
+
+ implicit val to = agentSettings.operationTimeout
+
+ val baseQuery = Query(runID.map(ri ⇒ Map("run_id" -> String.valueOf(ri))).getOrElse(Map.empty[String, String]) +
+ ("license_key" -> agentSettings.licenseKey) +
+ ("marshal_format" -> "json") +
+ ("protocol_version" -> "12"))
+
+ // New Relic responses contain JSON but with text/plain content type :(.
+ implicit val JsValueUnmarshaller = Unmarshaller[JsValue](MediaTypes.`application/json`, MediaTypes.`text/plain`) {
+ case x: HttpEntity.NonEmpty ⇒
+ JsonParser(x.asString(defaultCharset = HttpCharsets.`UTF-8`))
+ }
+
+ val httpClient = encode(Deflate) ~> sendReceive(httpTransport) ~> decode(Deflate) ~> unmarshal[JsValue]
+ val baseCollectorUri = Uri("/agent_listener/invoke_raw_method").withHost(host).withScheme("http")
+
+ def invokeMethod[T: Marshaller](method: String, payload: T): Future[JsValue] = {
+ val methodQuery = ("method" -> method) +: baseQuery
+
+ httpClient(Post(baseCollectorUri.withQuery(methodQuery), payload)) map { jsResponse ⇒
+ jsResponse.extract[String]('exception.? / 'error_type.?).map(_ match {
+ case CollectorErrors.`ForceRestart` ⇒ throw AgentRestartRequiredException
+ case CollectorErrors.`ForceShutdown` ⇒ throw AgentShutdownRequiredException
+ case anyOtherError ⇒
+ val errorMessage = jsResponse.extract[String]('exception / 'message.?).getOrElse("no message")
+ throw NewRelicException(anyOtherError, errorMessage)
+ })
+
+ jsResponse
+ }
+ }
+}
+
+object ApiMethodClient {
+ case class NewRelicException(exceptionType: String, message: String) extends RuntimeException with NoStackTrace
+ case object AgentRestartRequiredException extends RuntimeException with NoStackTrace
+ case object AgentShutdownRequiredException extends RuntimeException with NoStackTrace
+}
+
+object RawMethods {
+ val GetRedirectHost = "get_redirect_host"
+ val Connect = "connect"
+ val MetricData = "metric_data"
+}
+
+object CollectorErrors {
+ val ForceRestart = "NewRelic::Agent::ForceRestartException"
+ val ForceShutdown = "NewRelic::Agent::ForceDisconnectException"
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala
deleted file mode 100644
index ca003646..00000000
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package kamon.newrelic
-
-import akka.actor.ActorRef
-import akka.util.Timeout
-import spray.http.{ HttpResponse, HttpRequest }
-import spray.httpx.RequestBuilding
-import spray.httpx.encoding.Deflate
-import spray.json._
-import spray.client.pipelining.sendReceive
-
-import scala.concurrent.{ ExecutionContext, Future }
-
-trait ClientPipelines extends RequestBuilding {
-
- def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] =
- encode(Deflate) ~> sendReceive(transport)
-
- def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] =
- compressedPipeline(transport) ~> toJson
-
- def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson
-
-}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
index 84472593..3b1b8cb3 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala
@@ -16,18 +16,17 @@
package kamon.newrelic
-import kamon.metric.UserMetrics.UserMetricGroup
-import kamon.metric._
-import kamon.newrelic.Agent.Settings
+import kamon.metric.{ UserMetricsExtensionImpl, EntitySnapshot, Entity }
+import kamon.metric.instrument.CollectionContext
object CustomMetricExtractor extends MetricExtractor {
- def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
- metrics.collect {
- case (mg: UserMetricGroup, groupSnapshot) ⇒
- groupSnapshot.metrics collect {
- case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit)
- }
- }.flatten.toMap
+ def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = {
+ metrics.get(UserMetricsExtensionImpl.UserMetricEntity).map { allUserMetrics ⇒
+ allUserMetrics.metrics.map {
+ case (key, snapshot) ⇒ Metric(snapshot, key.unitOfMeasurement, s"Custom/${key.name}", None)
+ }
+
+ } getOrElse (Map.empty)
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
index 26e8839e..6e16b975 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala
@@ -15,18 +15,18 @@
* ========================================================== */
package kamon.newrelic
+import kamon.util.Timestamp
import spray.json._
-import kamon.newrelic.Agent._
object JsonProtocol extends DefaultJsonProtocol {
- implicit object ConnectJsonWriter extends RootJsonWriter[Settings] {
- def write(obj: Settings): JsValue =
+ implicit object ConnectJsonWriter extends RootJsonWriter[AgentSettings] {
+ def write(obj: AgentSettings): JsValue =
JsArray(
JsObject(
"agent_version" -> JsString("3.1.0"),
"app_name" -> JsArray(JsString(obj.appName)),
- "host" -> JsString(obj.host),
+ "host" -> JsString(obj.hostname),
"identifier" -> JsString(s"java:${obj.appName}"),
"language" -> JsString("java"),
"pid" -> JsNumber(obj.pid)))
@@ -87,8 +87,8 @@ object JsonProtocol extends DefaultJsonProtocol {
def read(json: JsValue): MetricBatch = json match {
case JsArray(elements) ⇒
val runID = elements(0).convertTo[Long]
- val timeSliceFrom = elements(1).convertTo[Long]
- val timeSliceTo = elements(2).convertTo[Long]
+ val timeSliceFrom = new Timestamp(elements(1).convertTo[Long])
+ val timeSliceTo = new Timestamp(elements(2).convertTo[Long])
val metrics = elements(3).convertTo[Seq[Metric]]
MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap))
@@ -99,8 +99,8 @@ object JsonProtocol extends DefaultJsonProtocol {
def write(obj: MetricBatch): JsValue =
JsArray(
JsNumber(obj.runID),
- JsNumber(obj.timeSliceMetrics.from),
- JsNumber(obj.timeSliceMetrics.to),
+ JsNumber(obj.timeSliceMetrics.from.seconds),
+ JsNumber(obj.timeSliceMetrics.to.seconds),
obj.timeSliceMetrics.metrics.toSeq.toJson)
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
index 14541483..20204b79 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
@@ -1,7 +1,8 @@
package kamon.newrelic
-import kamon.metric.instrument.{ Counter, Histogram }
-import kamon.metric.{ MetricSnapshot, Scale }
+import kamon.metric.instrument._
+import kamon.metric.MetricKey
+import kamon.util.{ MapMerge, Timestamp }
case class MetricID(name: String, scope: Option[String])
case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) {
@@ -17,16 +18,23 @@ case class MetricData(callCount: Long, total: Double, totalExclusive: Double, mi
object Metric {
- def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = {
+ def scaleFunction(uom: UnitOfMeasurement): Long ⇒ Double = uom match {
+ case time: Time ⇒ time.scale(Time.Seconds)
+ case other ⇒ _.toDouble
+ }
+
+ def apply(snapshot: InstrumentSnapshot, snapshotUnit: UnitOfMeasurement, name: String, scope: Option[String]): Metric = {
snapshot match {
case hs: Histogram.Snapshot ⇒
var total: Double = 0D
var sumOfSquares: Double = 0D
- val scaledMin = Scale.convert(hs.scale, targetScale, hs.min)
- val scaledMax = Scale.convert(hs.scale, targetScale, hs.max)
+ val scaler = scaleFunction(snapshotUnit)
+
+ val scaledMin = scaler(hs.min)
+ val scaledMax = scaler(hs.max)
hs.recordsIterator.foreach { record ⇒
- val scaledValue = Scale.convert(hs.scale, targetScale, record.level)
+ val scaledValue = scaler(record.level)
total += scaledValue * record.count
sumOfSquares += (scaledValue * scaledValue) * record.count
@@ -40,13 +48,13 @@ object Metric {
}
}
-case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) {
- import kamon.metric.combineMaps
+case class TimeSliceMetrics(from: Timestamp, to: Timestamp, metrics: Map[MetricID, MetricData]) {
+ import MapMerge.Syntax
def merge(that: TimeSliceMetrics): TimeSliceMetrics = {
- val mergedFrom = math.min(from, that.from)
- val mergedTo = math.max(to, that.to)
- val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r))
+ val mergedFrom = Timestamp.earlier(from, that.from)
+ val mergedTo = Timestamp.later(to, that.to)
+ val mergedMetrics = metrics.merge(that.metrics, (l, r) ⇒ l.merge(r))
TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics)
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
index 9742ed09..842fbdc6 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
@@ -1,103 +1,109 @@
package kamon.newrelic
-import java.util.concurrent.TimeUnit
-
import akka.actor.{ Props, ActorLogging, Actor }
import akka.pattern.pipe
import akka.io.IO
-import akka.util.Timeout
import kamon.Kamon
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms }
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.metric._
-import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult }
+import kamon.metric.instrument.CollectionContext
+import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException }
+import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded }
import spray.can.Http
-import spray.http.Uri
import spray.httpx.SprayJsonSupport
-import scala.concurrent.Future
import scala.concurrent.duration._
-import scala.util.control.NoStackTrace
-
-class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor
- with ClientPipelines with ActorLogging with SprayJsonSupport {
+import JsonProtocol._
- import JsonProtocol._
- import MetricReporter.Extractors
+class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging with SprayJsonSupport {
import context.dispatcher
- val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query
- val metricDataUri = baseUri.withQuery(metricDataQuery)
-
- implicit val operationTimeout = Timeout(30 seconds)
- val metricsExtension = Kamon(Metrics)(context.system)
+ val metricsExtension = Kamon.metrics
val collectionContext = metricsExtension.buildDefaultCollectionContext
- val collectorClient = compressedPipeline(IO(Http)(context.system))
-
- val subscriber = {
- val tickInterval = context.system.settings.config.getMilliseconds("kamon.metrics.tick-interval")
- if (tickInterval == 60000)
- self
- else
- context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer")
+ val metricsSubscriber = {
+ val tickInterval = Kamon.metrics.settings.tickInterval.toMillis
+
+ // Metrics are always sent to New Relic in 60 seconds intervals.
+ if (tickInterval == 60000) self
+ else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer")
}
- // Subscribe to Trace Metrics
- metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true)
+ subscribeToMetrics()
- // Subscribe to all User Metrics
- metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true)
- metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true)
- metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true)
- metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true)
+ def receive = awaitingConfiguration(None)
- def receive = reporting(None)
+ def awaitingConfiguration(bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
+ case Agent.Configure(collector, runID) ⇒ startReporting(collector, runID, bufferedMetrics)
+ case Agent.ResetConfiguration ⇒ // Stay waiting.
+ case tickSnapshot: TickMetricSnapshot ⇒ keepWaitingForConfig(tickSnapshot, bufferedMetrics)
+ case PostSucceeded ⇒ // Ignore
+ case PostFailed(reason) ⇒ // Ignore any problems until we get a new configuration
+ }
+
+ def reporting(apiClient: ApiMethodClient, bufferedMetrics: Option[TimeSliceMetrics]): Receive = {
+ case tick: TickMetricSnapshot ⇒ sendMetricData(apiClient, tick, bufferedMetrics)
+ case PostSucceeded ⇒ context become reporting(apiClient, None)
+ case PostFailed(reason) ⇒ processCollectorFailure(reason)
+ case Agent.ResetConfiguration ⇒ context become awaitingConfiguration(bufferedMetrics)
+ }
- def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = {
- case TickMetricSnapshot(from, to, metrics) ⇒
- val fromInSeconds = (from / 1E3).toInt
- val toInSeconds = (to / 1E3).toInt
- val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap
- val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics)
+ def sendMetricData(apiClient: ApiMethodClient, tick: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
+ val metricsToReport = merge(convertToTimeSliceMetrics(tick), bufferedMetrics)
+ val customMarshaller = sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)
- val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n))
- context become reporting(Some(metricsToReport))
- pipe(sendMetricData(metricsToReport)) to self
+ if (log.isDebugEnabled)
+ log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", metricsToReport.metrics.size,
+ metricsToReport.from, metricsToReport.to)
- case PostSucceeded ⇒
- context become (reporting(None))
+ pipe {
+ apiClient.invokeMethod(RawMethods.MetricData, MetricBatch(apiClient.runID.get, metricsToReport))(customMarshaller)
+ .map { _ ⇒ PostSucceeded }
+ .recover { case error ⇒ PostFailed(error) }
+ } to self
- case PostFailed(reason) ⇒
- log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
+ context become reporting(apiClient, Some(metricsToReport))
}
- def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = {
- log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to)
+ def processCollectorFailure(failureReason: Throwable): Unit = failureReason match {
+ case AgentRestartRequiredException ⇒ context.parent ! Agent.Reconnect
+ case AgentShutdownRequiredException ⇒ context.parent ! Agent.Shutdown
+ case anyOtherFailure ⇒
+ log.error(anyOtherFailure, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.")
+ }
- collectorClient {
- Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter))
+ def startReporting(collector: String, runID: Long, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
+ val apiClient = new ApiMethodClient(collector, Some(runID), settings, IO(Http)(context.system))
+ context become reporting(apiClient, bufferedMetrics)
+ }
+
+ def keepWaitingForConfig(tickSnapshot: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = {
+ val timeSliceMetrics = convertToTimeSliceMetrics(tickSnapshot)
+ context become awaitingConfiguration(Some(merge(timeSliceMetrics, bufferedMetrics)))
+ }
+
+ def merge(tsm: TimeSliceMetrics, buffered: Option[TimeSliceMetrics]): TimeSliceMetrics =
+ buffered.foldLeft(tsm)((p, n) ⇒ p.merge(n))
- } map { response ⇒
- if (response.status.isSuccess)
- PostSucceeded
- else
- PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector."))
- } recover { case t: Throwable ⇒ PostFailed(t) }
+ def convertToTimeSliceMetrics(tick: TickMetricSnapshot): TimeSliceMetrics = {
+ val extractedMetrics = MetricReporter.MetricExtractors.flatMap(_.extract(settings, collectionContext, tick.metrics)).toMap
+ TimeSliceMetrics(tick.from.toTimestamp, tick.to.toTimestamp, extractedMetrics)
+ }
+
+ def subscribeToMetrics(): Unit = {
+ metricsExtension.subscribe("trace", "*", metricsSubscriber, permanently = true)
+ metricsExtension.subscribe("user-metrics", "*", metricsSubscriber, permanently = true)
}
}
object MetricReporter {
- val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
-
- def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props =
- Props(new MetricReporter(settings, runID, baseUri))
+ def props(settings: AgentSettings): Props = Props(new MetricReporter(settings))
sealed trait MetricDataPostResult
case object PostSucceeded extends MetricDataPostResult
case class PostFailed(reason: Throwable) extends MetricDataPostResult
- class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace
+ val MetricExtractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil
}
trait MetricExtractor {
- def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData]
+ def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData]
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
index 08fdc8c4..7f56d931 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
@@ -21,7 +21,8 @@ import java.util
import akka.actor.{ Actor, ActorLogging }
import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized }
import com.newrelic.api.agent.{ NewRelic ⇒ NR }
-import kamon.trace.{ TraceRecorder, TraceContextAware }
+import kamon.trace.TraceLocal.HttpContextKey
+import kamon.trace.{ TraceContext, TraceLocal, TraceContextAware }
trait CustomParamsSupport {
this: NewRelicErrorLogger ⇒
@@ -40,9 +41,20 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo
def notifyError(error: Error): Unit = runInFakeTransaction {
val params = new util.HashMap[String, String]()
- val ctx = error.asInstanceOf[TraceContextAware].traceContext
- params put ("TraceToken", ctx.token)
+ if (error.isInstanceOf[TraceContextAware]) {
+ val ctx = error.asInstanceOf[TraceContextAware].traceContext
+ val httpContext = TraceLocal.retrieve(HttpContextKey)
+
+ params put ("TraceToken", ctx.token)
+
+ httpContext.map { httpCtx ⇒
+ params put ("User-Agent", httpCtx.agent)
+ params put ("X-Forwarded-For", httpCtx.xforwarded)
+ params put ("Request-URI", httpCtx.uri)
+ }
+ }
+
customParams foreach { case (k, v) ⇒ params.put(k, v) }
if (error.cause == Error.NoCause) NR.noticeError(error.message.toString, params)
@@ -52,7 +64,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo
//Really ugly, but temporal hack until next release...
def runInFakeTransaction[T](thunk: ⇒ T): T = {
val oldName = Thread.currentThread.getName
- Thread.currentThread.setName(TraceRecorder.currentContext.name)
+ Thread.currentThread.setName(TraceContext.currentContext.name)
try thunk finally Thread.currentThread.setName(oldName)
}
} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
index 0a4a516b..d0144f4b 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala
@@ -16,77 +16,81 @@
package kamon.newrelic
-import scala.collection.mutable;
-import kamon.metric._
-import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime }
-import kamon.metric.instrument.Histogram
-import kamon.trace.SegmentCategory.HttpClient
-import kamon.trace.SegmentMetricIdentity
+import kamon.metric.{ EntitySnapshot, Entity }
+
+import scala.collection.mutable
+import kamon.metric.instrument.{ Time, CollectionContext, Histogram }
object WebTransactionMetricExtractor extends MetricExtractor {
- def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = {
+ def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = {
val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT)
// Trace metrics are recorded in nanoseconds.
- var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano)
- var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano)
+ var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty
+ var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty
val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]]
val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]]
val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]]
- val transactionMetrics = metrics.collect {
- case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒
-
- tms.segments.foreach {
- case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒
- accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext)
+ val transactionMetrics = metrics.filterKeys(_.category == "trace").map {
+ case (entity: Entity, es: EntitySnapshot) ⇒
+ // Trace metrics only have elapsed-time and segments and all of them are Histograms.
+ es.histograms.foreach {
+ case (key, segmentSnapshot) if key.metadata.get("category").filter(_ == "http-client").nonEmpty ⇒
+ val library = key.metadata("library")
+ accumulatedExternalServices = accumulatedExternalServices.merge(segmentSnapshot, collectionContext)
// Accumulate externals by host
- externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil))
+ externalByHostSnapshots.update(key.name, segmentSnapshot :: externalByHostSnapshots.getOrElse(key.name, Nil))
// Accumulate externals by host and library
- externalByHostAndLibrarySnapshots.update((segmentName, library),
- snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil))
+ externalByHostAndLibrarySnapshots.update((key.name, library),
+ segmentSnapshot :: externalByHostAndLibrarySnapshots.getOrElse((key.name, library), Nil))
// Accumulate externals by host and library, including the transaction as scope.
- externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName),
- snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil))
+ externalScopedByHostAndLibrarySnapshots.update((key.name, library, entity.name),
+ segmentSnapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((key.name, library, entity.name), Nil))
- }
+ case otherSegments ⇒
- accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext)
- tms.elapsedTime.recordsIterator.foreach { record ⇒
- apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count)
}
- Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit)
- }
+ es.histograms.collect {
+ case (key, elapsedTime) if key.name == "elapsed-time" ⇒
+ accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(elapsedTime, collectionContext)
+ elapsedTime.recordsIterator.foreach { record ⇒
+ apdexBuilder.record(Time.Nanoseconds.scale(Time.Seconds)(record.level), record.count)
+ }
+
+ Metric(elapsedTime, key.unitOfMeasurement, "WebTransaction/Custom/" + entity.name, None)
+ }
+ } flatten
- val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit)
+ val httpDispatcher = Metric(accumulatedHttpDispatcher, Time.Seconds, "HttpDispatcher", None)
val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None))
val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None))
- val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit)
+ val externalAllWeb = Metric(accumulatedExternalServices, Time.Seconds, "External/allWeb", None)
val externalAll = externalAllWeb.copy(MetricID("External/all", None))
val externalByHost = externalByHostSnapshots.map {
case (host, snapshots) ⇒
- val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext))
- Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit)
+ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext))
+ Metric(mergedSnapshots, Time.Seconds, s"External/$host/all", None)
}
val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map {
case ((host, library), snapshots) ⇒
- val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext))
- Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit)
+ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext))
+ Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", None)
}
val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map {
case ((host, library, traceName), snapshots) ⇒
- val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext))
- Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit)
+ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext))
+ Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName))
}
Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++
diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala
index 7db9f2d0..3e15e9fd 100644
--- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala
+++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala
@@ -22,7 +22,6 @@ import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.io.IO
import akka.testkit._
import com.typesafe.config.ConfigFactory
-import kamon.AkkaExtensionSwap
import org.scalatest.{ BeforeAndAfterAll, WordSpecLike }
import spray.can.Http
import spray.http._
@@ -30,6 +29,7 @@ import spray.httpx.encoding.Deflate
import spray.httpx.{ SprayJsonSupport, RequestBuilding }
import spray.json.JsArray
import spray.json._
+import testkit.AkkaExtensionSwap
class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll with RequestBuilding with SprayJsonSupport {
import JsonProtocol._
@@ -44,9 +44,11 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit
| newrelic {
| app-name = kamon
| license-key = 1111111111
- | initialize-retry-delay = 1 second
- | max-initialize-retries = 3
+ | connect-retry-delay = 1 second
+ | max-connect-retries = 3
| }
+ |
+ | modules.kamon-newrelic.auto-start = no
|}
|
""".stripMargin))
@@ -88,7 +90,7 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit
})
// Receive the runID
- EventFilter.info(message = "Agent initialized with runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept {
+ EventFilter.info(message = "Configuring New Relic reporters to use runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept {
httpManager.reply(jsonResponse(
"""
| {
@@ -147,7 +149,7 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit
// Receive the runID
EventFilter.info(
- message = "Agent initialized with runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept {
+ message = "Configuring New Relic reporters to use runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept {
httpManager.reply(jsonResponse(
"""
@@ -184,7 +186,7 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit
})
// Give up on connecting.
- EventFilter[RuntimeException](message = "Giving up while trying to set up a connection with the New Relic collector.", occurrences = 1).intercept {
+ EventFilter.error(message = "Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.", occurrences = 1).intercept {
httpManager.reply(Timedout(request))
}
}
diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala
index 0001072e..04380677 100644
--- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala
+++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala
@@ -16,40 +16,46 @@
package kamon.newrelic
-import akka.actor.{ ActorRef, ActorSystem }
+import akka.actor.ActorRef
import akka.io.IO
import akka.testkit._
+import akka.util.Timeout
import com.typesafe.config.ConfigFactory
-import kamon.metric.{ TraceMetrics, Metrics }
-import kamon.{ Kamon, AkkaExtensionSwap }
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric.{ Entity, TraceMetrics }
+import kamon.testkit.BaseKamonSpec
+import kamon.util.MilliTimestamp
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import spray.can.Http
import spray.http.Uri.Query
import spray.http._
import spray.httpx.encoding.Deflate
-import spray.httpx.{ RequestBuilding, SprayJsonSupport }
+import spray.httpx.SprayJsonSupport
+import testkit.AkkaExtensionSwap
import scala.concurrent.duration._
import spray.json._
-class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with SprayJsonSupport {
+class MetricReporterSpec extends BaseKamonSpec("metric-reporter-spec") with SprayJsonSupport {
import kamon.newrelic.JsonProtocol._
- implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString(
- """
- |akka {
- | loggers = ["akka.testkit.TestEventListener"]
- | loglevel = "INFO"
- |}
- |kamon {
- | metric {
- | tick-interval = 1 hour
- | }
- |}
- |
- """.stripMargin))
-
- val agentSettings = Agent.Settings("1111111111", "kamon", "test-host", 1, 1, 30 seconds, 1D)
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |akka {
+ | loggers = ["akka.testkit.TestEventListener"]
+ | loglevel = "INFO"
+ |}
+ |kamon {
+ | metric {
+ | tick-interval = 1 hour
+ | }
+ |
+ | modules.kamon-newrelic.auto-start = no
+ |}
+ |
+ """.stripMargin)
+
+ val agentSettings = AgentSettings("1111111111", "kamon", "test-host", 1, Timeout(5 seconds), 1, 30 seconds, 1D)
val baseQuery = Query(
"license_key" -> agentSettings.licenseKey,
"marshal_format" -> "json",
@@ -59,8 +65,9 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit
"the MetricReporter" should {
"report metrics to New Relic upon arrival" in new FakeTickSnapshotsFixture {
val httpManager = setHttpManager(TestProbe())
- val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri))
+ val metricReporter = system.actorOf(MetricReporter.props(agentSettings))
+ metricReporter ! Agent.Configure("collector-1.newrelic.com", 9999)
metricReporter ! firstSnapshot
val metricPost = httpManager.expectMsgType[HttpRequest]
@@ -70,8 +77,8 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit
val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch]
postedBatch.runID should be(9999)
- postedBatch.timeSliceMetrics.from should be(1415587618)
- postedBatch.timeSliceMetrics.to should be(1415587678)
+ postedBatch.timeSliceMetrics.from.seconds should be(1415587618)
+ postedBatch.timeSliceMetrics.to.seconds should be(1415587678)
val metrics = postedBatch.timeSliceMetrics.metrics
metrics(MetricID("Apdex", None)).callCount should be(3)
@@ -81,8 +88,9 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit
"accumulate metrics if posting fails" in new FakeTickSnapshotsFixture {
val httpManager = setHttpManager(TestProbe())
- val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri))
+ val metricReporter = system.actorOf(MetricReporter.props(agentSettings))
+ metricReporter ! Agent.Configure("collector-1.newrelic.com", 9999)
metricReporter ! firstSnapshot
val request = httpManager.expectMsgType[HttpRequest]
httpManager.reply(Timedout(request))
@@ -96,8 +104,8 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit
val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch]
postedBatch.runID should be(9999)
- postedBatch.timeSliceMetrics.from should be(1415587618)
- postedBatch.timeSliceMetrics.to should be(1415587738)
+ postedBatch.timeSliceMetrics.from.seconds should be(1415587618)
+ postedBatch.timeSliceMetrics.to.seconds should be(1415587738)
val metrics = postedBatch.timeSliceMetrics.metrics
metrics(MetricID("Apdex", None)).callCount should be(6)
@@ -130,20 +138,20 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit
}
trait FakeTickSnapshotsFixture {
- val testTraceID = TraceMetrics("example-trace")
- val recorder = Kamon(Metrics).register(testTraceID, TraceMetrics.Factory).get
- val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
+ val testTraceID = Entity("example-trace", "trace")
+ val recorder = Kamon.metrics.register(TraceMetrics, testTraceID.name).get.recorder
+ val collectionContext = Kamon.metrics.buildDefaultCollectionContext
def collectRecorder = recorder.collect(collectionContext)
- recorder.elapsedTime.record(1000000)
- recorder.elapsedTime.record(2000000)
- recorder.elapsedTime.record(3000000)
- val firstSnapshot = TickMetricSnapshot(1415587618000L, 1415587678000L, Map(testTraceID -> collectRecorder))
+ recorder.ElapsedTime.record(1000000)
+ recorder.ElapsedTime.record(2000000)
+ recorder.ElapsedTime.record(3000000)
+ val firstSnapshot = TickMetricSnapshot(new MilliTimestamp(1415587618000L), new MilliTimestamp(1415587678000L), Map(testTraceID -> collectRecorder))
- recorder.elapsedTime.record(6000000)
- recorder.elapsedTime.record(5000000)
- recorder.elapsedTime.record(4000000)
- val secondSnapshot = TickMetricSnapshot(1415587678000L, 1415587738000L, Map(testTraceID -> collectRecorder))
+ recorder.ElapsedTime.record(6000000)
+ recorder.ElapsedTime.record(5000000)
+ recorder.ElapsedTime.record(4000000)
+ val secondSnapshot = TickMetricSnapshot(new MilliTimestamp(1415587678000L), new MilliTimestamp(1415587738000L), Map(testTraceID -> collectRecorder))
}
} \ No newline at end of file