diff options
author | Diego Parra <diegolparra@gmail.com> | 2015-10-29 10:03:46 -0300 |
---|---|---|
committer | Diego Parra <diegolparra@gmail.com> | 2015-10-29 10:03:46 -0300 |
commit | 88f603e7c0e6965a14c9eb4b8a42f2151f7a8b8c (patch) | |
tree | aca4041c3869f7e3cdcf08bcabe2d25d8e9cab9f | |
parent | 5591568f4453e21cdd580a4bafe4805a0030c47a (diff) | |
parent | 30ac538a0c859739f0fb038896ce6dde8a2d11a2 (diff) | |
download | Kamon-88f603e7c0e6965a14c9eb4b8a42f2151f7a8b8c.tar.gz Kamon-88f603e7c0e6965a14c9eb4b8a42f2151f7a8b8c.tar.bz2 Kamon-88f603e7c0e6965a14c9eb4b8a42f2151f7a8b8c.zip |
Merge pull request #264 from everpeace/add-kamon-fluentd
Introducing Kamon Fluentd module.
11 files changed, 747 insertions, 1 deletions
diff --git a/kamon-examples/kamon-fluentd-example/README.md b/kamon-examples/kamon-fluentd-example/README.md new file mode 100644 index 00000000..0dc6a250 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/README.md @@ -0,0 +1,31 @@ +kamon-fluentd-example +------------------------------ + +An example Spray application with Kamon monitoring reporting to Fluentd Server. + +Prerequisites +--------------- +* fluentd: + ```sh + you@host:kamon-fluentd-example $ gem install fluentd + ``` + +* install kamon snapshots to local: + ```sh + you@host:kamon-fluentd-example $ cd ../../ + you@host:Kamon $ sbt "+ publishLocal" + ... snip... + [info] published ivy to /Users/___/.ivy2/local/io.kamon/kamon-akka-remote_2.11/0.5.2-021ffd253e104342e6b4c75ae42717b51e3b6b26/ivys/ivy.xml + [success] Total time: 248 s, completed 2015/10/04 0:27:53 + [info] Setting version to 2.10.4 + [info] Reapplying settings... + [info] Set current project to kamon (in build file:/Users/___/kamon-io/Kamon/) + ``` + +* edit build.sbt. edit `kamonV` variable with installed snapshot version (`0.5.2-021ffd253e104342e6b4c75ae42717b51e3b6b26` in the above example). + +How to run +------------ +1. just do it: `sbt aspectj-runner:run` +2. you'll see kamon-log-reporter outputs on console. +3. you'll also see kamon metrics sent to fluentd on files named `target/fluentd_out.****` diff --git a/kamon-examples/kamon-fluentd-example/build.sbt b/kamon-examples/kamon-fluentd-example/build.sbt new file mode 100644 index 00000000..a240fad3 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/build.sbt @@ -0,0 +1,31 @@ +import sbt._ +import sbt.Keys._ + +name := "kamon-fluentd-example" + +version := "1.0" + +scalaVersion := "2.11.6" + +resolvers += "Kamon repo" at "http://repo.kamon.io" + +resolvers += "spray repo" at "http://repo.spray.io" + +libraryDependencies ++= { + val akkaV = "2.3.5" + val sprayV = "1.3.1" + val kamonV = "EDIT_HERE" + Seq( + "io.spray" %% "spray-can" % sprayV, + "io.spray" %% "spray-routing" % sprayV, + "io.kamon" %% "kamon-core" % kamonV, + "io.kamon" %% "kamon-system-metrics" % kamonV, + "io.kamon" %% "kamon-akka" % kamonV, + "io.kamon" %% "kamon-scala" % kamonV, + "io.kamon" %% "kamon-spray" % kamonV, + "io.kamon" %% "kamon-fluentd" % kamonV, + "io.kamon" %% "kamon-log-reporter" % kamonV, + "org.aspectj" % "aspectjweaver" % "1.8.4" + ) +} + diff --git a/kamon-examples/kamon-fluentd-example/project/build.properties b/kamon-examples/kamon-fluentd-example/project/build.properties new file mode 100644 index 00000000..df58110a --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.6
\ No newline at end of file diff --git a/kamon-examples/kamon-fluentd-example/project/plugins.sbt b/kamon-examples/kamon-fluentd-example/project/plugins.sbt new file mode 100644 index 00000000..ab3750e2 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/project/plugins.sbt @@ -0,0 +1,6 @@ +resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/" + +resolvers += "Kamon Releases" at "http://repo.kamon.io" + +addSbtPlugin("io.kamon" % "aspectj-runner" % "0.1.2") + diff --git a/kamon-examples/kamon-fluentd-example/src/main/resources/application.conf b/kamon-examples/kamon-fluentd-example/src/main/resources/application.conf new file mode 100644 index 00000000..6a251869 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/src/main/resources/application.conf @@ -0,0 +1,73 @@ +# ===================================== # +# Kamon-Fluentd Reference Configuration # +# ===================================== # + +kamon { + metric.filters { + akka-actor { + includes = ["**"], + } + + akka-dispatcher { + includes = ["**"] + } + + akka-router { + includes = ["**"] + } + } + + fluentd { + # Hostname and port of fluentd server to which kamon fluentd sends metrics. + hostname = "localhost" + port = 24224 + + # tag prefix of metrics data which is sent to fluentd server + tag = "kamon.fluentd" + + # Interval between metrics data flushes to fluentd server. + # It's value must be equal or greater than the kamon.metric.tick-interval setting. + flush-interval = 10 seconds + + # Your app name + application-name = "kamon-fluentd-example" + + # Subscription patterns used to select which metrics will be pushed to Fluentd. Note that first, metrics + # collection for your desired entities must be activated under the kamon.metrics.filters settings. + subscriptions { + histogram = [ "**" ] + min-max-counter = [ "**" ] + gauge = [ "**" ] + counter = [ "**" ] + trace = [ "**" ] + trace-segment = [ "**" ] + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + http-server = [ "**" ] + } + + # statistic values to be reported for histogram type metrics + # (i.e. Histogram, MinMaxCounter, Gauge). + histogram-stats { + # stats values: + # "count", "min", "max", "average", "percentiles" are supported. + # you can use "*" for wildcards. + subscription = [ "count", "min", "max", "average", "percentiles" ], + + # percentile points: + # this will be used when you set "percentiles" in "subscription" above. + # In this example, kamon-fluentd reports 50th 90th, 99th and 99.9th percentiles. + percentiles = [50.0, 90.0, 99.0, 99.9] + } + } + + modules { + kamon-fluentd { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.fluentd.Fluentd" + } + } +}
\ No newline at end of file diff --git a/kamon-examples/kamon-fluentd-example/src/main/scala/KamonFluentdExample.scala b/kamon-examples/kamon-fluentd-example/src/main/scala/KamonFluentdExample.scala new file mode 100644 index 00000000..bb1d0f7b --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/src/main/scala/KamonFluentdExample.scala @@ -0,0 +1,60 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import akka.actor.ActorSystem +import kamon.Kamon +import spray.routing.SimpleRoutingApp +import xerial.fluentd.{ FluentdConfig, FluentdStandalone } + +import scala.util.Properties + +object KamonFluentdExample extends App with SimpleRoutingApp { + // Start fluentd server only for this sample app + // In real usecase, you will spawn fluentd server independently. + val fluentdServer = FluentdStandalone.start(FluentdConfig(configuration = + """ + |<source> + | type forward + | port 24224 + |</source> + |<match **> + | type file + | path target/fluentd-out + |</match> + """.stripMargin)) + sys.addShutdownHook { + fluentdServer.stop + } + + // start Kamon + Kamon.start() + + implicit val system = ActorSystem("kamon-fluentd-example") + + // server endpoint + val interface = "0.0.0.0" + val port = Properties.envOrElse("PORT", "8080").toInt + + // resource endpoints + startServer(interface = interface, port = port) { + path("hello") { + get { + complete { + <h1>Hello! Kamon Fluentd Example!</h1> + } + } + } + } +} diff --git a/kamon-fluentd/src/main/resources/reference.conf b/kamon-fluentd/src/main/resources/reference.conf new file mode 100644 index 00000000..7490cb44 --- /dev/null +++ b/kamon-fluentd/src/main/resources/reference.conf @@ -0,0 +1,59 @@ +# ===================================== # +# Kamon-Fluentd Reference Configuration # +# ===================================== # + +kamon { + fluentd { + # Hostname and port of fluentd server to which kamon fluentd sends metrics. + hostname = "localhost" + port = 24224 + + # tag prefix of metrics data which is sent to fluentd server + tag = "kamon.fluentd" + + # Interval between metrics data flushes to fluentd server. + # It's value must be equal or greater than the kamon.metric.tick-interval setting. + flush-interval = 10 seconds + + # Your app name + application-name = "my-app" + + # Subscription patterns used to select which metrics will be pushed to Fluentd. Note that first, metrics + # collection for your desired entities must be activated under the kamon.metrics.filters settings. + subscriptions { + histogram = [ "**" ] + min-max-counter = [ "**" ] + gauge = [ "**" ] + counter = [ "**" ] + trace = [ "**" ] + trace-segment = [ "**" ] + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + http-server = [ "**" ] + } + + # statistic values to be reported for histogram type metrics + # (i.e. Histogram, MinMaxCounter, Gauge). + histogram-stats { + # stats values: + # "count", "min", "max", "average", "percentiles" are supported. + # you can use "*" for wildcards. + subscription = [ "count", "min", "max", "average", "percentiles" ], + + # percentile points: + # this will be used when you set "percentiles" in "subscription" above. + # In this example, kamon-fluentd reports 50th 90th, 99th and 99.9th percentiles. + percentiles = [50.0, 90.0, 99.0, 99.9] + } + } + + modules { + kamon-fluentd { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.fluentd.Fluentd" + } + } +} diff --git a/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala b/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala new file mode 100644 index 00000000..7ee6b4d4 --- /dev/null +++ b/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala @@ -0,0 +1,207 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.fluentd + +import akka.actor._ +import akka.event.Logging +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram } +import kamon.util.ConfigTools.Syntax +import kamon.util.MilliTimestamp +import org.fluentd.logger.scala.FluentLogger +import org.fluentd.logger.scala.sender.{ ScalaRawSocketSender, Sender } + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration + +object Fluentd extends ExtensionId[FluentdExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = Fluentd + + override def createExtension(system: ExtendedActorSystem): FluentdExtension = new FluentdExtension(system) +} + +class FluentdExtension(system: ExtendedActorSystem) extends Kamon.Extension { + private val fluentdConfig = system.settings.config.getConfig("kamon.fluentd") + val host = fluentdConfig.getString("hostname") + val port = fluentdConfig.getInt("port") + val tag = fluentdConfig.getString("tag") + val flushInterval = fluentdConfig.getFiniteDuration("flush-interval") + val tickInterval = Kamon.metrics.settings.tickInterval + val subscriptions = fluentdConfig.getConfig("subscriptions") + val histogramStatsConfig = new HistogramStatsConfig( + fluentdConfig.getStringList("histogram-stats.subscription").asScala.toList, + fluentdConfig.getDoubleList("histogram-stats.percentiles").asScala.toList.map(_.toDouble)) + + val log = Logging(system, classOf[FluentdExtension]) + log.info("Starting the Kamon(Fluentd) extension") + + val subscriber = buildMetricsListener(flushInterval, tickInterval, tag, host, port, histogramStatsConfig) + subscriptions.firstLevelKeys foreach { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon.metrics.subscribe(subscriptionCategory, pattern, subscriber, permanently = true) + } + } + + def buildMetricsListener(flushInterval: FiniteDuration, tickInterval: FiniteDuration, + tag: String, host: String, port: Int, + histogramStatsConfig: HistogramStatsConfig): ActorRef = { + assert(flushInterval >= tickInterval, "Fluentd flush-interval needs to be equal or greater to the tick-interval") + + val metricsSender = system.actorOf( + Props(new FluentdMetricsSender(tag, host, port, histogramStatsConfig)), + "kamon-fluentd") + if (flushInterval == tickInterval) { + metricsSender + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, metricsSender), "kamon-fluentd-buffer") + } + } +} + +class FluentdMetricsSender(val tag: String, val host: String, val port: Int, histogramStatsConfig: HistogramStatsConfig) + extends Actor with ActorLogging with FluentLoggerSenderProvider { + + private val config = context.system.settings.config + val appName = config.getString("kamon.fluentd.application-name") + val histogramStatsBuilder = HistogramStatsBuilder(histogramStatsConfig) + lazy val fluentd = FluentLogger(tag, sender(host, port)) + + def receive = { + case tick: TickMetricSnapshot ⇒ sendMetricSnapshotToFluentd(tick) + } + + def sendMetricSnapshotToFluentd(tick: TickMetricSnapshot): Unit = { + val time = tick.to + for { + (groupIdentity, groupSnapshot) ← tick.metrics + (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + } { + + val fluentdTagName = fluentdTagNameFor(groupIdentity, metricIdentity) + + val attrs = Map( + "app.name" -> appName, + "category.name" -> groupIdentity.category, + "entity.name" -> groupIdentity.name, + "metric.name" -> metricIdentity.name, + "unit_of_measurement.name" -> metricIdentity.unitOfMeasurement.name, + "unit_of_measurement.label" -> metricIdentity.unitOfMeasurement.label) ++ groupIdentity.tags.map(kv ⇒ s"tags.${kv._1}" -> kv._2) + + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + if (hs.numberOfMeasurements > 0) { + histogramStatsBuilder.buildStats(hs) foreach { + case (_name, value) ⇒ + log_fluentd(time, fluentdTagName, _name, value, attrs) + } + fluentd.flush() + } + case cs: Counter.Snapshot ⇒ + if (cs.count > 0) { + log_fluentd(time, fluentdTagName, "count", cs.count, attrs) + fluentd.flush() + } + } + } + } + + private def log_fluentd(time: MilliTimestamp, fluentdTagName: String, statsName: String, value: Any, + attrs: Map[String, String] = Map.empty) = { + fluentd.log( + fluentdTagName, + attrs ++ Map( + "stats.name" -> statsName, + "value" -> value, + "canonical_metric.name" -> (fluentdTagName + "." + statsName), + (fluentdTagName + "." + statsName) -> value), + time.millis / 1000) + } + + private def isSingleInstrumentEntity(entity: Entity): Boolean = + SingleInstrumentEntityRecorder.AllCategories.contains(entity.category) + + private def fluentdTagNameFor(entity: Entity, metricKey: MetricKey): String = { + if (isSingleInstrumentEntity(entity)) { + s"$appName.${entity.category}.${entity.name}" + } else { + s"$appName.${entity.category}.${entity.name}.${metricKey.name}" + } + } +} + +trait FluentLoggerSenderProvider { + def sender(host: String, port: Int): Sender = new ScalaRawSocketSender(host, port, 3 * 1000, 1 * 1024 * 1024) +} + +case class HistogramStatsBuilder(config: HistogramStatsConfig) { + import HistogramStatsBuilder.RichHistogramSnapshot + import HistogramStatsConfig._ + + // this returns List of ("statsName", "value as String") + def buildStats(hs: Histogram.Snapshot): List[(String, Any)] = { + config.subscriptions.foldRight(List.empty[(String, Any)]) { (name, res) ⇒ + name match { + case COUNT ⇒ (name, hs.numberOfMeasurements) :: res + case MAX ⇒ (name, hs.max) :: res + case MIN ⇒ (name, hs.min) :: res + case AVERAGE ⇒ (name, hs.average) :: res + case PERCENTILES ⇒ { + config.percentiles.foldRight(List.empty[(String, Any)]) { (p, _res) ⇒ + val pStr = if (p.toString.matches("[0-9]+\\.[0]+")) p.toInt.toString else p.toString.replace(".", "_") + (name + "." + pStr, hs.percentile(p)) :: _res + } ++ res + } + } + } + } +} + +object HistogramStatsBuilder { + + implicit class RichHistogramSnapshot(histogram: Histogram.Snapshot) { + def average: Double = { + if (histogram.numberOfMeasurements == 0) 0D + else histogram.sum / histogram.numberOfMeasurements + } + } + +} + +class HistogramStatsConfig(_subscriptions: List[String], _percentiles: List[Double]) { + import HistogramStatsConfig._ + val subscriptions: List[String] = { + if (_subscriptions.contains("*")) { + supported + } else { + assert(_subscriptions.forall(supported.contains(_)), s"supported stats values are: ${supported.mkString(",")}") + _subscriptions + } + } + val percentiles: List[Double] = { + if (subscriptions.contains("percentiles")) { + assert(_percentiles.forall(p ⇒ 0.0 <= p && p <= 100.0), "every percentile point p must be 0.0 <= p <= 100.0") + } + _percentiles + } +} + +object HistogramStatsConfig { + val COUNT = "count"; val MIN = "min"; val MAX = "max" + val AVERAGE = "average"; val PERCENTILES = "percentiles" + private val supported = List(COUNT, MIN, MAX, AVERAGE, PERCENTILES) +} diff --git a/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala b/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala new file mode 100644 index 00000000..d20a60b9 --- /dev/null +++ b/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala @@ -0,0 +1,267 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.fluentd + +import akka.actor.Props +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric._ +import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp +import org.easymock.EasyMock.{ expect ⇒ mockExpect } +import org.fluentd.logger.scala.sender.Sender +import org.scalatest.mock.EasyMockSugar + +class FluentdMetricsSenderSpec extends BaseKamonSpec("fluentd-metrics-sender-spec") with EasyMockSugar { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | metrics { + | disable-aspectj-weaver-missing-error = true + | } + |} + | + """.stripMargin) + + "FluentdMetricsSender" should { + + "be able to send counter value in single instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + mockExpect(fluentSenderMock.emit( + "kamon.fluentd.my-app.counter.sample_counter", tickTo / 1000, + Map( + "app.name" -> "my-app", + "category.name" -> "counter", + "entity.name" -> "sample_counter", + "unit_of_measurement.name" -> "unknown", + "unit_of_measurement.label" -> "unknown", + "metric.name" -> "counter", + "stats.name" -> "count", + "value" -> increment, + "canonical_metric.name" -> "my-app.counter.sample_counter.count", + "my-app.counter.sample_counter.count" -> increment))).andReturn(true) + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildSimpleCounter("sample_counter") + testRecorder.instrument.increment(increment) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + "be able to send histogram in single instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + expectHistgramLog(fluentSenderMock, "my-app", "histogram", "my_histogram") + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildSimpleHistogram("my_histogram") + histogramData.foreach(testRecorder.instrument.record(_)) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + "be able to send counter in multiple instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + mockExpect(fluentSenderMock.emit( + "kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", tickTo / 1000, + Map( + "app.name" -> "my-app", + "category.name" -> "sample_category", + "entity.name" -> "dummy_entity", + "metric.name" -> "my_counter", + "stats.name" -> "count", + "value" -> increment, + "unit_of_measurement.name" -> "unknown", + "unit_of_measurement.label" -> "unknown", + "canonical_metric.name" -> "my-app.sample_category.dummy_entity.my_counter.count", + "my-app.sample_category.dummy_entity.my_counter.count" -> increment, + "tags.tagName" -> "tagValue"))).andReturn(true) + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildRecorder("dummy_entity", Map("tagName" -> "tagValue")) + testRecorder.myCounter.increment(increment) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + "be able to send histogram in multiple instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + expectHistgramLog(fluentSenderMock, "my-app", "sample_category", "dummy_entity", "my_histogram") + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildRecorder("dummy_entity") + histogramData.foreach(testRecorder.myHistogram.record(_)) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + } + + trait MockingFluentLoggerSenderFixture { + val fluentSenderMock: Sender = mock[Sender] + + val tickFrom = 100000L + val tickTo = 150000L + val histogramData = (1 to 1000).toList + val increment: Long = 200L + + def expectHistgramLog(mock: Sender, appName: String, categoryName: String, + entityName: String, instrumentName: String = "histogram") = { + val expectedAttr = Map( + "app.name" -> appName, + "category.name" -> s"${categoryName}", + "entity.name" -> s"${entityName}", + "metric.name" -> s"${instrumentName}", + "unit_of_measurement.label" -> "unknown", + "unit_of_measurement.name" -> "unknown") + val expectedCanonicalMetricName = if (categoryName == "histogram") + s"${appName}.${categoryName}.${entityName}" + else + s"${appName}.${categoryName}.${entityName}.${instrumentName}" + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "count", + "value" -> 1000, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.count", + s"${expectedCanonicalMetricName}.count" -> 1000))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "min", + "value" -> 1, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.min", + s"${expectedCanonicalMetricName}.min" -> 1))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "max", + "value" -> 1000, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.max", + s"${expectedCanonicalMetricName}.max" -> 1000))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "average", + "value" -> 499.0, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.average", + s"${expectedCanonicalMetricName}.average" -> 499.0))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.50", + "value" -> 500, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.50", + s"${expectedCanonicalMetricName}.percentiles.50" -> 500))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.90", + "value" -> 900, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.90", + s"${expectedCanonicalMetricName}.percentiles.90" -> 900))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.95", + "value" -> 948, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.95", + s"${expectedCanonicalMetricName}.percentiles.95" -> 948))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.99", + "value" -> 988, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.99", + s"${expectedCanonicalMetricName}.percentiles.99" -> 988))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.99_9", + "value" -> 1000, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.99_9", + s"${expectedCanonicalMetricName}.percentiles.99_9" -> 1000))).andReturn(true) + } + + def buildRecorder(name: String, tags: Map[String, String] = Map.empty): (Entity, TestEntityRecorder) = { + val entity = Entity(name, TestEntityRecorder.category, tags) + val recorder = Kamon.metrics.entity(TestEntityRecorder, entity) + (entity, recorder) + } + + def buildSimpleCounter(name: String, tags: Map[String, String] = Map.empty): (Entity, CounterRecorder) = { + val entity = Entity(name, SingleInstrumentEntityRecorder.Counter, tags) + val counter = Kamon.metrics.counter(name, tags) + val recorder = CounterRecorder(CounterKey("counter", UnitOfMeasurement.Unknown), counter) + (entity, recorder) + } + + def buildSimpleHistogram(name: String, tags: Map[String, String] = Map.empty): (Entity, HistogramRecorder) = { + val entity = Entity(name, SingleInstrumentEntityRecorder.Histogram, tags) + val histogram = Kamon.metrics.histogram(name, tags) + val recorder = HistogramRecorder(CounterKey("histogram", UnitOfMeasurement.Unknown), histogram) + (entity, recorder) + } + + def run(metrics: Map[Entity, EntitySnapshot]) = { + val histoGramStatConfig = new HistogramStatsConfig(List("*"), List(50.0, 90.0, 95.0, 99.0, 99.9)) + val metricsSender = system.actorOf(Props( + new FluentdMetricsSender("kamon.fluentd", "localhost", 24224, histoGramStatConfig) { + override def sender(host: String, port: Int): Sender = fluentSenderMock + })) + val fakeSnapshot = TickMetricSnapshot(new MilliTimestamp(tickFrom), new MilliTimestamp(tickTo), metrics) + metricsSender ! fakeSnapshot + } + } + +} + +class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val myHistogram = histogram("my_histogram") + val myCounter = counter("my_counter") +} + +object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "sample_category" + + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) +} + +// Sender.emit("kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", 150, Map(stats.name -> count, unit_of_measurement.name -> unknown, my-app.sample_category.dummy_entity.my_counter.count -> 200, entity.name -> dummy_entity, category.name -> sample_category, canonical_metric.name -> my-app.sample_category.dummy_entity.my_counter.count, app.name -> my-app, unit_of_measurement.label -> unknown, tags.tagName -> tagValue, metric.name -> my_counter, value -> 200)): +// Sender.emit("kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", 150, Map(instrument.name -> my_counter, unit_of_measurement.name -> unknown, my-app.sample_category.dummy_entity.my_counter.count -> 200, entity.name -> dummy_entity, category.name -> sample_category, canonical_metric.name -> my-app.sample_category.dummy_entity.my_counter.count, app.name -> my-app, unit_of_measurement.label -> unknown, tags.tagName -> tagValue, metric.name -> count, value -> 200)): expected: 1, actual: 0 diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3b9b7980..394fb9f2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,6 +54,8 @@ object Dependencies { val sigarLoader = "io.kamon" % "sigar-loader" % "1.6.5-rev002" val h2 = "com.h2database" % "h2" % "1.4.182" val el = "org.glassfish" % "javax.el" % "3.0.0" + val fluentdLogger = "org.fluentd" %% "fluent-logger-scala" % "0.5.1" + val easyMock = "org.easymock" % "easymock" % "3.2" //play 2.3.x val play23 = "com.typesafe.play" %% "play" % play23Version diff --git a/project/Projects.scala b/project/Projects.scala index 7f4558c4..0293acad 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -24,7 +24,7 @@ object Projects extends Build { lazy val kamon = Project("kamon", file(".")) .aggregate(kamonCore, kamonScala, kamonAkka, kamonSpray, kamonNewrelic, kamonPlayground, kamonTestkit, kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc, - kamonAnnotation, kamonPlay23, kamonPlay24, kamonJMXReporter) + kamonAnnotation, kamonPlay23, kamonPlay24, kamonJMXReporter, kamonFluentd) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -226,5 +226,14 @@ object Projects extends Build { compile(akkaActor) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) + lazy val kamonFluentd = Project("kamon-fluentd", file("kamon-fluentd")) + .dependsOn(kamonCore % "compile->compile;test->test") + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ compile(fluentdLogger) ++ + test(scalatest, akkaTestKit, easyMock, slf4Api, slf4nop)) + val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false) } |