From c23450f4cc1baa5afdc54aae02b9adb746472381 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 9 Nov 2014 17:20:36 +0100 Subject: = core,play: workaround the non thread safe calls to TrieMap.getOrElseUpdate --- .../src/main/scala/kamon/http/HttpServerMetrics.scala | 6 ++++-- .../src/main/scala/kamon/metric/MetricsExtension.scala | 12 +++++++++++- kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala | 7 ++++--- kamon-core/src/main/scala/kamon/metric/UserMetrics.scala | 15 ++++++++------- kamon-core/src/main/scala/kamon/trace/TraceContext.scala | 3 ++- 5 files changed, 29 insertions(+), 14 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala index dfa4bcb8..0dd189f6 100644 --- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala +++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala @@ -8,6 +8,8 @@ import kamon.metric._ import scala.collection.concurrent.TrieMap object HttpServerMetrics extends MetricGroupIdentity { + import Metrics.AtomicGetOrElseUpdateForTriemap + val name: String = "http-server-metrics-recorder" val category = new MetricGroupCategory { val name: String = "http-server" @@ -32,13 +34,13 @@ object HttpServerMetrics extends MetricGroupIdentity { def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L) def recordResponse(statusCode: StatusCode, count: Long): Unit = - counters.getOrElseUpdate(statusCode, Counter()).increment(count) + counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count) def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L) def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = { recordResponse(statusCode, count) - countersPerTrace.getOrElseUpdate(traceName, TrieMap()).getOrElseUpdate(statusCode, Counter()).increment(count) + countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count) } def collect(context: CollectionContext): HttpServerMetricsSnapshot = { diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index 51cda6b2..cc7eb5f0 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -30,6 +30,8 @@ import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } import java.util.concurrent.TimeUnit class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + import Metrics.AtomicGetOrElseUpdateForTriemap + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) @@ -46,7 +48,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) + Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) else None } @@ -131,4 +133,12 @@ object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } + + implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { + def atomicGetOrElseUpdate(key: K, op: => V): V = + trieMap.get(key) match { + case Some(v) => v + case None => val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) + } + } } diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 7246ccb5..4c5ad6ce 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -27,6 +27,8 @@ case class TraceMetrics(name: String) extends MetricGroupIdentity { } object TraceMetrics extends MetricGroupCategory { + import Metrics.AtomicGetOrElseUpdateForTriemap + val name = "trace" case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } @@ -37,7 +39,7 @@ object TraceMetrics extends MetricGroupCategory { val segments = TrieMap[MetricIdentity, Histogram]() def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = - segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) def collect(context: CollectionContext): TraceMetricsSnapshot = TraceMetricsSnapshot( @@ -53,7 +55,7 @@ object TraceMetrics extends MetricGroupCategory { type GroupSnapshotType = TraceMetricsSnapshot def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) // TODO: Merge the segments metrics correctly and test it! + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) => l.merge(r, context))) def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) } @@ -69,7 +71,6 @@ case object TraceMetricGroupFactory extends MetricGroupFactory { type GroupRecorder = TraceMetricRecorder def create(config: Config, system: ActorSystem): TraceMetricRecorder = { - val settings = config.getConfig("precision.trace") val elapsedTimeConfig = settings.getConfig("elapsed-time") val segmentConfig = settings.getConfig("segment") diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala index b511b4bc..b7ac1ac5 100644 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -8,6 +8,7 @@ import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } import scala.concurrent.duration.FiniteDuration class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + import Metrics.AtomicGetOrElseUpdateForTriemap import UserMetrics._ lazy val metricsExtension = Kamon(Metrics)(system) @@ -18,45 +19,45 @@ class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = { - metricsExtension.storage.getOrElseUpdate(UserHistogram(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit)) }).asInstanceOf[UserHistogramRecorder].histogram } def registerHistogram(name: String): Histogram = { - metricsExtension.storage.getOrElseUpdate(UserHistogram(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig)) }).asInstanceOf[UserHistogramRecorder].histogram } def registerCounter(name: String): Counter = { - metricsExtension.storage.getOrElseUpdate(UserCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), { UserCounterRecorder(Counter()) }).asInstanceOf[UserCounterRecorder].counter } def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, refreshInterval: FiniteDuration): MinMaxCounter = { - metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter } def registerMinMaxCounter(name: String): MinMaxCounter = { - metricsExtension.storage.getOrElseUpdate(UserMinMaxCounter(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter } def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.getOrElseUpdate(UserGauge(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) }).asInstanceOf[UserGaugeRecorder].gauge } def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.getOrElseUpdate(UserGauge(name), { + metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) }).asInstanceOf[UserGaugeRecorder].gauge } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index c4c28a68..9555daba 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -34,6 +34,7 @@ sealed trait TraceContext { def finish(): Unit def origin: TraceContextOrigin def isOpen: Boolean + def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty def startSegment(segmentName: String, label: String): Segment @@ -111,7 +112,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, private def finishSegment(segmentName: String, label: String, duration: Long): Unit = { finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, label), duration)) - if (!_isOpen) { + if (isClosed) { metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ drainFinishedSegments(traceMetrics) } -- cgit v1.2.3