diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-02-14 13:50:36 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-02-14 13:50:36 +0100 |
commit | e931727454fbd97fb39d163255edbcdcd7bcdbc6 (patch) | |
tree | 98b4bdcaa1af1dbbf201036ce05021bc096db62f /kamon-newrelic/src | |
parent | 8af0dfb1e2c8892023dd1bc6fbae1dae2ffb16ba (diff) | |
parent | 66b35556aa1bf0975cefa35603660991cdfcc526 (diff) | |
download | Kamon-e931727454fbd97fb39d163255edbcdcd7bcdbc6.tar.gz Kamon-e931727454fbd97fb39d163255edbcdcd7bcdbc6.tar.bz2 Kamon-e931727454fbd97fb39d163255edbcdcd7bcdbc6.zip |
Merge branch 'single-kamon-instance-per-jvm' into release-legacy-akka-2.2
Conflicts:
kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala
kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala
kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala
kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala
kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala
kamon-play/src/main/scala/kamon/play/Play.scala
kamon-play/src/main/scala/kamon/play/action/KamonTraceActions.scala
kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
project/Dependencies.scala
version.sbt
Diffstat (limited to 'kamon-newrelic/src')
12 files changed, 381 insertions, 270 deletions
diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index c86e64ae..9dc793e1 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -14,12 +14,23 @@ kamon { # Your New Relic license key. license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 + # Time to wait for a response when calling any of the New Relic collector API methods. + operation-timeout = 30 seconds + # attempts to send pending metrics in the next tick, # combining the current metrics plus the pending, after max-retry, deletes all pending metrics - max-initialize-retries = 3 + max-connect-retries = 3 # delay between connection attempts to NewRelic collector - initialize-retry-delay = 30 seconds + connect-retry-delay = 30 seconds + } + + modules { + kamon-newrelic { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.newrelic.NewRelic" + } } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 6244c0ad..5f6383f8 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -18,113 +18,129 @@ package kamon.newrelic import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } -import akka.actor.{ ActorSystem, ActorLogging, Actor } -import akka.event.LoggingAdapter +import akka.actor.{ ActorLogging, Actor } import akka.io.IO import akka.util.Timeout -import kamon.Kamon -import kamon.metric.{ CollectionContext, Metrics } +import com.typesafe.config.Config import spray.can.Http import spray.json._ -import scala.concurrent.{ ExecutionContext, Future } -import spray.httpx.{ SprayJsonSupport, ResponseTransformation } -import spray.http._ +import scala.concurrent.Future +import spray.httpx.SprayJsonSupport import spray.json.lenses.JsonLenses._ import java.lang.management.ManagementFactory -import spray.http.Uri.Query import scala.concurrent.duration._ import Agent._ - +import JsonProtocol._ import akka.pattern.pipe -// TODO: Setup a proper host connector with custom timeout configuration for use with this. -class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging { - import JsonProtocol._ +class Agent extends Actor with SprayJsonSupport with ActorLogging { import context.dispatcher - implicit val operationTimeout = Timeout(30 seconds) - val collectorClient = compressedToJsonPipeline(IO(Http)(context.system)) - val settings = buildAgentSettings(context.system) - val baseQuery = Query( - "license_key" -> settings.licenseKey, - "marshal_format" -> "json", - "protocol_version" -> "12") + val agentSettings = AgentSettings.fromConfig(context.system.settings.config) + + // Start the reporters + context.actorOf(MetricReporter.props(agentSettings), "metric-reporter") // Start the connection to the New Relic collector. - self ! Initialize + self ! Connect + + def receive: Receive = disconnected(agentSettings.maxConnectionRetries) - def receive: Receive = uninitialized(settings.maxRetries) + def disconnected(attemptsLeft: Int): Receive = { + case Connect ⇒ pipe(connectToCollector) to self + case Connected(collector, runID) ⇒ configureChildren(collector, runID) + case ConnectFailed(reason) if (attemptsLeft > 0) ⇒ scheduleReconnection(reason, attemptsLeft) + case ConnectFailed(reason) ⇒ giveUpConnection() + } - def uninitialized(attemptsLeft: Int): Receive = { - case Initialize ⇒ pipe(connectToCollector) to self - case Initialized(runID, collector) ⇒ - log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector) + def connected: Receive = { + case Reconnect ⇒ reconnect() + case Shutdown ⇒ shutdown() + } - val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery) - context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter") + def reconnect(): Unit = { + log.warning("New Relic request the agent to restart the connection, all reporters will be paused until a new connection is available.") + self ! Connect + context.children.foreach(_ ! ResetConfiguration) + context become disconnected(agentSettings.maxConnectionRetries) + } - case InitializationFailed(reason) if (attemptsLeft > 0) ⇒ - log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds) - context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize) - context become (uninitialized(attemptsLeft - 1)) + def shutdown(): Unit = { + log.error("New Relic requested the agent to be stopped, no metrics will be reported after this point.") + context stop self + } + + def configureChildren(collector: String, runID: Long): Unit = { + log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}]", runID, collector) + context.children.foreach(_ ! Configure(collector, runID)) + context become connected + } + + def scheduleReconnection(connectionFailureReason: Throwable, attemptsLeft: Int): Unit = { + log.error(connectionFailureReason, "Initialization failed, retrying in {} seconds", agentSettings.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(agentSettings.retryDelay, self, Connect) + context become (disconnected(attemptsLeft - 1)) + } - case InitializationFailed(reason) ⇒ - log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.") - context.stop(self) + def giveUpConnection(): Unit = { + log.error("Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.") + context.stop(self) } - def connectToCollector: Future[InitResult] = { + def connectToCollector: Future[ConnectResult] = { (for { collector ← selectCollector - runId ← connect(collector, settings) - } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) } + runID ← connect(collector, agentSettings) + } yield Connected(collector, runID)) recover { case error ⇒ ConnectFailed(error) } } def selectCollector: Future[String] = { - val query = ("method" -> "get_redirect_host") +: baseQuery - val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) - - collectorClient { - Post(getRedirectHostUri, JsArray()) - - } map { json ⇒ + val apiClient = new ApiMethodClient("collector.newrelic.com", None, agentSettings, IO(Http)(context.system)) + apiClient.invokeMethod(RawMethods.GetRedirectHost, JsArray()) map { json ⇒ json.extract[String]('return_value) } } - def connect(collectorHost: String, connect: Settings): Future[Long] = { - log.debug("Connecting to NewRelic Collector [{}]", collectorHost) - - val query = ("method" -> "connect") +: baseQuery - val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) - - collectorClient { - Post(connectUri, connect) - - } map { json ⇒ + def connect(collectorHost: String, connect: AgentSettings): Future[Long] = { + val apiClient = new ApiMethodClient(collectorHost, None, agentSettings, IO(Http)(context.system)) + apiClient.invokeMethod(RawMethods.Connect, connect) map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } } object Agent { - case object Initialize - sealed trait InitResult - case class Initialized(runId: Long, collector: String) extends InitResult - case class InitializationFailed(reason: Throwable) extends InitResult - case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double) - - def buildAgentSettings(system: ActorSystem) = { - val config = system.settings.config.getConfig("kamon.newrelic") - val appName = config.getString("app-name") - val licenseKey = config.getString("license-key") - val maxRetries = config.getInt("max-initialize-retries") - val retryDelay = FiniteDuration(config.getMilliseconds("initialize-retry-delay"), milliseconds) - val apdexT: Double = config.getMilliseconds("apdexT").toDouble + case object Connect + case object Reconnect + case object Shutdown + case object ResetConfiguration + case class Configure(collector: String, runID: Long) + + sealed trait ConnectResult + case class Connected(collector: String, runID: Long) extends ConnectResult + case class ConnectFailed(reason: Throwable) extends ConnectResult +} + +case class AgentSettings(licenseKey: String, appName: String, hostname: String, pid: Int, operationTimeout: Timeout, + maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double) + +object AgentSettings { + + def fromConfig(config: Config) = { + import kamon.util.ConfigTools.Syntax // Name has the format of 'pid'@'host' val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') - - Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT) + val newRelicConfig = config.getConfig("kamon.newrelic") + + AgentSettings( + newRelicConfig.getString("license-key"), + newRelicConfig.getString("app-name"), + runtimeName(1), + runtimeName(0).toInt, + Timeout(newRelicConfig.getFiniteDuration("operation-timeout")), + newRelicConfig.getInt("max-connect-retries"), + newRelicConfig.getFiniteDuration("connect-retry-delay"), + newRelicConfig.getFiniteDuration("apdexT").toMillis / 1E3D) } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala new file mode 100644 index 00000000..263faa63 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala @@ -0,0 +1,68 @@ +package kamon.newrelic + +import akka.actor.ActorRef +import kamon.newrelic.ApiMethodClient.{ NewRelicException, AgentShutdownRequiredException, AgentRestartRequiredException } +import spray.http.Uri.Query +import spray.http._ +import spray.httpx.encoding.Deflate +import spray.httpx.marshalling.Marshaller +import spray.httpx.unmarshalling._ +import spray.json.{ JsonParser, JsValue } +import spray.json.lenses.JsonLenses._ +import spray.json.DefaultJsonProtocol._ +import spray.client.pipelining._ + +import scala.concurrent.{ Future, ExecutionContext } +import scala.util.control.NoStackTrace + +class ApiMethodClient(host: String, val runID: Option[Long], agentSettings: AgentSettings, httpTransport: ActorRef)(implicit exeContext: ExecutionContext) { + + implicit val to = agentSettings.operationTimeout + + val baseQuery = Query(runID.map(ri ⇒ Map("run_id" -> String.valueOf(ri))).getOrElse(Map.empty[String, String]) + + ("license_key" -> agentSettings.licenseKey) + + ("marshal_format" -> "json") + + ("protocol_version" -> "12")) + + // New Relic responses contain JSON but with text/plain content type :(. + implicit val JsValueUnmarshaller = Unmarshaller[JsValue](MediaTypes.`application/json`, MediaTypes.`text/plain`) { + case x: HttpEntity.NonEmpty ⇒ + JsonParser(x.asString(defaultCharset = HttpCharsets.`UTF-8`)) + } + + val httpClient = encode(Deflate) ~> sendReceive(httpTransport) ~> decode(Deflate) ~> unmarshal[JsValue] + val baseCollectorUri = Uri("/agent_listener/invoke_raw_method").withHost(host).withScheme("http") + + def invokeMethod[T: Marshaller](method: String, payload: T): Future[JsValue] = { + val methodQuery = ("method" -> method) +: baseQuery + + httpClient(Post(baseCollectorUri.withQuery(methodQuery), payload)) map { jsResponse ⇒ + jsResponse.extract[String]('exception.? / 'error_type.?).map(_ match { + case CollectorErrors.`ForceRestart` ⇒ throw AgentRestartRequiredException + case CollectorErrors.`ForceShutdown` ⇒ throw AgentShutdownRequiredException + case anyOtherError ⇒ + val errorMessage = jsResponse.extract[String]('exception / 'message.?).getOrElse("no message") + throw NewRelicException(anyOtherError, errorMessage) + }) + + jsResponse + } + } +} + +object ApiMethodClient { + case class NewRelicException(exceptionType: String, message: String) extends RuntimeException with NoStackTrace + case object AgentRestartRequiredException extends RuntimeException with NoStackTrace + case object AgentShutdownRequiredException extends RuntimeException with NoStackTrace +} + +object RawMethods { + val GetRedirectHost = "get_redirect_host" + val Connect = "connect" + val MetricData = "metric_data" +} + +object CollectorErrors { + val ForceRestart = "NewRelic::Agent::ForceRestartException" + val ForceShutdown = "NewRelic::Agent::ForceDisconnectException" +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala deleted file mode 100644 index ca003646..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala +++ /dev/null @@ -1,23 +0,0 @@ -package kamon.newrelic - -import akka.actor.ActorRef -import akka.util.Timeout -import spray.http.{ HttpResponse, HttpRequest } -import spray.httpx.RequestBuilding -import spray.httpx.encoding.Deflate -import spray.json._ -import spray.client.pipelining.sendReceive - -import scala.concurrent.{ ExecutionContext, Future } - -trait ClientPipelines extends RequestBuilding { - - def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] = - encode(Deflate) ~> sendReceive(transport) - - def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] = - compressedPipeline(transport) ~> toJson - - def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson - -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala index 84472593..3b1b8cb3 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala @@ -16,18 +16,17 @@ package kamon.newrelic -import kamon.metric.UserMetrics.UserMetricGroup -import kamon.metric._ -import kamon.newrelic.Agent.Settings +import kamon.metric.{ UserMetricsExtensionImpl, EntitySnapshot, Entity } +import kamon.metric.instrument.CollectionContext object CustomMetricExtractor extends MetricExtractor { - def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { - metrics.collect { - case (mg: UserMetricGroup, groupSnapshot) ⇒ - groupSnapshot.metrics collect { - case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit) - } - }.flatten.toMap + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = { + metrics.get(UserMetricsExtensionImpl.UserMetricEntity).map { allUserMetrics ⇒ + allUserMetrics.metrics.map { + case (key, snapshot) ⇒ Metric(snapshot, key.unitOfMeasurement, s"Custom/${key.name}", None) + } + + } getOrElse (Map.empty) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index 26e8839e..6e16b975 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -15,18 +15,18 @@ * ========================================================== */ package kamon.newrelic +import kamon.util.Timestamp import spray.json._ -import kamon.newrelic.Agent._ object JsonProtocol extends DefaultJsonProtocol { - implicit object ConnectJsonWriter extends RootJsonWriter[Settings] { - def write(obj: Settings): JsValue = + implicit object ConnectJsonWriter extends RootJsonWriter[AgentSettings] { + def write(obj: AgentSettings): JsValue = JsArray( JsObject( "agent_version" -> JsString("3.1.0"), "app_name" -> JsArray(JsString(obj.appName)), - "host" -> JsString(obj.host), + "host" -> JsString(obj.hostname), "identifier" -> JsString(s"java:${obj.appName}"), "language" -> JsString("java"), "pid" -> JsNumber(obj.pid))) @@ -87,8 +87,8 @@ object JsonProtocol extends DefaultJsonProtocol { def read(json: JsValue): MetricBatch = json match { case JsArray(elements) ⇒ val runID = elements(0).convertTo[Long] - val timeSliceFrom = elements(1).convertTo[Long] - val timeSliceTo = elements(2).convertTo[Long] + val timeSliceFrom = new Timestamp(elements(1).convertTo[Long]) + val timeSliceTo = new Timestamp(elements(2).convertTo[Long]) val metrics = elements(3).convertTo[Seq[Metric]] MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap)) @@ -99,8 +99,8 @@ object JsonProtocol extends DefaultJsonProtocol { def write(obj: MetricBatch): JsValue = JsArray( JsNumber(obj.runID), - JsNumber(obj.timeSliceMetrics.from), - JsNumber(obj.timeSliceMetrics.to), + JsNumber(obj.timeSliceMetrics.from.seconds), + JsNumber(obj.timeSliceMetrics.to.seconds), obj.timeSliceMetrics.metrics.toSeq.toJson) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala index 14541483..20204b79 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -1,7 +1,8 @@ package kamon.newrelic -import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metric.{ MetricSnapshot, Scale } +import kamon.metric.instrument._ +import kamon.metric.MetricKey +import kamon.util.{ MapMerge, Timestamp } case class MetricID(name: String, scope: Option[String]) case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) { @@ -17,16 +18,23 @@ case class MetricData(callCount: Long, total: Double, totalExclusive: Double, mi object Metric { - def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = { + def scaleFunction(uom: UnitOfMeasurement): Long ⇒ Double = uom match { + case time: Time ⇒ time.scale(Time.Seconds) + case other ⇒ _.toDouble + } + + def apply(snapshot: InstrumentSnapshot, snapshotUnit: UnitOfMeasurement, name: String, scope: Option[String]): Metric = { snapshot match { case hs: Histogram.Snapshot ⇒ var total: Double = 0D var sumOfSquares: Double = 0D - val scaledMin = Scale.convert(hs.scale, targetScale, hs.min) - val scaledMax = Scale.convert(hs.scale, targetScale, hs.max) + val scaler = scaleFunction(snapshotUnit) + + val scaledMin = scaler(hs.min) + val scaledMax = scaler(hs.max) hs.recordsIterator.foreach { record ⇒ - val scaledValue = Scale.convert(hs.scale, targetScale, record.level) + val scaledValue = scaler(record.level) total += scaledValue * record.count sumOfSquares += (scaledValue * scaledValue) * record.count @@ -40,13 +48,13 @@ object Metric { } } -case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) { - import kamon.metric.combineMaps +case class TimeSliceMetrics(from: Timestamp, to: Timestamp, metrics: Map[MetricID, MetricData]) { + import MapMerge.Syntax def merge(that: TimeSliceMetrics): TimeSliceMetrics = { - val mergedFrom = math.min(from, that.from) - val mergedTo = math.max(to, that.to) - val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)) + val mergedFrom = Timestamp.earlier(from, that.from) + val mergedTo = Timestamp.later(to, that.to) + val mergedMetrics = metrics.merge(that.metrics, (l, r) ⇒ l.merge(r)) TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index 9742ed09..842fbdc6 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -1,103 +1,109 @@ package kamon.newrelic -import java.util.concurrent.TimeUnit - import akka.actor.{ Props, ActorLogging, Actor } import akka.pattern.pipe import akka.io.IO -import akka.util.Timeout import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import kamon.metric._ -import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult } +import kamon.metric.instrument.CollectionContext +import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException } +import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded } import spray.can.Http -import spray.http.Uri import spray.httpx.SprayJsonSupport -import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util.control.NoStackTrace - -class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor - with ClientPipelines with ActorLogging with SprayJsonSupport { +import JsonProtocol._ - import JsonProtocol._ - import MetricReporter.Extractors +class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging with SprayJsonSupport { import context.dispatcher - val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query - val metricDataUri = baseUri.withQuery(metricDataQuery) - - implicit val operationTimeout = Timeout(30 seconds) - val metricsExtension = Kamon(Metrics)(context.system) + val metricsExtension = Kamon.metrics val collectionContext = metricsExtension.buildDefaultCollectionContext - val collectorClient = compressedPipeline(IO(Http)(context.system)) - - val subscriber = { - val tickInterval = context.system.settings.config.getMilliseconds("kamon.metrics.tick-interval") - if (tickInterval == 60000) - self - else - context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") + val metricsSubscriber = { + val tickInterval = Kamon.metrics.settings.tickInterval.toMillis + + // Metrics are always sent to New Relic in 60 seconds intervals. + if (tickInterval == 60000) self + else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") } - // Subscribe to Trace Metrics - metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true) + subscribeToMetrics() - // Subscribe to all User Metrics - metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true) - metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true) - metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true) - metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true) + def receive = awaitingConfiguration(None) - def receive = reporting(None) + def awaitingConfiguration(bufferedMetrics: Option[TimeSliceMetrics]): Receive = { + case Agent.Configure(collector, runID) ⇒ startReporting(collector, runID, bufferedMetrics) + case Agent.ResetConfiguration ⇒ // Stay waiting. + case tickSnapshot: TickMetricSnapshot ⇒ keepWaitingForConfig(tickSnapshot, bufferedMetrics) + case PostSucceeded ⇒ // Ignore + case PostFailed(reason) ⇒ // Ignore any problems until we get a new configuration + } + + def reporting(apiClient: ApiMethodClient, bufferedMetrics: Option[TimeSliceMetrics]): Receive = { + case tick: TickMetricSnapshot ⇒ sendMetricData(apiClient, tick, bufferedMetrics) + case PostSucceeded ⇒ context become reporting(apiClient, None) + case PostFailed(reason) ⇒ processCollectorFailure(reason) + case Agent.ResetConfiguration ⇒ context become awaitingConfiguration(bufferedMetrics) + } - def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = { - case TickMetricSnapshot(from, to, metrics) ⇒ - val fromInSeconds = (from / 1E3).toInt - val toInSeconds = (to / 1E3).toInt - val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap - val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics) + def sendMetricData(apiClient: ApiMethodClient, tick: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = { + val metricsToReport = merge(convertToTimeSliceMetrics(tick), bufferedMetrics) + val customMarshaller = sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter) - val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n)) - context become reporting(Some(metricsToReport)) - pipe(sendMetricData(metricsToReport)) to self + if (log.isDebugEnabled) + log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", metricsToReport.metrics.size, + metricsToReport.from, metricsToReport.to) - case PostSucceeded ⇒ - context become (reporting(None)) + pipe { + apiClient.invokeMethod(RawMethods.MetricData, MetricBatch(apiClient.runID.get, metricsToReport))(customMarshaller) + .map { _ ⇒ PostSucceeded } + .recover { case error ⇒ PostFailed(error) } + } to self - case PostFailed(reason) ⇒ - log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") + context become reporting(apiClient, Some(metricsToReport)) } - def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = { - log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) + def processCollectorFailure(failureReason: Throwable): Unit = failureReason match { + case AgentRestartRequiredException ⇒ context.parent ! Agent.Reconnect + case AgentShutdownRequiredException ⇒ context.parent ! Agent.Shutdown + case anyOtherFailure ⇒ + log.error(anyOtherFailure, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") + } - collectorClient { - Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)) + def startReporting(collector: String, runID: Long, bufferedMetrics: Option[TimeSliceMetrics]): Unit = { + val apiClient = new ApiMethodClient(collector, Some(runID), settings, IO(Http)(context.system)) + context become reporting(apiClient, bufferedMetrics) + } + + def keepWaitingForConfig(tickSnapshot: TickMetricSnapshot, bufferedMetrics: Option[TimeSliceMetrics]): Unit = { + val timeSliceMetrics = convertToTimeSliceMetrics(tickSnapshot) + context become awaitingConfiguration(Some(merge(timeSliceMetrics, bufferedMetrics))) + } + + def merge(tsm: TimeSliceMetrics, buffered: Option[TimeSliceMetrics]): TimeSliceMetrics = + buffered.foldLeft(tsm)((p, n) ⇒ p.merge(n)) - } map { response ⇒ - if (response.status.isSuccess) - PostSucceeded - else - PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector.")) - } recover { case t: Throwable ⇒ PostFailed(t) } + def convertToTimeSliceMetrics(tick: TickMetricSnapshot): TimeSliceMetrics = { + val extractedMetrics = MetricReporter.MetricExtractors.flatMap(_.extract(settings, collectionContext, tick.metrics)).toMap + TimeSliceMetrics(tick.from.toTimestamp, tick.to.toTimestamp, extractedMetrics) + } + + def subscribeToMetrics(): Unit = { + metricsExtension.subscribe("trace", "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe("user-metrics", "*", metricsSubscriber, permanently = true) } } object MetricReporter { - val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil - - def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props = - Props(new MetricReporter(settings, runID, baseUri)) + def props(settings: AgentSettings): Props = Props(new MetricReporter(settings)) sealed trait MetricDataPostResult case object PostSucceeded extends MetricDataPostResult case class PostFailed(reason: Throwable) extends MetricDataPostResult - class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace + val MetricExtractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil } trait MetricExtractor { - def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 08fdc8c4..7f56d931 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -21,7 +21,8 @@ import java.util import akka.actor.{ Actor, ActorLogging } import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } import com.newrelic.api.agent.{ NewRelic ⇒ NR } -import kamon.trace.{ TraceRecorder, TraceContextAware } +import kamon.trace.TraceLocal.HttpContextKey +import kamon.trace.{ TraceContext, TraceLocal, TraceContextAware } trait CustomParamsSupport { this: NewRelicErrorLogger ⇒ @@ -40,9 +41,20 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo def notifyError(error: Error): Unit = runInFakeTransaction { val params = new util.HashMap[String, String]() - val ctx = error.asInstanceOf[TraceContextAware].traceContext - params put ("TraceToken", ctx.token) + if (error.isInstanceOf[TraceContextAware]) { + val ctx = error.asInstanceOf[TraceContextAware].traceContext + val httpContext = TraceLocal.retrieve(HttpContextKey) + + params put ("TraceToken", ctx.token) + + httpContext.map { httpCtx ⇒ + params put ("User-Agent", httpCtx.agent) + params put ("X-Forwarded-For", httpCtx.xforwarded) + params put ("Request-URI", httpCtx.uri) + } + } + customParams foreach { case (k, v) ⇒ params.put(k, v) } if (error.cause == Error.NoCause) NR.noticeError(error.message.toString, params) @@ -52,7 +64,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo //Really ugly, but temporal hack until next release... def runInFakeTransaction[T](thunk: ⇒ T): T = { val oldName = Thread.currentThread.getName - Thread.currentThread.setName(TraceRecorder.currentContext.name) + Thread.currentThread.setName(TraceContext.currentContext.name) try thunk finally Thread.currentThread.setName(oldName) } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 0a4a516b..d0144f4b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,77 +16,81 @@ package kamon.newrelic -import scala.collection.mutable; -import kamon.metric._ -import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime } -import kamon.metric.instrument.Histogram -import kamon.trace.SegmentCategory.HttpClient -import kamon.trace.SegmentMetricIdentity +import kamon.metric.{ EntitySnapshot, Entity } + +import scala.collection.mutable +import kamon.metric.instrument.{ Time, CollectionContext, Histogram } object WebTransactionMetricExtractor extends MetricExtractor { - def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = { val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT) // Trace metrics are recorded in nanoseconds. - var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) + var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty + var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] - val transactionMetrics = metrics.collect { - case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ - - tms.segments.foreach { - case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ - accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) + val transactionMetrics = metrics.filterKeys(_.category == "trace").map { + case (entity: Entity, es: EntitySnapshot) ⇒ + // Trace metrics only have elapsed-time and segments and all of them are Histograms. + es.histograms.foreach { + case (key, segmentSnapshot) if key.metadata.get("category").filter(_ == "http-client").nonEmpty ⇒ + val library = key.metadata("library") + accumulatedExternalServices = accumulatedExternalServices.merge(segmentSnapshot, collectionContext) // Accumulate externals by host - externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + externalByHostSnapshots.update(key.name, segmentSnapshot :: externalByHostSnapshots.getOrElse(key.name, Nil)) // Accumulate externals by host and library - externalByHostAndLibrarySnapshots.update((segmentName, library), - snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + externalByHostAndLibrarySnapshots.update((key.name, library), + segmentSnapshot :: externalByHostAndLibrarySnapshots.getOrElse((key.name, library), Nil)) // Accumulate externals by host and library, including the transaction as scope. - externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), - snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + externalScopedByHostAndLibrarySnapshots.update((key.name, library, entity.name), + segmentSnapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((key.name, library, entity.name), Nil)) - } + case otherSegments ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) - tms.elapsedTime.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) } - Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) - } + es.histograms.collect { + case (key, elapsedTime) if key.name == "elapsed-time" ⇒ + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(elapsedTime, collectionContext) + elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Time.Nanoseconds.scale(Time.Seconds)(record.level), record.count) + } + + Metric(elapsedTime, key.unitOfMeasurement, "WebTransaction/Custom/" + entity.name, None) + } + } flatten - val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) + val httpDispatcher = Metric(accumulatedHttpDispatcher, Time.Seconds, "HttpDispatcher", None) val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) - val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAllWeb = Metric(accumulatedExternalServices, Time.Seconds, "External/allWeb", None) val externalAll = externalAllWeb.copy(MetricID("External/all", None)) val externalByHost = externalByHostSnapshots.map { case (host, snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/all", None) } val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { case ((host, library), snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", None) } val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { case ((host, library, traceName), snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName)) } Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala index 7db9f2d0..3e15e9fd 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -22,7 +22,6 @@ import akka.actor.{ ActorRef, ActorSystem, Props } import akka.io.IO import akka.testkit._ import com.typesafe.config.ConfigFactory -import kamon.AkkaExtensionSwap import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } import spray.can.Http import spray.http._ @@ -30,6 +29,7 @@ import spray.httpx.encoding.Deflate import spray.httpx.{ SprayJsonSupport, RequestBuilding } import spray.json.JsArray import spray.json._ +import testkit.AkkaExtensionSwap class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll with RequestBuilding with SprayJsonSupport { import JsonProtocol._ @@ -44,9 +44,11 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit | newrelic { | app-name = kamon | license-key = 1111111111 - | initialize-retry-delay = 1 second - | max-initialize-retries = 3 + | connect-retry-delay = 1 second + | max-connect-retries = 3 | } + | + | modules.kamon-newrelic.auto-start = no |} | """.stripMargin)) @@ -88,7 +90,7 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit }) // Receive the runID - EventFilter.info(message = "Agent initialized with runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + EventFilter.info(message = "Configuring New Relic reporters to use runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { httpManager.reply(jsonResponse( """ | { @@ -147,7 +149,7 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit // Receive the runID EventFilter.info( - message = "Agent initialized with runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + message = "Configuring New Relic reporters to use runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { httpManager.reply(jsonResponse( """ @@ -184,7 +186,7 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll wit }) // Give up on connecting. - EventFilter[RuntimeException](message = "Giving up while trying to set up a connection with the New Relic collector.", occurrences = 1).intercept { + EventFilter.error(message = "Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.", occurrences = 1).intercept { httpManager.reply(Timedout(request)) } } diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala index 0001072e..04380677 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -16,40 +16,46 @@ package kamon.newrelic -import akka.actor.{ ActorRef, ActorSystem } +import akka.actor.ActorRef import akka.io.IO import akka.testkit._ +import akka.util.Timeout import com.typesafe.config.ConfigFactory -import kamon.metric.{ TraceMetrics, Metrics } -import kamon.{ Kamon, AkkaExtensionSwap } -import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.metric.{ Entity, TraceMetrics } +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import spray.can.Http import spray.http.Uri.Query import spray.http._ import spray.httpx.encoding.Deflate -import spray.httpx.{ RequestBuilding, SprayJsonSupport } +import spray.httpx.SprayJsonSupport +import testkit.AkkaExtensionSwap import scala.concurrent.duration._ import spray.json._ -class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with SprayJsonSupport { +class MetricReporterSpec extends BaseKamonSpec("metric-reporter-spec") with SprayJsonSupport { import kamon.newrelic.JsonProtocol._ - implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString( - """ - |akka { - | loggers = ["akka.testkit.TestEventListener"] - | loglevel = "INFO" - |} - |kamon { - | metric { - | tick-interval = 1 hour - | } - |} - | - """.stripMargin)) - - val agentSettings = Agent.Settings("1111111111", "kamon", "test-host", 1, 1, 30 seconds, 1D) + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.testkit.TestEventListener"] + | loglevel = "INFO" + |} + |kamon { + | metric { + | tick-interval = 1 hour + | } + | + | modules.kamon-newrelic.auto-start = no + |} + | + """.stripMargin) + + val agentSettings = AgentSettings("1111111111", "kamon", "test-host", 1, Timeout(5 seconds), 1, 30 seconds, 1D) val baseQuery = Query( "license_key" -> agentSettings.licenseKey, "marshal_format" -> "json", @@ -59,8 +65,9 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit "the MetricReporter" should { "report metrics to New Relic upon arrival" in new FakeTickSnapshotsFixture { val httpManager = setHttpManager(TestProbe()) - val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) + val metricReporter = system.actorOf(MetricReporter.props(agentSettings)) + metricReporter ! Agent.Configure("collector-1.newrelic.com", 9999) metricReporter ! firstSnapshot val metricPost = httpManager.expectMsgType[HttpRequest] @@ -70,8 +77,8 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] postedBatch.runID should be(9999) - postedBatch.timeSliceMetrics.from should be(1415587618) - postedBatch.timeSliceMetrics.to should be(1415587678) + postedBatch.timeSliceMetrics.from.seconds should be(1415587618) + postedBatch.timeSliceMetrics.to.seconds should be(1415587678) val metrics = postedBatch.timeSliceMetrics.metrics metrics(MetricID("Apdex", None)).callCount should be(3) @@ -81,8 +88,9 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit "accumulate metrics if posting fails" in new FakeTickSnapshotsFixture { val httpManager = setHttpManager(TestProbe()) - val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) + val metricReporter = system.actorOf(MetricReporter.props(agentSettings)) + metricReporter ! Agent.Configure("collector-1.newrelic.com", 9999) metricReporter ! firstSnapshot val request = httpManager.expectMsgType[HttpRequest] httpManager.reply(Timedout(request)) @@ -96,8 +104,8 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] postedBatch.runID should be(9999) - postedBatch.timeSliceMetrics.from should be(1415587618) - postedBatch.timeSliceMetrics.to should be(1415587738) + postedBatch.timeSliceMetrics.from.seconds should be(1415587618) + postedBatch.timeSliceMetrics.to.seconds should be(1415587738) val metrics = postedBatch.timeSliceMetrics.metrics metrics(MetricID("Apdex", None)).callCount should be(6) @@ -130,20 +138,20 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit } trait FakeTickSnapshotsFixture { - val testTraceID = TraceMetrics("example-trace") - val recorder = Kamon(Metrics).register(testTraceID, TraceMetrics.Factory).get - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + val testTraceID = Entity("example-trace", "trace") + val recorder = Kamon.metrics.register(TraceMetrics, testTraceID.name).get.recorder + val collectionContext = Kamon.metrics.buildDefaultCollectionContext def collectRecorder = recorder.collect(collectionContext) - recorder.elapsedTime.record(1000000) - recorder.elapsedTime.record(2000000) - recorder.elapsedTime.record(3000000) - val firstSnapshot = TickMetricSnapshot(1415587618000L, 1415587678000L, Map(testTraceID -> collectRecorder)) + recorder.ElapsedTime.record(1000000) + recorder.ElapsedTime.record(2000000) + recorder.ElapsedTime.record(3000000) + val firstSnapshot = TickMetricSnapshot(new MilliTimestamp(1415587618000L), new MilliTimestamp(1415587678000L), Map(testTraceID -> collectRecorder)) - recorder.elapsedTime.record(6000000) - recorder.elapsedTime.record(5000000) - recorder.elapsedTime.record(4000000) - val secondSnapshot = TickMetricSnapshot(1415587678000L, 1415587738000L, Map(testTraceID -> collectRecorder)) + recorder.ElapsedTime.record(6000000) + recorder.ElapsedTime.record(5000000) + recorder.ElapsedTime.record(4000000) + val secondSnapshot = TickMetricSnapshot(new MilliTimestamp(1415587678000L), new MilliTimestamp(1415587738000L), Map(testTraceID -> collectRecorder)) } }
\ No newline at end of file |