aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-10 09:02:27 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-10 09:02:27 -0700
commitd17eeb997d10307eb08706e7e4b3982aea578108 (patch)
treedd8d37326d60e02ff6b974f5ed5ad9c40a8f07d1 /core
parentdce5e47435b143b8ce5a8b431ac5591a4b112ee8 (diff)
parentdda2ac8b5d9d378cb69bcda4a3e558faaee20813 (diff)
downloadspark-d17eeb997d10307eb08706e7e4b3982aea578108.tar.gz
spark-d17eeb997d10307eb08706e7e4b3982aea578108.tar.bz2
spark-d17eeb997d10307eb08706e7e4b3982aea578108.zip
Merge pull request #785 from anfeng/master
expose HDFS file system stats via Executor metrics
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/ExecutorSource.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
index 94116edfcf..d491a3c0c9 100644
--- a/core/src/main/scala/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -2,9 +2,25 @@ package spark.executor
import com.codahale.metrics.{Gauge, MetricRegistry}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.fs.LocalFileSystem
+
+import scala.collection.JavaConversions._
+
import spark.metrics.source.Source
class ExecutorSource(val executor: Executor) extends Source {
+ private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
+ FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
+
+ private def registerFileSystemStat[T](
+ scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
+ metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
+ override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
+ })
+ }
+
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
@@ -27,4 +43,13 @@ class ExecutorSource(val executor: Executor) extends Source {
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
+
+ // Gauge for file system stats of this executor
+ for (scheme <- Array("hdfs", "file")) {
+ registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
+ registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
+ registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
+ registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
+ registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
+ }
}