aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorGrace Huang <jie.huang@intel.com>2013-10-08 17:44:56 +0800
committerGrace Huang <jie.huang@intel.com>2013-10-08 17:44:56 +0800
commita2af6b543a0a70d94a451c9022deea181d04f8e8 (patch)
tree394a4d3fa8c89b0192f2c148e0dbfe9644ac52e4 /core
parent892fb8ffa85016a63d7d00dd6f1abc58ccf034a2 (diff)
downloadspark-a2af6b543a0a70d94a451c9022deea181d04f8e8.tar.gz
spark-a2af6b543a0a70d94a451c9022deea181d04f8e8.tar.bz2
spark-a2af6b543a0a70d94a451c9022deea181d04f8e8.zip
Revert "remedy the line-wrap while exceeding 100 chars"
This reverts commit 892fb8ffa85016a63d7d00dd6f1abc58ccf034a2.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala66
6 files changed, 91 insertions, 114 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index f0b1f777fd..c72322e9c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -31,13 +31,12 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
override def getValue: String = application.state.toString
})
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")),
- new Gauge[Long] { override def getValue: Long = application.duration })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] {
+ override def getValue: Long = application.duration
+ })
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")),
- new Gauge[Int] { override def getValue: Int = application.coresGranted })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] {
+ override def getValue: Int = application.coresGranted
+ })
}
-
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index 8a88fef330..de3939836b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -27,18 +27,17 @@ private[spark] class MasterSource(val master: Master) extends Source {
val sourceName = "master"
// Gauge for worker numbers in cluster
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("workers","number")),
- new Gauge[Int] { override def getValue: Int = master.workers.size })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] {
+ override def getValue: Int = master.workers.size
+ })
// Gauge for application numbers in cluster
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")),
- new Gauge[Int] { override def getValue: Int = master.apps.size })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] {
+ override def getValue: Int = master.apps.size
+ })
// Gauge for waiting application numbers in cluster
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")),
- new Gauge[Int] { override def getValue: Int = master.waitingApps.size })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] {
+ override def getValue: Int = master.waitingApps.size
+ })
}
-
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index 0596f14355..fc4f4ae99c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -26,28 +26,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")),
- new Gauge[Int] { override def getValue: Int = worker.executors.size })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] {
+ override def getValue: Int = worker.executors.size
+ })
// Gauge for cores used of this worker
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")),
- new Gauge[Int] { override def getValue: Int = worker.coresUsed })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] {
+ override def getValue: Int = worker.coresUsed
+ })
// Gauge for memory used of this worker
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")),
- new Gauge[Int] { override def getValue: Int = worker.memoryUsed })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] {
+ override def getValue: Int = worker.memoryUsed
+ })
// Gauge for cores free of this worker
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")),
- new Gauge[Int] { override def getValue: Int = worker.coresFree })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] {
+ override def getValue: Int = worker.coresFree
+ })
// Gauge for memory free of this worker
- metricRegistry.register(
- MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")),
- new Gauge[Int] { override def getValue: Int = worker.memoryFree })
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] {
+ override def getValue: Int = worker.memoryFree
+ })
}
-
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index d063e4ad0b..6cbd154603 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -44,42 +44,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
- metricRegistry.register(
- MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")),
- new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() })
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getActiveCount()
+ })
// Gauge for executor thread pool's approximate total number of tasks that have been completed
- metricRegistry.register(
- MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")),
- new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() })
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] {
+ override def getValue: Long = executor.threadPool.getCompletedTaskCount()
+ })
// Gauge for executor thread pool's current number of threads
- metricRegistry.register(
- MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")),
- new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() })
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getPoolSize()
+ })
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
- metricRegistry.register(
- MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")),
- new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() })
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getMaximumPoolSize()
+ })
// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
- registerFileSystemStat(scheme,
- NamingConventions.makeMetricName("read", "bytes"),
- _.getBytesRead(), 0L)
- registerFileSystemStat(scheme,
- NamingConventions.makeMetricName("write", "bytes"),
- _.getBytesWritten(), 0L)
- registerFileSystemStat(scheme,
- NamingConventions.makeMetricName("read", "ops"),
- _.getReadOps(), 0)
- registerFileSystemStat(scheme,
- NamingConventions.makeMetricName("largeRead", "ops"),
- _.getLargeReadOps(), 0)
- registerFileSystemStat(scheme,
- NamingConventions.makeMetricName("write", "ops"),
- _.getWriteOps(), 0)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0)
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 02fb80760f..9e90a08411 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -28,24 +28,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.DAGScheduler".format(sc.appName)
- metricRegistry.register(
- MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")),
- new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size })
+ metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.failed.size
+ })
- metricRegistry.register(
- MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")),
- new Gauge[Int] { override def getValue: Int = dagScheduler.running.size })
+ metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.running.size
+ })
- metricRegistry.register(
- MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")),
- new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size })
+ metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.waiting.size
+ })
- metricRegistry.register(
- MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")),
- new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() })
+ metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.nextJobId.get()
+ })
- metricRegistry.register(
- MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")),
- new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size })
+ metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.activeJobs.size
+ })
}
-
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index fcf9da481b..4312250240 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -29,48 +29,40 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.BlockManager".format(sc.appName)
- metricRegistry.register(
- MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")),
- new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
- maxMem / 1024 / 1024
- }
+ metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ maxMem / 1024 / 1024
+ }
})
- metricRegistry.register(
- MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")),
- new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
- remainingMem / 1024 / 1024
- }
+ metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ remainingMem / 1024 / 1024
+ }
})
- metricRegistry.register(
- MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")),
- new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
- (maxMem - remainingMem) / 1024 / 1024
- }
+ metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ (maxMem - remainingMem) / 1024 / 1024
+ }
})
- metricRegistry.register(
- MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList
- .flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_ + _)
- .getOrElse(0L)
-
- diskSpaceUsed / 1024 / 1024
- }
+ metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val diskSpaceUsed = storageStatusList
+ .flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_ + _)
+ .getOrElse(0L)
+
+ diskSpaceUsed / 1024 / 1024
+ }
})
}
-