diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-08-10 09:02:27 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-08-10 09:02:27 -0700 |
commit | d17eeb997d10307eb08706e7e4b3982aea578108 (patch) | |
tree | dd8d37326d60e02ff6b974f5ed5ad9c40a8f07d1 /core | |
parent | dce5e47435b143b8ce5a8b431ac5591a4b112ee8 (diff) | |
parent | dda2ac8b5d9d378cb69bcda4a3e558faaee20813 (diff) | |
download | spark-d17eeb997d10307eb08706e7e4b3982aea578108.tar.gz spark-d17eeb997d10307eb08706e7e4b3982aea578108.tar.bz2 spark-d17eeb997d10307eb08706e7e4b3982aea578108.zip |
Merge pull request #785 from anfeng/master
expose HDFS file system stats via Executor metrics
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/executor/ExecutorSource.scala | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala index 94116edfcf..d491a3c0c9 100644 --- a/core/src/main/scala/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/spark/executor/ExecutorSource.scala @@ -2,9 +2,25 @@ package spark.executor import com.codahale.metrics.{Gauge, MetricRegistry} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.fs.LocalFileSystem + +import scala.collection.JavaConversions._ + import spark.metrics.source.Source class ExecutorSource(val executor: Executor) extends Source { + private def fileStats(scheme: String) : Option[FileSystem.Statistics] = + FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption + + private def registerFileSystemStat[T]( + scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { + metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] { + override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue) + }) + } + val metricRegistry = new MetricRegistry() val sourceName = "executor" @@ -27,4 +43,13 @@ class ExecutorSource(val executor: Executor) extends Source { metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) + + // Gauge for file system stats of this executor + for (scheme <- Array("hdfs", "file")) { + registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) + registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0) + registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0) + } } |