aboutsummaryrefslogtreecommitdiff
path: root/kamon-fluentd
diff options
context:
space:
mode:
authorShingo Omura <everpeace@gmail.com>2015-10-03 22:32:09 +0900
committerShingo Omura <everpeace@gmail.com>2015-10-25 18:32:24 +0900
commit30ac538a0c859739f0fb038896ce6dde8a2d11a2 (patch)
tree051ccb17a2d4d94b71175e865faf11e81013e615 /kamon-fluentd
parentf3eefdb687358401965fdd793c3a68507e27aa12 (diff)
downloadKamon-30ac538a0c859739f0fb038896ce6dde8a2d11a2.tar.gz
Kamon-30ac538a0c859739f0fb038896ce6dde8a2d11a2.tar.bz2
Kamon-30ac538a0c859739f0fb038896ce6dde8a2d11a2.zip
+ add Fluentd module
- introduced kamon-fluentd module. - added kamon-fluentd-example project, which can report various metrics to stdoutputs via fluentd.
Diffstat (limited to 'kamon-fluentd')
-rw-r--r--kamon-fluentd/src/main/resources/reference.conf59
-rw-r--r--kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala207
-rw-r--r--kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala267
3 files changed, 533 insertions, 0 deletions
diff --git a/kamon-fluentd/src/main/resources/reference.conf b/kamon-fluentd/src/main/resources/reference.conf
new file mode 100644
index 00000000..7490cb44
--- /dev/null
+++ b/kamon-fluentd/src/main/resources/reference.conf
@@ -0,0 +1,59 @@
+# ===================================== #
+# Kamon-Fluentd Reference Configuration #
+# ===================================== #
+
+kamon {
+ fluentd {
+ # Hostname and port of fluentd server to which kamon fluentd sends metrics.
+ hostname = "localhost"
+ port = 24224
+
+ # tag prefix of metrics data which is sent to fluentd server
+ tag = "kamon.fluentd"
+
+ # Interval between metrics data flushes to fluentd server.
+ # It's value must be equal or greater than the kamon.metric.tick-interval setting.
+ flush-interval = 10 seconds
+
+ # Your app name
+ application-name = "my-app"
+
+ # Subscription patterns used to select which metrics will be pushed to Fluentd. Note that first, metrics
+ # collection for your desired entities must be activated under the kamon.metrics.filters settings.
+ subscriptions {
+ histogram = [ "**" ]
+ min-max-counter = [ "**" ]
+ gauge = [ "**" ]
+ counter = [ "**" ]
+ trace = [ "**" ]
+ trace-segment = [ "**" ]
+ akka-actor = [ "**" ]
+ akka-dispatcher = [ "**" ]
+ akka-router = [ "**" ]
+ system-metric = [ "**" ]
+ http-server = [ "**" ]
+ }
+
+ # statistic values to be reported for histogram type metrics
+ # (i.e. Histogram, MinMaxCounter, Gauge).
+ histogram-stats {
+ # stats values:
+ # "count", "min", "max", "average", "percentiles" are supported.
+ # you can use "*" for wildcards.
+ subscription = [ "count", "min", "max", "average", "percentiles" ],
+
+ # percentile points:
+ # this will be used when you set "percentiles" in "subscription" above.
+ # In this example, kamon-fluentd reports 50th 90th, 99th and 99.9th percentiles.
+ percentiles = [50.0, 90.0, 99.0, 99.9]
+ }
+ }
+
+ modules {
+ kamon-fluentd {
+ auto-start = yes
+ requires-aspectj = no
+ extension-id = "kamon.fluentd.Fluentd"
+ }
+ }
+}
diff --git a/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala b/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala
new file mode 100644
index 00000000..7ee6b4d4
--- /dev/null
+++ b/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala
@@ -0,0 +1,207 @@
+/* =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.fluentd
+
+import akka.actor._
+import akka.event.Logging
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric._
+import kamon.metric.instrument.{ Counter, Histogram }
+import kamon.util.ConfigTools.Syntax
+import kamon.util.MilliTimestamp
+import org.fluentd.logger.scala.FluentLogger
+import org.fluentd.logger.scala.sender.{ ScalaRawSocketSender, Sender }
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
+
+object Fluentd extends ExtensionId[FluentdExtension] with ExtensionIdProvider {
+ override def lookup(): ExtensionId[_ <: Extension] = Fluentd
+
+ override def createExtension(system: ExtendedActorSystem): FluentdExtension = new FluentdExtension(system)
+}
+
+class FluentdExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ private val fluentdConfig = system.settings.config.getConfig("kamon.fluentd")
+ val host = fluentdConfig.getString("hostname")
+ val port = fluentdConfig.getInt("port")
+ val tag = fluentdConfig.getString("tag")
+ val flushInterval = fluentdConfig.getFiniteDuration("flush-interval")
+ val tickInterval = Kamon.metrics.settings.tickInterval
+ val subscriptions = fluentdConfig.getConfig("subscriptions")
+ val histogramStatsConfig = new HistogramStatsConfig(
+ fluentdConfig.getStringList("histogram-stats.subscription").asScala.toList,
+ fluentdConfig.getDoubleList("histogram-stats.percentiles").asScala.toList.map(_.toDouble))
+
+ val log = Logging(system, classOf[FluentdExtension])
+ log.info("Starting the Kamon(Fluentd) extension")
+
+ val subscriber = buildMetricsListener(flushInterval, tickInterval, tag, host, port, histogramStatsConfig)
+ subscriptions.firstLevelKeys foreach { subscriptionCategory ⇒
+ subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒
+ Kamon.metrics.subscribe(subscriptionCategory, pattern, subscriber, permanently = true)
+ }
+ }
+
+ def buildMetricsListener(flushInterval: FiniteDuration, tickInterval: FiniteDuration,
+ tag: String, host: String, port: Int,
+ histogramStatsConfig: HistogramStatsConfig): ActorRef = {
+ assert(flushInterval >= tickInterval, "Fluentd flush-interval needs to be equal or greater to the tick-interval")
+
+ val metricsSender = system.actorOf(
+ Props(new FluentdMetricsSender(tag, host, port, histogramStatsConfig)),
+ "kamon-fluentd")
+ if (flushInterval == tickInterval) {
+ metricsSender
+ } else {
+ system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, metricsSender), "kamon-fluentd-buffer")
+ }
+ }
+}
+
+class FluentdMetricsSender(val tag: String, val host: String, val port: Int, histogramStatsConfig: HistogramStatsConfig)
+ extends Actor with ActorLogging with FluentLoggerSenderProvider {
+
+ private val config = context.system.settings.config
+ val appName = config.getString("kamon.fluentd.application-name")
+ val histogramStatsBuilder = HistogramStatsBuilder(histogramStatsConfig)
+ lazy val fluentd = FluentLogger(tag, sender(host, port))
+
+ def receive = {
+ case tick: TickMetricSnapshot ⇒ sendMetricSnapshotToFluentd(tick)
+ }
+
+ def sendMetricSnapshotToFluentd(tick: TickMetricSnapshot): Unit = {
+ val time = tick.to
+ for {
+ (groupIdentity, groupSnapshot) ← tick.metrics
+ (metricIdentity, metricSnapshot) ← groupSnapshot.metrics
+ } {
+
+ val fluentdTagName = fluentdTagNameFor(groupIdentity, metricIdentity)
+
+ val attrs = Map(
+ "app.name" -> appName,
+ "category.name" -> groupIdentity.category,
+ "entity.name" -> groupIdentity.name,
+ "metric.name" -> metricIdentity.name,
+ "unit_of_measurement.name" -> metricIdentity.unitOfMeasurement.name,
+ "unit_of_measurement.label" -> metricIdentity.unitOfMeasurement.label) ++ groupIdentity.tags.map(kv ⇒ s"tags.${kv._1}" -> kv._2)
+
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ if (hs.numberOfMeasurements > 0) {
+ histogramStatsBuilder.buildStats(hs) foreach {
+ case (_name, value) ⇒
+ log_fluentd(time, fluentdTagName, _name, value, attrs)
+ }
+ fluentd.flush()
+ }
+ case cs: Counter.Snapshot ⇒
+ if (cs.count > 0) {
+ log_fluentd(time, fluentdTagName, "count", cs.count, attrs)
+ fluentd.flush()
+ }
+ }
+ }
+ }
+
+ private def log_fluentd(time: MilliTimestamp, fluentdTagName: String, statsName: String, value: Any,
+ attrs: Map[String, String] = Map.empty) = {
+ fluentd.log(
+ fluentdTagName,
+ attrs ++ Map(
+ "stats.name" -> statsName,
+ "value" -> value,
+ "canonical_metric.name" -> (fluentdTagName + "." + statsName),
+ (fluentdTagName + "." + statsName) -> value),
+ time.millis / 1000)
+ }
+
+ private def isSingleInstrumentEntity(entity: Entity): Boolean =
+ SingleInstrumentEntityRecorder.AllCategories.contains(entity.category)
+
+ private def fluentdTagNameFor(entity: Entity, metricKey: MetricKey): String = {
+ if (isSingleInstrumentEntity(entity)) {
+ s"$appName.${entity.category}.${entity.name}"
+ } else {
+ s"$appName.${entity.category}.${entity.name}.${metricKey.name}"
+ }
+ }
+}
+
+trait FluentLoggerSenderProvider {
+ def sender(host: String, port: Int): Sender = new ScalaRawSocketSender(host, port, 3 * 1000, 1 * 1024 * 1024)
+}
+
+case class HistogramStatsBuilder(config: HistogramStatsConfig) {
+ import HistogramStatsBuilder.RichHistogramSnapshot
+ import HistogramStatsConfig._
+
+ // this returns List of ("statsName", "value as String")
+ def buildStats(hs: Histogram.Snapshot): List[(String, Any)] = {
+ config.subscriptions.foldRight(List.empty[(String, Any)]) { (name, res) ⇒
+ name match {
+ case COUNT ⇒ (name, hs.numberOfMeasurements) :: res
+ case MAX ⇒ (name, hs.max) :: res
+ case MIN ⇒ (name, hs.min) :: res
+ case AVERAGE ⇒ (name, hs.average) :: res
+ case PERCENTILES ⇒ {
+ config.percentiles.foldRight(List.empty[(String, Any)]) { (p, _res) ⇒
+ val pStr = if (p.toString.matches("[0-9]+\\.[0]+")) p.toInt.toString else p.toString.replace(".", "_")
+ (name + "." + pStr, hs.percentile(p)) :: _res
+ } ++ res
+ }
+ }
+ }
+ }
+}
+
+object HistogramStatsBuilder {
+
+ implicit class RichHistogramSnapshot(histogram: Histogram.Snapshot) {
+ def average: Double = {
+ if (histogram.numberOfMeasurements == 0) 0D
+ else histogram.sum / histogram.numberOfMeasurements
+ }
+ }
+
+}
+
+class HistogramStatsConfig(_subscriptions: List[String], _percentiles: List[Double]) {
+ import HistogramStatsConfig._
+ val subscriptions: List[String] = {
+ if (_subscriptions.contains("*")) {
+ supported
+ } else {
+ assert(_subscriptions.forall(supported.contains(_)), s"supported stats values are: ${supported.mkString(",")}")
+ _subscriptions
+ }
+ }
+ val percentiles: List[Double] = {
+ if (subscriptions.contains("percentiles")) {
+ assert(_percentiles.forall(p ⇒ 0.0 <= p && p <= 100.0), "every percentile point p must be 0.0 <= p <= 100.0")
+ }
+ _percentiles
+ }
+}
+
+object HistogramStatsConfig {
+ val COUNT = "count"; val MIN = "min"; val MAX = "max"
+ val AVERAGE = "average"; val PERCENTILES = "percentiles"
+ private val supported = List(COUNT, MIN, MAX, AVERAGE, PERCENTILES)
+}
diff --git a/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala b/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala
new file mode 100644
index 00000000..d20a60b9
--- /dev/null
+++ b/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala
@@ -0,0 +1,267 @@
+/* =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.fluentd
+
+import akka.actor.Props
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric._
+import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement }
+import kamon.testkit.BaseKamonSpec
+import kamon.util.MilliTimestamp
+import org.easymock.EasyMock.{ expect ⇒ mockExpect }
+import org.fluentd.logger.scala.sender.Sender
+import org.scalatest.mock.EasyMockSugar
+
+class FluentdMetricsSenderSpec extends BaseKamonSpec("fluentd-metrics-sender-spec") with EasyMockSugar {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | metrics {
+ | disable-aspectj-weaver-missing-error = true
+ | }
+ |}
+ |
+ """.stripMargin)
+
+ "FluentdMetricsSender" should {
+
+ "be able to send counter value in single instrument entity" in new MockingFluentLoggerSenderFixture {
+ expecting {
+ mockExpect(fluentSenderMock.emit(
+ "kamon.fluentd.my-app.counter.sample_counter", tickTo / 1000,
+ Map(
+ "app.name" -> "my-app",
+ "category.name" -> "counter",
+ "entity.name" -> "sample_counter",
+ "unit_of_measurement.name" -> "unknown",
+ "unit_of_measurement.label" -> "unknown",
+ "metric.name" -> "counter",
+ "stats.name" -> "count",
+ "value" -> increment,
+ "canonical_metric.name" -> "my-app.counter.sample_counter.count",
+ "my-app.counter.sample_counter.count" -> increment))).andReturn(true)
+ mockExpect(fluentSenderMock.flush())
+ }
+
+ whenExecuting(fluentSenderMock) {
+ val (entity, testRecorder) = buildSimpleCounter("sample_counter")
+ testRecorder.instrument.increment(increment)
+ run(Map(entity -> testRecorder.collect(collectionContext)))
+ Thread.sleep(100)
+ }
+ }
+
+ "be able to send histogram in single instrument entity" in new MockingFluentLoggerSenderFixture {
+ expecting {
+ expectHistgramLog(fluentSenderMock, "my-app", "histogram", "my_histogram")
+ mockExpect(fluentSenderMock.flush())
+ }
+
+ whenExecuting(fluentSenderMock) {
+ val (entity, testRecorder) = buildSimpleHistogram("my_histogram")
+ histogramData.foreach(testRecorder.instrument.record(_))
+ run(Map(entity -> testRecorder.collect(collectionContext)))
+ Thread.sleep(100)
+ }
+ }
+
+ "be able to send counter in multiple instrument entity" in new MockingFluentLoggerSenderFixture {
+ expecting {
+ mockExpect(fluentSenderMock.emit(
+ "kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", tickTo / 1000,
+ Map(
+ "app.name" -> "my-app",
+ "category.name" -> "sample_category",
+ "entity.name" -> "dummy_entity",
+ "metric.name" -> "my_counter",
+ "stats.name" -> "count",
+ "value" -> increment,
+ "unit_of_measurement.name" -> "unknown",
+ "unit_of_measurement.label" -> "unknown",
+ "canonical_metric.name" -> "my-app.sample_category.dummy_entity.my_counter.count",
+ "my-app.sample_category.dummy_entity.my_counter.count" -> increment,
+ "tags.tagName" -> "tagValue"))).andReturn(true)
+ mockExpect(fluentSenderMock.flush())
+ }
+
+ whenExecuting(fluentSenderMock) {
+ val (entity, testRecorder) = buildRecorder("dummy_entity", Map("tagName" -> "tagValue"))
+ testRecorder.myCounter.increment(increment)
+ run(Map(entity -> testRecorder.collect(collectionContext)))
+ Thread.sleep(100)
+ }
+ }
+
+ "be able to send histogram in multiple instrument entity" in new MockingFluentLoggerSenderFixture {
+ expecting {
+ expectHistgramLog(fluentSenderMock, "my-app", "sample_category", "dummy_entity", "my_histogram")
+ mockExpect(fluentSenderMock.flush())
+ }
+
+ whenExecuting(fluentSenderMock) {
+ val (entity, testRecorder) = buildRecorder("dummy_entity")
+ histogramData.foreach(testRecorder.myHistogram.record(_))
+ run(Map(entity -> testRecorder.collect(collectionContext)))
+ Thread.sleep(100)
+ }
+ }
+
+ }
+
+ trait MockingFluentLoggerSenderFixture {
+ val fluentSenderMock: Sender = mock[Sender]
+
+ val tickFrom = 100000L
+ val tickTo = 150000L
+ val histogramData = (1 to 1000).toList
+ val increment: Long = 200L
+
+ def expectHistgramLog(mock: Sender, appName: String, categoryName: String,
+ entityName: String, instrumentName: String = "histogram") = {
+ val expectedAttr = Map(
+ "app.name" -> appName,
+ "category.name" -> s"${categoryName}",
+ "entity.name" -> s"${entityName}",
+ "metric.name" -> s"${instrumentName}",
+ "unit_of_measurement.label" -> "unknown",
+ "unit_of_measurement.name" -> "unknown")
+ val expectedCanonicalMetricName = if (categoryName == "histogram")
+ s"${appName}.${categoryName}.${entityName}"
+ else
+ s"${appName}.${categoryName}.${entityName}.${instrumentName}"
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "count",
+ "value" -> 1000,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.count",
+ s"${expectedCanonicalMetricName}.count" -> 1000))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "min",
+ "value" -> 1,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.min",
+ s"${expectedCanonicalMetricName}.min" -> 1))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "max",
+ "value" -> 1000,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.max",
+ s"${expectedCanonicalMetricName}.max" -> 1000))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "average",
+ "value" -> 499.0,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.average",
+ s"${expectedCanonicalMetricName}.average" -> 499.0))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "percentiles.50",
+ "value" -> 500,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.50",
+ s"${expectedCanonicalMetricName}.percentiles.50" -> 500))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "percentiles.90",
+ "value" -> 900,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.90",
+ s"${expectedCanonicalMetricName}.percentiles.90" -> 900))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "percentiles.95",
+ "value" -> 948,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.95",
+ s"${expectedCanonicalMetricName}.percentiles.95" -> 948))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "percentiles.99",
+ "value" -> 988,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.99",
+ s"${expectedCanonicalMetricName}.percentiles.99" -> 988))).andReturn(true)
+
+ mockExpect(mock.emit(
+ s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000,
+ expectedAttr ++ Map(
+ "stats.name" -> "percentiles.99_9",
+ "value" -> 1000,
+ "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.99_9",
+ s"${expectedCanonicalMetricName}.percentiles.99_9" -> 1000))).andReturn(true)
+ }
+
+ def buildRecorder(name: String, tags: Map[String, String] = Map.empty): (Entity, TestEntityRecorder) = {
+ val entity = Entity(name, TestEntityRecorder.category, tags)
+ val recorder = Kamon.metrics.entity(TestEntityRecorder, entity)
+ (entity, recorder)
+ }
+
+ def buildSimpleCounter(name: String, tags: Map[String, String] = Map.empty): (Entity, CounterRecorder) = {
+ val entity = Entity(name, SingleInstrumentEntityRecorder.Counter, tags)
+ val counter = Kamon.metrics.counter(name, tags)
+ val recorder = CounterRecorder(CounterKey("counter", UnitOfMeasurement.Unknown), counter)
+ (entity, recorder)
+ }
+
+ def buildSimpleHistogram(name: String, tags: Map[String, String] = Map.empty): (Entity, HistogramRecorder) = {
+ val entity = Entity(name, SingleInstrumentEntityRecorder.Histogram, tags)
+ val histogram = Kamon.metrics.histogram(name, tags)
+ val recorder = HistogramRecorder(CounterKey("histogram", UnitOfMeasurement.Unknown), histogram)
+ (entity, recorder)
+ }
+
+ def run(metrics: Map[Entity, EntitySnapshot]) = {
+ val histoGramStatConfig = new HistogramStatsConfig(List("*"), List(50.0, 90.0, 95.0, 99.0, 99.9))
+ val metricsSender = system.actorOf(Props(
+ new FluentdMetricsSender("kamon.fluentd", "localhost", 24224, histoGramStatConfig) {
+ override def sender(host: String, port: Int): Sender = fluentSenderMock
+ }))
+ val fakeSnapshot = TickMetricSnapshot(new MilliTimestamp(tickFrom), new MilliTimestamp(tickTo), metrics)
+ metricsSender ! fakeSnapshot
+ }
+ }
+
+}
+
+class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ val myHistogram = histogram("my_histogram")
+ val myCounter = counter("my_counter")
+}
+
+object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] {
+ def category: String = "sample_category"
+
+ def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory)
+}
+
+// Sender.emit("kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", 150, Map(stats.name -> count, unit_of_measurement.name -> unknown, my-app.sample_category.dummy_entity.my_counter.count -> 200, entity.name -> dummy_entity, category.name -> sample_category, canonical_metric.name -> my-app.sample_category.dummy_entity.my_counter.count, app.name -> my-app, unit_of_measurement.label -> unknown, tags.tagName -> tagValue, metric.name -> my_counter, value -> 200)):
+// Sender.emit("kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", 150, Map(instrument.name -> my_counter, unit_of_measurement.name -> unknown, my-app.sample_category.dummy_entity.my_counter.count -> 200, entity.name -> dummy_entity, category.name -> sample_category, canonical_metric.name -> my-app.sample_category.dummy_entity.my_counter.count, app.name -> my-app, unit_of_measurement.label -> unknown, tags.tagName -> tagValue, metric.name -> count, value -> 200)): expected: 1, actual: 0