aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-27 00:57:26 -0700
committerReynold Xin <rxin@apache.org>2014-09-27 00:57:26 -0700
commit436a7730b6e7067f74b3739a3a412490003f7c4c (patch)
treeceb5769202f55f18137ea9d22b1b60d44ff39de4 /core
parent2d972fd84ac54a89e416442508a6d4eaeff452c1 (diff)
downloadspark-436a7730b6e7067f74b3739a3a412490003f7c4c.tar.gz
spark-436a7730b6e7067f74b3739a3a412490003f7c4c.tar.bz2
spark-436a7730b6e7067f74b3739a3a412490003f7c4c.zip
Minor cleanup to tighten visibility and remove compilation warning.
Author: Reynold Xin <rxin@apache.org> Closes #2555 from rxin/cleanup and squashes the following commits: 6add199 [Reynold Xin] Minor cleanup to tighten visibility and remove compilation warning.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala33
3 files changed, 46 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index c3dabd2e79..3564ab2e2a 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -36,33 +36,31 @@ private[spark] class WholeTextFileRecordReader(
index: Integer)
extends RecordReader[String, String] {
- private val path = split.getPath(index)
- private val fs = path.getFileSystem(context.getConfiguration)
+ private[this] val path = split.getPath(index)
+ private[this] val fs = path.getFileSystem(context.getConfiguration)
// True means the current file has been processed, then skip it.
- private var processed = false
+ private[this] var processed = false
- private val key = path.toString
- private var value: String = null
+ private[this] val key = path.toString
+ private[this] var value: String = null
- override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
+ override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
- override def close() = {}
+ override def close(): Unit = {}
- override def getProgress = if (processed) 1.0f else 0.0f
+ override def getProgress: Float = if (processed) 1.0f else 0.0f
- override def getCurrentKey = key
+ override def getCurrentKey: String = key
- override def getCurrentValue = value
+ override def getCurrentValue: String = value
- override def nextKeyValue = {
+ override def nextKeyValue(): Boolean = {
if (!processed) {
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
-
value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)
-
processed = true
true
} else {
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 6ef817d0e5..fd316a89a1 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -63,15 +63,18 @@ import org.apache.spark.metrics.source.Source
*
* [options] is the specific property of this source or sink.
*/
-private[spark] class MetricsSystem private (val instance: String,
- conf: SparkConf, securityMgr: SecurityManager) extends Logging {
+private[spark] class MetricsSystem private (
+ val instance: String,
+ conf: SparkConf,
+ securityMgr: SecurityManager)
+ extends Logging {
- val confFile = conf.get("spark.metrics.conf", null)
- val metricsConfig = new MetricsConfig(Option(confFile))
+ private[this] val confFile = conf.get("spark.metrics.conf", null)
+ private[this] val metricsConfig = new MetricsConfig(Option(confFile))
- val sinks = new mutable.ArrayBuffer[Sink]
- val sources = new mutable.ArrayBuffer[Source]
- val registry = new MetricRegistry()
+ private val sinks = new mutable.ArrayBuffer[Sink]
+ private val sources = new mutable.ArrayBuffer[Source]
+ private val registry = new MetricRegistry()
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None
@@ -91,7 +94,7 @@ private[spark] class MetricsSystem private (val instance: String,
sinks.foreach(_.stop)
}
- def report(): Unit = {
+ def report() {
sinks.foreach(_.report())
}
@@ -155,8 +158,8 @@ private[spark] object MetricsSystem {
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
- val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
- val MINIMAL_POLL_PERIOD = 1
+ private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+ private[this] val MINIMAL_POLL_PERIOD = 1
def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
@@ -166,7 +169,8 @@ private[spark] object MetricsSystem {
}
}
- def createMetricsSystem(instance: String, conf: SparkConf,
- securityMgr: SecurityManager): MetricsSystem =
+ def createMetricsSystem(
+ instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 96a5a12318..e42b181194 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -17,42 +17,47 @@
package org.apache.spark.metrics
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.metrics.source.Source
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
+
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.master.MasterSource
-class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
+import scala.collection.mutable.ArrayBuffer
+
+
+class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{
var filePath: String = _
var conf: SparkConf = null
var securityMgr: SecurityManager = null
before {
- filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
+ filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile
conf = new SparkConf(false).set("spark.metrics.conf", filePath)
securityMgr = new SecurityManager(conf)
}
test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
- val sources = metricsSystem.sources
- val sinks = metricsSystem.sinks
+ val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
+ val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
- assert(sources.length === 0)
- assert(sinks.length === 0)
- assert(!metricsSystem.getServletHandlers.isEmpty)
+ assert(metricsSystem.invokePrivate(sources()).length === 0)
+ assert(metricsSystem.invokePrivate(sinks()).length === 0)
+ assert(metricsSystem.getServletHandlers.nonEmpty)
}
test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
- val sources = metricsSystem.sources
- val sinks = metricsSystem.sinks
+ val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
+ val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
- assert(sources.length === 0)
- assert(sinks.length === 1)
- assert(!metricsSystem.getServletHandlers.isEmpty)
+ assert(metricsSystem.invokePrivate(sources()).length === 0)
+ assert(metricsSystem.invokePrivate(sinks()).length === 1)
+ assert(metricsSystem.getServletHandlers.nonEmpty)
val source = new MasterSource(null)
metricsSystem.registerSource(source)
- assert(sources.length === 1)
+ assert(metricsSystem.invokePrivate(sources()).length === 1)
}
}