diff options
64 files changed, 2092 insertions, 243 deletions
diff --git a/kamon-akka-remote/src/test/resources/logback.xml b/kamon-akka-remote/src/test/resources/logback.xml index dd623d61..b467456f 100644 --- a/kamon-akka-remote/src/test/resources/logback.xml +++ b/kamon-akka-remote/src/test/resources/logback.xml @@ -1,17 +1,12 @@ -<configuration scan="true"> - <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> - <resetJUL>true</resetJUL> - </contextListener> - - <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> - +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> - <root level="error"> + <root level="OFF"> <appender-ref ref="STDOUT"/> </root> -</configuration> +</configuration>
\ No newline at end of file diff --git a/kamon-akka/src/test/resources/logback.xml b/kamon-akka/src/test/resources/logback.xml index df142eac..c336bbfe 100644 --- a/kamon-akka/src/test/resources/logback.xml +++ b/kamon-akka/src/test/resources/logback.xml @@ -1,13 +1,12 @@ -<configuration scan="true" debug="false"> - <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> - +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> - <root level="off"> + <root level="OFF"> <appender-ref ref="STDOUT"/> </root> -</configuration> +</configuration>
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index f2d1e319..54050c18 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -16,14 +16,11 @@ package kamon import _root_.akka.actor import _root_.akka.actor._ -import _root_.akka.event.Logging -import com.typesafe.config.{ ConfigFactory, Config } +import com.typesafe.config.{ Config, ConfigFactory } import kamon.metric._ -import kamon.trace.{ TracerModuleImpl, TracerModule } +import kamon.trace.TracerModuleImpl import org.slf4j.LoggerFactory -import _root_.scala.util.Try - object Kamon { private val log = LoggerFactory.getLogger(getClass) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 1bd72089..9642233a 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -158,7 +158,7 @@ object TimestampedTraceContextAware { } trait SegmentAware { - @volatile var segment: Segment = EmptyTraceContext.EmptySegment + @volatile @transient var segment: Segment = EmptyTraceContext.EmptySegment } object SegmentAware { diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml index 7bb50450..b467456f 100644 --- a/kamon-core/src/test/resources/logback.xml +++ b/kamon-core/src/test/resources/logback.xml @@ -1,18 +1,12 @@ -<configuration - > - <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> - <resetJUL>true</resetJUL> - </contextListener> - - <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> - +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> - <root level="error"> + <root level="OFF"> <appender-ref ref="STDOUT"/> </root> -</configuration> +</configuration>
\ No newline at end of file diff --git a/kamon-examples/kamon-annotation-example/java/pom.xml b/kamon-examples/kamon-annotation-example/java/pom.xml new file mode 100644 index 00000000..d1fa4690 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/java/pom.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>kamon.annotation</groupId> + <artifactId>kamon-spring-boot</artifactId> + <version>0.1.0</version> + + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>1.2.3.RELEASE</version> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + <version>0.5.2</version> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-annotation_2.11</artifactId> + <version>0.5.2</version> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-log-reporter_2.11</artifactId> + <version>0.5.2</version> + </dependency> + </dependencies> + + <properties> + <java.version>1.8</java.version> + </properties> + + + <build> + <plugins> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <repositories> + <repository> + <id>io.kamon</id> + <url>http://snapshots.kamon.io</url> + </repository> + </repositories> +</project> diff --git a/kamon-examples/kamon-annotation-example/java/src/main/java/kamon/annotation/KamonController.java b/kamon-examples/kamon-annotation-example/java/src/main/java/kamon/annotation/KamonController.java new file mode 100644 index 00000000..27eeda53 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/java/src/main/java/kamon/annotation/KamonController.java @@ -0,0 +1,17 @@ +package kamon.annotation; + +import org.springframework.boot.autoconfigure.*; +import org.springframework.stereotype.*; +import org.springframework.web.bind.annotation.*; + +@Controller +@EnableAutoConfiguration +@RequestMapping("/kamon") +@EnableKamon +public class KamonController { + + @RequestMapping("/counter") + @ResponseBody + @Count(name = "awesomeCounter") + public String counter() { return "count!!!"; } +} diff --git a/kamon-examples/kamon-annotation-example/java/src/main/java/kamon/annotation/KamonSpringApplication.java b/kamon-examples/kamon-annotation-example/java/src/main/java/kamon/annotation/KamonSpringApplication.java new file mode 100644 index 00000000..fec55df6 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/java/src/main/java/kamon/annotation/KamonSpringApplication.java @@ -0,0 +1,11 @@ +package kamon.annotation; + +import kamon.Kamon; +import org.springframework.boot.SpringApplication; + +public class KamonSpringApplication { + public static void main(String... args) { + Kamon.start(); + SpringApplication.run(KamonController.class,args); + } +} diff --git a/kamon-examples/kamon-annotation-example/java/src/main/resources/application.conf b/kamon-examples/kamon-annotation-example/java/src/main/resources/application.conf new file mode 100644 index 00000000..595f9b89 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/java/src/main/resources/application.conf @@ -0,0 +1,10 @@ +############################### +# Kamon related configuration # +############################### + +kamon { + modules { + kamon-annotation.auto-start = yes + kamon-log-reporter.auto-start = yes + } +}
\ No newline at end of file diff --git a/kamon-examples/kamon-annotation-example/java/src/main/resources/logback.xml b/kamon-examples/kamon-annotation-example/java/src/main/resources/logback.xml new file mode 100644 index 00000000..fac5ad3d --- /dev/null +++ b/kamon-examples/kamon-annotation-example/java/src/main/resources/logback.xml @@ -0,0 +1,19 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${PID:- } [%t] --- %-40.40logger{39} : %m%n%wex"/> + + <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" /> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${CONSOLE_LOG_PATTERN}</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="CONSOLE" /> + </root> + + <!-- logger name="org.springframework" level="DEBUG"/ --> + +</configuration> diff --git a/kamon-examples/kamon-annotation-example/scala/build.sbt b/kamon-examples/kamon-annotation-example/scala/build.sbt new file mode 100644 index 00000000..2200ad6f --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/build.sbt @@ -0,0 +1,22 @@ +name := "spring-boot-kamon" + +version := "1.0" + +scalaVersion := "2.11.6" + +sbtVersion := "0.13.1" + +resolvers += "Kamon Repository Snapshots" at "http://snapshots.kamon.io" + +libraryDependencies ++= Seq( + "org.springframework.boot" % "spring-boot-starter-web" % "1.2.3.RELEASE", + "io.kamon" %% "kamon-core" % "0.5.2", + "io.kamon" %% "kamon-annotation" % "0.5.2", + "io.kamon" %% "kamon-log-reporter" % "0.5.2" +) + +aspectjSettings + +fork in run := true + +javaOptions in run <++= AspectjKeys.weaverOptions in Aspectj diff --git a/kamon-examples/kamon-annotation-example/scala/project/plugins.sbt b/kamon-examples/kamon-annotation-example/scala/project/plugins.sbt new file mode 100644 index 00000000..eba7b696 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.10.0") diff --git a/kamon-examples/kamon-annotation-example/scala/src/main/resources/META-INF/aop.xml b/kamon-examples/kamon-annotation-example/scala/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..9b144860 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/src/main/resources/META-INF/aop.xml @@ -0,0 +1,8 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <weaver options="-Xlint:ignore"> + <include within="kamon..*"/> + <exclude within="org.aspectj.*"/> + </weaver> +</aspectj> diff --git a/kamon-examples/kamon-annotation-example/scala/src/main/resources/application.conf b/kamon-examples/kamon-annotation-example/scala/src/main/resources/application.conf new file mode 100644 index 00000000..595f9b89 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/src/main/resources/application.conf @@ -0,0 +1,10 @@ +############################### +# Kamon related configuration # +############################### + +kamon { + modules { + kamon-annotation.auto-start = yes + kamon-log-reporter.auto-start = yes + } +}
\ No newline at end of file diff --git a/kamon-examples/kamon-annotation-example/scala/src/main/resources/logback.xml b/kamon-examples/kamon-annotation-example/scala/src/main/resources/logback.xml new file mode 100644 index 00000000..fac5ad3d --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/src/main/resources/logback.xml @@ -0,0 +1,19 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${PID:- } [%t] --- %-40.40logger{39} : %m%n%wex"/> + + <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" /> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${CONSOLE_LOG_PATTERN}</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="CONSOLE" /> + </root> + + <!-- logger name="org.springframework" level="DEBUG"/ --> + +</configuration> diff --git a/kamon-examples/kamon-annotation-example/scala/src/main/scala/kamon/annotation/KamonController.scala b/kamon-examples/kamon-annotation-example/scala/src/main/scala/kamon/annotation/KamonController.scala new file mode 100644 index 00000000..00c88715 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/src/main/scala/kamon/annotation/KamonController.scala @@ -0,0 +1,17 @@ +package kamon.annotation + +import org.springframework.boot.autoconfigure._ +import org.springframework.stereotype._ +import org.springframework.web.bind.annotation._ + +@Controller +@EnableAutoConfiguration +@RequestMapping(Array("/kamon")) +@EnableKamon +class KamonController { + + @RequestMapping(Array("/counter")) + @ResponseBody + @Count(name = "awesomeCounter") + def counter(): String = "count!!!" +}
\ No newline at end of file diff --git a/kamon-examples/kamon-annotation-example/scala/src/main/scala/kamon/annotation/KamonSpringApplication.scala b/kamon-examples/kamon-annotation-example/scala/src/main/scala/kamon/annotation/KamonSpringApplication.scala new file mode 100644 index 00000000..595900a8 --- /dev/null +++ b/kamon-examples/kamon-annotation-example/scala/src/main/scala/kamon/annotation/KamonSpringApplication.scala @@ -0,0 +1,10 @@ +package kamon.annotation + +import kamon.Kamon +import org.springframework.boot.SpringApplication + +object KamonSpringApplication extends App { + Kamon.start() + + SpringApplication.run(classOf[KamonController]) +} diff --git a/kamon-examples/kamon-fluentd-example/README.md b/kamon-examples/kamon-fluentd-example/README.md new file mode 100644 index 00000000..0dc6a250 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/README.md @@ -0,0 +1,31 @@ +kamon-fluentd-example +------------------------------ + +An example Spray application with Kamon monitoring reporting to Fluentd Server. + +Prerequisites +--------------- +* fluentd: + ```sh + you@host:kamon-fluentd-example $ gem install fluentd + ``` + +* install kamon snapshots to local: + ```sh + you@host:kamon-fluentd-example $ cd ../../ + you@host:Kamon $ sbt "+ publishLocal" + ... snip... + [info] published ivy to /Users/___/.ivy2/local/io.kamon/kamon-akka-remote_2.11/0.5.2-021ffd253e104342e6b4c75ae42717b51e3b6b26/ivys/ivy.xml + [success] Total time: 248 s, completed 2015/10/04 0:27:53 + [info] Setting version to 2.10.4 + [info] Reapplying settings... + [info] Set current project to kamon (in build file:/Users/___/kamon-io/Kamon/) + ``` + +* edit build.sbt. edit `kamonV` variable with installed snapshot version (`0.5.2-021ffd253e104342e6b4c75ae42717b51e3b6b26` in the above example). + +How to run +------------ +1. just do it: `sbt aspectj-runner:run` +2. you'll see kamon-log-reporter outputs on console. +3. you'll also see kamon metrics sent to fluentd on files named `target/fluentd_out.****` diff --git a/kamon-examples/kamon-fluentd-example/build.sbt b/kamon-examples/kamon-fluentd-example/build.sbt new file mode 100644 index 00000000..a240fad3 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/build.sbt @@ -0,0 +1,31 @@ +import sbt._ +import sbt.Keys._ + +name := "kamon-fluentd-example" + +version := "1.0" + +scalaVersion := "2.11.6" + +resolvers += "Kamon repo" at "http://repo.kamon.io" + +resolvers += "spray repo" at "http://repo.spray.io" + +libraryDependencies ++= { + val akkaV = "2.3.5" + val sprayV = "1.3.1" + val kamonV = "EDIT_HERE" + Seq( + "io.spray" %% "spray-can" % sprayV, + "io.spray" %% "spray-routing" % sprayV, + "io.kamon" %% "kamon-core" % kamonV, + "io.kamon" %% "kamon-system-metrics" % kamonV, + "io.kamon" %% "kamon-akka" % kamonV, + "io.kamon" %% "kamon-scala" % kamonV, + "io.kamon" %% "kamon-spray" % kamonV, + "io.kamon" %% "kamon-fluentd" % kamonV, + "io.kamon" %% "kamon-log-reporter" % kamonV, + "org.aspectj" % "aspectjweaver" % "1.8.4" + ) +} + diff --git a/kamon-examples/kamon-fluentd-example/project/build.properties b/kamon-examples/kamon-fluentd-example/project/build.properties new file mode 100644 index 00000000..df58110a --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.6
\ No newline at end of file diff --git a/kamon-examples/kamon-fluentd-example/project/plugins.sbt b/kamon-examples/kamon-fluentd-example/project/plugins.sbt new file mode 100644 index 00000000..ab3750e2 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/project/plugins.sbt @@ -0,0 +1,6 @@ +resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/" + +resolvers += "Kamon Releases" at "http://repo.kamon.io" + +addSbtPlugin("io.kamon" % "aspectj-runner" % "0.1.2") + diff --git a/kamon-examples/kamon-fluentd-example/src/main/resources/application.conf b/kamon-examples/kamon-fluentd-example/src/main/resources/application.conf new file mode 100644 index 00000000..6a251869 --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/src/main/resources/application.conf @@ -0,0 +1,73 @@ +# ===================================== # +# Kamon-Fluentd Reference Configuration # +# ===================================== # + +kamon { + metric.filters { + akka-actor { + includes = ["**"], + } + + akka-dispatcher { + includes = ["**"] + } + + akka-router { + includes = ["**"] + } + } + + fluentd { + # Hostname and port of fluentd server to which kamon fluentd sends metrics. + hostname = "localhost" + port = 24224 + + # tag prefix of metrics data which is sent to fluentd server + tag = "kamon.fluentd" + + # Interval between metrics data flushes to fluentd server. + # It's value must be equal or greater than the kamon.metric.tick-interval setting. + flush-interval = 10 seconds + + # Your app name + application-name = "kamon-fluentd-example" + + # Subscription patterns used to select which metrics will be pushed to Fluentd. Note that first, metrics + # collection for your desired entities must be activated under the kamon.metrics.filters settings. + subscriptions { + histogram = [ "**" ] + min-max-counter = [ "**" ] + gauge = [ "**" ] + counter = [ "**" ] + trace = [ "**" ] + trace-segment = [ "**" ] + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + http-server = [ "**" ] + } + + # statistic values to be reported for histogram type metrics + # (i.e. Histogram, MinMaxCounter, Gauge). + histogram-stats { + # stats values: + # "count", "min", "max", "average", "percentiles" are supported. + # you can use "*" for wildcards. + subscription = [ "count", "min", "max", "average", "percentiles" ], + + # percentile points: + # this will be used when you set "percentiles" in "subscription" above. + # In this example, kamon-fluentd reports 50th 90th, 99th and 99.9th percentiles. + percentiles = [50.0, 90.0, 99.0, 99.9] + } + } + + modules { + kamon-fluentd { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.fluentd.Fluentd" + } + } +}
\ No newline at end of file diff --git a/kamon-examples/kamon-fluentd-example/src/main/scala/KamonFluentdExample.scala b/kamon-examples/kamon-fluentd-example/src/main/scala/KamonFluentdExample.scala new file mode 100644 index 00000000..bb1d0f7b --- /dev/null +++ b/kamon-examples/kamon-fluentd-example/src/main/scala/KamonFluentdExample.scala @@ -0,0 +1,60 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import akka.actor.ActorSystem +import kamon.Kamon +import spray.routing.SimpleRoutingApp +import xerial.fluentd.{ FluentdConfig, FluentdStandalone } + +import scala.util.Properties + +object KamonFluentdExample extends App with SimpleRoutingApp { + // Start fluentd server only for this sample app + // In real usecase, you will spawn fluentd server independently. + val fluentdServer = FluentdStandalone.start(FluentdConfig(configuration = + """ + |<source> + | type forward + | port 24224 + |</source> + |<match **> + | type file + | path target/fluentd-out + |</match> + """.stripMargin)) + sys.addShutdownHook { + fluentdServer.stop + } + + // start Kamon + Kamon.start() + + implicit val system = ActorSystem("kamon-fluentd-example") + + // server endpoint + val interface = "0.0.0.0" + val port = Properties.envOrElse("PORT", "8080").toInt + + // resource endpoints + startServer(interface = interface, port = port) { + path("hello") { + get { + complete { + <h1>Hello! Kamon Fluentd Example!</h1> + } + } + } + } +} diff --git a/kamon-fluentd/src/main/resources/reference.conf b/kamon-fluentd/src/main/resources/reference.conf new file mode 100644 index 00000000..7490cb44 --- /dev/null +++ b/kamon-fluentd/src/main/resources/reference.conf @@ -0,0 +1,59 @@ +# ===================================== # +# Kamon-Fluentd Reference Configuration # +# ===================================== # + +kamon { + fluentd { + # Hostname and port of fluentd server to which kamon fluentd sends metrics. + hostname = "localhost" + port = 24224 + + # tag prefix of metrics data which is sent to fluentd server + tag = "kamon.fluentd" + + # Interval between metrics data flushes to fluentd server. + # It's value must be equal or greater than the kamon.metric.tick-interval setting. + flush-interval = 10 seconds + + # Your app name + application-name = "my-app" + + # Subscription patterns used to select which metrics will be pushed to Fluentd. Note that first, metrics + # collection for your desired entities must be activated under the kamon.metrics.filters settings. + subscriptions { + histogram = [ "**" ] + min-max-counter = [ "**" ] + gauge = [ "**" ] + counter = [ "**" ] + trace = [ "**" ] + trace-segment = [ "**" ] + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + http-server = [ "**" ] + } + + # statistic values to be reported for histogram type metrics + # (i.e. Histogram, MinMaxCounter, Gauge). + histogram-stats { + # stats values: + # "count", "min", "max", "average", "percentiles" are supported. + # you can use "*" for wildcards. + subscription = [ "count", "min", "max", "average", "percentiles" ], + + # percentile points: + # this will be used when you set "percentiles" in "subscription" above. + # In this example, kamon-fluentd reports 50th 90th, 99th and 99.9th percentiles. + percentiles = [50.0, 90.0, 99.0, 99.9] + } + } + + modules { + kamon-fluentd { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.fluentd.Fluentd" + } + } +} diff --git a/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala b/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala new file mode 100644 index 00000000..7ee6b4d4 --- /dev/null +++ b/kamon-fluentd/src/main/scala/kamon/fluentd/Fluentd.scala @@ -0,0 +1,207 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.fluentd + +import akka.actor._ +import akka.event.Logging +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram } +import kamon.util.ConfigTools.Syntax +import kamon.util.MilliTimestamp +import org.fluentd.logger.scala.FluentLogger +import org.fluentd.logger.scala.sender.{ ScalaRawSocketSender, Sender } + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration + +object Fluentd extends ExtensionId[FluentdExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = Fluentd + + override def createExtension(system: ExtendedActorSystem): FluentdExtension = new FluentdExtension(system) +} + +class FluentdExtension(system: ExtendedActorSystem) extends Kamon.Extension { + private val fluentdConfig = system.settings.config.getConfig("kamon.fluentd") + val host = fluentdConfig.getString("hostname") + val port = fluentdConfig.getInt("port") + val tag = fluentdConfig.getString("tag") + val flushInterval = fluentdConfig.getFiniteDuration("flush-interval") + val tickInterval = Kamon.metrics.settings.tickInterval + val subscriptions = fluentdConfig.getConfig("subscriptions") + val histogramStatsConfig = new HistogramStatsConfig( + fluentdConfig.getStringList("histogram-stats.subscription").asScala.toList, + fluentdConfig.getDoubleList("histogram-stats.percentiles").asScala.toList.map(_.toDouble)) + + val log = Logging(system, classOf[FluentdExtension]) + log.info("Starting the Kamon(Fluentd) extension") + + val subscriber = buildMetricsListener(flushInterval, tickInterval, tag, host, port, histogramStatsConfig) + subscriptions.firstLevelKeys foreach { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon.metrics.subscribe(subscriptionCategory, pattern, subscriber, permanently = true) + } + } + + def buildMetricsListener(flushInterval: FiniteDuration, tickInterval: FiniteDuration, + tag: String, host: String, port: Int, + histogramStatsConfig: HistogramStatsConfig): ActorRef = { + assert(flushInterval >= tickInterval, "Fluentd flush-interval needs to be equal or greater to the tick-interval") + + val metricsSender = system.actorOf( + Props(new FluentdMetricsSender(tag, host, port, histogramStatsConfig)), + "kamon-fluentd") + if (flushInterval == tickInterval) { + metricsSender + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, metricsSender), "kamon-fluentd-buffer") + } + } +} + +class FluentdMetricsSender(val tag: String, val host: String, val port: Int, histogramStatsConfig: HistogramStatsConfig) + extends Actor with ActorLogging with FluentLoggerSenderProvider { + + private val config = context.system.settings.config + val appName = config.getString("kamon.fluentd.application-name") + val histogramStatsBuilder = HistogramStatsBuilder(histogramStatsConfig) + lazy val fluentd = FluentLogger(tag, sender(host, port)) + + def receive = { + case tick: TickMetricSnapshot ⇒ sendMetricSnapshotToFluentd(tick) + } + + def sendMetricSnapshotToFluentd(tick: TickMetricSnapshot): Unit = { + val time = tick.to + for { + (groupIdentity, groupSnapshot) ← tick.metrics + (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + } { + + val fluentdTagName = fluentdTagNameFor(groupIdentity, metricIdentity) + + val attrs = Map( + "app.name" -> appName, + "category.name" -> groupIdentity.category, + "entity.name" -> groupIdentity.name, + "metric.name" -> metricIdentity.name, + "unit_of_measurement.name" -> metricIdentity.unitOfMeasurement.name, + "unit_of_measurement.label" -> metricIdentity.unitOfMeasurement.label) ++ groupIdentity.tags.map(kv ⇒ s"tags.${kv._1}" -> kv._2) + + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + if (hs.numberOfMeasurements > 0) { + histogramStatsBuilder.buildStats(hs) foreach { + case (_name, value) ⇒ + log_fluentd(time, fluentdTagName, _name, value, attrs) + } + fluentd.flush() + } + case cs: Counter.Snapshot ⇒ + if (cs.count > 0) { + log_fluentd(time, fluentdTagName, "count", cs.count, attrs) + fluentd.flush() + } + } + } + } + + private def log_fluentd(time: MilliTimestamp, fluentdTagName: String, statsName: String, value: Any, + attrs: Map[String, String] = Map.empty) = { + fluentd.log( + fluentdTagName, + attrs ++ Map( + "stats.name" -> statsName, + "value" -> value, + "canonical_metric.name" -> (fluentdTagName + "." + statsName), + (fluentdTagName + "." + statsName) -> value), + time.millis / 1000) + } + + private def isSingleInstrumentEntity(entity: Entity): Boolean = + SingleInstrumentEntityRecorder.AllCategories.contains(entity.category) + + private def fluentdTagNameFor(entity: Entity, metricKey: MetricKey): String = { + if (isSingleInstrumentEntity(entity)) { + s"$appName.${entity.category}.${entity.name}" + } else { + s"$appName.${entity.category}.${entity.name}.${metricKey.name}" + } + } +} + +trait FluentLoggerSenderProvider { + def sender(host: String, port: Int): Sender = new ScalaRawSocketSender(host, port, 3 * 1000, 1 * 1024 * 1024) +} + +case class HistogramStatsBuilder(config: HistogramStatsConfig) { + import HistogramStatsBuilder.RichHistogramSnapshot + import HistogramStatsConfig._ + + // this returns List of ("statsName", "value as String") + def buildStats(hs: Histogram.Snapshot): List[(String, Any)] = { + config.subscriptions.foldRight(List.empty[(String, Any)]) { (name, res) ⇒ + name match { + case COUNT ⇒ (name, hs.numberOfMeasurements) :: res + case MAX ⇒ (name, hs.max) :: res + case MIN ⇒ (name, hs.min) :: res + case AVERAGE ⇒ (name, hs.average) :: res + case PERCENTILES ⇒ { + config.percentiles.foldRight(List.empty[(String, Any)]) { (p, _res) ⇒ + val pStr = if (p.toString.matches("[0-9]+\\.[0]+")) p.toInt.toString else p.toString.replace(".", "_") + (name + "." + pStr, hs.percentile(p)) :: _res + } ++ res + } + } + } + } +} + +object HistogramStatsBuilder { + + implicit class RichHistogramSnapshot(histogram: Histogram.Snapshot) { + def average: Double = { + if (histogram.numberOfMeasurements == 0) 0D + else histogram.sum / histogram.numberOfMeasurements + } + } + +} + +class HistogramStatsConfig(_subscriptions: List[String], _percentiles: List[Double]) { + import HistogramStatsConfig._ + val subscriptions: List[String] = { + if (_subscriptions.contains("*")) { + supported + } else { + assert(_subscriptions.forall(supported.contains(_)), s"supported stats values are: ${supported.mkString(",")}") + _subscriptions + } + } + val percentiles: List[Double] = { + if (subscriptions.contains("percentiles")) { + assert(_percentiles.forall(p ⇒ 0.0 <= p && p <= 100.0), "every percentile point p must be 0.0 <= p <= 100.0") + } + _percentiles + } +} + +object HistogramStatsConfig { + val COUNT = "count"; val MIN = "min"; val MAX = "max" + val AVERAGE = "average"; val PERCENTILES = "percentiles" + private val supported = List(COUNT, MIN, MAX, AVERAGE, PERCENTILES) +} diff --git a/kamon-fluentd/src/test/resources/application.conf b/kamon-fluentd/src/test/resources/application.conf new file mode 100644 index 00000000..6f3d2f54 --- /dev/null +++ b/kamon-fluentd/src/test/resources/application.conf @@ -0,0 +1,6 @@ +kamon { + metric { + disable-aspectj-weaver-missing-error = true + default-collection-context-buffer-size = 1000 + } +}
\ No newline at end of file diff --git a/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala b/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala new file mode 100644 index 00000000..ed68288c --- /dev/null +++ b/kamon-fluentd/src/test/scala/kamon/fluentd/FluentdMetricsSenderSpec.scala @@ -0,0 +1,256 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.fluentd + +import akka.actor.Props +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric._ +import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp +import org.easymock.EasyMock.{ expect ⇒ mockExpect } +import org.fluentd.logger.scala.sender.Sender +import org.scalatest.mock.EasyMockSugar + +class FluentdMetricsSenderSpec extends BaseKamonSpec("fluentd-metrics-sender-spec") with EasyMockSugar { + "FluentdMetricsSender" should { + + "be able to send counter value in single instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + mockExpect(fluentSenderMock.emit( + "kamon.fluentd.my-app.counter.sample_counter", tickTo / 1000, + Map( + "app.name" -> "my-app", + "category.name" -> "counter", + "entity.name" -> "sample_counter", + "unit_of_measurement.name" -> "unknown", + "unit_of_measurement.label" -> "unknown", + "metric.name" -> "counter", + "stats.name" -> "count", + "value" -> increment, + "canonical_metric.name" -> "my-app.counter.sample_counter.count", + "my-app.counter.sample_counter.count" -> increment))).andReturn(true) + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildSimpleCounter("sample_counter") + testRecorder.instrument.increment(increment) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + "be able to send histogram in single instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + expectHistgramLog(fluentSenderMock, "my-app", "histogram", "my_histogram") + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildSimpleHistogram("my_histogram") + histogramData.foreach(testRecorder.instrument.record(_)) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + "be able to send counter in multiple instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + mockExpect(fluentSenderMock.emit( + "kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", tickTo / 1000, + Map( + "app.name" -> "my-app", + "category.name" -> "sample_category", + "entity.name" -> "dummy_entity", + "metric.name" -> "my_counter", + "stats.name" -> "count", + "value" -> increment, + "unit_of_measurement.name" -> "unknown", + "unit_of_measurement.label" -> "unknown", + "canonical_metric.name" -> "my-app.sample_category.dummy_entity.my_counter.count", + "my-app.sample_category.dummy_entity.my_counter.count" -> increment, + "tags.tagName" -> "tagValue"))).andReturn(true) + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildRecorder("dummy_entity", Map("tagName" -> "tagValue")) + testRecorder.myCounter.increment(increment) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + "be able to send histogram in multiple instrument entity" in new MockingFluentLoggerSenderFixture { + expecting { + expectHistgramLog(fluentSenderMock, "my-app", "sample_category", "dummy_entity", "my_histogram") + mockExpect(fluentSenderMock.flush()) + } + + whenExecuting(fluentSenderMock) { + val (entity, testRecorder) = buildRecorder("dummy_entity") + histogramData.foreach(testRecorder.myHistogram.record(_)) + run(Map(entity -> testRecorder.collect(collectionContext))) + Thread.sleep(100) + } + } + + } + + trait MockingFluentLoggerSenderFixture { + val fluentSenderMock: Sender = mock[Sender] + + val tickFrom = 100000L + val tickTo = 150000L + val histogramData = (1 to 1000).toList + val increment: Long = 200L + + def expectHistgramLog(mock: Sender, appName: String, categoryName: String, + entityName: String, instrumentName: String = "histogram") = { + val expectedAttr = Map( + "app.name" -> appName, + "category.name" -> s"${categoryName}", + "entity.name" -> s"${entityName}", + "metric.name" -> s"${instrumentName}", + "unit_of_measurement.label" -> "unknown", + "unit_of_measurement.name" -> "unknown") + val expectedCanonicalMetricName = if (categoryName == "histogram") + s"${appName}.${categoryName}.${entityName}" + else + s"${appName}.${categoryName}.${entityName}.${instrumentName}" + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "count", + "value" -> 1000, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.count", + s"${expectedCanonicalMetricName}.count" -> 1000))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "min", + "value" -> 1, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.min", + s"${expectedCanonicalMetricName}.min" -> 1))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "max", + "value" -> 1000, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.max", + s"${expectedCanonicalMetricName}.max" -> 1000))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "average", + "value" -> 499.0, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.average", + s"${expectedCanonicalMetricName}.average" -> 499.0))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.50", + "value" -> 500, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.50", + s"${expectedCanonicalMetricName}.percentiles.50" -> 500))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.90", + "value" -> 900, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.90", + s"${expectedCanonicalMetricName}.percentiles.90" -> 900))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.95", + "value" -> 948, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.95", + s"${expectedCanonicalMetricName}.percentiles.95" -> 948))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.99", + "value" -> 988, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.99", + s"${expectedCanonicalMetricName}.percentiles.99" -> 988))).andReturn(true) + + mockExpect(mock.emit( + s"kamon.fluentd.${expectedCanonicalMetricName}", tickTo / 1000, + expectedAttr ++ Map( + "stats.name" -> "percentiles.99_9", + "value" -> 1000, + "canonical_metric.name" -> s"${expectedCanonicalMetricName}.percentiles.99_9", + s"${expectedCanonicalMetricName}.percentiles.99_9" -> 1000))).andReturn(true) + } + + def buildRecorder(name: String, tags: Map[String, String] = Map.empty): (Entity, TestEntityRecorder) = { + val entity = Entity(name, TestEntityRecorder.category, tags) + val recorder = Kamon.metrics.entity(TestEntityRecorder, entity) + (entity, recorder) + } + + def buildSimpleCounter(name: String, tags: Map[String, String] = Map.empty): (Entity, CounterRecorder) = { + val entity = Entity(name, SingleInstrumentEntityRecorder.Counter, tags) + val counter = Kamon.metrics.counter(name, tags) + val recorder = CounterRecorder(CounterKey("counter", UnitOfMeasurement.Unknown), counter) + (entity, recorder) + } + + def buildSimpleHistogram(name: String, tags: Map[String, String] = Map.empty): (Entity, HistogramRecorder) = { + val entity = Entity(name, SingleInstrumentEntityRecorder.Histogram, tags) + val histogram = Kamon.metrics.histogram(name, tags) + val recorder = HistogramRecorder(CounterKey("histogram", UnitOfMeasurement.Unknown), histogram) + (entity, recorder) + } + + def run(metrics: Map[Entity, EntitySnapshot]) = { + val histoGramStatConfig = new HistogramStatsConfig(List("*"), List(50.0, 90.0, 95.0, 99.0, 99.9)) + val metricsSender = system.actorOf(Props( + new FluentdMetricsSender("kamon.fluentd", "localhost", 24224, histoGramStatConfig) { + override def sender(host: String, port: Int): Sender = fluentSenderMock + })) + val fakeSnapshot = TickMetricSnapshot(new MilliTimestamp(tickFrom), new MilliTimestamp(tickTo), metrics) + metricsSender ! fakeSnapshot + } + } + +} + +class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val myHistogram = histogram("my_histogram") + val myCounter = counter("my_counter") +} + +object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "sample_category" + + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) +} + +// Sender.emit("kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", 150, Map(stats.name -> count, unit_of_measurement.name -> unknown, my-app.sample_category.dummy_entity.my_counter.count -> 200, entity.name -> dummy_entity, category.name -> sample_category, canonical_metric.name -> my-app.sample_category.dummy_entity.my_counter.count, app.name -> my-app, unit_of_measurement.label -> unknown, tags.tagName -> tagValue, metric.name -> my_counter, value -> 200)): +// Sender.emit("kamon.fluentd.my-app.sample_category.dummy_entity.my_counter", 150, Map(instrument.name -> my_counter, unit_of_measurement.name -> unknown, my-app.sample_category.dummy_entity.my_counter.count -> 200, entity.name -> dummy_entity, category.name -> sample_category, canonical_metric.name -> my-app.sample_category.dummy_entity.my_counter.count, app.name -> my-app, unit_of_measurement.label -> unknown, tags.tagName -> tagValue, metric.name -> count, value -> 200)): expected: 1, actual: 0 diff --git a/kamon-jmx/src/main/resources/reference.conf b/kamon-jmx/src/main/resources/reference.conf new file mode 100644 index 00000000..41bf366b --- /dev/null +++ b/kamon-jmx/src/main/resources/reference.conf @@ -0,0 +1,30 @@ +# ========================================== # +# Kamon-JMX-Reporter Reference Configuration # +# ========================================== # + +kamon { + + jmx { + subscriptions { + histogram = [ "**" ] + min-max-counter = [ "**" ] + gauge = [ "**" ] + counter = [ "**" ] + trace = [ "**" ] + trace-segment = [ "**" ] + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + http-server = [ "**" ] + } + } + + modules { + kamon-jmx { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.jmx.JMXReporter" + } + } +} diff --git a/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala new file mode 100644 index 00000000..e1e12aea --- /dev/null +++ b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= +*/ + +package kamon.jmx + +import javax.management.{JMException, JMRuntimeException} + +import akka.actor._ +import akka.event.Logging +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.util.ConfigTools.Syntax +import scala.collection.JavaConverters._ + +object JMXReporter extends ExtensionId[JMXExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = JMXReporter + + override def createExtension(system: ExtendedActorSystem): JMXExtension = new JMXExtension(system) +} + +class JMXExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val log = Logging(system, getClass) + log.info("Starting the Kamon(JMX) extension") + + val subscriber = system.actorOf(Props[JMXReporterSupervisor], "kamon-jmx-reporter") + + val jmxConfig = system.settings.config.getConfig("kamon.jmx") + val subscriptions = jmxConfig.getConfig("subscriptions") + + subscriptions.firstLevelKeys foreach { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon.metrics.subscribe(subscriptionCategory, pattern, subscriber, permanently = true) + } + } +} + +private trait ActorJMXSupervisor extends Actor with ActorLogging { + + import akka.actor.OneForOneStrategy + import akka.actor.SupervisorStrategy._ + + import scala.concurrent.duration._ + + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + case e: JMRuntimeException ⇒ + log.error(e, "Supervisor strategy STOPPING actor from errors during JMX invocation") + Stop + case e: JMException ⇒ + log.error(e, "Supervisor strategy STOPPING actor from incorrect invocation of JMX registration") + Stop + case t ⇒ + // Use the default supervisor strategy otherwise. + super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate) + } +} + +private class JMXReporterSupervisor extends Actor with ActorLogging with ActorJMXSupervisor { + private val jmxActor = context.actorOf(JMXReporterActor.props, "kamon-jmx-actor") + + def receive = { + case tick: TickMetricSnapshot ⇒ jmxActor ! tick + } +}
\ No newline at end of file diff --git a/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala new file mode 100644 index 00000000..110943f9 --- /dev/null +++ b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala @@ -0,0 +1,212 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= +*/ + +package kamon.jmx + +import java.lang.management.ManagementFactory +import javax.management._ + +import akka.actor.{Actor, Props} +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{Counter, Histogram, InstrumentSnapshot} +import kamon.metric.{Entity, EntitySnapshot} +import org.slf4j.LoggerFactory + +import scala.collection.concurrent.TrieMap + +private object MetricMBeans { + + private implicit class RichHistogramSnapshot(histogram: Histogram.Snapshot) { + def average: Double = { + if (histogram.numberOfMeasurements == 0) 0D + else histogram.sum / histogram.numberOfMeasurements + } + } + + private[jmx] sealed trait MetricMBean + + private[jmx] abstract class AbstractMetricMBean[T <: InstrumentSnapshot] extends MetricMBean { + private[jmx] var snapshot: T + + private[jmx] def objectName: ObjectName + } + + private[jmx] trait HistogramMetricMBean extends AbstractMetricMBean[Histogram.Snapshot] { + def getNumberOfMeasurements: Long + + def getMin: Long + + def get50thPercentile: Long + + def getPercentile(n: Double): Long + + def get70thPercentile: Long + + def get90thPercentile: Long + + def get95thPercentile: Long + + def get990thPercentile: Long + + def get999thPercentile: Long + + def getAvg: Double + + def getMax: Long + + def getSum: Long + } + + private[jmx] class HistogramMetric(@volatile private[jmx] override var snapshot: Histogram.Snapshot, private[jmx] override val objectName: ObjectName) extends HistogramMetricMBean { + override def getNumberOfMeasurements: Long = snapshot.numberOfMeasurements + + override def getMin: Long = snapshot.min + + override def get50thPercentile: Long = snapshot.percentile(50.0D) + + override def get70thPercentile: Long = snapshot.percentile(70.0D) + + override def get90thPercentile: Long = snapshot.percentile(90.0D) + + override def get95thPercentile: Long = snapshot.percentile(95.0D) + + override def get990thPercentile: Long = snapshot.percentile(99.0D) + + override def get999thPercentile: Long = snapshot.percentile(99.9D) + + override def getPercentile(n: Double): Long = snapshot.percentile(n) + + override def getMax: Long = snapshot.max + + override def getSum: Long = snapshot.sum + + override def getAvg: Double = snapshot.average + } + + private[jmx] trait CounterMetricMBean extends AbstractMetricMBean[Counter.Snapshot] { + def getCount: Long + } + + private[jmx] trait HistogramValueMetricMBean extends AbstractMetricMBean[Histogram.Snapshot] { + def getValue: Long + } + + private[jmx] class CounterMetric(@volatile private[jmx] override var snapshot: Counter.Snapshot, private[jmx] override val objectName: ObjectName) extends CounterMetricMBean { + override def getCount: Long = snapshot.count + } + + private[jmx] class HistogramValueMetric(@volatile private[jmx] override var snapshot: Histogram.Snapshot, private[jmx] override val objectName: ObjectName) extends HistogramValueMetricMBean { + override def getValue: Long = snapshot.max + } + + private[jmx] implicit def impCreateHistogramMetric(snapshot: Histogram.Snapshot, objectName: ObjectName) = new HistogramMetric(snapshot, objectName) + + private[jmx] implicit def impCounterMetric(snapshot: Counter.Snapshot, objectName: ObjectName) = new CounterMetric(snapshot, objectName) + +} + +private object MBeanManager { + + import MetricMBeans._ + + private val mbs = ManagementFactory.getPlatformMBeanServer + + private val registeredMBeans = TrieMap.empty[String, AbstractMetricMBean[_]] + + private val log = LoggerFactory.getLogger(getClass) + + private[jmx] def createOrUpdateMBean[M <: AbstractMetricMBean[T], T <: InstrumentSnapshot](group: String, name: String, snapshot: T)(implicit buildMetricMBean: (T, ObjectName) ⇒ M): Unit = { + registeredMBeans.get(name) match { + case Some(mbean: M) ⇒ + mbean.snapshot = snapshot + case None ⇒ + val objectName = new ObjectName(createMBeanName("kamon", group, name)) + val mbean = buildMetricMBean(snapshot, objectName) + registeredMBeans += name -> mbean + mbs.registerMBean(mbean, objectName) + case _ ⇒ throw new IllegalStateException("Illegal metric bean type") + } + } + + private[jmx] def unregisterAllBeans(): Unit = { + registeredMBeans.values.map(_.objectName).foreach(name ⇒ + try { + mbs.unregisterMBean(name) + } catch { + case e: InstanceNotFoundException ⇒ if (log.isTraceEnabled) log.trace(s"Error unregistering $name", e) + case e: MBeanRegistrationException ⇒ if (log.isDebugEnabled) log.debug(s"Error unregistering $name", e) + }) + registeredMBeans.clear() + } + + private def createMBeanName(group: String, `type`: String, name: String, scope: Option[String] = None): String = { + val nameBuilder: StringBuilder = new StringBuilder + nameBuilder.append(group) + nameBuilder.append(":type=") + nameBuilder.append(`type`) + if (scope.isDefined) { + nameBuilder.append(",scope=") + nameBuilder.append(scope.get) + } + if (name.length > 0) { + nameBuilder.append(",name=") + nameBuilder.append(name) + } + nameBuilder.toString + } +} + +private object JMXReporterActor { + + import MBeanManager._ + import MetricMBeans._ + + private[jmx] def props = Props(classOf[JMXReporterActor]) + + private def updateHystogramMetrics(group: String, name: String, hs: Histogram.Snapshot): Unit = { + createOrUpdateMBean[HistogramMetric, Histogram.Snapshot](group, name, hs) + } + + private def updateCounterMetrics(group: String, name: String, cs: Counter.Snapshot): Unit = { + createOrUpdateMBean[CounterMetric, Counter.Snapshot](group, name, cs) + } +} + +private class JMXReporterActor extends Actor { + + import JMXReporterActor._ + import MBeanManager.unregisterAllBeans + + def receive: Receive = { + case tick: TickMetricSnapshot ⇒ + for { + (entity, snapshot) ← tick.metrics + (metricKey, metricSnapshot) ← snapshot.metrics + } { + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + updateHystogramMetrics(entity.category, entity.name + "." + metricKey.name, hs) + case cs: Counter.Snapshot ⇒ + updateCounterMetrics(entity.category, entity.name + "." + metricKey.name, cs) + } + } + } + + override def postStop(): Unit = { + super.postStop() + unregisterAllBeans() + } +}
\ No newline at end of file diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index 6d394bbf..9c583d5f 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -33,6 +33,9 @@ kamon { akka-dispatcher = [ "**" ] akka-router = [ "**" ] } + + # if true send metrics over SSL + ssl = false } modules { diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 3e43da36..49a4993c 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -63,7 +63,7 @@ class Agent extends Actor with SprayJsonSupport with ActorLogging with MetricsSu def disconnected(attemptsLeft: Int): Receive = { case Connect ⇒ pipe(connectToCollector) to self - case Connected(collector, runID) ⇒ configureChildren(collector, runID) + case Connected(collector, runID, scheme) ⇒ configureChildren(collector, runID, scheme) case ConnectFailed(reason) if (attemptsLeft > 0) ⇒ scheduleReconnection(reason, attemptsLeft) case ConnectFailed(reason) ⇒ giveUpConnection() } @@ -85,8 +85,8 @@ class Agent extends Actor with SprayJsonSupport with ActorLogging with MetricsSu context stop self } - def configureChildren(collector: String, runID: Long): Unit = { - log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}]", runID, collector) + def configureChildren(collector: String, runID: Long, scheme: String): Unit = { + log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}] over: [{}]", runID, collector, scheme) context.children.foreach(_ ! Configure(collector, runID)) context become connected } @@ -105,8 +105,8 @@ class Agent extends Actor with SprayJsonSupport with ActorLogging with MetricsSu def connectToCollector: Future[ConnectResult] = { (for { collector ← selectCollector - runID ← connect(collector, agentSettings) - } yield Connected(collector, runID)) recover { case error ⇒ ConnectFailed(error) } + (runID, scheme) ← connect(collector, agentSettings) + } yield Connected(collector, runID, scheme)) recover { case error ⇒ ConnectFailed(error) } } def selectCollector: Future[String] = { @@ -116,10 +116,10 @@ class Agent extends Actor with SprayJsonSupport with ActorLogging with MetricsSu } } - def connect(collectorHost: String, connect: AgentSettings): Future[Long] = { + def connect(collectorHost: String, connect: AgentSettings): Future[(Long, String)] = { val apiClient = new ApiMethodClient(collectorHost, None, agentSettings, IO(Http)(context.system)) apiClient.invokeMethod(RawMethods.Connect, connect) map { json ⇒ - json.extract[Long]('return_value / 'agent_run_id) + (json.extract[Long]('return_value / 'agent_run_id), apiClient.scheme) } } } @@ -132,12 +132,12 @@ object Agent { case class Configure(collector: String, runID: Long) sealed trait ConnectResult - case class Connected(collector: String, runID: Long) extends ConnectResult + case class Connected(collector: String, runID: Long, scheme: String) extends ConnectResult case class ConnectFailed(reason: Throwable) extends ConnectResult } case class AgentSettings(licenseKey: String, appName: String, hostname: String, pid: Int, operationTimeout: Timeout, - maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double) + maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double, ssl: Boolean) object AgentSettings { @@ -147,6 +147,7 @@ object AgentSettings { val newRelicConfig = config.getConfig("kamon.newrelic") val licenseKey = newRelicConfig.getString("license-key") assert(licenseKey != "<put-your-key-here>", "You forgot to include your New Relic license key in the configuration settings!") + val ssl = newRelicConfig.getBoolean("ssl") AgentSettings( licenseKey, @@ -156,7 +157,8 @@ object AgentSettings { Timeout(newRelicConfig.getFiniteDuration("operation-timeout")), newRelicConfig.getInt("max-connect-retries"), newRelicConfig.getFiniteDuration("connect-retry-delay"), - newRelicConfig.getFiniteDuration("apdexT").toMillis / 1E3D) + newRelicConfig.getFiniteDuration("apdexT").toMillis / 1E3D, + ssl) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala index 263faa63..0550f433 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/ApiMethodClient.scala @@ -31,7 +31,8 @@ class ApiMethodClient(host: String, val runID: Option[Long], agentSettings: Agen } val httpClient = encode(Deflate) ~> sendReceive(httpTransport) ~> decode(Deflate) ~> unmarshal[JsValue] - val baseCollectorUri = Uri("/agent_listener/invoke_raw_method").withHost(host).withScheme("http") + val scheme = if (agentSettings.ssl) "https" else "http" + val baseCollectorUri = Uri("/agent_listener/invoke_raw_method").withHost(host).withScheme(scheme) def invokeMethod[T: Marshaller](method: String, payload: T): Future[JsValue] = { val methodQuery = ("method" -> method) +: baseQuery diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index 6e16b975..8db24e46 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -21,15 +21,18 @@ import spray.json._ object JsonProtocol extends DefaultJsonProtocol { implicit object ConnectJsonWriter extends RootJsonWriter[AgentSettings] { - def write(obj: AgentSettings): JsValue = + def write(obj: AgentSettings): JsValue = { + val appNames = obj.appName.split(";") JsArray( JsObject( "agent_version" -> JsString("3.1.0"), - "app_name" -> JsArray(JsString(obj.appName)), + "app_name" -> JsArray(appNames.map(n ⇒ JsString(n)).toVector), "host" -> JsString(obj.hostname), - "identifier" -> JsString(s"java:${obj.appName}"), + "identifier" -> JsString(s"java:${appNames(0)}"), "language" -> JsString("java"), + "ssl" -> JsString(obj.ssl.toString), "pid" -> JsNumber(obj.pid))) + } } implicit def seqWriter[T: JsonFormat] = new JsonFormat[Seq[T]] { diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index c1601fa8..e2f1e606 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -17,11 +17,13 @@ package kamon.newrelic import java.util - import akka.actor.{ Actor, ActorLogging } import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } -import com.newrelic.api.agent.{ NewRelic ⇒ NR } +import com.newrelic.agent.errors.{ ErrorService, ThrowableError } +import com.newrelic.agent.service.ServiceFactory import kamon.trace.{ Tracer, TraceContextAware } +import scala.util.Try +import scala.util.control.NoStackTrace trait CustomParamsSupport { this: NewRelicErrorLogger ⇒ @@ -31,6 +33,10 @@ trait CustomParamsSupport { class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSupport { override def customParams: Map[String, String] = Map.empty + private val errorService: Option[ErrorService] = Try(ServiceFactory.getRPMService).map(_.getErrorService).toOption + + if (errorService.isEmpty) + log.warning("Not sending errors to New Relic as the New Relic Agent is not started") def receive = { case InitializeLogger(_) ⇒ sender ! LoggerInitialized @@ -38,24 +44,50 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo case anythingElse ⇒ } - def notifyError(error: Error): Unit = runInFakeTransaction { + def notifyError(error: Error): Unit = { val params = new util.HashMap[String, String]() + customParams foreach { case (k, v) ⇒ params.put(k, v) } - if (error.isInstanceOf[TraceContextAware]) { - val ctx = error.asInstanceOf[TraceContextAware].traceContext - params put ("TraceToken", ctx.token) + params.put("LogSource", error.logSource) + params.put("LogClass", error.logClass.getCanonicalName) + error match { + case e: TraceContextAware if e.traceContext.token.length > 0 ⇒ + params.put("TraceToken", e.traceContext.token) + case _ ⇒ } - customParams foreach { case (k, v) ⇒ params.put(k, v) } + if (error.cause == Error.NoCause) + reportError(loggedMessage(error.message, params)) + else + reportError(loggedException(error.message, error.cause, params)) + } + + def loggedMessage(message: Any, params: util.HashMap[String, String]) = + loggedException(null, LoggedMessage(message), params) - if (error.cause == Error.NoCause) NR.noticeError(error.message.toString, params) - else NR.noticeError(error.cause, params) + def loggedException(message: Any, cause: Throwable, params: util.HashMap[String, String]) = { + if (Option(message).isDefined) params.put("ErrorMessage", message.toString) + val uri = s"/${Tracer.currentContext.name}" + val transaction = s"WebTransaction/Uri$uri" + LoggedException(cause, params, transaction, uri) } - //Really ugly, but temporal hack until next release... - def runInFakeTransaction[T](thunk: ⇒ T): T = { - val oldName = Thread.currentThread.getName - Thread.currentThread.setName(Tracer.currentContext.name) - try thunk finally Thread.currentThread.setName(oldName) + def reportError(error: ThrowableError) = errorService.foreach(_.reportError(error)) +} + +case class LoggedMessage(message: Any) extends Throwable(message match { case null ⇒ "" case s ⇒ s.toString }) with NoStackTrace + +case class LoggedException(cause: Throwable, params: util.HashMap[String, String], transaction: String, uri: String) + extends ThrowableError(null, transaction, cause, uri, System.currentTimeMillis(), null, null, null, params, null) { + + //ThrowableError has some funky equals method which gives false positives in tests + override def equals(any: Any): Boolean = { + any match { + case that: LoggedException ⇒ + that.cause == this.cause && that.params == this.params && that.transaction == this.transaction && that.uri == this.uri + case _ ⇒ false + } } -}
\ No newline at end of file + // + override def toString = s"${this.getClass.getSimpleName}: cause=$cause transaction=$transaction uri=$uri params=$params" +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 7e057407..604969ab 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -43,12 +43,12 @@ object WebTransactionMetricExtractor extends MetricExtractor { apdexBuilder.record(Time.Nanoseconds.scale(Time.Seconds)(record.level), record.count) } - Metric(elapsedTime, Time.Nanoseconds, "WebTransaction/Custom/" + entity.name, None) + Metric(elapsedTime, Time.Nanoseconds, "WebTransaction/Uri/" + entity.name, None) } // Accumulate all segment metrics - metrics.filterKeys(_.category == "trace-segment").map { - case (entity, entitySnapshot) if entity.tags("category") == SegmentCategory.HttpClient ⇒ + metrics.filterKeys(isHttpClientSegment).map { + case (entity, entitySnapshot) ⇒ val library = entity.tags("library") val trace = entity.tags("trace") val elapsedTime = entitySnapshot.histogram("elapsed-time").get @@ -89,12 +89,15 @@ object WebTransactionMetricExtractor extends MetricExtractor { val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { case ((host, library, traceName), snapshots) ⇒ val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) - Metric(mergedSnapshots, Time.Nanoseconds, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName)) + Metric(mergedSnapshots, Time.Nanoseconds, s"External/$host/$library", Some("WebTransaction/Uri/" + traceName)) } Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ transactionMetrics ++ externalByHost ++ externalByHostAndLibrary ++ externalScopedByHostAndLibrary } + + def isHttpClientSegment(entity: Entity): Boolean = + entity.category == "trace-segment" && entity.tags.get("category").filter(_ == SegmentCategory.HttpClient).isDefined } class ApdexBuilder(name: String, scope: Option[String], apdexT: Double) { diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala index e4289231..063755a8 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -48,6 +48,7 @@ class AgentSpec extends BaseKamonSpec("metric-reporter-spec") with RequestBuildi | license-key = 1111111111 | connect-retry-delay = 1 second | max-connect-retries = 3 + | ssl = true | } | | modules.kamon-newrelic.auto-start = no @@ -85,14 +86,15 @@ class AgentSpec extends BaseKamonSpec("metric-reporter-spec") with RequestBuildi | "host": "$host", | "identifier": "java:kamon", | "language": "java", - | "pid": $pid + | "pid": $pid, + | "ssl": "true" | } | ] """.stripMargin.parseJson)(sprayJsonMarshaller(JsValueFormat)) }) // Receive the runID - EventFilter.info(message = "Configuring New Relic reporters to use runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + EventFilter.info(message = "Configuring New Relic reporters to use runID: [161221111] and collector: [collector-8.newrelic.com] over: [https]", occurrences = 1).intercept { httpManager.reply(jsonResponse( """ | { @@ -143,7 +145,8 @@ class AgentSpec extends BaseKamonSpec("metric-reporter-spec") with RequestBuildi | "host": "$host", | "identifier": "java:kamon", | "language": "java", - | "pid": $pid + | "pid": $pid, + | "ssl": "true" | } | ] """.stripMargin.parseJson)(sprayJsonMarshaller(JsValueFormat)) @@ -151,7 +154,7 @@ class AgentSpec extends BaseKamonSpec("metric-reporter-spec") with RequestBuildi // Receive the runID EventFilter.info( - message = "Configuring New Relic reporters to use runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + message = "Configuring New Relic reporters to use runID: [161221112] and collector: [collector-8.newrelic.com] over: [https]", occurrences = 1).intercept { httpManager.reply(jsonResponse( """ @@ -202,7 +205,7 @@ class AgentSpec extends BaseKamonSpec("metric-reporter-spec") with RequestBuildi } def rawMethodUri(host: String, methodName: String): Uri = { - Uri(s"http://$host/agent_listener/invoke_raw_method").withQuery( + Uri(s"https://$host/agent_listener/invoke_raw_method").withQuery( "method" -> methodName, "license_key" -> "1111111111", "marshal_format" -> "json", diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/ConnectJsonWriterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/ConnectJsonWriterSpec.scala new file mode 100644 index 00000000..cba2bb18 --- /dev/null +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/ConnectJsonWriterSpec.scala @@ -0,0 +1,40 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import akka.util.Timeout +import org.scalatest.{ Matchers, WordSpecLike } +import scala.concurrent.duration.DurationInt +import spray.json.JsValue + +class ConnectJsonWriterSpec extends WordSpecLike with Matchers { + import kamon.newrelic.JsonProtocol._ + + "the ConnectJsonWriter" should { + "produce the correct Json when a single app name is configured" in { + ConnectJsonWriter.write(agentSettings("app1")).compactPrint shouldBe expectedJson(""""app1"""") + } + + "produce the correct Json when multiple app names are configured" in { + ConnectJsonWriter.write(agentSettings("app1;app2;app3")).compactPrint shouldBe expectedJson(""""app1","app2","app3""""); + } + } + + def agentSettings(appName: String) = AgentSettings("1111111111", appName, "test-host", 1, Timeout(5 seconds), 1, 30 seconds, 1D, false) + + def expectedJson(appName: String) = s"""[{"identifier":"java:app1","agent_version":"3.1.0","host":"test-host","ssl":"false","pid":1,"language":"java","app_name":[$appName]}]""" +}
\ No newline at end of file diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala index d4e815e5..ddc6ed69 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -55,7 +55,7 @@ class MetricReporterSpec extends BaseKamonSpec("metric-reporter-spec") with Spra | """.stripMargin) - val agentSettings = AgentSettings("1111111111", "kamon", "test-host", 1, Timeout(5 seconds), 1, 30 seconds, 1D) + val agentSettings = AgentSettings("1111111111", "kamon", "test-host", 1, Timeout(5 seconds), 1, 30 seconds, 1D, false) val baseQuery = Query( "license_key" -> agentSettings.licenseKey, "marshal_format" -> "json", diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/NewRelicErrorLoggerSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/NewRelicErrorLoggerSpec.scala new file mode 100644 index 00000000..0d27aa71 --- /dev/null +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/NewRelicErrorLoggerSpec.scala @@ -0,0 +1,171 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import akka.actor._ +import akka.testkit.ImplicitSender +import com.newrelic.agent.errors.ThrowableError +import com.typesafe.config.ConfigFactory +import kamon.testkit.BaseKamonSpec +import kamon.trace.Tracer +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +class NewRelicErrorLoggerSpec extends BaseKamonSpec("NewRelicErrorLoggerSpec") + with WordSpecLike + with Matchers + with ImplicitSender + with BeforeAndAfterAll { + + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loggers = ["kamon.newrelic.TestableNewRelicErrorLogger"] + |} + |kamon { + | modules.kamon-newrelic.auto-start = no + |} + """.stripMargin) + + override def afterAll(): Unit = shutdown(system) + + val actor = system.actorOf(Props[ErrorLoggingActor], "actor") + + "the NewRelicErrorLogger" should { + + "record a log message" in { + val msg = "logMessageOnly" + actor ! msg + expectMsg(LoggedException(LoggedMessage(msg), defaultParams, "WebTransaction/Uri/empty-trace", "/empty-trace")) + } + + "record an exception with no log message" in { + val ex = new Exception() + actor ! (null, ex) + expectMsg(LoggedException(ex, defaultParams, "WebTransaction/Uri/empty-trace", "/empty-trace")) + } + + "record a log message with exception" in { + val msg = "logMessageAndException" + val ex = new Exception("Exception message") + actor ! (msg, ex) + expectMsg(LoggedException(ex, defaultParamsWithErrorMessage(msg), "WebTransaction/Uri/empty-trace", "/empty-trace")) + } + + "record the TraceToken with the logged error" in { + val msg = "logMessageOnly" + + Tracer.withNewContext("traceName", Some("traceToken")) { + actor ! msg + } + + expectMsg(LoggedException(LoggedMessage(msg), defaultParamsWithTraceToken("traceToken"), "WebTransaction/Uri/traceName", "/traceName")) + } + } + + "A LoggedException" should { + val ex1 = LoggedException(new Throwable(), defaultParams, "transaction", "uri") + + "be equal to another if its members are equal" in { + val ex2 = ex1.copy() + ex1 == ex2 shouldBe true + } + + "be different to another if its transaction is different" in { + val ex2 = ex1.copy(transaction = "something else") + ex1 == ex2 shouldBe false + } + + "be different to another if its uri is different" in { + val ex2 = ex1.copy(uri = "something else") + ex1 == ex2 shouldBe false + } + + "be different to another if its cause is different" in { + val ex2 = ex1.copy(cause = new Throwable()) + ex1 == ex2 shouldBe false + } + + "be different to another if its params are different" in { + val ex2 = ex1.copy(params = defaultParamsWithErrorMessage("yikes")) + ex1 == ex2 shouldBe false + } + } + + "A LoggedMessage" should { + val msg1 = LoggedMessage("hello") + + "be equal to another if its members are equal" in { + val msg2 = msg1.copy() + msg1 == msg2 shouldBe true + } + + "be different to another if message is different" in { + val msg2 = msg1.copy(message = "another") + msg1 == msg2 shouldBe false + } + } + + def defaultParams = { + val params = new java.util.HashMap[String, String]() + params.put("LogSource", actor.path.toString) + params.put("LogClass", classOf[ErrorLoggingActor].getCanonicalName) + params + } + + def defaultParamsWithErrorMessage(msg: String) = { + val params = new java.util.HashMap[String, String]() + params.putAll(defaultParams) + params.put("ErrorMessage", msg) + params + } + + def defaultParamsWithTraceToken(traceToken: String) = { + val params = new java.util.HashMap[String, String]() + params.putAll(defaultParams) + params.put("TraceToken", traceToken) + params + } + +} + +class ErrorLoggingActor extends Actor with ActorLogging { + var lastSender: ActorRef = _ + + override def receive: Receive = testcase orElse replycase + + def testcase: Receive = { + case m: String ⇒ + lastSender = sender() + log.error(m) + case (m: String, e: Exception) ⇒ + lastSender = sender() + log.error(e, m) + case (_, e: Exception) ⇒ + lastSender = sender() + log.error(e, null) + } + + def replycase: Receive = { + case reply ⇒ + lastSender ! reply + } +} + +class TestableNewRelicErrorLogger extends NewRelicErrorLogger { + override def reportError(error: ThrowableError) = Some(context.actorSelection("/user/actor") ! error) +} diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index b25549bf..783bbecd 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -44,5 +44,6 @@ kamon { kamon-datadog.auto-start = yes kamon-log-reporter.auto-start = no kamon-system-metrics.auto-start = no + kamon-jmx.auto-start = yes } } diff --git a/kamon-spm/src/main/resources/reference.conf b/kamon-spm/src/main/resources/reference.conf index 397f14a7..1b2f3cf5 100644 --- a/kamon-spm/src/main/resources/reference.conf +++ b/kamon-spm/src/main/resources/reference.conf @@ -1,3 +1,7 @@ +# ==================================# +# Kamon-SPM Reference Configuration # +# ==================================# + kamon { spm { receiver-url = "https://spm-receiver.sematext.com/receiver/v1/_bulk" @@ -8,10 +12,10 @@ kamon { # hostname-alias = "custom hostname" subscriptions { - akka-actor = [ "**" ] + akka-actor = [ "**" ] akka-dispatcher = [ "**" ] - akka-router = [ "**" ] - system-metric = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] } } diff --git a/kamon-spm/src/main/scala/kamon/spm/SPM.scala b/kamon-spm/src/main/scala/kamon/spm/SPM.scala index b8bb4eab..d15c995b 100644 --- a/kamon-spm/src/main/scala/kamon/spm/SPM.scala +++ b/kamon-spm/src/main/scala/kamon/spm/SPM.scala @@ -44,8 +44,8 @@ class SPMExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.spm") val maxQueueSize = config.getInt("max-queue-size") - val retryInterval: FiniteDuration = config.getDuration("retry-interval", TimeUnit.MILLISECONDS) millis - val sendTimeout: FiniteDuration = config.getDuration("send-timeout", TimeUnit.MILLISECONDS) millis + val retryInterval: FiniteDuration = config.getFiniteDuration("retry-interval") + val sendTimeout: FiniteDuration = config.getFiniteDuration("send-timeout") val url = config.getString("receiver-url") val token = config.getString("token") val hostname = if (config.hasPath("hostname-alias")) { diff --git a/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala b/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala index 80d50c41..4182947b 100644 --- a/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala +++ b/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala @@ -22,7 +22,7 @@ import akka.util.Timeout import spray.http.Uri.Query import spray.http.{ HttpEntity, HttpResponse, Uri } import spray.httpx.RequestBuilding._ -import spray.json.{ DefaultJsonProtocol, _ } +import spray.json._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, Queue } @@ -103,7 +103,7 @@ class SPMMetricsSender(io: ActorRef, retryInterval: FiniteDuration, sendTimeout: post(metrics) } case resp: HttpResponse if resp.status.isFailure ⇒ { - log.warning("Metrics can't be sent. Response status: ${resp.status}. Scheduling retry.") + log.warning(s"Metrics can't be sent. Response status: ${resp.status}. Scheduling retry.") context.system.scheduler.scheduleOnce(retryInterval, self, Retry) } case ScheduleRetry ⇒ { diff --git a/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala index 10f44bbe..ffde0315 100644 --- a/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala +++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala @@ -16,12 +16,9 @@ package kamon.spray -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } -import akka.actor -import akka.event.{ Logging, LoggingAdapter } import kamon.Kamon -import kamon.util.http.HttpServerMetrics import kamon.metric.Entity +import kamon.util.http.HttpServerMetrics import org.slf4j.LoggerFactory import spray.http.HttpHeaders.Host import spray.http.HttpRequest diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index 4840fb11..9ea2832a 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -14,9 +14,6 @@ kamon { # kamon.metric.tick-interval setting. flush-interval = 10 seconds - # Max packet size for UDP metrics data sent to StatsD. - max-packet-size = 1024 bytes - # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. subscriptions { @@ -63,6 +60,20 @@ kamon { # version of StatsD or if you are running your own, customized version of StatsD that supports this. metric-name-normalization-strategy = normalize } + + # FQCN of the implementation of `kamon.statsd.StatsDMetricsSenderFactory` to be instantiated and use for + # creating StatsD sender. Provided implementations are: + # - `kamon.statsd.BatchStatsDMetricsSender`. Sends a UDP packet every "kamon.statsd.flush-interval" or + # as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. Default one. + # - `kamon.statsd.SimpleStatsDMetricsSender`. Sends a UDP packet for each piece of data it receives. + metric-sender-factory = kamon.statsd.BatchStatsDMetricsSender + + # Settings for `kamon.statsd.BatchStatsDMetricsSender`. + # Used only if kamon.statsd.metric-sender-factory is set to `kamon.statsd.BatchStatsDMetricsSender` + batch-metric-sender { + # Max packet size for UDP metrics data sent to StatsD. + max-packet-size = 1024 bytes + } } modules { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala index 70ff1b45..be9a572a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,41 +16,34 @@ package kamon.statsd -import akka.actor.{ ActorSystem, Props, ActorRef, Actor } -import akka.io.{ Udp, IO } -import java.net.InetSocketAddress -import akka.util.ByteString +import akka.actor.Props +import com.typesafe.config.Config import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot -import java.text.{ DecimalFormatSymbols, DecimalFormat } -import java.util.Locale - import kamon.metric.instrument.{ Counter, Histogram } -class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBytes: Long, metricKeyGenerator: MetricKeyGenerator) - extends Actor with UdpExtensionProvider { - import context.system - - val symbols = DecimalFormatSymbols.getInstance(Locale.US) - symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. - - // Absurdly high number of decimal digits, let the other end lose precision if it needs to. - val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) - - udpExtension ! Udp.SimpleSender - - def newSocketAddress = new InetSocketAddress(statsDHost, statsDPort) +/** + * Factory for [[BatchStatsDMetricsSender]]. + * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender" + * to select [[BatchStatsDMetricsSender]] as your sender + */ +object BatchStatsDMetricsSender extends StatsDMetricsSenderFactory { + override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props = + Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator)) +} - def receive = { - case Udp.SimpleSenderReady ⇒ - context.become(ready(sender)) - } +/** + * StatsD sender which sends a UDP packet every "kamon.statsd.flush-interval" or + * as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +class BatchStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) { - def ready(udpSender: ActorRef): Receive = { - case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender) - } + val maxPacketSizeInBytes = statsDConfig.getBytes("batch-metric-sender.max-packet-size") - def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, newSocketAddress) + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = { + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, flushToUDP) for ( (entity, snapshot) ← tick.metrics; @@ -72,25 +65,9 @@ class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBy packetBuilder.flush() } - - def encodeStatsDTimer(level: Long, count: Long): String = { - val samplingRate: Double = 1D / count - level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - } - - def encodeStatsDCounter(count: Long): String = count.toString + "|c" } -object StatsDMetricsSender { - def props(statsDHost: String, statsDPort: Int, maxPacketSize: Long, metricKeyGenerator: MetricKeyGenerator): Props = - Props(new StatsDMetricsSender(statsDHost, statsDPort, maxPacketSize, metricKeyGenerator)) -} - -trait UdpExtensionProvider { - def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) -} - -class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, flushToUDP: String ⇒ Unit) { val metricSeparator = "\n" val measurementSeparator = ":" @@ -103,8 +80,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r if (fitsOnBuffer(dataWithoutKey)) buffer.append(dataWithoutKey) else { - flushToUDP(buffer.toString()) - buffer.clear() + flush() buffer.append(key).append(dataWithoutKey) } } else { @@ -114,8 +90,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r val mSeparator = if (buffer.length > 0) metricSeparator else "" buffer.append(mSeparator).append(dataWithoutSeparator) } else { - flushToUDP(buffer.toString()) - buffer.clear() + flush() buffer.append(dataWithoutSeparator) } } @@ -123,8 +98,6 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes - private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) - def flush(): Unit = { flushToUDP(buffer.toString) buffer.clear() diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala index 97a27ff3..c4d8682a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.statsd import java.lang.management.ManagementFactory diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala new file mode 100644 index 00000000..47ce66cd --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala @@ -0,0 +1,62 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{ Counter, Histogram } + +/** + * Factory for [[SimpleStatsDMetricsSender]]. + * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender" + * to select [[SimpleStatsDMetricsSender]] as your sender + */ +object SimpleStatsDMetricsSender extends StatsDMetricsSenderFactory { + override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props = + Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator)) +} + +/** + * "Traditional" StatsD sender which sends a UDP packet for each piece of data it receives. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +class SimpleStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = { + + for ( + (entity, snapshot) ← tick.metrics; + (metricKey, metricSnapshot) ← snapshot.metrics + ) { + + val keyPrefix = metricKeyGenerator.generateKey(entity, metricKey) + ":" + + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + flushToUDP(keyPrefix + encodeStatsDTimer(record.level, record.count)) + } + + case cs: Counter.Snapshot ⇒ + flushToUDP(keyPrefix + encodeStatsDCounter(cs.count)) + } + } + } +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 91d05510..a1f7dca3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -23,8 +23,6 @@ import kamon.util.ConfigTools.Syntax import scala.concurrent.duration._ import com.typesafe.config.Config import akka.event.Logging -import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit.MILLISECONDS import scala.collection.JavaConverters._ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { @@ -44,10 +42,10 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val tickInterval = metricsExtension.settings.tickInterval val flushInterval = statsDConfig.getFiniteDuration("flush-interval") - val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size") val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator") + val senderFactoryFQCN = statsDConfig.getString("metric-sender-factory") - val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config) + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, senderFactoryFQCN, config) val subscriptions = statsDConfig.getConfig("subscriptions") subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ @@ -56,15 +54,13 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { } } - def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = { + def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, + keyGeneratorFQCN: String, senderFactoryFQCN: String, config: Config): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get + val senderFactory = system.dynamicAccess.getObjectFor[StatsDMetricsSenderFactory](senderFactoryFQCN).get - val metricsSender = system.actorOf(StatsDMetricsSender.props( - statsDConfig.getString("hostname"), - statsDConfig.getInt("port"), - maxPacketSizeInBytes, - keyGenerator), "statsd-metrics-sender") + val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala new file mode 100644 index 00000000..1f603aea --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala @@ -0,0 +1,24 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import com.typesafe.config.Config + +trait StatsDMetricsSenderFactory { + def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala new file mode 100644 index 00000000..9e856eda --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala @@ -0,0 +1,76 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import java.net.InetSocketAddress +import java.text.{ DecimalFormat, DecimalFormatSymbols } +import java.util.Locale +import akka.actor.{ Actor, ActorRef, ActorSystem } +import akka.io.{ IO, Udp } +import akka.util.ByteString +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot + +/** + * Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends Actor with UdpExtensionProvider { + + import context.system + + val statsDHost = statsDConfig.getString("hostname") + val statsDPort = statsDConfig.getInt("port") + + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. + + // Absurdly high number of decimal digits, let the other end lose precision if it needs to. + val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) + + udpExtension ! Udp.SimpleSender + + lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort) + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + } + + def ready(udpSender: ActorRef): Receive = { + case tick: TickMetricSnapshot ⇒ + writeMetricsToRemote(tick, + (data: String) ⇒ udpSender ! Udp.Send(ByteString(data), socketAddress)) + } + + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit + + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") + } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" + +} + +trait UdpExtensionProvider { + def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) +} + diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala index 84d2d003..8474a5ce 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,37 +16,52 @@ package kamon.statsd -import akka.testkit.{ TestKitBase, TestProbe } +import akka.testkit.TestProbe import akka.actor.{ ActorRef, Props, ActorSystem } -import kamon.Kamon -import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } -import kamon.testkit.BaseKamonSpec -import kamon.util.MilliTimestamp -import org.scalatest.{ Matchers, WordSpecLike } -import kamon.metric._ import akka.io.Udp -import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot -import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory -class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") { - implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) { - override def hostName: String = "localhost_local" +class BatchStatsDMetricSenderSpec extends UDPBasedStatsDMetricSenderSpec("batch-statsd-metric-sender-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | statsd { + | hostname = "127.0.0.1" + | port = 0 + | simple-metric-key-generator { + | application = kamon + | hostname-override = kamon-host + | include-hostname = true + | metric-name-normalization-strategy = normalize + | } + | batch-metric-sender.max-packet-size = 1024 + | } + |} + | + """.stripMargin) + + val testMaxPacketSize = statsDConfig.getBytes("batch-metric-sender.max-packet-size") + + trait BatchSenderFixture extends UdpListenerFixture { + override def newSender(udpProbe: TestProbe) = + Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + override def udpExtension(implicit system: ActorSystem): ActorRef = udpProbe.ref + }) } - "the StatsDMetricSender" should { - "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { + "the BatchStatsDMetricSender" should { + "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new BatchSenderFixture { val testMetricKey = buildMetricKey(testEntity, "metric-one") val testRecorder = buildRecorder("user/kamon") testRecorder.metricOne.record(10L) val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"$testMetricKey:10|ms") + expectUDPPacket(s"$testMetricKey:10|ms", udp) } - "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture { + "render several measurements of the same key under a single (key + multiple measurements) packet" in new BatchSenderFixture { val testMetricKey = buildMetricKey(testEntity, "metric-one") val testRecorder = buildRecorder("user/kamon") testRecorder.metricOne.record(10L) @@ -54,24 +69,22 @@ class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") testRecorder.metricOne.record(12L) val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") + expectUDPPacket(s"$testMetricKey:10|ms:11|ms:12|ms", udp) } - "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture { + "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new BatchSenderFixture { val testMetricKey = buildMetricKey(testEntity, "metric-one") val testRecorder = buildRecorder("user/kamon") testRecorder.metricOne.record(10L) testRecorder.metricOne.record(10L) val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"$testMetricKey:10|ms|@0.5") + expectUDPPacket(s"$testMetricKey:10|ms|@0.5", udp) } - "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { + "flush the packet when the max-packet-size is reached" in new BatchSenderFixture { val testMetricKey = buildMetricKey(testEntity, "metric-one") val testRecorder = buildRecorder("user/kamon") @@ -85,12 +98,10 @@ class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) udp.expectMsgType[Udp.Send] // let the first flush pass - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey:$level|ms") + expectUDPPacket(s"$testMetricKey:$level|ms", udp) } - "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { + "render multiple keys in the same packet using newline as separator" in new BatchSenderFixture { val testMetricKey1 = buildMetricKey(testEntity, "metric-one") val testMetricKey2 = buildMetricKey(testEntity, "metric-two") val testRecorder = buildRecorder("user/kamon") @@ -103,48 +114,7 @@ class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") testRecorder.metricTwo.record(21L) val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms") - } - } - - trait UdpListenerFixture { - val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size") - val testEntity = Entity("user/kamon", "test") - - def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { - val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown) - metricKeyGenerator.generateKey(entity, metricKey) - } - - def buildRecorder(name: String): TestEntityRecorder = { - Kamon.metrics.entity(TestEntityRecorder, name) - } - - def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { - val udp = TestProbe() - val metricsSender = system.actorOf(Props(new StatsDMetricsSender("127.0.0.1", 0, testMaxPacketSize, metricKeyGenerator) { - override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref - })) - - // Setup the SimpleSender - udp.expectMsgType[Udp.SimpleSender] - udp.reply(Udp.SimpleSenderReady) - - val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) - metricsSender ! fakeSnapshot - udp + expectUDPPacket(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms", udp) } } } - -class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { - val metricOne = histogram("metric-one") - val metricTwo = histogram("metric-two") -} - -object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { - def category: String = "test" - def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) -} diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala index 2e03a59d..84c0b6aa 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.statsd import com.typesafe.config.ConfigFactory @@ -17,7 +33,7 @@ class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers { |} """.stripMargin) - "the StatsDMetricSender" should { + "the SimpleMetricKeyGenerator" should { "generate metric names that follow the application.host.entity.entity-name.metric-name pattern by default" in { implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(defaultConfiguration) { override def hostName: String = "localhost" diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala new file mode 100644 index 00000000..2e909d8b --- /dev/null +++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala @@ -0,0 +1,76 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.{ ActorSystem, Props, ActorRef } +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory + +class SimpleStatsDMetricsSenderSpec extends UDPBasedStatsDMetricSenderSpec("simple-statsd-metric-sender-spec") { + + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | statsd { + | hostname = "127.0.0.1" + | port = 0 + | simple-metric-key-generator { + | application = kamon + | hostname-override = kamon-host + | include-hostname = true + | metric-name-normalization-strategy = normalize + | } + | } + |} + | + """.stripMargin) + + trait SimpleSenderFixture extends UdpListenerFixture { + override def newSender(udpProbe: TestProbe) = + Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + override def udpExtension(implicit system: ActorSystem): ActorRef = udpProbe.ref + }) + } + + "the SimpleStatsDMetricSender" should { + "flush the metrics data for each unique value it receives" in new SimpleSenderFixture { + + val testMetricKey1 = buildMetricKey(testEntity, "metric-one") + val testMetricKey2 = buildMetricKey(testEntity, "metric-two") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(30L) + testRecorder.metricTwo.record(20L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + expectUDPPacket(s"$testMetricKey1:10|ms", udp) + expectUDPPacket(s"$testMetricKey1:30|ms", udp) + expectUDPPacket(s"$testMetricKey2:20|ms", udp) + } + + "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new SimpleSenderFixture { + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + expectUDPPacket(s"$testMetricKey:10|ms|@0.5", udp) + } + } +} diff --git a/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala new file mode 100644 index 00000000..d004adaa --- /dev/null +++ b/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala @@ -0,0 +1,81 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import akka.io.Udp +import akka.testkit.TestProbe +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } +import kamon.metric._ +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp + +abstract class UDPBasedStatsDMetricSenderSpec(actorSystemName: String) extends BaseKamonSpec(actorSystemName) { + + implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) { + override def hostName: String = "localhost_local" + } + + val statsDConfig = config.getConfig("kamon.statsd") + + trait UdpListenerFixture { + val testEntity = Entity("user/kamon", "test") + + def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { + val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown) + metricKeyGenerator.generateKey(entity, metricKey) + } + + def buildRecorder(name: String): TestEntityRecorder = + Kamon.metrics.entity(TestEntityRecorder, name) + + def newSender(udpProbe: TestProbe): Props + + def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { + val udp = TestProbe() + val metricsSender = system.actorOf(newSender(udp)) + + // Setup the SimpleSender + udp.expectMsgType[Udp.SimpleSender] + udp.reply(Udp.SimpleSenderReady) + + val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) + metricsSender ! fakeSnapshot + udp + } + + def expectUDPPacket(expected: String, udp: TestProbe): Unit = { + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + data.utf8String should be(expected) + } + } + + class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val metricOne = histogram("metric-one") + val metricTwo = histogram("metric-two") + } + + object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "test" + + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) + } + +} + diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/MemoryUsageMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/MemoryUsageMetrics.scala index ef9d47a6..41681459 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/jmx/MemoryUsageMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/MemoryUsageMetrics.scala @@ -32,13 +32,13 @@ import scala.collection.convert.WrapAsScala class MemoryUsageMetrics(instrumentFactory: InstrumentFactory, beansWithNames: Iterable[MemoryUsageWithMetricName]) extends GenericEntityRecorder(instrumentFactory) { beansWithNames.foreach { - case MemoryUsageWithMetricName(name, bean) ⇒ + case MemoryUsageWithMetricName(name, beanFun) ⇒ gauge(name + "-used", Memory.Bytes, () ⇒ { - bean.getUsed + beanFun().getUsed }) gauge(name + "-max", Memory.Bytes, () ⇒ { - val max = bean.getMax + val max = beanFun().getMax // .getMax can return -1 if the max is not defined. if (max >= 0) max @@ -46,7 +46,7 @@ class MemoryUsageMetrics(instrumentFactory: InstrumentFactory, }) gauge(name + "-committed", Memory.Bytes, () ⇒ { - bean.getCommitted + beanFun().getCommitted }) } } @@ -54,9 +54,9 @@ class MemoryUsageMetrics(instrumentFactory: InstrumentFactory, /** * Objects of this kind may be passed to instances of [[MemoryUsageMetrics]] for data collection. * @param metricName The sanitized name for a metric. - * @param bean The data source for metrics. + * @param beanFun Function returning the data source for metrics. */ -private[jmx] final case class MemoryUsageWithMetricName(metricName: String, bean: MemoryUsage) +private[jmx] final case class MemoryUsageWithMetricName(metricName: String, beanFun: () ⇒ MemoryUsage) /** * Memory Pool metrics, as reported by JMX: @@ -79,7 +79,7 @@ object MemoryUsageMetrics extends JmxSystemMetricRecorderCompanion("jmx-memory") def apply(instrumentFactory: InstrumentFactory): MemoryUsageMetrics = new MemoryUsageMetrics(instrumentFactory, - MemoryUsageWithMetricName("non-heap", memoryMXBean.getNonHeapMemoryUsage) :: - MemoryUsageWithMetricName("heap", memoryMXBean.getHeapMemoryUsage) :: + MemoryUsageWithMetricName("non-heap", () ⇒ memoryMXBean.getNonHeapMemoryUsage) :: + MemoryUsageWithMetricName("heap", () ⇒ memoryMXBean.getHeapMemoryUsage) :: usagesWithNames) } diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala index 69bc00ec..c0da6dc1 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala @@ -37,7 +37,8 @@ class SigarMetricsUpdater(refreshInterval: FiniteDuration) extends Actor { LoadAverageMetrics.register(sigar, metricsExtension, logger), MemoryMetrics.register(sigar, metricsExtension, logger), NetworkMetrics.register(sigar, metricsExtension, logger), - ProcessCpuMetrics.register(sigar, metricsExtension, logger)) + ProcessCpuMetrics.register(sigar, metricsExtension, logger), + ULimitMetrics.register(sigar, metricsExtension, logger)) val refreshSchedule = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, UpdateSigarMetrics)(context.dispatcher) diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/ULimitMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/ULimitMetrics.scala new file mode 100644 index 00000000..314c8450 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/ULimitMetrics.scala @@ -0,0 +1,20 @@ +package kamon.system.sigar + +import akka.event.LoggingAdapter +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.InstrumentFactory +import org.hyperic.sigar.Sigar + +class ULimitMetrics(sigar: Sigar, instrumentFactory: InstrumentFactory, logger: LoggingAdapter) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val pid = sigar.getPid + val openFiles = histogram("open-files") + + def update(): Unit = { + openFiles.record(sigar.getProcFd(pid).getTotal) + } +} + +object ULimitMetrics extends SigarMetricRecorderCompanion("ulimit") { + def apply(sigar: Sigar, instrumentFactory: InstrumentFactory, logger: LoggingAdapter): ULimitMetrics = + new ULimitMetrics(sigar, instrumentFactory, logger) +}
\ No newline at end of file diff --git a/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala b/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala index 7af704d5..452ee0c7 100644 --- a/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala +++ b/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala @@ -74,6 +74,20 @@ class SystemMetricsSpec extends BaseKamonSpec("system-metrics-spec") with Redire memoryMetrics.gauge("non-heap-committed").get.numberOfMeasurements should be > 0L } + "record correctly updatable values for heap metrics" in { + Thread.sleep(3000) + + val data = new Array[Byte](20 * 1024 * 1024) // 20 Mb of data + + Thread.sleep(3000) + + val memoryMetrics = takeSnapshotOf("jmx-memory", "system-metric") + val heapUsed = memoryMetrics.gauge("heap-used").get + + heapUsed.max should be > heapUsed.min + data.size should be > 0 // Just for data usage + } + "record daemon, count and peak jvm threads metrics" in { val threadsMetrics = takeSnapshotOf("threads", "system-metric") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dd200757..711e9260 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,11 +23,11 @@ object Dependencies { ) val sprayVersion = "1.3.2" - val akkaVersion = "2.3.10" - val aspectjVersion = "1.8.5" + val akkaVersion = "2.3.14" + val aspectjVersion = "1.8.7" val slf4jVersion = "1.7.7" - val play23Version = "2.3.9" - val play24Version = "2.4.2" + val play23Version = "2.3.10" + val play24Version = "2.4.3" val sigarVersion = "1.6.5.132" val sprayJson = "io.spray" %% "spray-json" % "1.3.1" @@ -35,8 +35,8 @@ object Dependencies { val scalatest = "org.scalatest" %% "scalatest" % "2.2.1" val logback = "ch.qos.logback" % "logback-classic" % "1.0.13" val aspectJ = "org.aspectj" % "aspectjweaver" % aspectjVersion - val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "3.11.0" - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.6" + val newrelic = "com.newrelic.agent.java" % "newrelic-agent" % "3.11.0" + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.7" val sprayCan = "io.spray" %% "spray-can" % sprayVersion val sprayRouting = "io.spray" %% "spray-routing" % sprayVersion val sprayTestkit = "io.spray" %% "spray-testkit" % sprayVersion @@ -54,6 +54,8 @@ object Dependencies { val sigarLoader = "io.kamon" % "sigar-loader" % "1.6.5-rev002" val h2 = "com.h2database" % "h2" % "1.4.182" val el = "org.glassfish" % "javax.el" % "3.0.0" + val fluentdLogger = "org.fluentd" %% "fluent-logger-scala" % "0.5.1" + val easyMock = "org.easymock" % "easymock" % "3.2" //play 2.3.x val play23 = "com.typesafe.play" %% "play" % play23Version diff --git a/project/Projects.scala b/project/Projects.scala index 9a199797..c16a8580 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -23,7 +23,8 @@ object Projects extends Build { lazy val kamon = Project("kamon", file(".")) .aggregate(kamonCore, kamonScala, kamonAkka, kamonSpray, kamonNewrelic, kamonPlayground, kamonTestkit, - kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc, kamonAnnotation, kamonPlay23, kamonPlay24) + kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc, + kamonAnnotation, kamonPlay23, kamonPlay24, kamonJMXReporter, kamonFluentd) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -92,19 +93,20 @@ object Projects extends Build { test(scalatest, akkaTestKit, sprayTestkit, akkaSlf4j, slf4jJul, slf4jLog4j, logback)) lazy val kamonNewrelic = Project("kamon-newrelic", file("kamon-newrelic")) - .dependsOn(kamonCore % "compile->compile;test->test", kamonTestkit % "test->test") + .dependsOn(kamonCore % "compile->compile;test->test", kamonAkka, kamonTestkit % "test->test") .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( libraryDependencies ++= compile(sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, akkaSlf4j) ++ - provided(aspectJ) ++ + provided(aspectJ, newrelic) ++ test(scalatest, akkaTestKit, sprayTestkit, slf4jApi, akkaSlf4j)) lazy val kamonPlayground = Project("kamon-playground", file("kamon-playground")) - .dependsOn(kamonSpray, kamonNewrelic, kamonStatsD, kamonDatadog, kamonLogReporter, kamonSystemMetrics) + .dependsOn(kamonSpray, kamonNewrelic, kamonStatsD, kamonDatadog, kamonLogReporter, kamonSystemMetrics, + kamonJMXReporter) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -212,8 +214,26 @@ object Projects extends Build { .settings(formatSettings: _*) .settings( libraryDependencies ++= - compile(sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, akkaSlf4j) ++ + compile(sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, akkaSlf4j) ++ test(scalatest, akkaTestKit, slf4jApi, slf4jnop)) + lazy val kamonJMXReporter = Project("kamon-jmx", file("kamon-jmx")) + .dependsOn(kamonCore) + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ + test(scalatest, akkaTestKit, slf4jApi, slf4jnop)) + + lazy val kamonFluentd = Project("kamon-fluentd", file("kamon-fluentd")) + .dependsOn(kamonCore % "compile->compile;test->test") + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ compile(fluentdLogger) ++ + test(scalatest, akkaTestKit, easyMock, slf4jApi, slf4jnop)) + val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false) } diff --git a/project/build.properties b/project/build.properties index a6e117b6..817bc38d 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.8 +sbt.version=0.13.9 diff --git a/version.sbt b/version.sbt index 7338ce76..7193baff 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.5.1-SNAPSHOT" +version in ThisBuild := "0.5.3-SNAPSHOT" |