aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authoranfeng <andy.feng@gmail.com>2013-08-06 11:47:06 -0700
committeranfeng <andy.feng@gmail.com>2013-08-06 11:47:06 -0700
commit0748c60817ca8027869ba12e7d8175877e1d5cdb (patch)
treefdbd127a1de01a89072b4118e46bd37af2202e17 /core
parentd031f73679db02e57e70a0ac1caf6a597f8e76c0 (diff)
downloadspark-0748c60817ca8027869ba12e7d8175877e1d5cdb.tar.gz
spark-0748c60817ca8027869ba12e7d8175877e1d5cdb.tar.bz2
spark-0748c60817ca8027869ba12e7d8175877e1d5cdb.zip
expose HDFS file system stats via Executor metrics
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/ExecutorSource.scala23
1 files changed, 23 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
index 94116edfcf..92adc7fba9 100644
--- a/core/src/main/scala/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -2,9 +2,24 @@ package spark.executor
import com.codahale.metrics.{Gauge, MetricRegistry}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.fs.LocalFileSystem
+
+import scala.collection.JavaConversions._
+
import spark.metrics.source.Source
class ExecutorSource(val executor: Executor) extends Source {
+ private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
+ FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
+
+ private def registerFileSystemStat[T](scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
+ metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
+ override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
+ })
+ }
+
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
@@ -27,4 +42,12 @@ class ExecutorSource(val executor: Executor) extends Source {
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
+
+ for (scheme <- Array("hdfs", "file")) {
+ registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
+ registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
+ registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
+ registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
+ registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
+ }
}