aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-05-20 20:42:41 -0300
committerDiego <diegolparra@gmail.com>2014-05-20 20:46:43 -0300
commit40073266ed7ed69ae0e42801201dc818b59726fb (patch)
tree91cf99b9814e968b3a84815df40d56063d895c4e
parentf40f19a719d35873e4e0ab599f783f860876f51d (diff)
downloadKamon-40073266ed7ed69ae0e42801201dc818b59726fb.tar.gz
Kamon-40073266ed7ed69ae0e42801201dc818b59726fb.tar.bz2
Kamon-40073266ed7ed69ae0e42801201dc818b59726fb.zip
+ core: added tests to dispatcher metrics
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/dispatcher/DispatcherMetricsCollector.scala14
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala105
2 files changed, 112 insertions, 7 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/dispatcher/DispatcherMetricsCollector.scala b/kamon-core/src/main/scala/kamon/metrics/dispatcher/DispatcherMetricsCollector.scala
index e7edce51..529cc542 100644
--- a/kamon-core/src/main/scala/kamon/metrics/dispatcher/DispatcherMetricsCollector.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/dispatcher/DispatcherMetricsCollector.scala
@@ -18,17 +18,17 @@ package kamon.metrics.dispatcher
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.ThreadPoolExecutor
-import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher}
+import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
import java.lang.reflect.Method
object DispatcherMetricsCollector {
- private[this] type DispatcherMetrics = (Long, Long, Long, Long)
+ private[this]type DispatcherMetrics = (Long, Long, Long, Long)
- private[this] def collectForkJoinMetrics(pool: ForkJoinPool):DispatcherMetrics = {
+ private[this] def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetrics = {
(pool.getParallelism, pool.getActiveThreadCount, (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
}
- private[this] def collectExecutorMetrics(pool: ThreadPoolExecutor):DispatcherMetrics = {
+ private[this] def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetrics = {
(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
}
@@ -44,13 +44,13 @@ object DispatcherMetricsCollector {
case x: Dispatcher ⇒ {
val executor = executorServiceMethod.invoke(x) match {
case delegate: ExecutorServiceDelegate ⇒ delegate.executor
- case other ⇒ other
+ case other ⇒ other
}
executor match {
- case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
- case anything ⇒ (0L, 0L, 0L, 0L)
+ case anything ⇒ (0L, 0L, 0L, 0L)
}
}
case _ ⇒ new DispatcherMetrics(0L, 0L, 0L, 0L)
diff --git a/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala
new file mode 100644
index 00000000..a5594ee2
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala
@@ -0,0 +1,105 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.scalatest.{ WordSpecLike, Matchers }
+import akka.testkit.{ TestProbe, TestKitBase }
+import akka.actor.{ ActorRef, Props, ActorSystem }
+import com.typesafe.config.ConfigFactory
+import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.DispatcherMetrics.DispatcherMetricSnapshot
+
+class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
+ implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | filters = [
+ | {
+ | dispatcher {
+ | includes = ["*"]
+ | excludes = ["dispatcher-explicitly-excluded"]
+ | }
+ | }
+ | ]
+ |}
+ |
+ |dispatcher-explicitly-excluded {
+ | type = "Dispatcher"
+ | executor = "fork-join-executor"
+ |}
+ |
+ |tracked-dispatcher {
+ | type = "Dispatcher"
+ | executor = "thread-pool-executor"
+ |}
+ |
+ """.stripMargin))
+
+ "the Kamon dispatcher metrics" should {
+ "respect the configured include and exclude filters" in {
+ system.actorOf(Props[DelayableActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher")
+ system.actorOf(Props[DelayableActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher")
+
+ Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true)
+ expectMsgType[TickMetricSnapshot]
+
+ within(2 seconds) {
+ val tickSnapshot = expectMsgType[TickMetricSnapshot]
+ tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher"))
+ tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded"))
+ }
+ }
+
+ "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture {
+ val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher")
+
+ for (_ ← 1 to 100) {
+ delayable ! Discard
+ }
+
+ val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds)
+ dispatcherMetrics.maximumPoolSize.max should be <= 18L //6 cores
+ dispatcherMetrics.poolSize.max should be(12L)
+ dispatcherMetrics.queueTaskCount.max should be(0L)
+ dispatcherMetrics.runningThreadCount.max should be(0L)
+ }
+
+ }
+
+ def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = {
+ val tickSnapshot = within(waitTime) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+ val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId))
+ dispatcherMetricsOption should not be empty
+ dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot]
+ }
+
+ trait DelayableActorFixture {
+ def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = {
+ val actor = system.actorOf(Props[DelayableActor].withDispatcher(dispatcher), name)
+ val metricsListener = TestProbe()
+
+ Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true)
+ // Wait for one empty snapshot before proceeding to the test.
+ metricsListener.expectMsgType[TickMetricSnapshot]
+
+ (actor, metricsListener)
+ }
+ }
+}