From c23450f4cc1baa5afdc54aae02b9adb746472381 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 9 Nov 2014 17:20:36 +0100 Subject: = core,play: workaround the non thread safe calls to TrieMap.getOrElseUpdate --- .../src/main/scala/kamon/metric/MetricsExtension.scala | 12 +++++++++++- kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala | 7 ++++--- kamon-core/src/main/scala/kamon/metric/UserMetrics.scala | 15 ++++++++------- 3 files changed, 23 insertions(+), 11 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/metric') diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index 51cda6b2..cc7eb5f0 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -30,6 +30,8 @@ import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } import java.util.concurrent.TimeUnit class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + import Metrics.AtomicGetOrElseUpdateForTriemap + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) @@ -46,7 +48,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) + Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) else None } @@ -131,4 +133,12 @@ object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } + + implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { + def atomicGetOrElseUpdate(key: K, op: => V): V = + trieMap.get(key) match { + case Some(v) => v + case None => val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) + } + } } diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 7246ccb5..4c5ad6ce 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -27,6 +27,8 @@ case class TraceMetrics(name: String) extends MetricGroupIdentity { } object TraceMetrics extends MetricGroupCategory { + import Metrics.AtomicGetOrElseUpdateForTriemap + val name = "trace" case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } @@ -37,7 +39,7 @@ object TraceMetrics extends MetricGroupCategory { val segments = TrieMap[MetricIdentity, Histogram]() def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = - segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) def collect(context: CollectionContext): TraceMetricsSnapshot = TraceMetricsSnapshot( @@ -53,7 +55,7 @@ object TraceMetrics extends MetricGroupCategory { type GroupSnapshotType = TraceMetricsSnapshot def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) // TODO: Merge the segments metrics correctly and test it! + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) => l.merge(r, context))) def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) } @@ -69,7 +71,6 @@ case object TraceMetricGroupFactory extends MetricGroupFactory { type GroupRecorder = TraceMetricRecorder def create(config: Config, system: ActorSystem): TraceMetricRecorder = { - val settings = config.getConfig("precision.trace") val elapsedTimeConfig = settings.getConfig("elapsed-time") val segmentConfig = settings.getConfig("segment") diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala index b511b4bc..b7ac1ac5 100644 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -8,6 +8,7 @@ import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } import scala.concurrent.duration.FiniteDuration class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + import Metrics.AtomicGetOrElseUpdateForTriemap import UserMetrics._ lazy val metricsExtension = Kamon(Metrics)(system) @@ -18,45 +19,45 @@ class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = { - metricsExtension.storage.getOrElseUpdate(UserHistogram(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit)) }).asInstanceOf[UserHistogramRecorder].histogram } def registerHistogram(name: String): Histogram = { - metricsExtension.storage.getOrElseUpdate(UserHistogram(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig)) }).asInstanceOf[UserHistogramRecorder].histogram } def registerCounter(name: String): Counter = { - metricsExtension.storage.getOrElseUpdate(UserCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), { UserCounterRecorder(Counter()) }).asInstanceOf[UserCounterRecorder].counter } def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, refreshInterval: FiniteDuration): MinMaxCounter = { - metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter } def registerMinMaxCounter(name: String): MinMaxCounter = { - metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter } def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.getOrElseUpdate(UserGauge(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) }).asInstanceOf[UserGaugeRecorder].gauge } def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.getOrElseUpdate(UserGauge(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) }).asInstanceOf[UserGaugeRecorder].gauge } -- cgit v1.2.3