aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/util/executors/ExecutorMetricRecorder.scala
blob: 732e189e0a670164ca2c50169a2a6ddd1e6d2358 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*
 * =========================================================================================
 * Copyright © 2013-2015 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.util.executors

import kamon.metric.{EntityRecorderFactory, GenericEntityRecorder}
import kamon.metric.instrument.{Gauge, MinMaxCounter, DifferentialValueCollector, InstrumentFactory}
import java.util.concurrent.{ForkJoinPool  JavaForkJoinPool, ThreadPoolExecutor}
import kamon.util.executors.ForkJoinPools.ForkJoinMetrics

import scala.concurrent.forkjoin.ForkJoinPool

object ForkJoinPools {
  trait ForkJoinMetrics[T] {
    def getParallelism(fjp: T): Long
    def getPoolSize(fjp: T): Long
    def getActiveThreadCount(fjp: T): Long
    def getRunningThreadCount(fjp: T): Long
    def getQueuedTaskCount(fjp: T): Long
    def getQueuedSubmissionCount(fjp: T): Long
  }

  implicit object ScalaForkJoin extends ForkJoinMetrics[ForkJoinPool] {
    def getParallelism(fjp: ForkJoinPool) = fjp.getParallelism
    def getPoolSize(fjp: ForkJoinPool) = fjp.getPoolSize.toLong
    def getRunningThreadCount(fjp: ForkJoinPool) = fjp.getActiveThreadCount.toLong
    def getActiveThreadCount(fjp: ForkJoinPool) = fjp.getRunningThreadCount.toLong
    def getQueuedTaskCount(fjp: ForkJoinPool) = fjp.getQueuedTaskCount
    def getQueuedSubmissionCount(fjp: ForkJoinPool) = fjp.getQueuedSubmissionCount

  }

  implicit object JavaForkJoin extends ForkJoinMetrics[JavaForkJoinPool] {
    def getParallelism(fjp: JavaForkJoinPool) = fjp.getParallelism
    def getPoolSize(fjp: JavaForkJoinPool) = fjp.getPoolSize.toLong
    def getRunningThreadCount(fjp: JavaForkJoinPool) = fjp.getActiveThreadCount.toLong
    def getActiveThreadCount(fjp: JavaForkJoinPool) = fjp.getRunningThreadCount.toLong
    def getQueuedTaskCount(fjp: JavaForkJoinPool) = fjp.getQueuedTaskCount
    def getQueuedSubmissionCount(fjp: JavaForkJoinPool) = fjp.getQueuedSubmissionCount

  }
}

abstract class ForkJoinPoolMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
  def paralellism: MinMaxCounter
  def poolSize: Gauge
  def activeThreads: Gauge
  def runningThreads: Gauge
  def queuedTaskCount: Gauge
  def queuedSubmissionCount: Gauge
}

object ForkJoinPoolMetrics {
  def factory[T: ForkJoinMetrics](fjp: T, categoryName: String) = new EntityRecorderFactory[ForkJoinPoolMetrics] {
    val forkJoinMetrics = implicitly[ForkJoinMetrics[T]]

    def category: String = categoryName
    def createRecorder(instrumentFactory: InstrumentFactory) = new ForkJoinPoolMetrics(instrumentFactory) {
      val paralellism = minMaxCounter("parallelism")
      paralellism.increment(forkJoinMetrics.getParallelism(fjp)) // Steady value.

      val poolSize = gauge("pool-size", forkJoinMetrics.getPoolSize(fjp))
      val activeThreads = gauge("active-threads", forkJoinMetrics.getActiveThreadCount(fjp))
      val runningThreads = gauge("running-threads", forkJoinMetrics.getRunningThreadCount(fjp))
      val queuedTaskCount = gauge("queued-task-count", forkJoinMetrics.getQueuedTaskCount(fjp))
      val queuedSubmissionCount = gauge("queued-submission-count", forkJoinMetrics.getQueuedSubmissionCount(fjp))
    }
  }
}

class ThreadPoolExecutorMetrics(tpe: ThreadPoolExecutor, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
  val corePoolSize = gauge("core-pool-size", tpe.getCorePoolSize.toLong)
  val maxPoolSize = gauge("max-pool-size", tpe.getMaximumPoolSize.toLong)
  val poolSize = gauge("pool-size", tpe.getPoolSize.toLong)
  val activeThreads = gauge("active-threads", tpe.getActiveCount.toLong)
  val processedTasks = gauge("processed-tasks", DifferentialValueCollector(()  {
    tpe.getTaskCount
  }))
}

object ThreadPoolExecutorMetrics {
  def factory(tpe: ThreadPoolExecutor, cat: String) = new EntityRecorderFactory[ThreadPoolExecutorMetrics] {
    def category: String = cat
    def createRecorder(instrumentFactory: InstrumentFactory) = new ThreadPoolExecutorMetrics(tpe, instrumentFactory)
  }
}