1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
|