diff options
41 files changed, 1027 insertions, 572 deletions
diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala index dfa4bcb8..0dd189f6 100644 --- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala +++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala @@ -8,6 +8,8 @@ import kamon.metric._ import scala.collection.concurrent.TrieMap object HttpServerMetrics extends MetricGroupIdentity { + import Metrics.AtomicGetOrElseUpdateForTriemap + val name: String = "http-server-metrics-recorder" val category = new MetricGroupCategory { val name: String = "http-server" @@ -32,13 +34,13 @@ object HttpServerMetrics extends MetricGroupIdentity { def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L) def recordResponse(statusCode: StatusCode, count: Long): Unit = - counters.getOrElseUpdate(statusCode, Counter()).increment(count) + counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count) def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L) def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = { recordResponse(statusCode, count) - countersPerTrace.getOrElseUpdate(traceName, TrieMap()).getOrElseUpdate(statusCode, Counter()).increment(count) + countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count) } def collect(context: CollectionContext): HttpServerMetricsSnapshot = { diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index 05bab175..f491cc57 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -30,6 +30,8 @@ import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } import java.util.concurrent.TimeUnit class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + import Metrics.AtomicGetOrElseUpdateForTriemap + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) @@ -46,7 +48,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) + Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) else None } @@ -131,4 +133,12 @@ object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } + + implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { + def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = + trieMap.get(key) match { + case Some(v) ⇒ v + case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) + } + } } diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 7246ccb5..eaad6e0d 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -27,6 +27,8 @@ case class TraceMetrics(name: String) extends MetricGroupIdentity { } object TraceMetrics extends MetricGroupCategory { + import Metrics.AtomicGetOrElseUpdateForTriemap + val name = "trace" case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } @@ -37,7 +39,7 @@ object TraceMetrics extends MetricGroupCategory { val segments = TrieMap[MetricIdentity, Histogram]() def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = - segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) def collect(context: CollectionContext): TraceMetricsSnapshot = TraceMetricsSnapshot( @@ -53,7 +55,7 @@ object TraceMetrics extends MetricGroupCategory { type GroupSnapshotType = TraceMetricsSnapshot def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) // TODO: Merge the segments metrics correctly and test it! + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) ⇒ l.merge(r, context))) def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) } @@ -69,7 +71,6 @@ case object TraceMetricGroupFactory extends MetricGroupFactory { type GroupRecorder = TraceMetricRecorder def create(config: Config, system: ActorSystem): TraceMetricRecorder = { - val settings = config.getConfig("precision.trace") val elapsedTimeConfig = settings.getConfig("elapsed-time") val segmentConfig = settings.getConfig("segment") diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala index b511b4bc..b7ac1ac5 100644 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -8,6 +8,7 @@ import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } import scala.concurrent.duration.FiniteDuration class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + import Metrics.AtomicGetOrElseUpdateForTriemap import UserMetrics._ lazy val metricsExtension = Kamon(Metrics)(system) @@ -18,45 +19,45 @@ class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = { - metricsExtension.storage.getOrElseUpdate(UserHistogram(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit)) }).asInstanceOf[UserHistogramRecorder].histogram } def registerHistogram(name: String): Histogram = { - metricsExtension.storage.getOrElseUpdate(UserHistogram(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig)) }).asInstanceOf[UserHistogramRecorder].histogram } def registerCounter(name: String): Counter = { - metricsExtension.storage.getOrElseUpdate(UserCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), { UserCounterRecorder(Counter()) }).asInstanceOf[UserCounterRecorder].counter } def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, refreshInterval: FiniteDuration): MinMaxCounter = { - metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter } def registerMinMaxCounter(name: String): MinMaxCounter = { - metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter } def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.getOrElseUpdate(UserGauge(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) }).asInstanceOf[UserGaugeRecorder].gauge } def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.getOrElseUpdate(UserGauge(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) }).asInstanceOf[UserGaugeRecorder].gauge } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index c4c28a68..5b74e6b2 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -34,16 +34,18 @@ sealed trait TraceContext { def finish(): Unit def origin: TraceContextOrigin def isOpen: Boolean + def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty - def startSegment(segmentName: String, label: String): Segment + def startSegment(segmentName: String, category: String, library: String): Segment def nanoTimestamp: Long } sealed trait Segment { def name: String def rename(newName: String): Unit - def label: String + def category: String + def library: String def finish(): Unit def isEmpty: Boolean } @@ -56,12 +58,13 @@ case object EmptyTraceContext extends TraceContext { def origin: TraceContextOrigin = TraceContextOrigin.Local def isOpen: Boolean = false def isEmpty: Boolean = true - def startSegment(segmentName: String, label: String): Segment = EmptySegment + def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment def nanoTimestamp: Long = 0L case object EmptySegment extends Segment { val name: String = "empty-segment" - val label: String = "empty-label" + val category: String = "empty-category" + val library: String = "empty-library" def isEmpty: Boolean = true def rename(newName: String): Unit = {} def finish: Unit = {} @@ -98,7 +101,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, } } - def startSegment(segmentName: String, segmentLabel: String): Segment = new DefaultSegment(segmentName, segmentLabel) + def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { val segment = finishedSegments.poll() @@ -108,17 +111,17 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, } } - private def finishSegment(segmentName: String, label: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, label), duration)) + private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) - if (!_isOpen) { + if (isClosed) { metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ drainFinishedSegments(traceMetrics) } } } - class DefaultSegment(segmentName: String, val label: String) extends Segment { + class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { private val _segmentStartNanoTime = System.nanoTime() @volatile private var _segmentName = segmentName @volatile private var _isOpen = true @@ -129,15 +132,15 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, label, (segmentFinishNanoTime - _segmentStartNanoTime)) + finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) } } } -case class SegmentMetricIdentity(name: String, label: String) extends MetricIdentity +case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity case class SegmentData(identity: SegmentMetricIdentity, duration: Long) -object SegmentMetricIdentityLabel { +object SegmentCategory { val HttpClient = "http-client" } diff --git a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala index 006366ba..245901cd 100644 --- a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala @@ -43,13 +43,18 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { | } | } | ] - | precision { - | default-histogram-precision { + | precision.actor { + | processing-time { | highest-trackable-value = 3600000000000 | significant-value-digits = 2 | } | - | default-min-max-counter-precision { + | time-in-mailbox { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | + | mailbox-size { | refresh-interval = 1 hour | highest-trackable-value = 999999999 | significant-value-digits = 2 diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala index 7468f59c..301ea9b2 100644 --- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala @@ -54,7 +54,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers { "record the elapsed time for segments that occur inside a given trace" in { TraceRecorder.withNewTraceContext("trace-with-segments") { - val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-label") + val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library") segment.finish() TraceRecorder.finish() } @@ -62,12 +62,12 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers { val snapshot = takeSnapshotOf("trace-with-segments") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1) } "record the elapsed time for segments that finish after their correspondent trace has finished" in { val segment = TraceRecorder.withNewTraceContext("closing-segment-after-trace") { - val s = TraceRecorder.currentContext.startSegment("test-segment", "test-label") + val s = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library") TraceRecorder.finish() s } @@ -81,7 +81,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers { val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0) afterFinishSegmentSnapshot.segments.size should be(1) - afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-label")).numberOfMeasurements should be(1) + afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1) } } diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala index 206fbd4e..838a1b98 100644 --- a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala @@ -79,7 +79,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow creating a segment within a trace" in { val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") { - val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-label") + val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-category", "segment-library") TraceRecorder.currentContext } @@ -89,7 +89,7 @@ class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Ma "allow renaming a segment" in { TraceRecorder.withNewTraceContext("trace-with-renamed-segment") { - val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label") + val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label", "segment-library") segment.name should be("original-segment-name") segment.rename("new-segment-name") diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala index 22e86b05..6e29400f 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -82,7 +82,7 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { // Subscribe to SystemMetrics val includeSystemMetrics = datadogConfig.getBoolean("report-system-metrics") if (includeSystemMetrics) { - List(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics) foreach { metric ⇒ + List(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics, ContextSwitchesMetrics) foreach { metric ⇒ Kamon(Metrics)(system).subscribe(metric, "*", datadogMetricsListener, permanently = true) } } diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index 5ab0589c..195798fe 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -62,13 +62,13 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long metricSnapshot match { case hs: Histogram.Snapshot ⇒ hs.recordsIterator.foreach { record ⇒ - val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDTimer(record.level, record.count)) + val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeDatadogTimer(record.level, record.count)) packetBuilder.appendMeasurement(key, measurementData) } case cs: Counter.Snapshot ⇒ - val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDCounter(cs.count)) + val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeDatadogCounter(cs.count)) packetBuilder.appendMeasurement(key, measurementData) } } @@ -81,12 +81,12 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long .append(buildIdentificationTag(groupIdentity, metricIdentity)) .result() - def encodeStatsDTimer(level: Long, count: Long): String = { + def encodeDatadogTimer(level: Long, count: Long): String = { val samplingRate: Double = 1D / count level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") } - def encodeStatsDCounter(count: Long): String = count.toString + "|c" + def encodeDatadogCounter(count: Long): String = count.toString + "|c" def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = if (isUserMetric(groupIdentity)) diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index 059420f9..c86e64ae 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -4,17 +4,22 @@ kamon { newrelic { + + # General ApdexT that applies to all Trace metrics reported. apdexT = 1 second + # The application name that will be shown in the New Relic dashboard. app-name = "Kamon[Development]" - license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 - # delay between connection attempts to NewRelic collector - retry-delay = 30 seconds + # Your New Relic license key. + license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 # attempts to send pending metrics in the next tick, # combining the current metrics plus the pending, after max-retry, deletes all pending metrics - max-retry = 3 + max-initialize-retries = 3 + + # delay between connection attempts to NewRelic collector + initialize-retry-delay = 30 seconds } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 834a10ad..6244c0ad 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -18,88 +18,73 @@ package kamon.newrelic import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } -import akka.actor.{ ActorLogging, Actor } +import akka.actor.{ ActorSystem, ActorLogging, Actor } import akka.event.LoggingAdapter -import org.slf4j.LoggerFactory +import akka.io.IO +import akka.util.Timeout +import kamon.Kamon +import kamon.metric.{ CollectionContext, Metrics } +import spray.can.Http import spray.json._ import scala.concurrent.{ ExecutionContext, Future } -import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } -import spray.httpx.encoding.Deflate +import spray.httpx.{ SprayJsonSupport, ResponseTransformation } import spray.http._ import spray.json.lenses.JsonLenses._ import java.lang.management.ManagementFactory -import spray.client.pipelining._ -import scala.util.{ Failure, Success } import spray.http.Uri.Query -import kamon.newrelic.MetricTranslator.TimeSliceMetrics import scala.concurrent.duration._ +import Agent._ -class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { +import akka.pattern.pipe +// TODO: Setup a proper host connector with custom timeout configuration for use with this. +class Agent extends Actor with ClientPipelines with ResponseTransformation with SprayJsonSupport with ActorLogging { + import JsonProtocol._ import context.dispatcher - import Agent._ - import Retry._ - - self ! Initialize - - val agentInfo = { - val config = context.system.settings.config.getConfig("kamon.newrelic") - val appName = config.getString("app-name") - val licenseKey = config.getString("license-key") - - // Name has the format of pid@host - val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') - val retryDelay = FiniteDuration(config.getMilliseconds("retry-delay"), milliseconds) - val maxRetry = config.getInt("max-retry") - - AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay) - } + implicit val operationTimeout = Timeout(30 seconds) + val collectorClient = compressedToJsonPipeline(IO(Http)(context.system)) + val settings = buildAgentSettings(context.system) val baseQuery = Query( - "license_key" -> agentInfo.licenseKey, + "license_key" -> settings.licenseKey, "marshal_format" -> "json", "protocol_version" -> "12") - def receive: Receive = uninitialized - - def uninitialized: Receive = { - case Initialize ⇒ { - connectToCollector onComplete { - case Success(agent) ⇒ { - log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector) - context become reporting(agent.runId, agent.collector) - } - case Failure(reason) ⇒ self ! InitializationFailed(reason) - } - } - case InitializationFailed(reason) ⇒ { - log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds) - context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize) - } - case everythingElse ⇒ //ignore - } + // Start the connection to the New Relic collector. + self ! Initialize - def reporting(runId: Long, collector: String): Receive = { - case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) - } + def receive: Receive = uninitialized(settings.maxRetries) - def connectToCollector: Future[Initialized] = for { - collector ← selectCollector - runId ← connect(collector, agentInfo) - } yield Initialized(runId, collector) + def uninitialized(attemptsLeft: Int): Receive = { + case Initialize ⇒ pipe(connectToCollector) to self + case Initialized(runID, collector) ⇒ + log.info("Agent initialized with runID: [{}] and collector: [{}]", runID, collector) - import AgentJsonProtocol._ + val baseCollectorUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(baseQuery) + context.actorOf(MetricReporter.props(settings, runID, baseCollectorUri), "metric-reporter") - val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive - val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson + case InitializationFailed(reason) if (attemptsLeft > 0) ⇒ + log.error(reason, "Initialization failed, retrying in {} seconds", settings.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(settings.retryDelay, self, Initialize) + context become (uninitialized(attemptsLeft - 1)) - def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson + case InitializationFailed(reason) ⇒ + log.error(reason, "Giving up while trying to set up a connection with the New Relic collector.") + context.stop(self) + } + + def connectToCollector: Future[InitResult] = { + (for { + collector ← selectCollector + runId ← connect(collector, settings) + } yield Initialized(runId, collector)) recover { case error ⇒ InitializationFailed(error) } + } def selectCollector: Future[String] = { val query = ("method" -> "get_redirect_host") +: baseQuery val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) - compressedToJsonPipeline { + collectorClient { Post(getRedirectHostUri, JsArray()) } map { json ⇒ @@ -107,67 +92,39 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } } - def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + def connect(collectorHost: String, connect: Settings): Future[Long] = { log.debug("Connecting to NewRelic Collector [{}]", collectorHost) val query = ("method" -> "connect") +: baseQuery val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) - compressedToJsonPipeline { + collectorClient { Post(connectUri, connect) } map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } - - def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = { - val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery - val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - - withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒ - compressedPipeline { - log.info("Sending metrics to NewRelic collector") - Post(sendMetricDataUri, MetricData(runId, currentMetrics)) - } - } - } } object Agent { - case class Initialize() - case class Initialized(runId: Long, collector: String) - case class InitializationFailed(reason: Throwable) - case class CollectorSelection(return_value: String) - case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration) - case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) -} + case object Initialize + sealed trait InitResult + case class Initialized(runId: Long, collector: String) extends InitResult + case class InitializationFailed(reason: Throwable) extends InitResult + case class Settings(licenseKey: String, appName: String, host: String, pid: Int, maxRetries: Int, retryDelay: FiniteDuration, apdexT: Double) + + def buildAgentSettings(system: ActorSystem) = { + val config = system.settings.config.getConfig("kamon.newrelic") + val appName = config.getString("app-name") + val licenseKey = config.getString("license-key") + val maxRetries = config.getInt("max-initialize-retries") + val retryDelay = FiniteDuration(config.getMilliseconds("initialize-retry-delay"), milliseconds) + val apdexT: Double = config.getMilliseconds("apdexT").toDouble -object Retry { - - @volatile private var attempts: Int = 0 - @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None - - def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = { - - val currentMetrics = metrics.merge(pendingMetrics) - - if (currentMetrics.metrics.nonEmpty) { - block(currentMetrics) onComplete { - case Success(_) ⇒ - pendingMetrics = None - attempts = 0 - case Failure(_) ⇒ - attempts += 1 - if (maxRetry > attempts) { - log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry) - pendingMetrics = Some(currentMetrics) - } else { - log.info("Max attempts achieved, proceeding to remove all pending metrics") - pendingMetrics = None - attempts = 0 - } - } - } + // Name has the format of 'pid'@'host' + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + + Agent.Settings(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetries, retryDelay, apdexT) } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala deleted file mode 100644 index 9b3e6dea..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.newrelic - -import spray.json._ -import kamon.newrelic.Agent._ - -object AgentJsonProtocol extends DefaultJsonProtocol { - - implicit object ConnectJsonWriter extends RootJsonWriter[AgentInfo] { - def write(obj: AgentInfo): JsValue = - JsArray( - JsObject( - "agent_version" -> JsString("3.1.0"), - "app_name" -> JsArray(JsString(obj.appName)), - "host" -> JsString(obj.host), - "identifier" -> JsString(s"java:${obj.appName}"), - "language" -> JsString("java"), - "pid" -> JsNumber(obj.pid))) - } - - implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] { - def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) - } - - implicit object MetricDetailWriter extends JsonWriter[NewRelic.Metric] { - def write(obj: NewRelic.Metric): JsValue = { - JsArray( - JsObject( - "name" -> JsString(obj.name) // TODO Include scope - ), - JsArray( - JsNumber(obj.callCount), - JsNumber(obj.total), - JsNumber(obj.totalExclusive), - JsNumber(obj.min), - JsNumber(obj.max), - JsNumber(obj.sumOfSquares))) - } - } - - implicit object MetricDataWriter extends RootJsonWriter[MetricData] { - def write(obj: MetricData): JsValue = - JsArray( - JsNumber(obj.runId), - JsNumber(obj.timeSliceMetrics.from), - JsNumber(obj.timeSliceMetrics.to), - obj.timeSliceMetrics.metrics.values.toSeq.toJson) - } -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala new file mode 100644 index 00000000..ca003646 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ClientPipelines.scala @@ -0,0 +1,23 @@ +package kamon.newrelic + +import akka.actor.ActorRef +import akka.util.Timeout +import spray.http.{ HttpResponse, HttpRequest } +import spray.httpx.RequestBuilding +import spray.httpx.encoding.Deflate +import spray.json._ +import spray.client.pipelining.sendReceive + +import scala.concurrent.{ ExecutionContext, Future } + +trait ClientPipelines extends RequestBuilding { + + def compressedPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[HttpResponse] = + encode(Deflate) ~> sendReceive(transport) + + def compressedToJsonPipeline(transport: ActorRef)(implicit ec: ExecutionContext, to: Timeout): HttpRequest ⇒ Future[JsValue] = + compressedPipeline(transport) ~> toJson + + def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson + +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala index a2b208dc..84472593 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala @@ -16,19 +16,18 @@ package kamon.newrelic -import akka.actor.Actor import kamon.metric.UserMetrics.UserMetricGroup import kamon.metric._ +import kamon.newrelic.Agent.Settings -trait CustomMetrics { - self: Actor ⇒ +object CustomMetricExtractor extends MetricExtractor { - def collectCustomMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { + def extract(settings: Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { metrics.collect { case (mg: UserMetricGroup, groupSnapshot) ⇒ groupSnapshot.metrics collect { - case (name, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/${mg.name}", None, snapshot) + case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit) } - }.flatten.toSeq + }.flatten.toMap } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala new file mode 100644 index 00000000..26e8839e --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -0,0 +1,106 @@ +/* =================================================== + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.newrelic + +import spray.json._ +import kamon.newrelic.Agent._ + +object JsonProtocol extends DefaultJsonProtocol { + + implicit object ConnectJsonWriter extends RootJsonWriter[Settings] { + def write(obj: Settings): JsValue = + JsArray( + JsObject( + "agent_version" -> JsString("3.1.0"), + "app_name" -> JsArray(JsString(obj.appName)), + "host" -> JsString(obj.host), + "identifier" -> JsString(s"java:${obj.appName}"), + "language" -> JsString("java"), + "pid" -> JsNumber(obj.pid))) + } + + implicit def seqWriter[T: JsonFormat] = new JsonFormat[Seq[T]] { + def read(value: JsValue): Seq[T] = value match { + case JsArray(elements) ⇒ elements.map(_.convertTo[T])(collection.breakOut) + case x ⇒ deserializationError("Expected Seq as JsArray, but got " + x) + } + + def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) + } + + implicit object MetricDetailWriter extends JsonFormat[Metric] { + def read(json: JsValue): (MetricID, MetricData) = json match { + case JsArray(elements) ⇒ + val metricID = elements(0) match { + case JsObject(fields) ⇒ MetricID(fields("name").convertTo[String], fields.get("scope").map(_.convertTo[String])) + case x ⇒ deserializationError("Expected MetricID as JsObject, but got " + x) + } + + val metricData = elements(1) match { + case JsArray(dataElements) ⇒ + MetricData( + dataElements(0).convertTo[Long], + dataElements(1).convertTo[Double], + dataElements(2).convertTo[Double], + dataElements(3).convertTo[Double], + dataElements(4).convertTo[Double], + dataElements(5).convertTo[Double]) + case x ⇒ deserializationError("Expected MetricData as JsArray, but got " + x) + } + + (metricID, metricData) + + case x ⇒ deserializationError("Expected Metric as JsArray, but got " + x) + } + + def write(obj: Metric): JsValue = { + val (metricID, metricData) = obj + val nameAndScope = metricID.scope.foldLeft(Map("name" -> JsString(metricID.name)))((m, scope) ⇒ m + ("scope" -> JsString(scope))) + + JsArray( + JsObject(nameAndScope), + JsArray( + JsNumber(metricData.callCount), + JsNumber(metricData.total), + JsNumber(metricData.totalExclusive), + JsNumber(metricData.min), + JsNumber(metricData.max), + JsNumber(metricData.sumOfSquares))) + } + } + + implicit object MetricBatchWriter extends RootJsonFormat[MetricBatch] { + + def read(json: JsValue): MetricBatch = json match { + case JsArray(elements) ⇒ + val runID = elements(0).convertTo[Long] + val timeSliceFrom = elements(1).convertTo[Long] + val timeSliceTo = elements(2).convertTo[Long] + val metrics = elements(3).convertTo[Seq[Metric]] + + MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap)) + + case x ⇒ deserializationError("Expected Array as JsArray, but got " + x) + } + + def write(obj: MetricBatch): JsValue = + JsArray( + JsNumber(obj.runID), + JsNumber(obj.timeSliceMetrics.from), + JsNumber(obj.timeSliceMetrics.to), + obj.timeSliceMetrics.metrics.toSeq.toJson) + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala new file mode 100644 index 00000000..14541483 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -0,0 +1,55 @@ +package kamon.newrelic + +import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.{ MetricSnapshot, Scale } + +case class MetricID(name: String, scope: Option[String]) +case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) { + def merge(that: MetricData): MetricData = + MetricData( + callCount + that.callCount, + total + that.total, + totalExclusive + that.totalExclusive, + math.min(min, that.min), + math.max(max, that.max), + sumOfSquares + that.sumOfSquares) +} + +object Metric { + + def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = { + snapshot match { + case hs: Histogram.Snapshot ⇒ + var total: Double = 0D + var sumOfSquares: Double = 0D + val scaledMin = Scale.convert(hs.scale, targetScale, hs.min) + val scaledMax = Scale.convert(hs.scale, targetScale, hs.max) + + hs.recordsIterator.foreach { record ⇒ + val scaledValue = Scale.convert(hs.scale, targetScale, record.level) + + total += scaledValue * record.count + sumOfSquares += (scaledValue * scaledValue) * record.count + } + + (MetricID(name, scope), MetricData(hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)) + + case cs: Counter.Snapshot ⇒ + (MetricID(name, scope), MetricData(cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count)) + } + } +} + +case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[MetricID, MetricData]) { + import kamon.metric.combineMaps + + def merge(that: TimeSliceMetrics): TimeSliceMetrics = { + val mergedFrom = math.min(from, that.from) + val mergedTo = math.max(to, that.to) + val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)) + + TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics) + } +} + +case class MetricBatch(runID: Long, timeSliceMetrics: TimeSliceMetrics)
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala new file mode 100644 index 00000000..0aa078f5 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -0,0 +1,103 @@ +package kamon.newrelic + +import java.util.concurrent.TimeUnit + +import akka.actor.{ Props, ActorLogging, Actor } +import akka.pattern.pipe +import akka.io.IO +import akka.util.Timeout +import kamon.Kamon +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } +import kamon.metric._ +import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed, PostSucceeded, MetricDataPostResult } +import spray.can.Http +import spray.http.Uri +import spray.httpx.SprayJsonSupport +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extends Actor + with ClientPipelines with ActorLogging with SprayJsonSupport { + + import JsonProtocol._ + import MetricReporter.Extractors + import context.dispatcher + + val metricDataQuery = ("method" -> "metric_data") +: ("run_id" -> runID.toString) +: baseUri.query + val metricDataUri = baseUri.withQuery(metricDataQuery) + + implicit val operationTimeout = Timeout(30 seconds) + val metricsExtension = Kamon(Metrics)(context.system) + val collectionContext = metricsExtension.buildDefaultCollectionContext + val collectorClient = compressedPipeline(IO(Http)(context.system)) + + val subscriber = { + val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS) + if (tickInterval == 60000) + self + else + context.actorOf(TickMetricSnapshotBuffer.props(1 minute, self), "metric-buffer") + } + + // Subscribe to Trace Metrics + metricsExtension.subscribe(TraceMetrics, "*", subscriber, permanently = true) + + // Subscribe to all User Metrics + metricsExtension.subscribe(UserHistograms, "*", subscriber, permanently = true) + metricsExtension.subscribe(UserCounters, "*", subscriber, permanently = true) + metricsExtension.subscribe(UserMinMaxCounters, "*", subscriber, permanently = true) + metricsExtension.subscribe(UserGauges, "*", subscriber, permanently = true) + + def receive = reporting(None) + + def reporting(pendingMetrics: Option[TimeSliceMetrics]): Receive = { + case TickMetricSnapshot(from, to, metrics) ⇒ + val fromInSeconds = (from / 1E3).toInt + val toInSeconds = (to / 1E3).toInt + val extractedMetrics = Extractors.flatMap(_.extract(settings, collectionContext, metrics)).toMap + val tickMetrics = TimeSliceMetrics(fromInSeconds, toInSeconds, extractedMetrics) + + val metricsToReport = pendingMetrics.foldLeft(tickMetrics)((p, n) ⇒ p.merge(n)) + context become reporting(Some(metricsToReport)) + pipe(sendMetricData(metricsToReport)) to self + + case PostSucceeded ⇒ + context become (reporting(None)) + + case PostFailed(reason) ⇒ + log.error(reason, "Metric POST to the New Relic collector failed, metrics will be accumulated with the next tick.") + } + + def sendMetricData(slice: TimeSliceMetrics): Future[MetricDataPostResult] = { + log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) + + collectorClient { + Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)) + + } map { response ⇒ + if (response.status.isSuccess) + PostSucceeded + else + PostFailed(new UnexpectedStatusCodeException(s"Received unsuccessful status code [${response.status.value}] from collector.")) + } recover { case t: Throwable ⇒ PostFailed(t) } + } +} + +object MetricReporter { + val Extractors: List[MetricExtractor] = WebTransactionMetricExtractor :: CustomMetricExtractor :: Nil + + def props(settings: Agent.Settings, runID: Long, baseUri: Uri): Props = + Props(new MetricReporter(settings, runID, baseUri)) + + sealed trait MetricDataPostResult + case object PostSucceeded extends MetricDataPostResult + case class PostFailed(reason: Throwable) extends MetricDataPostResult + + class UnexpectedStatusCodeException(message: String) extends RuntimeException(message) with NoStackTrace +} + +trait MetricExtractor { + def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala deleted file mode 100644 index 5fa571e1..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.newrelic - -import akka.actor.{ Props, ActorRef, Actor } -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.newrelic.MetricTranslator.TimeSliceMetrics - -class MetricTranslator(receiver: ActorRef) extends Actor - with WebTransactionMetrics with CustomMetrics { - - def receive = { - case TickMetricSnapshot(from, to, metrics) ⇒ - val fromInSeconds = (from / 1E3).toInt - val toInSeconds = (to / 1E3).toInt - val allMetrics = collectWebTransactionMetrics(metrics) ++ collectCustomMetrics(metrics) - val groupedMetrics: Map[String, NewRelic.Metric] = allMetrics.map(metric ⇒ metric.name -> metric)(collection.breakOut) // avoid intermediate tuple - - receiver ! TimeSliceMetrics(fromInSeconds, toInSeconds, groupedMetrics) - } - -} - -object MetricTranslator { - case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[String, NewRelic.Metric]) { - import kamon.metric._ - - def merge(thatMetrics: Option[TimeSliceMetrics]): TimeSliceMetrics = { - thatMetrics.map(that ⇒ TimeSliceMetrics(from + that.from, to + that.to, combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)))).getOrElse(this) - } - } - - def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver)) -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index bc73e475..a4be4c0b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -18,57 +18,17 @@ package kamon.newrelic import akka.actor import akka.actor._ +import akka.event.Logging import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.UserMetrics.{ UserCounters, UserGauges, UserHistograms, UserMinMaxCounters } -import kamon.metric.{ Metrics, TickMetricSnapshotBuffer, TraceMetrics } - -import scala.concurrent.duration._ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.newrelic") - - val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext - val metricsListener = system.actorOf(Props[NewRelicMetricsListener], "kamon-newrelic") - val apdexT: Double = config.getMilliseconds("apdexT") / 1E3 // scale to seconds. - - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricsListener, permanently = true) - - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", metricsListener, permanently = true) - -} + val log = Logging(system, classOf[NewRelicExtension]) -class NewRelicMetricsListener extends Actor with ActorLogging { log.info("Starting the Kamon(NewRelic) extension") - - val agent = context.actorOf(Props[Agent], "agent") - val translator = context.actorOf(MetricTranslator.props(agent), "translator") - val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "metric-buffer") - - def receive = { - case tick: TickMetricSnapshot ⇒ buffer.forward(tick) - } + val agent = system.actorOf(Props[Agent], "newrelic-agent") } object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = NewRelic def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) - - case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double, - min: Double, max: Double, sumOfSquares: Double) { - - def merge(that: Metric): Metric = { - Metric(name, scope, - callCount + that.callCount, - total + that.total, - totalExclusive + that.totalExclusive, - math.min(min, that.min), - math.max(max, that.max), - sumOfSquares + that.sumOfSquares) - } - } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 08b5df99..08fdc8c4 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -16,13 +16,21 @@ package kamon.newrelic +import java.util + import akka.actor.{ Actor, ActorLogging } import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } import com.newrelic.api.agent.{ NewRelic ⇒ NR } -import kamon.trace.TraceContextAware +import kamon.trace.{ TraceRecorder, TraceContextAware } + +trait CustomParamsSupport { + this: NewRelicErrorLogger ⇒ + + def customParams: Map[String, String] +} -class NewRelicErrorLogger extends Actor with ActorLogging { - var aspectJMissingAlreadyReported = false +class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSupport { + override def customParams: Map[String, String] = Map.empty def receive = { case InitializeLogger(_) ⇒ sender ! LoggerInitialized @@ -30,17 +38,21 @@ class NewRelicErrorLogger extends Actor with ActorLogging { case anythingElse ⇒ } - def notifyError(error: Error): Unit = { - val params = new java.util.HashMap[String, String]() - + def notifyError(error: Error): Unit = runInFakeTransaction { + val params = new util.HashMap[String, String]() val ctx = error.asInstanceOf[TraceContextAware].traceContext - params.put("TraceToken", ctx.token) - if (error.cause == Error.NoCause) { - NR.noticeError(error.message.toString, params) - } else { - NR.noticeError(error.cause, params) - } + params put ("TraceToken", ctx.token) + customParams foreach { case (k, v) ⇒ params.put(k, v) } + if (error.cause == Error.NoCause) NR.noticeError(error.message.toString, params) + else NR.noticeError(error.cause, params) } -} + + //Really ugly, but temporal hack until next release... + def runInFakeTransaction[T](thunk: ⇒ T): T = { + val oldName = Thread.currentThread.getName + Thread.currentThread.setName(TraceRecorder.currentContext.name) + try thunk finally Thread.currentThread.setName(oldName) + } +}
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala new file mode 100644 index 00000000..713a5586 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala @@ -0,0 +1,59 @@ +package kamon.newrelic + +import java.lang.{ StringBuilder ⇒ JStringBuilder } +import spray.json._ +import scala.annotation.tailrec + +// We need a special treatment of / that needs to be escaped as \/ for New Relic to work properly with all metrics. +// Without this custom json printer the scoped metrics are not displayed in the site. + +// format: OFF +trait NewRelicJsonPrinter extends CompactPrinter { + + override def printString(s: String, sb: JStringBuilder) { + import NewRelicJsonPrinter._ + @tailrec def firstToBeEncoded(ix: Int = 0): Int = + if (ix == s.length) -1 else if (requiresEncoding(s.charAt(ix))) ix else firstToBeEncoded(ix + 1) + + sb.append('"') + firstToBeEncoded() match { + case -1 ⇒ sb.append(s) + case first ⇒ + sb.append(s, 0, first) + @tailrec def append(ix: Int): Unit = + if (ix < s.length) { + s.charAt(ix) match { + case c if !requiresEncoding(c) => sb.append(c) + case '"' => sb.append("\\\"") + case '\\' => sb.append("\\\\") + case '/' => sb.append("\\/") + case '\b' => sb.append("\\b") + case '\f' => sb.append("\\f") + case '\n' => sb.append("\\n") + case '\r' => sb.append("\\r") + case '\t' => sb.append("\\t") + case x if x <= 0xF => sb.append("\\u000").append(Integer.toHexString(x)) + case x if x <= 0xFF => sb.append("\\u00").append(Integer.toHexString(x)) + case x if x <= 0xFFF => sb.append("\\u0").append(Integer.toHexString(x)) + case x => sb.append("\\u").append(Integer.toHexString(x)) + } + append(ix + 1) + } + append(first) + } + sb.append('"') + } +} + +object NewRelicJsonPrinter extends NewRelicJsonPrinter { + + def requiresEncoding(c: Char): Boolean = + // from RFC 4627 + // unescaped = %x20-21 / %x23-5B / %x5D-10FFFF + c match { + case '"' ⇒ true + case '\\' ⇒ true + case '/' ⇒ true + case c ⇒ c < 0x20 + } +}
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala new file mode 100644 index 00000000..0a4a516b --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -0,0 +1,114 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import scala.collection.mutable; +import kamon.metric._ +import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime } +import kamon.metric.instrument.Histogram +import kamon.trace.SegmentCategory.HttpClient +import kamon.trace.SegmentMetricIdentity + +object WebTransactionMetricExtractor extends MetricExtractor { + + def extract(settings: Agent.Settings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT) + + // Trace metrics are recorded in nanoseconds. + var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) + var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) + + val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] + val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] + val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] + + val transactionMetrics = metrics.collect { + case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ + + tms.segments.foreach { + case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ + accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) + + // Accumulate externals by host + externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + + // Accumulate externals by host and library + externalByHostAndLibrarySnapshots.update((segmentName, library), + snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + + // Accumulate externals by host and library, including the transaction as scope. + externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), + snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + + } + + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) + tms.elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) + } + + Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) + } + + val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) + val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) + val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) + + val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAll = externalAllWeb.copy(MetricID("External/all", None)) + + val externalByHost = externalByHostSnapshots.map { + case (host, snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + } + + val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { + case ((host, library), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + } + + val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { + case ((host, library, traceName), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + } + + Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ + transactionMetrics ++ externalByHost ++ externalByHostAndLibrary ++ externalScopedByHostAndLibrary + } +} + +class ApdexBuilder(name: String, scope: Option[String], apdexT: Double) { + val frustratingThreshold = 4 * apdexT + + var satisfying = 0L + var tolerating = 0L + var frustrating = 0L + + def record(duration: Double, count: Long): Unit = + if (duration <= apdexT) + satisfying += count + else if (duration <= frustratingThreshold) + tolerating += count + else + frustrating += count + + // NewRelic reuses the same metric structure for recording the Apdex.. weird, but that's how it works. + def build: Metric = (MetricID(name, scope), MetricData(satisfying, tolerating, frustrating, apdexT, apdexT, 0)) +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala deleted file mode 100644 index a8c54684..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.newrelic - -import kamon.metric._ -import kamon.metric.TraceMetrics.ElapsedTime -import akka.actor.Actor -import kamon.Kamon -import kamon.metric.instrument.Histogram - -trait WebTransactionMetrics { - self: Actor ⇒ - - def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { - val newRelicExtension = Kamon(NewRelic)(context.system) - val apdexBuilder = new ApdexBuilder("Apdex", None, newRelicExtension.apdexT) - val collectionContext = newRelicExtension.collectionContext - - // Trace metrics are recorded in nanoseconds. - var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - - val webTransactionMetrics = metrics.collect { - case (TraceMetrics(name), groupSnapshot) ⇒ - - groupSnapshot.metrics collect { - case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext) - snapshot.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count) - } - - toNewRelicMetric(Scale.Unit)(s"WebTransaction/Custom/$name", None, snapshot) - } - } - - val httpDispatcher = toNewRelicMetric(Scale.Unit)("HttpDispatcher", None, accumulatedHttpDispatcher) - val webTransaction = toNewRelicMetric(Scale.Unit)("WebTransaction", None, accumulatedHttpDispatcher) - - Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq - } -} - -class ApdexBuilder(name: String, scope: Option[String], apdexT: Double) { - val frustratingThreshold = 4 * apdexT - - var satisfying = 0L - var tolerating = 0L - var frustrating = 0L - - def record(duration: Double, count: Long): Unit = - if (duration <= apdexT) - satisfying += count - else if (duration <= frustratingThreshold) - tolerating += count - else - frustrating += count - - // NewRelic reuses the same metric structure for recording the Apdex.. weird, but that's how it works. - def build: NewRelic.Metric = NewRelic.Metric(name, scope, satisfying, tolerating, frustrating, apdexT, apdexT, 0) -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala index 89a8b15b..06c3dad0 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala @@ -1,45 +1,5 @@ -/*========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - * - */ - package kamon -import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metric.{ MetricSnapshot, Scale } - package object newrelic { - - def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = { - snapshot match { - case hs: Histogram.Snapshot ⇒ - var total: Double = 0D - var sumOfSquares: Double = 0D - val scaledMin = Scale.convert(hs.scale, scale, hs.min) - val scaledMax = Scale.convert(hs.scale, scale, hs.max) - - hs.recordsIterator.foreach { record ⇒ - val scaledValue = Scale.convert(hs.scale, scale, record.level) - - total += scaledValue * record.count - sumOfSquares += (scaledValue * scaledValue) * record.count - } - - NewRelic.Metric(name, scope, hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) - - case cs: Counter.Snapshot ⇒ - NewRelic.Metric(name, scope, cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count) - } - } + type Metric = (MetricID, MetricData) } diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala index 8b61c241..7db9f2d0 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -16,20 +16,23 @@ package kamon.newrelic +import java.lang.management.ManagementFactory + import akka.actor.{ ActorRef, ActorSystem, Props } import akka.io.IO -import akka.testkit.TestActor.{ AutoPilot, KeepRunning } import akka.testkit._ import com.typesafe.config.ConfigFactory import kamon.AkkaExtensionSwap -import kamon.newrelic.MetricTranslator.TimeSliceMetrics import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } import spray.can.Http -import spray.http.{ HttpRequest, HttpResponse, _ } - -class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll { +import spray.http._ +import spray.httpx.encoding.Deflate +import spray.httpx.{ SprayJsonSupport, RequestBuilding } +import spray.json.JsArray +import spray.json._ - import kamon.newrelic.AgentSpec._ +class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll with RequestBuilding with SprayJsonSupport { + import JsonProtocol._ implicit lazy val system: ActorSystem = ActorSystem("Agent-Spec", ConfigFactory.parseString( """ @@ -39,120 +42,180 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll { |} |kamon { | newrelic { - | retry-delay = 1 second - | max-retry = 3 + | app-name = kamon + | license-key = 1111111111 + | initialize-retry-delay = 1 second + | max-initialize-retries = 3 | } |} | """.stripMargin)) - var agent: ActorRef = _ - - setupFakeHttpManager - - "the Newrelic Agent" should { - "try to connect upon creation, retry to connect if an error occurs" in { - EventFilter.info(message = "Initialization failed: Unexpected response from HTTP transport: None, retrying in 1 seconds", occurrences = 3).intercept { - system.actorOf(Props[Agent]) - Thread.sleep(1000) - } - } - - "when everything is fine should select a NewRelic collector" in { + "the New Relic Agent" should { + "try to establish a connection to the collector upon creation" in { + val httpManager = setHttpManager(TestProbe()) + val agent = system.actorOf(Props[Agent]) + + // Request NR for a collector + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) + + // Receive the assigned collector + httpManager.reply(jsonResponse( + """ + | { + | "return_value": "collector-8.newrelic.com" + | } + | """.stripMargin)) + + // Connect to the collector + val (host, pid) = getHostAndPid() + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector-8.newrelic.com", "connect"), + s""" + | [ + | { + | "agent_version": "3.1.0", + | "app_name": [ "kamon" ], + | "host": "$host", + | "identifier": "java:kamon", + | "language": "java", + | "pid": $pid + | } + | ] + """.stripMargin.parseJson)(sprayJsonMarshaller(JsValueFormat)) + }) + + // Receive the runID EventFilter.info(message = "Agent initialized with runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { - system.actorOf(Props[Agent]) + httpManager.reply(jsonResponse( + """ + | { + | "return_value": { + | "agent_run_id": 161221111 + | } + | } + | """.stripMargin)) } } - "merge the metrics if not possible send them and do it in the next post" in { - EventFilter.info(pattern = "Trying to send metrics to NewRelic collector, attempt.*", occurrences = 2).intercept { - agent = system.actorOf(Props[Agent].withDispatcher(CallingThreadDispatcher.Id)) + "retry the connection in case it fails" in { + val httpManager = setHttpManager(TestProbe()) + val agent = system.actorOf(Props[Agent]) - for (_ ← 1 to 3) { - sendDelayedMetric(agent) - } - } - } + // Request NR for a collector + val request = httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) - "when the connection is re-established, the metrics should be send" in { - EventFilter.info(message = "Sending metrics to NewRelic collector", occurrences = 2).intercept { - sendDelayedMetric(agent) + // Fail the request. + EventFilter[RuntimeException](start = "Initialization failed, retrying in 1 seconds", occurrences = 1).intercept { + httpManager.reply(Timedout(request)) } + + // Request NR for a collector, second attempt + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) + + // Receive the assigned collector + httpManager.reply(jsonResponse( + """ + | { + | "return_value": "collector-8.newrelic.com" + | } + | """.stripMargin)) + + // Connect to the collector + val (host, pid) = getHostAndPid() + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector-8.newrelic.com", "connect"), + s""" + | [ + | { + | "agent_version": "3.1.0", + | "app_name": [ "kamon" ], + | "host": "$host", + | "identifier": "java:kamon", + | "language": "java", + | "pid": $pid + | } + | ] + """.stripMargin.parseJson)(sprayJsonMarshaller(JsValueFormat)) + }) + + // Receive the runID + EventFilter.info( + message = "Agent initialized with runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + + httpManager.reply(jsonResponse( + """ + | { + | "return_value": { + | "agent_run_id": 161221112 + | } + | } + | """.stripMargin)) + } } - } - def setupFakeHttpManager: Unit = { - val ConnectionAttempts = 3 // an arbitrary value only for testing purposes - val PostAttempts = 3 // if the number is achieved, the metrics should be discarded - val fakeHttpManager = TestProbe() - var attemptsToConnect: Int = 0 // should retry grab an NewRelic collector after retry-delay - var attemptsToSendMetrics: Int = 0 - - fakeHttpManager.setAutoPilot(new TestActor.AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = { - msg match { - case HttpRequest(_, uri, _, _, _) if rawMethodIs("get_redirect_host", uri) ⇒ - if (attemptsToConnect == ConnectionAttempts) { - sender ! jsonResponse( - """ - | { - | "return_value": "collector-8.newrelic.com" - | } - | """.stripMargin) - system.log.info("Selecting Collector") - - } else { - sender ! None - attemptsToConnect += 1 - system.log.info("Network Error or Connection Refuse") - } - - case HttpRequest(_, uri, _, _, _) if rawMethodIs("connect", uri) ⇒ - sender ! jsonResponse( - """ - | { - | "return_value": { - | "agent_run_id": 161221111 - | } - | } - | """.stripMargin) - system.log.info("Connecting") - - case HttpRequest(_, uri, _, _, _) if rawMethodIs("metric_data", uri) ⇒ - if (attemptsToSendMetrics < PostAttempts) { - sender ! None - attemptsToSendMetrics += 1 - system.log.info("Error when trying to send metrics to NewRelic collector, the metrics will be merged") - } else { - system.log.info("Sending metrics to NewRelic collector") - } + "give up the connection after max-initialize-retries" in { + val httpManager = setHttpManager(TestProbe()) + val agent = system.actorOf(Props[Agent]) + + // First attempt and two retries + for (_ ← 1 to 3) { + + // Request NR for a collector + val request = httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) + + // Fail the request. + EventFilter[RuntimeException](start = "Initialization failed, retrying in 1 seconds", occurrences = 1).intercept { + httpManager.reply(Timedout(request)) } - KeepRunning } - def jsonResponse(json: String): HttpResponse = { - HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json)) - } + // Final retry. Request NR for a collector + val request = httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) - def rawMethodIs(method: String, uri: Uri): Boolean = { - uri.query.get("method").filter(_ == method).isDefined + // Give up on connecting. + EventFilter[RuntimeException](message = "Giving up while trying to set up a connection with the New Relic collector.", occurrences = 1).intercept { + httpManager.reply(Timedout(request)) } - }) + } + } + def setHttpManager(probe: TestProbe): TestProbe = { AkkaExtensionSwap.swap(system, Http, new IO.Extension { - def manager: ActorRef = fakeHttpManager.ref + def manager: ActorRef = probe.ref }) + probe + } + + def rawMethodUri(host: String, methodName: String): Uri = { + Uri(s"http://$host/agent_listener/invoke_raw_method").withQuery( + "method" -> methodName, + "license_key" -> "1111111111", + "marshal_format" -> "json", + "protocol_version" -> "12") + } + + def jsonResponse(json: String): HttpResponse = { + HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json)) } - override def afterAll() { - super.afterAll() - system.shutdown() + def getHostAndPid(): (String, String) = { + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + (runtimeName(1), runtimeName(0)) } -} -object AgentSpec { - def sendDelayedMetric(agent: ActorRef, delay: Int = 1000): Unit = { - agent ! TimeSliceMetrics(100000L, 200000L, Map("Latency" -> NewRelic.Metric("Latency", None, 1000L, 2000D, 3000D, 1D, 100000D, 300D))) - Thread.sleep(delay) + implicit def JsValueFormat = new RootJsonFormat[JsValue] { + def write(value: JsValue) = value + def read(value: JsValue) = value } }
\ No newline at end of file diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala new file mode 100644 index 00000000..0001072e --- /dev/null +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -0,0 +1,149 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import akka.actor.{ ActorRef, ActorSystem } +import akka.io.IO +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import kamon.metric.{ TraceMetrics, Metrics } +import kamon.{ Kamon, AkkaExtensionSwap } +import kamon.metric.Subscriptions.TickMetricSnapshot +import org.scalatest.{ Matchers, WordSpecLike } +import spray.can.Http +import spray.http.Uri.Query +import spray.http._ +import spray.httpx.encoding.Deflate +import spray.httpx.{ RequestBuilding, SprayJsonSupport } +import scala.concurrent.duration._ +import spray.json._ + +class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with SprayJsonSupport { + import kamon.newrelic.JsonProtocol._ + + implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.testkit.TestEventListener"] + | loglevel = "INFO" + |} + |kamon { + | metric { + | tick-interval = 1 hour + | } + |} + | + """.stripMargin)) + + val agentSettings = Agent.Settings("1111111111", "kamon", "test-host", 1, 1, 30 seconds, 1D) + val baseQuery = Query( + "license_key" -> agentSettings.licenseKey, + "marshal_format" -> "json", + "protocol_version" -> "12") + val baseCollectorUri = Uri("http://collector-1.newrelic.com/agent_listener/invoke_raw_method").withQuery(baseQuery) + + "the MetricReporter" should { + "report metrics to New Relic upon arrival" in new FakeTickSnapshotsFixture { + val httpManager = setHttpManager(TestProbe()) + val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) + + metricReporter ! firstSnapshot + val metricPost = httpManager.expectMsgType[HttpRequest] + + metricPost.method should be(HttpMethods.POST) + metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data")) + metricPost.encoding should be(HttpEncodings.deflate) + + val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] + postedBatch.runID should be(9999) + postedBatch.timeSliceMetrics.from should be(1415587618) + postedBatch.timeSliceMetrics.to should be(1415587678) + + val metrics = postedBatch.timeSliceMetrics.metrics + metrics(MetricID("Apdex", None)).callCount should be(3) + metrics(MetricID("WebTransaction", None)).callCount should be(3) + metrics(MetricID("HttpDispatcher", None)).callCount should be(3) + } + + "accumulate metrics if posting fails" in new FakeTickSnapshotsFixture { + val httpManager = setHttpManager(TestProbe()) + val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) + + metricReporter ! firstSnapshot + val request = httpManager.expectMsgType[HttpRequest] + httpManager.reply(Timedout(request)) + + metricReporter ! secondSnapshot + val metricPost = httpManager.expectMsgType[HttpRequest] + + metricPost.method should be(HttpMethods.POST) + metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data")) + metricPost.encoding should be(HttpEncodings.deflate) + + val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] + postedBatch.runID should be(9999) + postedBatch.timeSliceMetrics.from should be(1415587618) + postedBatch.timeSliceMetrics.to should be(1415587738) + + val metrics = postedBatch.timeSliceMetrics.metrics + metrics(MetricID("Apdex", None)).callCount should be(6) + metrics(MetricID("WebTransaction", None)).callCount should be(6) + metrics(MetricID("HttpDispatcher", None)).callCount should be(6) + } + } + + def setHttpManager(probe: TestProbe): TestProbe = { + AkkaExtensionSwap.swap(system, Http, new IO.Extension { + def manager: ActorRef = probe.ref + }) + probe + } + + def rawMethodUri(host: String, methodName: String): Uri = { + Uri(s"http://$host/agent_listener/invoke_raw_method").withQuery( + "method" -> methodName, + "run_id" -> "9999", + "license_key" -> "1111111111", + "marshal_format" -> "json", + "protocol_version" -> "12") + } + + def compactJsonEntity(jsonString: String): HttpEntity = { + import spray.json._ + + val compactJson = jsonString.parseJson.compactPrint + HttpEntity(ContentTypes.`application/json`, compactJson) + } + + trait FakeTickSnapshotsFixture { + val testTraceID = TraceMetrics("example-trace") + val recorder = Kamon(Metrics).register(testTraceID, TraceMetrics.Factory).get + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + + def collectRecorder = recorder.collect(collectionContext) + + recorder.elapsedTime.record(1000000) + recorder.elapsedTime.record(2000000) + recorder.elapsedTime.record(3000000) + val firstSnapshot = TickMetricSnapshot(1415587618000L, 1415587678000L, Map(testTraceID -> collectRecorder)) + + recorder.elapsedTime.record(6000000) + recorder.elapsedTime.record(5000000) + recorder.elapsedTime.record(4000000) + val secondSnapshot = TickMetricSnapshot(1415587678000L, 1415587738000L, Map(testTraceID -> collectRecorder)) + } +}
\ No newline at end of file diff --git a/kamon-play/src/main/scala/kamon/play/Play.scala b/kamon-play/src/main/scala/kamon/play/Play.scala index 0f23baf5..2184fa84 100644 --- a/kamon-play/src/main/scala/kamon/play/Play.scala +++ b/kamon-play/src/main/scala/kamon/play/Play.scala @@ -27,6 +27,8 @@ import play.api.mvc.RequestHeader object Play extends ExtensionId[PlayExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Play override def createExtension(system: ExtendedActorSystem): PlayExtension = new PlayExtension(system) + + val SegmentLibraryName = "WS-client" } class PlayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index 897acce6..b44e45a3 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -86,7 +86,7 @@ class RequestInstrumentation { } object RequestInstrumentation { - + import kamon.metric.Metrics.AtomicGetOrElseUpdateForTriemap import java.util.Locale import scala.collection.concurrent.TrieMap @@ -94,7 +94,7 @@ object RequestInstrumentation { def normaliseTraceName(requestHeader: RequestHeader): Option[String] = requestHeader.tags.get(Routes.ROUTE_VERB).map({ verb ⇒ val path = requestHeader.tags(Routes.ROUTE_PATTERN) - cache.getOrElseUpdate(s"$verb$path", { + cache.atomicGetOrElseUpdate(s"$verb$path", { val traceName = { // Convert paths of form GET /foo/bar/$paramname<regexp>/blah to foo.bar.paramname.blah.get val p = path.replaceAll("""\$([^<]+)<[^>]+>""", "$1").replace('/', '.').dropWhile(_ == '.') diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index c58e9f0c..2791df80 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -18,7 +18,7 @@ package kamon.play.instrumentation import kamon.Kamon import kamon.play.Play -import kamon.trace.SegmentMetricIdentityLabel +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import kamon.trace.TraceRecorder @@ -38,7 +38,7 @@ class WSInstrumentation { val playExtension = Kamon(Play)(system) val executor = playExtension.defaultDispatcher val segmentName = playExtension.generateHttpClientSegmentName(request) - val segment = ctx.startSegment(segmentName, SegmentMetricIdentityLabel.HttpClient) + val segment = ctx.startSegment(segmentName, SegmentMetricIdentityLabel.HttpClient, Play.SegmentLibraryName) val response = pjp.proceed().asInstanceOf[Future[Response]] response.map(result ⇒ segment.finish())(executor) diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala index bf1ead05..bda8281b 100644 --- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala @@ -19,7 +19,7 @@ package kamon.play import kamon.Kamon import kamon.metric.TraceMetrics.TraceMetricsSnapshot import kamon.metric.{ Metrics, TraceMetrics } -import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } import org.scalatest.{ Matchers, WordSpecLike } import org.scalatestplus.play.OneServerPerSuite import play.api.libs.ws.WS @@ -49,7 +49,7 @@ class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPer val snapshot = takeSnapshotOf("GET: /inside") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentCategory.HttpClient, Play.SegmentLibraryName)).numberOfMeasurements should be(1) } "propagate the TraceContext outside an Action and complete the WS request" in { diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 32f0269d..86a87439 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,5 +1,5 @@ akka { - loglevel = INFO + loglevel = DEBUG extensions = ["kamon.newrelic.NewRelic"] actor { diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index af1eb793..8cb8cc22 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -23,7 +23,7 @@ import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.metric._ import kamon.spray.KamonTraceDirectives -import kamon.trace.{ SegmentMetricIdentityLabel, TraceRecorder } +import kamon.trace.{ SegmentCategory, TraceRecorder } import spray.http.{ StatusCodes, Uri } import spray.httpx.RequestBuilding import spray.routing.SimpleRoutingApp @@ -83,8 +83,11 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("site") { - complete { - pipeline(Get("http://localhost:9090/site-redirect")) + traceName("FinalGetSite-3") { + complete { + for (f1 <- pipeline(Get("http://127.0.0.1:9090/ok")); + f2 <- pipeline(Get("http://www.google.com/search?q=mkyong"))) yield "Ok Double Future" + } } } ~ path("site-redirect") { @@ -99,14 +102,14 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("ok") { - traceName("OK") { + traceName("RespondWithOK-3") { complete { "ok" } } } ~ path("future") { - traceName("OK-Future") { + traceName("OKFuture") { dynamic { counter.increment() complete(Future { "OK" }) @@ -127,7 +130,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } ~ path("segment") { complete { - val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentMetricIdentityLabel.HttpClient) + val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentCategory.HttpClient, "none") (replier ? "hello").mapTo[String].onComplete { t ⇒ segment.finish() } diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala index c1c81116..ab8d6a7d 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala @@ -21,12 +21,14 @@ import akka.actor import kamon.Kamon import kamon.http.HttpServerMetrics import kamon.metric.Metrics +import spray.http.HttpHeaders.Host import spray.http.HttpRequest object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Spray def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system) + val SegmentLibraryName = "spray-client" } object ClientSegmentCollectionStrategy { @@ -66,7 +68,14 @@ trait SprayNameGenerator { } class DefaultSprayNameGenerator extends SprayNameGenerator { - def generateRequestLevelApiSegmentName(request: HttpRequest): String = request.method.value + ": " + request.uri.path + def hostFromHeaders(request: HttpRequest): Option[String] = request.header[Host].map(_.host) + + def generateRequestLevelApiSegmentName(request: HttpRequest): String = { + val uriAddress = request.uri.authority.host.address + if (uriAddress.equals("")) hostFromHeaders(request).getOrElse("unknown-host") else uriAddress + } + + def generateHostLevelApiSegmentName(request: HttpRequest): String = hostFromHeaders(request).getOrElse("unknown-host") + def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path - def generateHostLevelApiSegmentName(request: HttpRequest): String = request.uri.authority.host.address } diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index 94fc3572..813915c4 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -53,7 +53,7 @@ class ClientRequestInstrumentation { if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { if (requestContext.segment.isEmpty) { val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName) requestContext.segment = segment } @@ -116,7 +116,7 @@ class ClientRequestInstrumentation { val sprayExtension = Kamon(Spray)(system) val segment = if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) - ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient) + ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName) else EmptyTraceContext.EmptySegment diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index eb25412b..93a9cf55 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -90,11 +90,12 @@ class ServerRequestInstrumentation { def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = { def publishWarning(text: String, system: ActorSystem): Unit = - system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text)) + system.eventStream.publish(Warning("ServerRequestInstrumentation", classOf[ServerRequestInstrumentation], text)) - if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token) - publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) - else + if (incomingTraceContext.nonEmpty) { + if (incomingTraceContext.token != storedTraceContext.token) + publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) + } else publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system) } diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 57f9ebe1..b90b0f3b 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -23,7 +23,7 @@ import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader @@ -144,7 +144,8 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api") traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", + SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) } "rename a request level api segment once it reaches the relevant host connector" in { @@ -174,7 +175,8 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api") traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", + SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) } } @@ -261,7 +263,8 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api") traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", + SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) } } } diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 8b0e7992..58fb3658 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -82,7 +82,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { // Subscribe to SystemMetrics val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics") if (includeSystemMetrics) { - List(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics) foreach { metric ⇒ + Seq(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics, ContextSwitchesMetrics) foreach { metric ⇒ Kamon(Metrics)(system).subscribe(metric, "*", statsDMetricsListener, permanently = true) } } diff --git a/project/AspectJ.scala b/project/AspectJ.scala index f33555e1..74513bd6 100644 --- a/project/AspectJ.scala +++ b/project/AspectJ.scala @@ -1,12 +1,12 @@ import sbt._ import sbt.Keys._ -import com.typesafe.sbt.SbtAspectj.{ Aspectj, aspectjSettings } +import com.typesafe.sbt.SbtAspectj.{ Aspectj, defaultAspectjSettings } import com.typesafe.sbt.SbtAspectj.AspectjKeys.{ aspectjVersion, compileOnly, lintProperties, weaverOptions } object AspectJ { - lazy val aspectJSettings = aspectjSettings ++ Seq( + lazy val aspectJSettings = inConfig(Aspectj)(defaultAspectjSettings) ++ aspectjDependencySettings ++ Seq( aspectjVersion in Aspectj := Dependencies.aspectjVersion, compileOnly in Aspectj := true, fork in Test := true, @@ -14,4 +14,13 @@ object AspectJ { javaOptions in run <++= weaverOptions in Aspectj, lintProperties in Aspectj += "invalidAbsoluteTypeName = ignore" ) + + def aspectjDependencySettings = Seq( + ivyConfigurations += Aspectj, + libraryDependencies <++= (aspectjVersion in Aspectj) { version => Seq( + "org.aspectj" % "aspectjtools" % version % Aspectj.name, + "org.aspectj" % "aspectjweaver" % version % Aspectj.name, + "org.aspectj" % "aspectjrt" % version % Aspectj.name + )} + ) }
\ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a23eaf24..07d62dfb 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val sprayVersion = "1.2.2" val akkaVersion = "2.2.4" - val aspectjVersion = "1.8.1" + val aspectjVersion = "1.8.4" val slf4jVersion = "1.7.6" val playVersion = "2.2.5" @@ -17,9 +17,8 @@ object Dependencies { val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.5.4" val scalatest = "org.scalatest" %% "scalatest" % "2.2.1" val logback = "ch.qos.logback" % "logback-classic" % "1.0.13" - val aspectJ = "org.aspectj" % "aspectjrt" % aspectjVersion + val aspectJ = "org.aspectj" % "aspectjweaver" % aspectjVersion val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "3.11.0" - val snakeYaml = "org.yaml" % "snakeyaml" % "1.13" val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.2.1" val sprayCan = "io.spray" % "spray-can" % sprayVersion val sprayRouting = "io.spray" % "spray-routing" % sprayVersion @@ -30,7 +29,8 @@ object Dependencies { val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion val akkaRemote = "com.typesafe.akka" %% "akka-remote" % akkaVersion val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % akkaVersion - val playTest = "org.scalatestplus" %% "play" % "1.3.0" + val play = "com.typesafe.play" %% "play" % playVersion + val playTest = "org.scalatestplus" %% "play" % "1.2.0" val slf4Api = "org.slf4j" % "slf4j-api" % slf4jVersion val slf4nop = "org.slf4j" % "slf4j-nop" % slf4jVersion val scalaCompiler = "org.scala-lang" % "scala-compiler" % Settings.ScalaVersion diff --git a/project/Projects.scala b/project/Projects.scala index bf57f2be..69b4b065 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -23,9 +23,10 @@ object Projects extends Build { mappings in (Compile, packageBin) ++= mappings.in(kamonMacros, Compile, packageBin).value, mappings in (Compile, packageSrc) ++= mappings.in(kamonMacros, Compile, packageSrc).value, libraryDependencies ++= - compile(akkaActor, aspectJ, hdrHistogram) ++ + compile(akkaActor, hdrHistogram) ++ + provided(aspectJ) ++ optional(logback, scalazConcurrent) ++ - test(scalatest, akkaTestKit, sprayTestkit, akkaSlf4j, logback)) + test(scalatest, akkaTestKit, akkaSlf4j, logback)) lazy val kamonAkkaRemote = Project("kamon-akka-remote", file("kamon-akka-remote")) @@ -36,6 +37,7 @@ object Projects extends Build { .settings( libraryDependencies ++= compile(akkaRemote, akkaCluster) ++ + provided(aspectJ) ++ test(scalatest, akkaTestKit)) @@ -48,7 +50,8 @@ object Projects extends Build { mappings in (Compile, packageBin) ++= mappings.in(kamonMacros, Compile, packageBin).value, mappings in (Compile, packageSrc) ++= mappings.in(kamonMacros, Compile, packageSrc).value, libraryDependencies ++= - compile(akkaActor, aspectJ, sprayCan, sprayClient, sprayRouting) ++ + compile(akkaActor, sprayCan, sprayClient, sprayRouting) ++ + provided(aspectJ) ++ test(scalatest, akkaTestKit, sprayTestkit, slf4Api, slf4nop)) .dependsOn(kamonCore) .dependsOn(kamonTestkit % "test") @@ -60,7 +63,8 @@ object Projects extends Build { .settings(aspectJSettings: _*) .settings( libraryDependencies ++= - compile(aspectJ, sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, snakeYaml, akkaSlf4j) ++ + compile(sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, akkaSlf4j) ++ + provided(aspectJ) ++ test(scalatest, akkaTestKit, sprayTestkit, slf4Api, akkaSlf4j)) .dependsOn(kamonCore) @@ -79,41 +83,60 @@ object Projects extends Build { lazy val kamonDashboard = Project("kamon-dashboard", file("kamon-dashboard")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayRouting, sprayCan, sprayJson)) + .settings( + libraryDependencies ++= + compile(akkaActor, akkaSlf4j, sprayRouting, sprayCan, sprayJson)) .dependsOn(kamonCore) lazy val kamonTestkit = Project("kamon-testkit", file("kamon-testkit")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor, akkaTestKit, aspectJ) ++ test(slf4Api, slf4nop)) + .settings( + libraryDependencies ++= + compile(akkaActor, akkaTestKit) ++ + provided(aspectJ) ++ + test(slf4Api, slf4nop)) .dependsOn(kamonCore) lazy val kamonPlay = Project("kamon-play", file("kamon-play")) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(aspectJSettings: _*) - .settings(libraryDependencies ++= compile(playTest, aspectJ) ++ test(playTest, akkaTestKit, slf4Api)) + .settings( + libraryDependencies ++= + compile(play) ++ + provided(aspectJ) ++ + test(playTest, akkaTestKit, slf4Api)) .dependsOn(kamonCore) lazy val kamonStatsD = Project("kamon-statsd", file("kamon-statsd")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ + test(scalatest, akkaTestKit, slf4Api, slf4nop)) .dependsOn(kamonCore) .dependsOn(kamonSystemMetrics % "provided") lazy val kamonDatadog = Project("kamon-datadog", file("kamon-datadog")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ + test(scalatest, akkaTestKit, slf4Api, slf4nop)) .dependsOn(kamonCore) .dependsOn(kamonSystemMetrics % "provided") lazy val kamonLogReporter = Project("kamon-log-reporter", file("kamon-log-reporter")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ + test(scalatest, akkaTestKit, slf4Api, slf4nop)) .dependsOn(kamonCore) .dependsOn(kamonSystemMetrics % "provided") @@ -126,8 +149,11 @@ object Projects extends Build { lazy val kamonSystemMetrics = Project("kamon-system-metrics", file("kamon-system-metrics")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(sigar) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) .settings(fork in Test := true) + .settings( + libraryDependencies ++= + compile(sigar) ++ + test(scalatest, akkaTestKit, slf4Api, slf4nop)) .dependsOn(kamonCore) val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false) |