diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-07-03 14:36:42 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-07-03 14:36:18 -0300 |
commit | a3353d3e3fcb1dfab3e8f401187e236e99df2202 (patch) | |
tree | 4e9e246201cf169f1496bc72928ea2d35d03fcd0 /kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | |
parent | 6d7970c6dd5b96b512c846181771bb11a43bc82a (diff) | |
download | Kamon-a3353d3e3fcb1dfab3e8f401187e236e99df2202.tar.gz Kamon-a3353d3e3fcb1dfab3e8f401187e236e99df2202.tar.bz2 Kamon-a3353d3e3fcb1dfab3e8f401187e236e99df2202.zip |
! all: refactor the core metric recording instruments and accomodate UserMetrics
This PR is including several changes to the kamon-core, most notably:
- Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure
that the interfaces are as clean as possible.
- Move away from the all Vector[Measurement] based Histogram snapshot to a new approach
in which we use a single long to store both the index in the counts array and the
frequency on that bucket. The leftmost 2 bytes of each long are used for storing the
counts array index and the remaining 6 bytes are used for the actual count, and
everything is put into a simple long array. This way only the buckets that actually
have values will be included in the snapshot with the smallest possible memory
footprint.
- Introduce Gauges.
- Reorganize the instrumentation for Akka and Scala and rewrite most of the tests
of this components to avoid going through the subscription protocol to test.
- Introduce trace tests and fixes on various tests.
- Necessary changes on new relic, datadog and statsd modules to compile with the new
codebase.
Pending:
- Finish the upgrade of the new relic to the current model.
- Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes
limits.
- More testing, more testing, more testing.
- Create the KamonStandalone module.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | 129 |
1 files changed, 0 insertions, 129 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala deleted file mode 100644 index c9990229..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import akka.actor.{ Props, ActorRef, Actor } -import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } -import kamon.util.GlobPathFilter -import scala.concurrent.duration.{ FiniteDuration, Duration } -import java.util.concurrent.TimeUnit -import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } - -class Subscriptions extends Actor { - import context.system - - val config = context.system.settings.config - val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) - val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - - var lastTick: Long = System.currentTimeMillis() - var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - - def receive = { - case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) - case FlushMetrics ⇒ flush() - } - - def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { - val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) - if (permanent) { - val receivers = subscribedPermanently.get(filter).getOrElse(Nil) - subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) - - } else { - val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) - subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) - } - - } - - def flush(): Unit = { - val currentTick = System.currentTimeMillis() - val snapshots = Kamon(Metrics).collect - - dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) - - lastTick = currentTick - subscribedForOneShot = Map.empty - } - - def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - - for ((filter, receivers) ← subscriptions) yield { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - receivers.foreach(_ ! tickMetrics) - } - } -} - -object Subscriptions { - case object FlushMetrics - case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) - case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - - case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { - def accept(identity: MetricGroupIdentity): Boolean = { - category.equals(identity.category) && globFilter.accept(identity.name) - } - } -} - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - - def receive = empty - - def empty: Actor.Receive = { - case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) - case FlushBuffer ⇒ // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) ⇒ - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become (buffering(combinedSnapshot)) - - case FlushBuffer ⇒ - receiver ! buffered - context become (empty) - - } - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} |