diff options
author | Diego <diegolparra@gmail.com> | 2014-12-02 00:58:30 -0300 |
---|---|---|
committer | Diego <diegolparra@gmail.com> | 2014-12-02 00:58:30 -0300 |
commit | c2d108ca26faecc5be77fd05c69d4eac7982fa3e (patch) | |
tree | 51d239aea06ac10077b5fc3104470062435931f7 | |
parent | bedda495bbfe3696609294408c4bc50cefbcdd8f (diff) | |
download | Kamon-c2d108ca26faecc5be77fd05c69d4eac7982fa3e.tar.gz Kamon-c2d108ca26faecc5be77fd05c69d4eac7982fa3e.tar.bz2 Kamon-c2d108ca26faecc5be77fd05c69d4eac7982fa3e.zip |
+ core, play: introduce kamon-dispatcher
5 files changed, 35 insertions, 11 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 12e21bd7..639d4aba 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -5,7 +5,7 @@ kamon { # Default dispatcher for all Kamon components, unless a more specific one is configured. - default-dispatcher = "akka.actor.default-dispatcher" + default-dispatcher = "kamon.kamon-dispatcher" metrics { @@ -132,4 +132,29 @@ kamon { # the future was created. ask-pattern-tracing = off } + + kamon-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 2.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 3 + } }
\ No newline at end of file diff --git a/kamon-play/src/main/scala/kamon/play/Play.scala b/kamon-play/src/main/scala/kamon/play/Play.scala index 9b69f9a3..6c6cbf4c 100644 --- a/kamon-play/src/main/scala/kamon/play/Play.scala +++ b/kamon-play/src/main/scala/kamon/play/Play.scala @@ -55,6 +55,6 @@ trait PlayNameGenerator { } class DefaultPlayNameGenerator extends PlayNameGenerator { - def generateTraceName(requestHeader: RequestHeader): String = requestHeader.method + ": " + requestHeader.uri + def generateTraceName(requestHeader: RequestHeader): String = s"${requestHeader.method}: ${requestHeader.uri}" def generateHttpClientSegmentName(request: WSRequest): String = request.url } diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index 1ce41a75..1bafa8ff 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -55,7 +55,7 @@ class RequestInstrumentation { @Around("call(* play.api.GlobalSettings.doFilter(*)) && args(next)") def aroundDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = { val essentialAction = (requestHeader: RequestHeader) ⇒ { - // TODO: Move to a Kamon-specific dispatcher. + val executor = Kamon(Play)(Akka.system()).defaultDispatcher def onResult(result: Result): Result = { diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index f16c76c8..fca13c4c 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -18,7 +18,7 @@ package kamon.play.instrumentation import kamon.Kamon import kamon.play.Play -import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentCategory, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import play.api.libs.ws.{ WSRequest, WSResponse } @@ -42,6 +42,6 @@ class WSInstrumentation { response.map(result ⇒ segment.finish())(executor) response - } getOrElse (pjp.proceed()) + } getOrElse pjp.proceed() } }
\ No newline at end of file diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala index 3c3ce9f2..564d5abe 100644 --- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala @@ -21,7 +21,6 @@ import kamon.metric.{ CollectionContext, Metrics, TraceMetrics } import kamon.play.action.TraceName import kamon.trace.TraceLocal.HttpContextKey import kamon.trace.{ TraceLocal, TraceRecorder } -import org.scalatest.Matchers import org.scalatestplus.play._ import play.api.DefaultGlobal import play.api.http.Writeable @@ -124,23 +123,23 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { } "propagate the TraceContext and LocalStorage through of filters in the current request" in { - val Some(result) = route(FakeRequest(GET, "/retrieve").withHeaders(traceTokenHeader, traceLocalStorageHeader)) + route(FakeRequest(GET, "/retrieve").withHeaders(traceTokenHeader, traceLocalStorageHeader)) TraceLocal.retrieve(TraceLocalKey).get must be(traceLocalStorageValue) } "response to the getRouted Action and normalise the current TraceContext name" in { Await.result(WS.url("http://localhost:19001/getRouted").get(), 10 seconds) - Kamon(Metrics)(Akka.system()).storage.get(TraceMetrics("getRouted.get")) must not be (empty) + Kamon(Metrics)(Akka.system()).storage.get(TraceMetrics("getRouted.get")) must not be empty } "response to the postRouted Action and normalise the current TraceContext name" in { Await.result(WS.url("http://localhost:19001/postRouted").post("content"), 10 seconds) - Kamon(Metrics)(Akka.system()).storage.get(TraceMetrics("postRouted.post")) must not be (empty) + Kamon(Metrics)(Akka.system()).storage.get(TraceMetrics("postRouted.post")) must not be empty } "response to the showRouted Action and normalise the current TraceContext name" in { Await.result(WS.url("http://localhost:19001/showRouted/2").get(), 10 seconds) - Kamon(Metrics)(Akka.system()).storage.get(TraceMetrics("show.some.id.get")) must not be (empty) + Kamon(Metrics)(Akka.system()).storage.get(TraceMetrics("show.some.id.get")) must not be empty } "include HttpContext information for help to diagnose possible errors" in { @@ -230,7 +229,7 @@ object Routes extends Router.Routes { Route("GET", PathPattern(List(StaticPart(Routes.prefix), StaticPart(Routes.defaultPrefix), StaticPart("getRouted")))) private[this] lazy val Application_show = - Route("GET", PathPattern(List(StaticPart(Routes.prefix), StaticPart(Routes.defaultPrefix), StaticPart("showRouted/"), DynamicPart("id", """[^/]+""", true)))) + Route("GET", PathPattern(List(StaticPart(Routes.prefix), StaticPart(Routes.defaultPrefix), StaticPart("showRouted/"), DynamicPart("id", """[^/]+""", encodeable = true)))) //Posts private[this] lazy val Application_postRouted = |