aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-11-09 17:20:36 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-11-09 18:01:46 +0100
commit88bc1b94b2d737e01074a8f41391367f59796157 (patch)
treecd31ef5e5573f4b7d1ef55961cbd9208901c70fa
parentef21f8d0ade1015f71f1d289c8d041e2b525d824 (diff)
downloadKamon-88bc1b94b2d737e01074a8f41391367f59796157.tar.gz
Kamon-88bc1b94b2d737e01074a8f41391367f59796157.tar.bz2
Kamon-88bc1b94b2d737e01074a8f41391367f59796157.zip
= core,play: workaround the non thread safe calls to TrieMap.getOrElseUpdate
-rw-r--r--kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala7
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala3
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala4
6 files changed, 31 insertions, 16 deletions
diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
index dfa4bcb8..0dd189f6 100644
--- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
@@ -8,6 +8,8 @@ import kamon.metric._
import scala.collection.concurrent.TrieMap
object HttpServerMetrics extends MetricGroupIdentity {
+ import Metrics.AtomicGetOrElseUpdateForTriemap
+
val name: String = "http-server-metrics-recorder"
val category = new MetricGroupCategory {
val name: String = "http-server"
@@ -32,13 +34,13 @@ object HttpServerMetrics extends MetricGroupIdentity {
def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L)
def recordResponse(statusCode: StatusCode, count: Long): Unit =
- counters.getOrElseUpdate(statusCode, Counter()).increment(count)
+ counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count)
def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L)
def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = {
recordResponse(statusCode, count)
- countersPerTrace.getOrElseUpdate(traceName, TrieMap()).getOrElseUpdate(statusCode, Counter()).increment(count)
+ countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count)
}
def collect(context: CollectionContext): HttpServerMetricsSnapshot = {
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index 51cda6b2..cc7eb5f0 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -30,6 +30,8 @@ import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe }
import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ import Metrics.AtomicGetOrElseUpdateForTriemap
+
val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error"))
@@ -46,7 +48,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
if (shouldTrack(identity))
- Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder])
+ Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder])
else
None
}
@@ -131,4 +133,12 @@ object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
}
+
+ implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) {
+ def atomicGetOrElseUpdate(key: K, op: => V): V =
+ trieMap.get(key) match {
+ case Some(v) => v
+ case None => val d = op; trieMap.putIfAbsent(key, d).getOrElse(d)
+ }
+ }
}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
index 7246ccb5..4c5ad6ce 100644
--- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -27,6 +27,8 @@ case class TraceMetrics(name: String) extends MetricGroupIdentity {
}
object TraceMetrics extends MetricGroupCategory {
+ import Metrics.AtomicGetOrElseUpdateForTriemap
+
val name = "trace"
case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
@@ -37,7 +39,7 @@ object TraceMetrics extends MetricGroupCategory {
val segments = TrieMap[MetricIdentity, Histogram]()
def segmentRecorder(segmentIdentity: MetricIdentity): Histogram =
- segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
+ segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
def collect(context: CollectionContext): TraceMetricsSnapshot =
TraceMetricsSnapshot(
@@ -53,7 +55,7 @@ object TraceMetrics extends MetricGroupCategory {
type GroupSnapshotType = TraceMetricsSnapshot
def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot =
- TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) // TODO: Merge the segments metrics correctly and test it!
+ TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) => l.merge(r, context)))
def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime)
}
@@ -69,7 +71,6 @@ case object TraceMetricGroupFactory extends MetricGroupFactory {
type GroupRecorder = TraceMetricRecorder
def create(config: Config, system: ActorSystem): TraceMetricRecorder = {
-
val settings = config.getConfig("precision.trace")
val elapsedTimeConfig = settings.getConfig("elapsed-time")
val segmentConfig = settings.getConfig("segment")
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
index b511b4bc..b7ac1ac5 100644
--- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -8,6 +8,7 @@ import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
import scala.concurrent.duration.FiniteDuration
class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ import Metrics.AtomicGetOrElseUpdateForTriemap
import UserMetrics._
lazy val metricsExtension = Kamon(Metrics)(system)
@@ -18,45 +19,45 @@ class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension
val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision")
def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = {
- metricsExtension.storage.getOrElseUpdate(UserHistogram(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), {
UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit))
}).asInstanceOf[UserHistogramRecorder].histogram
}
def registerHistogram(name: String): Histogram = {
- metricsExtension.storage.getOrElseUpdate(UserHistogram(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), {
UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig))
}).asInstanceOf[UserHistogramRecorder].histogram
}
def registerCounter(name: String): Counter = {
- metricsExtension.storage.getOrElseUpdate(UserCounter(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), {
UserCounterRecorder(Counter())
}).asInstanceOf[UserCounterRecorder].counter
}
def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
refreshInterval: FiniteDuration): MinMaxCounter = {
- metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), {
UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system))
}).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter
}
def registerMinMaxCounter(name: String): MinMaxCounter = {
- metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), {
UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system))
}).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter
}
def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = {
- metricsExtension.storage.getOrElseUpdate(UserGauge(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), {
UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector))
}).asInstanceOf[UserGaugeRecorder].gauge
}
def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = {
- metricsExtension.storage.getOrElseUpdate(UserGauge(name), {
+ metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), {
UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector))
}).asInstanceOf[UserGaugeRecorder].gauge
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index c4c28a68..9555daba 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -34,6 +34,7 @@ sealed trait TraceContext {
def finish(): Unit
def origin: TraceContextOrigin
def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
def startSegment(segmentName: String, label: String): Segment
@@ -111,7 +112,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean,
private def finishSegment(segmentName: String, label: String, duration: Long): Unit = {
finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, label), duration))
- if (!_isOpen) {
+ if (isClosed) {
metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
drainFinishedSegments(traceMetrics)
}
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
index ca95781e..989ef43e 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
@@ -92,7 +92,7 @@ class RequestInstrumentation {
}
object RequestInstrumentation {
-
+ import kamon.metric.Metrics.AtomicGetOrElseUpdateForTriemap
import java.util.Locale
import scala.collection.concurrent.TrieMap
@@ -100,7 +100,7 @@ object RequestInstrumentation {
def normaliseTraceName(requestHeader: RequestHeader): Option[String] = requestHeader.tags.get(Routes.ROUTE_VERB).map({ verb ⇒
val path = requestHeader.tags(Routes.ROUTE_PATTERN)
- cache.getOrElseUpdate(s"$verb$path", {
+ cache.atomicGetOrElseUpdate(s"$verb$path", {
val traceName = {
// Convert paths of form GET /foo/bar/$paramname<regexp>/blah to foo.bar.paramname.blah.get
val p = path.replaceAll("""\$([^<]+)<[^>]+>""", "$1").replace('/', '.').dropWhile(_ == '.')