aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
blob: 98c4fb7e59a03755cebefb21fe6a99f0a2a89f82 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package spark.scheduler

import com.codahale.metrics.{Gauge,MetricRegistry}

import spark.metrics.source.Source

private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
  val metricRegistry = new MetricRegistry()
  val sourceName = "DAGScheduler"

  metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
    override def getValue: Int = dagScheduler.failed.size
  })

  metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
    override def getValue: Int = dagScheduler.running.size
  })

  metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
    override def getValue: Int = dagScheduler.waiting.size
  })

  metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
    override def getValue: Int = dagScheduler.nextJobId.get()
  })

  metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
    override def getValue: Int = dagScheduler.activeJobs.size
  })
}