aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoranfeng <andy.feng@gmail.com>2013-08-06 15:22:25 -0700
committeranfeng <andy.feng@gmail.com>2013-08-06 15:22:25 -0700
commitdda2ac8b5d9d378cb69bcda4a3e558faaee20813 (patch)
treead3bad9f33bf54068ebee4ad9f60d2174a3d6bd5
parent0748c60817ca8027869ba12e7d8175877e1d5cdb (diff)
downloadspark-dda2ac8b5d9d378cb69bcda4a3e558faaee20813.tar.gz
spark-dda2ac8b5d9d378cb69bcda4a3e558faaee20813.tar.bz2
spark-dda2ac8b5d9d378cb69bcda4a3e558faaee20813.zip
reformat registerFileSystemStat()
-rw-r--r--core/src/main/scala/spark/executor/ExecutorSource.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
index 92adc7fba9..d491a3c0c9 100644
--- a/core/src/main/scala/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -14,7 +14,8 @@ class ExecutorSource(val executor: Executor) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
- private def registerFileSystemStat[T](scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
+ private def registerFileSystemStat[T](
+ scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
})
@@ -43,6 +44,7 @@ class ExecutorSource(val executor: Executor) extends Source {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
+ // Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)