From 7307e1cc97e0363d1fb4cc116fc69a5272ca3730 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 4 Feb 2014 15:58:30 -0300 Subject: metrics buffer for easier periodic reporting --- .../kamon/metrics/TickMetricSnapshotBuffer.scala | 64 ++++++++++++++++++++++ .../src/main/scala/kamon/metrics/package.scala | 26 +++++++++ 2 files changed, 90 insertions(+) create mode 100644 kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/package.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala new file mode 100644 index 00000000..11c58cae --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala @@ -0,0 +1,64 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import akka.actor.{Props, ActorRef, Actor} +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer +import scala.concurrent.duration.FiniteDuration + + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + + def receive = empty + + def empty: Actor.Receive = { + case tick : TickMetricSnapshot => context become(buffering(tick)) + case FlushBuffer => // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) => + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become(buffering(combinedSnapshot)) + + case FlushBuffer => + receiver ! buffered + context become(empty) + + } + + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { + val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r)) + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala new file mode 100644 index 00000000..765ebaca --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -0,0 +1,26 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +package object metrics { + + def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) => V): Map[K, V] = { + (left ++ right) map { + case (key, rightValue) => key -> left.get(key).map(leftValue => valueMerger(leftValue, rightValue)).getOrElse(rightValue) + } + } +} -- cgit v1.2.3