diff options
author | Reynold Xin <rxin@apache.org> | 2014-09-27 00:57:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-09-27 00:57:26 -0700 |
commit | 436a7730b6e7067f74b3739a3a412490003f7c4c (patch) | |
tree | ceb5769202f55f18137ea9d22b1b60d44ff39de4 /core | |
parent | 2d972fd84ac54a89e416442508a6d4eaeff452c1 (diff) | |
download | spark-436a7730b6e7067f74b3739a3a412490003f7c4c.tar.gz spark-436a7730b6e7067f74b3739a3a412490003f7c4c.tar.bz2 spark-436a7730b6e7067f74b3739a3a412490003f7c4c.zip |
Minor cleanup to tighten visibility and remove compilation warning.
Author: Reynold Xin <rxin@apache.org>
Closes #2555 from rxin/cleanup and squashes the following commits:
6add199 [Reynold Xin] Minor cleanup to tighten visibility and remove compilation warning.
Diffstat (limited to 'core')
3 files changed, 46 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index c3dabd2e79..3564ab2e2a 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -36,33 +36,31 @@ private[spark] class WholeTextFileRecordReader( index: Integer) extends RecordReader[String, String] { - private val path = split.getPath(index) - private val fs = path.getFileSystem(context.getConfiguration) + private[this] val path = split.getPath(index) + private[this] val fs = path.getFileSystem(context.getConfiguration) // True means the current file has been processed, then skip it. - private var processed = false + private[this] var processed = false - private val key = path.toString - private var value: String = null + private[this] val key = path.toString + private[this] var value: String = null - override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {} - override def close() = {} + override def close(): Unit = {} - override def getProgress = if (processed) 1.0f else 0.0f + override def getProgress: Float = if (processed) 1.0f else 0.0f - override def getCurrentKey = key + override def getCurrentKey: String = key - override def getCurrentValue = value + override def getCurrentValue: String = value - override def nextKeyValue = { + override def nextKeyValue(): Boolean = { if (!processed) { val fileIn = fs.open(path) val innerBuffer = ByteStreams.toByteArray(fileIn) - value = new Text(innerBuffer).toString Closeables.close(fileIn, false) - processed = true true } else { diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 6ef817d0e5..fd316a89a1 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -63,15 +63,18 @@ import org.apache.spark.metrics.source.Source * * [options] is the specific property of this source or sink. */ -private[spark] class MetricsSystem private (val instance: String, - conf: SparkConf, securityMgr: SecurityManager) extends Logging { +private[spark] class MetricsSystem private ( + val instance: String, + conf: SparkConf, + securityMgr: SecurityManager) + extends Logging { - val confFile = conf.get("spark.metrics.conf", null) - val metricsConfig = new MetricsConfig(Option(confFile)) + private[this] val confFile = conf.get("spark.metrics.conf", null) + private[this] val metricsConfig = new MetricsConfig(Option(confFile)) - val sinks = new mutable.ArrayBuffer[Sink] - val sources = new mutable.ArrayBuffer[Source] - val registry = new MetricRegistry() + private val sinks = new mutable.ArrayBuffer[Sink] + private val sources = new mutable.ArrayBuffer[Source] + private val registry = new MetricRegistry() // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui private var metricsServlet: Option[MetricsServlet] = None @@ -91,7 +94,7 @@ private[spark] class MetricsSystem private (val instance: String, sinks.foreach(_.stop) } - def report(): Unit = { + def report() { sinks.foreach(_.report()) } @@ -155,8 +158,8 @@ private[spark] object MetricsSystem { val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r - val MINIMAL_POLL_UNIT = TimeUnit.SECONDS - val MINIMAL_POLL_PERIOD = 1 + private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS + private[this] val MINIMAL_POLL_PERIOD = 1 def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) @@ -166,7 +169,8 @@ private[spark] object MetricsSystem { } } - def createMetricsSystem(instance: String, conf: SparkConf, - securityMgr: SecurityManager): MetricsSystem = + def createMetricsSystem( + instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { new MetricsSystem(instance, conf, securityMgr) + } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 96a5a12318..e42b181194 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -17,42 +17,47 @@ package org.apache.spark.metrics -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.metrics.source.Source +import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.master.MasterSource -class MetricsSystemSuite extends FunSuite with BeforeAndAfter { +import scala.collection.mutable.ArrayBuffer + + +class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{ var filePath: String = _ var conf: SparkConf = null var securityMgr: SecurityManager = null before { - filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile() + filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile conf = new SparkConf(false).set("spark.metrics.conf", filePath) securityMgr = new SecurityManager(conf) } test("MetricsSystem with default config") { val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr) - val sources = metricsSystem.sources - val sinks = metricsSystem.sinks + val sources = PrivateMethod[ArrayBuffer[Source]]('sources) + val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) - assert(sources.length === 0) - assert(sinks.length === 0) - assert(!metricsSystem.getServletHandlers.isEmpty) + assert(metricsSystem.invokePrivate(sources()).length === 0) + assert(metricsSystem.invokePrivate(sinks()).length === 0) + assert(metricsSystem.getServletHandlers.nonEmpty) } test("MetricsSystem with sources add") { val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) - val sources = metricsSystem.sources - val sinks = metricsSystem.sinks + val sources = PrivateMethod[ArrayBuffer[Source]]('sources) + val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) - assert(sources.length === 0) - assert(sinks.length === 1) - assert(!metricsSystem.getServletHandlers.isEmpty) + assert(metricsSystem.invokePrivate(sources()).length === 0) + assert(metricsSystem.invokePrivate(sinks()).length === 1) + assert(metricsSystem.getServletHandlers.nonEmpty) val source = new MasterSource(null) metricsSystem.registerSource(source) - assert(sources.length === 1) + assert(metricsSystem.invokePrivate(sources()).length === 1) } } |