diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 654c37b0..2111563b 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -16,12 +16,13 @@ package kamon.metrics -import akka.actor.{ ActorRef, Actor } +import akka.actor.{Props, ActorRef, Actor} import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } import kamon.util.GlobPathFilter -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{FiniteDuration, Duration} import java.util.concurrent.TimeUnit import kamon.Kamon +import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer class Subscriptions extends Actor { import context.system @@ -85,5 +86,46 @@ object Subscriptions { category.equals(identity.category) && globFilter.accept(identity.name) } } +} + + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + + def receive = empty + + def empty: Actor.Receive = { + case tick : TickMetricSnapshot => context become(buffering(tick)) + case FlushBuffer => // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) => + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become(buffering(combinedSnapshot)) + + case FlushBuffer => + receiver ! buffered + context become(empty) + + } + + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { + val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r)) + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) } |