aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala
blob: 60612beb2e583dc1cd9746970a742618c022ed72 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
 * =========================================================================================
 * Copyright © 2013-2015 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.util.executors

import java.util.concurrent.{ExecutorService, ForkJoinPool  JavaForkJoinPool, ThreadPoolExecutor}

import kamon.Kamon
import kamon.metric.Entity

import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NoStackTrace

object ExecutorServiceMetrics {
  val Category = "executor-service"

  private val DelegatedExecutor = Class.forName("java.util.concurrent.Executors$DelegatedExecutorService")
  private val FinalizableDelegated = Class.forName("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")
  private val DelegateScheduled = Class.forName("java.util.concurrent.Executors$DelegatedScheduledExecutorService")
  private val JavaForkJoinPool = classOf[JavaForkJoinPool]
  private val ScalaForkJoinPool = classOf[ForkJoinPool]

  private val executorField = {
    // executorService is private :(
    val field = DelegatedExecutor.getDeclaredField("e")
    field.setAccessible(true)
    field
  }

  /**
   *
   * Register the [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html ThreadPoolExecutor]] to Monitor.
   *
   * @param name The name of the [[ThreadPoolExecutor]]
   * @param threadPool The intance of the [[ThreadPoolExecutor]]
   * @param tags The tags associated to the [[ThreadPoolExecutor]]
   */
  @inline private def registerThreadPool(name: String, threadPool: ThreadPoolExecutor, tags: Map[String, String]): Entity = {
    val threadPoolEntity = Entity(name, Category, tags + ("executor-type"  "thread-pool-executor"))
    Kamon.metrics.entity(ThreadPoolExecutorMetrics.factory(threadPool, Category), threadPoolEntity)
    threadPoolEntity
  }

  /**
   *
   * Register the [[http://www.scala-lang.org/api/current/index.html#scala.collection.parallel.TaskSupport ForkJoinPool]] to Monitor.
   *
   * @param name The name of the [[ForkJoinPool]]
   * @param forkJoinPool The instance of the [[ForkJoinPool]]
   * @param tags The tags associated to the [[ForkJoinPool]]
   */
  @inline private def registerScalaForkJoin(name: String, forkJoinPool: ForkJoinPool, tags: Map[String, String]): Entity = {
    val forkJoinEntity = Entity(name, Category, tags + ("executor-type"  "fork-join-pool"))
    Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity)
    forkJoinEntity
  }

  /**
   *
   * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html JavaForkJoinPool]] to Monitor.
   *
   * @param name The name of the [[JavaForkJoinPool]]
   * @param forkJoinPool The instance of the [[JavaForkJoinPool]]
   * @param tags The tags associated to the [[JavaForkJoinPool]]
   */
  @inline private def registerJavaForkJoin(name: String, forkJoinPool: JavaForkJoinPool, tags: Map[String, String]): Entity = {
    val forkJoinEntity = Entity(name, Category, tags + ("executor-type"  "fork-join-pool"))
    Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity)
    forkJoinEntity
  }

  /**
   *
   * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor.
   *
   * @param name The name of the [[ExecutorService]]
   * @param executorService The instance of the [[ExecutorService]]
   * @param tags The tags associated to the [[ExecutorService]]
   */
  def register(name: String, executorService: ExecutorService, tags: Map[String, String]): Entity = executorService match {
    case threadPoolExecutor: ThreadPoolExecutor  registerThreadPool(name, threadPoolExecutor, tags)
    case scalaForkJoinPool: ForkJoinPool if scalaForkJoinPool.getClass.isAssignableFrom(ScalaForkJoinPool)  registerScalaForkJoin(name, scalaForkJoinPool, tags)
    case javaForkJoinPool: JavaForkJoinPool if javaForkJoinPool.getClass.isAssignableFrom(JavaForkJoinPool)  registerJavaForkJoin(name, javaForkJoinPool, tags)
    case delegatedExecutor: ExecutorService if delegatedExecutor.getClass.isAssignableFrom(DelegatedExecutor)  registerDelegatedExecutor(name, delegatedExecutor, tags)
    case delegatedScheduledExecutor: ExecutorService if delegatedScheduledExecutor.getClass.isAssignableFrom(DelegateScheduled)  registerDelegatedExecutor(name, delegatedScheduledExecutor, tags)
    case finalizableDelegatedExecutor: ExecutorService if finalizableDelegatedExecutor.getClass.isAssignableFrom(FinalizableDelegated)  registerDelegatedExecutor(name, finalizableDelegatedExecutor, tags)
    case other  throw NotSupportedException(s"The ExecutorService $name is not supported.")
  }

  //Java variant
  def register(name: String, executorService: ExecutorService, tags: java.util.Map[String, String]): Entity = {
    import scala.collection.JavaConverters._
    register(name, executorService, tags.asScala.toMap)
  }

  /**
   *
   * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor.
   *
   * @param name The name of the [[ExecutorService]]
   * @param executorService The instance of the [[ExecutorService]]
   */
  def register(name: String, executorService: ExecutorService): Entity = {
    register(name, executorService, Map.empty[String, String])
  }

  def remove(entity: Entity): Unit = Kamon.metrics.removeEntity(entity)

  /**
   * INTERNAL USAGE ONLY
   */
  private def registerDelegatedExecutor(name: String, executor: ExecutorService, tags: Map[String, String]) = {
    val underlyingExecutor = executorField.get(executor) match {
      case executorService: ExecutorService  executorService
      case other                             other
    }
    register(name, underlyingExecutor.asInstanceOf[ExecutorService], tags)
  }

  case class NotSupportedException(message: String) extends RuntimeException with NoStackTrace
}