From 4d828e1a3195e55365c865aa3a78af9668742643 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 24 Apr 2017 13:54:40 +0200 Subject: Prepare for the major cleanup Moved all the original files from src/main to src/legacy-main, same with test files. Also removed the autoweave module, examples and bench as I'm planning to have them in separate repositories. --- .../util/executors/ExecutorServiceMetrics.scala | 134 +++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala (limited to 'kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala') diff --git a/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala b/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..60612beb --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala @@ -0,0 +1,134 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util.executors + +import java.util.concurrent.{ExecutorService, ForkJoinPool ⇒ JavaForkJoinPool, ThreadPoolExecutor} + +import kamon.Kamon +import kamon.metric.Entity + +import scala.concurrent.forkjoin.ForkJoinPool +import scala.util.control.NoStackTrace + +object ExecutorServiceMetrics { + val Category = "executor-service" + + private val DelegatedExecutor = Class.forName("java.util.concurrent.Executors$DelegatedExecutorService") + private val FinalizableDelegated = Class.forName("java.util.concurrent.Executors$FinalizableDelegatedExecutorService") + private val DelegateScheduled = Class.forName("java.util.concurrent.Executors$DelegatedScheduledExecutorService") + private val JavaForkJoinPool = classOf[JavaForkJoinPool] + private val ScalaForkJoinPool = classOf[ForkJoinPool] + + private val executorField = { + // executorService is private :( + val field = DelegatedExecutor.getDeclaredField("e") + field.setAccessible(true) + field + } + + /** + * + * Register the [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html ThreadPoolExecutor]] to Monitor. + * + * @param name The name of the [[ThreadPoolExecutor]] + * @param threadPool The intance of the [[ThreadPoolExecutor]] + * @param tags The tags associated to the [[ThreadPoolExecutor]] + */ + @inline private def registerThreadPool(name: String, threadPool: ThreadPoolExecutor, tags: Map[String, String]): Entity = { + val threadPoolEntity = Entity(name, Category, tags + ("executor-type" → "thread-pool-executor")) + Kamon.metrics.entity(ThreadPoolExecutorMetrics.factory(threadPool, Category), threadPoolEntity) + threadPoolEntity + } + + /** + * + * Register the [[http://www.scala-lang.org/api/current/index.html#scala.collection.parallel.TaskSupport ForkJoinPool]] to Monitor. + * + * @param name The name of the [[ForkJoinPool]] + * @param forkJoinPool The instance of the [[ForkJoinPool]] + * @param tags The tags associated to the [[ForkJoinPool]] + */ + @inline private def registerScalaForkJoin(name: String, forkJoinPool: ForkJoinPool, tags: Map[String, String]): Entity = { + val forkJoinEntity = Entity(name, Category, tags + ("executor-type" → "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity) + forkJoinEntity + } + + /** + * + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html JavaForkJoinPool]] to Monitor. + * + * @param name The name of the [[JavaForkJoinPool]] + * @param forkJoinPool The instance of the [[JavaForkJoinPool]] + * @param tags The tags associated to the [[JavaForkJoinPool]] + */ + @inline private def registerJavaForkJoin(name: String, forkJoinPool: JavaForkJoinPool, tags: Map[String, String]): Entity = { + val forkJoinEntity = Entity(name, Category, tags + ("executor-type" → "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity) + forkJoinEntity + } + + /** + * + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor. + * + * @param name The name of the [[ExecutorService]] + * @param executorService The instance of the [[ExecutorService]] + * @param tags The tags associated to the [[ExecutorService]] + */ + def register(name: String, executorService: ExecutorService, tags: Map[String, String]): Entity = executorService match { + case threadPoolExecutor: ThreadPoolExecutor ⇒ registerThreadPool(name, threadPoolExecutor, tags) + case scalaForkJoinPool: ForkJoinPool if scalaForkJoinPool.getClass.isAssignableFrom(ScalaForkJoinPool) ⇒ registerScalaForkJoin(name, scalaForkJoinPool, tags) + case javaForkJoinPool: JavaForkJoinPool if javaForkJoinPool.getClass.isAssignableFrom(JavaForkJoinPool) ⇒ registerJavaForkJoin(name, javaForkJoinPool, tags) + case delegatedExecutor: ExecutorService if delegatedExecutor.getClass.isAssignableFrom(DelegatedExecutor) ⇒ registerDelegatedExecutor(name, delegatedExecutor, tags) + case delegatedScheduledExecutor: ExecutorService if delegatedScheduledExecutor.getClass.isAssignableFrom(DelegateScheduled) ⇒ registerDelegatedExecutor(name, delegatedScheduledExecutor, tags) + case finalizableDelegatedExecutor: ExecutorService if finalizableDelegatedExecutor.getClass.isAssignableFrom(FinalizableDelegated) ⇒ registerDelegatedExecutor(name, finalizableDelegatedExecutor, tags) + case other ⇒ throw NotSupportedException(s"The ExecutorService $name is not supported.") + } + + //Java variant + def register(name: String, executorService: ExecutorService, tags: java.util.Map[String, String]): Entity = { + import scala.collection.JavaConverters._ + register(name, executorService, tags.asScala.toMap) + } + + /** + * + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor. + * + * @param name The name of the [[ExecutorService]] + * @param executorService The instance of the [[ExecutorService]] + */ + def register(name: String, executorService: ExecutorService): Entity = { + register(name, executorService, Map.empty[String, String]) + } + + def remove(entity: Entity): Unit = Kamon.metrics.removeEntity(entity) + + /** + * INTERNAL USAGE ONLY + */ + private def registerDelegatedExecutor(name: String, executor: ExecutorService, tags: Map[String, String]) = { + val underlyingExecutor = executorField.get(executor) match { + case executorService: ExecutorService ⇒ executorService + case other ⇒ other + } + register(name, underlyingExecutor.asInstanceOf[ExecutorService], tags) + } + + case class NotSupportedException(message: String) extends RuntimeException with NoStackTrace +} \ No newline at end of file -- cgit v1.2.3