From dda2ac8b5d9d378cb69bcda4a3e558faaee20813 Mon Sep 17 00:00:00 2001 From: anfeng Date: Tue, 6 Aug 2013 15:22:25 -0700 Subject: reformat registerFileSystemStat() --- core/src/main/scala/spark/executor/ExecutorSource.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala index 92adc7fba9..d491a3c0c9 100644 --- a/core/src/main/scala/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/spark/executor/ExecutorSource.scala @@ -14,7 +14,8 @@ class ExecutorSource(val executor: Executor) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption - private def registerFileSystemStat[T](scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { + private def registerFileSystemStat[T]( + scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] { override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue) }) @@ -43,6 +44,7 @@ class ExecutorSource(val executor: Executor) extends Source { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) + // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) -- cgit v1.2.3