From 7307e1cc97e0363d1fb4cc116fc69a5272ca3730 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 4 Feb 2014 15:58:30 -0300 Subject: metrics buffer for easier periodic reporting --- .../kamon/metrics/TickMetricSnapshotBuffer.scala | 64 ++++++++++++++++++++++ .../src/main/scala/kamon/metrics/package.scala | 26 +++++++++ .../main/scala/test/SimpleRequestProcessor.scala | 6 +- 3 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/package.scala diff --git a/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala new file mode 100644 index 00000000..11c58cae --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala @@ -0,0 +1,64 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import akka.actor.{Props, ActorRef, Actor} +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer +import scala.concurrent.duration.FiniteDuration + + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + + def receive = empty + + def empty: Actor.Receive = { + case tick : TickMetricSnapshot => context become(buffering(tick)) + case FlushBuffer => // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) => + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become(buffering(combinedSnapshot)) + + case FlushBuffer => + receiver ! buffered + context become(empty) + + } + + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { + val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r)) + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala new file mode 100644 index 00000000..765ebaca --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -0,0 +1,26 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +package object metrics { + + def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) => V): Map[K, V] = { + (left ++ right) map { + case (key, rightValue) => key -> left.get(key).map(leftValue => valueMerger(leftValue, rightValue)).getOrElse(rightValue) + } + } +} diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index cd497ca5..fb3d05d2 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -25,7 +25,7 @@ import scala.util.Random import akka.routing.RoundRobinRouter import kamon.trace.TraceRecorder import kamon.Kamon -import kamon.metrics.{ ActorMetrics, TraceMetrics, Metrics } +import kamon.metrics.{TickMetricSnapshotBuffer, ActorMetrics, TraceMetrics, Metrics} import spray.http.{ StatusCodes, Uri } import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.newrelic.WebTransactionMetrics @@ -44,7 +44,9 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil def receive: Actor.Receive = { case any ⇒ sender ! any } }), "com") - Kamon(Metrics).subscribe(TraceMetrics, "*", printer, permanently = true) + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(10 seconds, printer)) + + Kamon(Metrics).subscribe(TraceMetrics, "*", buffer, permanently = true) //Kamon(Metrics).subscribe(ActorMetrics, "*", printer, permanently = true) implicit val timeout = Timeout(30 seconds) -- cgit v1.2.3