diff options
author | anfeng <andy.feng@gmail.com> | 2013-08-06 11:47:06 -0700 |
---|---|---|
committer | anfeng <andy.feng@gmail.com> | 2013-08-06 11:47:06 -0700 |
commit | 0748c60817ca8027869ba12e7d8175877e1d5cdb (patch) | |
tree | fdbd127a1de01a89072b4118e46bd37af2202e17 | |
parent | d031f73679db02e57e70a0ac1caf6a597f8e76c0 (diff) | |
download | spark-0748c60817ca8027869ba12e7d8175877e1d5cdb.tar.gz spark-0748c60817ca8027869ba12e7d8175877e1d5cdb.tar.bz2 spark-0748c60817ca8027869ba12e7d8175877e1d5cdb.zip |
expose HDFS file system stats via Executor metrics
-rw-r--r-- | core/src/main/scala/spark/executor/ExecutorSource.scala | 23 |
1 files changed, 23 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala index 94116edfcf..92adc7fba9 100644 --- a/core/src/main/scala/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/spark/executor/ExecutorSource.scala @@ -2,9 +2,24 @@ package spark.executor import com.codahale.metrics.{Gauge, MetricRegistry} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.fs.LocalFileSystem + +import scala.collection.JavaConversions._ + import spark.metrics.source.Source class ExecutorSource(val executor: Executor) extends Source { + private def fileStats(scheme: String) : Option[FileSystem.Statistics] = + FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption + + private def registerFileSystemStat[T](scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { + metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] { + override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue) + }) + } + val metricRegistry = new MetricRegistry() val sourceName = "executor" @@ -27,4 +42,12 @@ class ExecutorSource(val executor: Executor) extends Source { metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) + + for (scheme <- Array("hdfs", "file")) { + registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) + registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0) + registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0) + } } |