diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-08-28 13:02:09 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-08-28 13:02:09 +0200 |
commit | 110fff9745a0c1f154ee3d7a5149cd9f162a879f (patch) | |
tree | 689dc6f94cc73f7156569bf6cb69c44cda4c2c8c /kamon-akka | |
parent | aa1c38de58d692f90275867fdfda437b99bd8dcc (diff) | |
download | Kamon-110fff9745a0c1f154ee3d7a5149cd9f162a879f.tar.gz Kamon-110fff9745a0c1f154ee3d7a5149cd9f162a879f.tar.bz2 Kamon-110fff9745a0c1f154ee3d7a5149cd9f162a879f.zip |
wip, core/akka/spray are kind of migrated.
Diffstat (limited to 'kamon-akka')
11 files changed, 95 insertions, 148 deletions
diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf index c1e59e63..70758f83 100644 --- a/kamon-akka/src/main/resources/reference.conf +++ b/kamon-akka/src/main/resources/reference.conf @@ -33,9 +33,7 @@ kamon { modules { kamon-akka { - auto-start = yes requires-aspectj = yes - extension-id = "kamon.akka.Akka" } } }
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index e84a1030..2fe2a42f 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -16,22 +16,11 @@ package kamon.akka -import _root_.akka.actor -import _root_.akka.actor._ -import _root_.akka.event.Logging import com.typesafe.config.Config -import kamon._ +import kamon.Kamon -class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val log = Logging(system, classOf[AkkaExtension]) - val config = system.settings.config.getConfig("kamon.akka") - - val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(config) -} - -object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Akka - def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) +object AkkaExtension { + val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(Kamon.config.getConfig("kamon.akka")) } sealed trait AskPatternTimeoutWarningSetting diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala index e64b241e..64012163 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala @@ -17,8 +17,7 @@ package akka.kamon.instrumentation import akka.util.Timeout -import kamon.Kamon -import kamon.akka.{ AkkaExtension, Akka } +import kamon.akka.AkkaExtension import kamon.akka.AskPatternTimeoutWarningSettings.{ Heavyweight, Lightweight, Off } import akka.actor.{ InternalActorRef, ActorRef } import akka.pattern.AskTimeoutException @@ -27,11 +26,13 @@ import kamon.util.SameThreadExecutionContext import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.aspectj.lang.reflect.SourceLocation +import org.slf4j.LoggerFactory import scala.concurrent.Future import scala.compat.Platform.EOL @Aspect class AskPatternInstrumentation { + private val log = LoggerFactory.getLogger(getClass) import AskPatternInstrumentation._ @@ -43,18 +44,19 @@ class AskPatternInstrumentation { actor match { // the AskPattern will only work for InternalActorRef's with these conditions. case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 && Tracer.currentContext.nonEmpty ⇒ - val akkaExtension = Kamon.extension(Akka) val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - akkaExtension.askPatternTimeoutWarning match { + AkkaExtension.askPatternTimeoutWarning match { case Off ⇒ - case Lightweight ⇒ hookLightweightWarning(future, pjp.getSourceLocation, actor, akkaExtension) - case Heavyweight ⇒ hookHeavyweightWarning(future, new StackTraceCaptureException, actor, akkaExtension) + case Lightweight ⇒ hookLightweightWarning(future, pjp.getSourceLocation, actor) + case Heavyweight ⇒ hookHeavyweightWarning(future, new StackTraceCaptureException, actor) } future - case _ ⇒ pjp.proceed().asInstanceOf[Future[AnyRef]] // + case _ ⇒ + pjp.proceed().asInstanceOf[Future[AnyRef]] + } def ifAskTimeoutException(code: ⇒ Unit): PartialFunction[Throwable, Unit] = { @@ -62,21 +64,21 @@ class AskPatternInstrumentation { case _ ⇒ } - def hookLightweightWarning(future: Future[AnyRef], sourceLocation: SourceLocation, actor: ActorRef, akkaExtension: AkkaExtension): Unit = { + def hookLightweightWarning(future: Future[AnyRef], sourceLocation: SourceLocation, actor: ActorRef): Unit = { val locationString = Option(sourceLocation) .map(location ⇒ s"${location.getFileName}:${location.getLine}") .getOrElse("<unknown position>") future.onFailure(ifAskTimeoutException { - akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString) + log.warn(s"Timeout triggered for ask pattern to actor [${actor.path.name}] at [$locationString]") })(SameThreadExecutionContext) } - def hookHeavyweightWarning(future: Future[AnyRef], captureException: StackTraceCaptureException, actor: ActorRef, akkaExtension: AkkaExtension): Unit = { + def hookHeavyweightWarning(future: Future[AnyRef], captureException: StackTraceCaptureException, actor: ActorRef): Unit = { val locationString = captureException.getStackTrace.drop(3).mkString("", EOL, EOL) future.onFailure(ifAskTimeoutException { - akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString) + log.warn(s"Timeout triggered for ask pattern to actor [${actor.path.name}] at [$locationString]") })(SameThreadExecutionContext) } } diff --git a/kamon-akka/src/test/resources/application.conf b/kamon-akka/src/test/resources/application.conf new file mode 100644 index 00000000..5407ccfe --- /dev/null +++ b/kamon-akka/src/test/resources/application.conf @@ -0,0 +1,63 @@ +akka { + loglevel = INFO + loggers = [ "akka.event.slf4j.Slf4jLogger" ] +} + +kamon.metric { + tick-interval = 1 hour + default-collection-context-buffer-size = 100 + + filters = { + akka-actor { + includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/clean-after-collect", "*/user/stop", "*/" ] + excludes = [ "*/user/tracked-explicitly-excluded", "*/user/non-tracked-actor" ] + } + + akka-router { + includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/stop-*" ] + excludes = [ "*/user/tracked-explicitly-excluded-*"] + } + + akka-dispatcher { + includes = [ "**" ] + excludes = [ "*/explicitly-excluded" ] + } + } + + default-instrument-settings { + gauge.refresh-interval = 1 hour + min-max-counter.refresh-interval = 1 hour + } + + instrument-settings { + akka-actor.mailbox-size.refresh-interval = 1 hour + } +} + +explicitly-excluded { + type = "Dispatcher" + executor = "fork-join-executor" +} + +tracked-fjp { + type = "Dispatcher" + executor = "fork-join-executor" + + fork-join-executor { + parallelism-min = 8 + parallelism-factor = 100.0 + parallelism-max = 22 + } +} + +tracked-tpe { + type = "Dispatcher" + executor = "thread-pool-executor" + + thread-pool-executor { + core-pool-size-min = 7 + core-pool-size-factor = 100.0 + max-pool-size-factor = 100.0 + max-pool-size-max = 21 + } +}
\ No newline at end of file diff --git a/kamon-akka/src/test/resources/logback.xml b/kamon-akka/src/test/resources/logback.xml new file mode 100644 index 00000000..df142eac --- /dev/null +++ b/kamon-akka/src/test/resources/logback.xml @@ -0,0 +1,13 @@ +<configuration scan="true" debug="false"> + <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> + </encoder> + </appender> + + <root level="off"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> diff --git a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala index 4647abc0..cd64988d 100644 --- a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala @@ -29,28 +29,6 @@ import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { - override lazy val config = - ConfigFactory.parseString( - """ - |kamon.metric { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters { - | akka-actor { - | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/clean-after-collect", "*/user/stop", "*/" ] - | excludes = [ "*/user/tracked-explicitly-excluded", "*/user/non-tracked-actor" ] - | } - | } - | - | instrument-settings { - | akka-actor.mailbox-size.refresh-interval = 1 hour - | } - |} - | - |akka.loglevel = OFF - | - """.stripMargin) "the Kamon actor metrics" should { "respect the configured include and exclude filters" in new ActorMetricsFixtures { diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index 678c7f73..3fbb10fd 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -30,56 +30,6 @@ import scala.concurrent.duration._ import scala.concurrent.{ Await, Future } class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { - override lazy val config = - ConfigFactory.parseString( - """ - |kamon.metric { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters = { - | akka-dispatcher { - | includes = [ "**" ] - | excludes = [ "*/explicitly-excluded" ] - | } - | } - | - | default-instrument-settings { - | gauge.refresh-interval = 1 hour - | min-max-counter.refresh-interval = 1 hour - | } - |} - | - |explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" - |} - | - |tracked-fjp { - | type = "Dispatcher" - | executor = "fork-join-executor" - | - | fork-join-executor { - | parallelism-min = 8 - | parallelism-factor = 100.0 - | parallelism-max = 22 - | } - |} - | - |tracked-tpe { - | type = "Dispatcher" - | executor = "thread-pool-executor" - | - | thread-pool-executor { - | core-pool-size-min = 7 - | core-pool-size-factor = 100.0 - | max-pool-size-factor = 100.0 - | max-pool-size-max = 21 - | } - |} - | - """.stripMargin) - "the Kamon dispatcher metrics" should { "respect the configured include and exclude filters" in { val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) diff --git a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala index 4128c9ef..bff2382e 100644 --- a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala @@ -20,7 +20,6 @@ import java.nio.LongBuffer import akka.actor._ import akka.routing._ import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.akka.RouterMetricsTestActor._ import kamon.metric.EntitySnapshot @@ -30,24 +29,6 @@ import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { - override lazy val config = - ConfigFactory.parseString( - """ - |kamon.metric { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters = { - | akka-router { - | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/stop-*" ] - | excludes = [ "*/user/tracked-explicitly-excluded-*"] - | } - | } - |} - | - |akka.loglevel = OFF - | - """.stripMargin) "the Kamon router metrics" should { "respect the configured include and exclude filters" in new RouterMetricsFixtures { diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala index 85f41795..d08f38a4 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala @@ -26,13 +26,6 @@ import org.scalatest.Inspectors import org.slf4j.MDC class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instrumentation-spec") with Inspectors with MdcKeysSupport { - override lazy val config = - ConfigFactory.parseString( - """ - |akka { - | loggers = ["akka.event.slf4j.Slf4jLogger"] - |} - """.stripMargin) "the ActorLogging instrumentation" should { "attach the TraceContext (if available) to log events" in { diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala index 1635fadc..d2e7d5e5 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala @@ -28,14 +28,6 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-message-instrumentation-spec") with WordSpecLike with ImplicitSender { - override lazy val config = - ConfigFactory.parseString( - """ - |akka { - | loglevel = OFF - |} - """.stripMargin) - implicit lazy val executionContext = system.dispatcher "the system message passing instrumentation" should { diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala index d9fbc2df..025c8d0c 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala @@ -20,25 +20,14 @@ import akka.actor._ import akka.pattern.ask import akka.testkit.EventFilter import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.Kamon import kamon.akka.AskPatternTimeoutWarningSettings.{ Off, Lightweight, Heavyweight } -import kamon.akka.{ AskPatternTimeoutWarningSetting, Akka } +import kamon.akka.{ AkkaExtension, AskPatternTimeoutWarningSetting } import kamon.testkit.BaseKamonSpec import kamon.trace.Tracer import scala.concurrent.duration._ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-spec") { - override lazy val config = - ConfigFactory.parseString( - """ - |akka { - | loglevel = OFF - | loggers = [akka.testkit.TestEventListener] - |} - """.stripMargin) - implicit lazy val ec = system.dispatcher implicit val askTimeout = Timeout(10 millis) @@ -93,10 +82,9 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s override protected def afterAll(): Unit = shutdown() def setAskPatternTimeoutWarningMode(mode: AskPatternTimeoutWarningSetting): Unit = { - val target = Kamon(Akka) - val field = target.getClass.getDeclaredField("askPatternTimeoutWarning") + val field = AkkaExtension.getClass.getDeclaredField("askPatternTimeoutWarning") field.setAccessible(true) - field.set(target, mode) + field.set(AkkaExtension, mode) } } |