aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
committerReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
commit82701ee25fda64f03899713bc56f82ca6f278151 (patch)
tree07fba36d66228f7561bd65dd502fd668d50a9be5
parentc42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 (diff)
downloadspark-82701ee25fda64f03899713bc56f82ca6f278151.tar.gz
spark-82701ee25fda64f03899713bc56f82ca6f278151.tar.bz2
spark-82701ee25fda64f03899713bc56f82ca6f278151.zip
[SPARK-6428] Turn on explicit type checking for public methods.
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle. Author: Reynold Xin <rxin@databricks.com> Closes #5342 from rxin/SPARK-6428 and squashes the following commits: 7b531ab [Reynold Xin] import ordering 2d9a8a5 [Reynold Xin] jl e668b1c [Reynold Xin] override 9b9e119 [Reynold Xin] Parenthesis. 82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala53
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalLR.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LogQuery.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkLR.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTC.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala12
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala2
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala13
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala3
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala2
-rw-r--r--scalastyle-config.xml2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala4
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala62
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala6
46 files changed, 170 insertions, 142 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a023712be1..8441bb3a30 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -661,7 +661,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
- def fn = (x: V) => f.call(x).asScala
+ def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 18ccd625fc..db4e996feb 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -192,7 +192,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
*/
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x)
+ def fn: (T) => S = (x: T) => f.call(x)
import com.google.common.collect.Ordering // shadows scala.math.Ordering
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
implicit val ctag: ClassTag[S] = fakeClassTag
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 8da42934a7..8bf0627fc4 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,8 +17,9 @@
package org.apache.spark.api.java
-import java.util.{Comparator, List => JList, Iterator => JIterator}
+import java.{lang => jl}
import java.lang.{Iterable => JIterable, Long => JLong}
+import java.util.{Comparator, List => JList, Iterator => JIterator}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -93,7 +94,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of the original partition.
*/
def mapPartitionsWithIndex[R](
- f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
+ f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -109,7 +110,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassTag[(K2, V2)]]
+ def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -119,7 +120,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -129,8 +130,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
- new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
+ def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
+ new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
/**
@@ -139,8 +140,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
- def cm = implicitly[ClassTag[(K2, V2)]]
+ def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -148,7 +149,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[U] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -157,7 +160,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[U] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -166,8 +171,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
- new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
+ def fn: (Iterator[T]) => Iterator[jl.Double] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
+ new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}
/**
@@ -175,7 +182,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -184,7 +193,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[jl.Double] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
}
@@ -194,7 +205,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -277,8 +290,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
- def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
- f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
+ def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
+ (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
+ f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
+ }
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}
@@ -441,8 +456,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
- def countByValue(): java.util.Map[T, java.lang.Long] =
- mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
+ def countByValue(): java.util.Map[T, jl.Long] =
+ mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))
/**
* (Experimental) Approximate version of countByValue().
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index 17624c20cf..f73eac1e2b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -40,8 +40,8 @@ object LocalKMeans {
val convergeDist = 0.001
val rand = new Random(42)
- def generateData = {
- def generatePoint(i: Int) = {
+ def generateData: Array[DenseVector[Double]] = {
+ def generatePoint(i: Int): DenseVector[Double] = {
DenseVector.fill(D){rand.nextDouble * R}
}
Array.tabulate(N)(generatePoint)
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
index 92a683ad57..a55e0dc8d3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
@@ -37,8 +37,8 @@ object LocalLR {
case class DataPoint(x: Vector[Double], y: Double)
- def generateData = {
- def generatePoint(i: Int) = {
+ def generateData: Array[DataPoint] = {
+ def generatePoint(i: Int): DataPoint = {
val y = if(i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 74620ad007..32e02eab8b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -54,8 +54,8 @@ object LogQuery {
// scalastyle:on
/** Tracks the total query count and number of aggregate bytes for a particular group. */
class Stats(val count: Int, val numBytes: Int) extends Serializable {
- def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
- override def toString = "bytes=%s\tn=%s".format(numBytes, count)
+ def merge(other: Stats): Stats = new Stats(count + other.count, numBytes + other.numBytes)
+ override def toString: String = "bytes=%s\tn=%s".format(numBytes, count)
}
def extractKey(line: String): (String, String, String) = {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
index 257a7d29f9..8c01a60844 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
@@ -42,8 +42,8 @@ object SparkLR {
case class DataPoint(x: Vector[Double], y: Double)
- def generateData = {
- def generatePoint(i: Int) = {
+ def generateData: Array[DataPoint] = {
+ def generatePoint(i: Int): DataPoint = {
val y = if(i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
index f7f83086df..772cd897f5 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
@@ -31,7 +31,7 @@ object SparkTC {
val numVertices = 100
val rand = new Random(42)
- def generateGraph = {
+ def generateGraph: Seq[(Int, Int)] = {
val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
while (edges.size < numEdges) {
val from = rand.nextInt(numVertices)
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index e322d4ce5a..ab6e63deb3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -90,7 +90,7 @@ class PRMessage() extends Message[String] with Serializable {
}
class CustomPartitioner(partitions: Int) extends Partitioner {
- def numPartitions = partitions
+ def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
val hash = key match {
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
index 1f4ca4fbe7..0bc36ea65e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -178,7 +178,9 @@ object MovieLensALS {
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
: Double = {
- def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
+ def mapPredictedRating(r: Double): Double = {
+ if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
+ }
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map{ x =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index b433082dce..92867b44be 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -85,13 +85,13 @@ extends Actor with ActorHelper {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
- override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+ override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
- def receive = {
+ def receive: PartialFunction[Any, Unit] = {
case msg => store(msg.asInstanceOf[T])
}
- override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+ override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index c3a05c89d8..751b30ea15 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -55,7 +55,8 @@ import org.apache.spark.util.IntParam
*/
object RecoverableNetworkWordCount {
- def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
+ def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
+ : StreamingContext = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 6510c70bd1..e99d1baa72 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -35,7 +35,7 @@ import org.apache.spark.SparkConf
*/
object SimpleZeroMQPublisher {
- def main(args: Array[String]) = {
+ def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
System.exit(1)
@@ -45,7 +45,7 @@ object SimpleZeroMQPublisher {
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
- implicit def stringToByteString(x: String) = ByteString(x)
+ implicit def stringToByteString(x: String): ByteString = ByteString(x)
val messages: List[ByteString] = List("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
@@ -86,7 +86,7 @@ object ZeroMQWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(sparkConf, Seconds(2))
- def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
+ def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
// For this stream, a zeroMQ publisher should be running.
val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 8402491b62..54d996b8ac 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -94,7 +94,7 @@ object PageViewGenerator {
while (true) {
val socket = listener.accept()
new Thread() {
- override def run = {
+ override def run(): Unit = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2de2a7926b..60e2994431 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -37,8 +37,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
-import org.jboss.netty.channel.ChannelPipelineFactory
-import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
@@ -187,8 +186,8 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}
- override def preferredLocation = Some(host)
-
+ override def preferredLocation: Option[String] = Option(host)
+
/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
@@ -198,13 +197,12 @@ class FlumeReceiver(
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
-
- def getPipeline() = {
+ def getPipeline(): ChannelPipeline = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
+ }
}
}
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 04e65cb3d7..1b1fc8051d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -129,8 +129,9 @@ class DirectKafkaInputDStream[
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def batchForTime = data.asInstanceOf[mutable.HashMap[
- Time, Array[OffsetRange.OffsetRangeTuple]]]
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
override def update(time: Time) {
batchForTime.clear()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 6d465bcb6b..4a83b715fa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -155,7 +155,7 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}
- override def close() = consumer.close()
+ override def close(): Unit = consumer.close()
override def getNext(): R = {
if (iter == null || !iter.hasNext) {
@@ -207,7 +207,7 @@ object KafkaRDD {
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
- ): KafkaRDD[K, V, U, T, R] = {
+ ): KafkaRDD[K, V, U, T, R] = {
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 4eacc47da5..7cf02d85d7 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -70,7 +70,7 @@ class TwitterReceiver(
try {
val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
newTwitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
+ def onStatus(status: Status): Unit = {
store(status)
}
// Unimplemented
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 554705878e..588e6bac7b 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -29,13 +29,16 @@ import org.apache.spark.streaming.receiver.ActorHelper
/**
* A receiver to subscribe to ZeroMQ stream.
*/
-private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T])
+private[streaming] class ZeroMQReceiver[T: ClassTag](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with ActorHelper with Logging {
- override def preStart() = ZeroMQExtension(context.system)
- .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
+ override def preStart(): Unit = {
+ ZeroMQExtension(context.system)
+ .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
+ }
def receive: Receive = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
index d8be02e202..23430179f1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
@@ -62,7 +62,6 @@ object EdgeContext {
* , _ + _)
* }}}
*/
- def unapply[VD, ED, A](edge: EdgeContext[VD, ED, A]) =
+ def unapply[VD, ED, A](edge: EdgeContext[VD, ED, A]): Some[(VertexId, VertexId, VD, VD, ED)] =
Some(edge.srcId, edge.dstId, edge.srcAttr, edge.dstAttr, edge.attr)
}
-
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
index 6f03eb1439..058c8c8aa1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
@@ -34,12 +34,12 @@ class EdgeDirection private (private val name: String) extends Serializable {
override def toString: String = "EdgeDirection." + name
- override def equals(o: Any) = o match {
+ override def equals(o: Any): Boolean = o match {
case other: EdgeDirection => other.name == name
case _ => false
}
- override def hashCode = name.hashCode
+ override def hashCode: Int = name.hashCode
}
@@ -48,14 +48,14 @@ class EdgeDirection private (private val name: String) extends Serializable {
*/
object EdgeDirection {
/** Edges arriving at a vertex. */
- final val In = new EdgeDirection("In")
+ final val In: EdgeDirection = new EdgeDirection("In")
/** Edges originating from a vertex. */
- final val Out = new EdgeDirection("Out")
+ final val Out: EdgeDirection = new EdgeDirection("Out")
/** Edges originating from *or* arriving at a vertex of interest. */
- final val Either = new EdgeDirection("Either")
+ final val Either: EdgeDirection = new EdgeDirection("Either")
/** Edges originating from *and* arriving at a vertex of interest. */
- final val Both = new EdgeDirection("Both")
+ final val Both: EdgeDirection = new EdgeDirection("Both")
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 9d473d5ebd..c8790cac3d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -62,7 +62,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
def vertexAttr(vid: VertexId): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
- override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+ override def toString: String = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 373af75448..c561570809 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -324,7 +324,7 @@ class EdgePartition[
*
* @return an iterator over edges in the partition
*/
- def iterator = new Iterator[Edge[ED]] {
+ def iterator: Iterator[Edge[ED]] = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = 0
@@ -351,7 +351,7 @@ class EdgePartition[
override def hasNext: Boolean = pos < EdgePartition.this.size
- override def next() = {
+ override def next(): EdgeTriplet[VD, ED] = {
val triplet = new EdgeTriplet[VD, ED]
val localSrcId = localSrcIds(pos)
val localDstId = localDstIds(pos)
@@ -518,11 +518,11 @@ private class AggregatingEdgeContext[VD, ED, A](
_attr = attr
}
- override def srcId = _srcId
- override def dstId = _dstId
- override def srcAttr = _srcAttr
- override def dstAttr = _dstAttr
- override def attr = _attr
+ override def srcId: VertexId = _srcId
+ override def dstId: VertexId = _dstId
+ override def srcAttr: VD = _srcAttr
+ override def dstAttr: VD = _dstAttr
+ override def attr: ED = _attr
override def sendToSrc(msg: A) {
send(_localSrcId, msg)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 43a3aea0f6..c88b2f65a8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -70,9 +70,9 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
this
}
- override def getStorageLevel = partitionsRDD.getStorageLevel
+ override def getStorageLevel: StorageLevel = partitionsRDD.getStorageLevel
- override def checkpoint() = {
+ override def checkpoint(): Unit = {
partitionsRDD.checkpoint()
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 8ab255bd40..1df86449fa 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -50,7 +50,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
* Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to
* match.
*/
- def reverse() = {
+ def reverse(): ReplicatedVertexView[VD, ED] = {
val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 349c8545bf..33ac7b0ed6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -71,9 +71,9 @@ class VertexRDDImpl[VD] private[graphx] (
this
}
- override def getStorageLevel = partitionsRDD.getStorageLevel
+ override def getStorageLevel: StorageLevel = partitionsRDD.getStorageLevel
- override def checkpoint() = {
+ override def checkpoint(): Unit = {
partitionsRDD.checkpoint()
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index e2f6cc1389..859f896039 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -37,7 +37,7 @@ object ConnectedComponents {
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
- def sendMessage(edge: EdgeTriplet[VertexId, ED]) = {
+ def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
index 82e9e06515..2bcf8684b8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -43,7 +43,7 @@ object LabelPropagation {
*/
def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, ED] = {
val lpaGraph = graph.mapVertices { case (vid, _) => vid }
- def sendMessage(e: EdgeTriplet[VertexId, ED]) = {
+ def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, Map[VertexId, VertexId])] = {
Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))
}
def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
@@ -54,7 +54,7 @@ object LabelPropagation {
i -> (count1Val + count2Val)
}.toMap
}
- def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]) = {
+ def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
if (message.isEmpty) attr else message.maxBy(_._2)._1
}
val initialMessage = Map[VertexId, Long]()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 570440ba44..042e366a29 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -156,7 +156,7 @@ object PageRank extends Logging {
(newPR, newPR - oldPR)
}
- def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
+ def sendMessage(edge: EdgeTriplet[(Double, Double), Double]): Iterator[(VertexId, Double)] = {
if (edge.srcAttr._2 > tol) {
Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
index 57b01b6f2e..e2754ea699 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
@@ -56,7 +56,7 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
private var _oldValues: Array[V] = null
- override def size = keySet.size
+ override def size: Int = keySet.size
/** Get the value for a given key */
def apply(k: K): V = {
@@ -112,7 +112,7 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
}
}
- override def iterator = new Iterator[(K, V)] {
+ override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
var pos = 0
var nextPair: (K, V) = computeNextPair()
@@ -128,9 +128,9 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
}
}
- def hasNext = nextPair != null
+ def hasNext: Boolean = nextPair != null
- def next() = {
+ def next(): (K, V) = {
val pair = nextPair
nextPair = computeNextPair()
pair
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 9ee7e4a66b..b2d9053f70 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -522,7 +522,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
new Word2VecModel(word2VecMap)
}
- def save(sc: SparkContext, path: String, model: Map[String, Array[Float]]) = {
+ def save(sc: SparkContext, path: String, model: Map[String, Array[Float]]): Unit = {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 459a5035d4..7168d5b2a8 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -137,7 +137,7 @@
<!-- <parameter name="maxMethods"><![CDATA[30]]></parameter> -->
<!-- </parameters> -->
<!-- </check> -->
- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
<check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index 34fedead44..f9992185a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -30,7 +30,7 @@ class AnalysisException protected[sql] (
val startPosition: Option[Int] = None)
extends Exception with Serializable {
- def withPosition(line: Option[Int], startPosition: Option[Int]) = {
+ def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
val newException = new AnalysisException(message, line, startPosition)
newException.setStackTrace(getStackTrace)
newException
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
index c61c395cb4..7731336d24 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
@@ -44,7 +44,7 @@ package object analysis {
}
/** Catches any AnalysisExceptions thrown by `f` and attaches `t`'s position if any. */
- def withPosition[A](t: TreeNode[_])(f: => A) = {
+ def withPosition[A](t: TreeNode[_])(f: => A): A = {
try f catch {
case a: AnalysisException =>
throw a.withPosition(t.origin.line, t.origin.startPosition)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index eb46b46ca5..319de710fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -204,7 +204,7 @@ private[sql] object ResolvedDataSource {
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val clazz: Class[_] = lookupDataSource(provider)
- def className = clazz.getCanonicalName
+ def className: String = clazz.getCanonicalName
val relation = userSpecifiedSchema match {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c4da34ae64..ae5ce4cf4c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -861,7 +861,7 @@ private[hive] case class MetastoreRelation
/** An attribute map for determining the ordinal for non-partition columns. */
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
- override def newInstance() = {
+ override def newInstance(): MetastoreRelation = {
MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 5be09a11ad..077e64133f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -659,7 +659,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
AttributeReference("value", StringType)()), true)
}
- def matchSerDe(clause: Seq[ASTNode]) = clause match {
+ def matchSerDe(clause: Seq[ASTNode])
+ : (Seq[(String, String)], String, Seq[(String, String)]) = clause match {
case Token("TOK_SERDEPROPS", propsClause) :: Nil =>
val rowFormat = propsClause.map {
case Token(name, Token(value, Nil) :: Nil) => (name, value)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index f73b463d07..28703ef812 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -234,7 +234,7 @@ object CheckpointReader extends Logging {
val checkpointPath = new Path(checkpointDir)
// TODO(rxin): Why is this a def?!
- def fs = checkpointPath.getFileSystem(hadoopConf)
+ def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 73030e15c5..808dcc174c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -169,7 +169,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -179,7 +179,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -190,7 +190,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[U] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -201,7 +203,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index f94f2d0e8b..93baad19e3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -526,7 +526,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
- def fn = (x: V) => f.apply(x).asScala
+ def fn: (V) => Iterable[U] = (x: V) => f.apply(x).asScala
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.flatMapValues(fn)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index e3db01c1e1..4095a7cc84 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -192,7 +192,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaReceiverInputDStream[T] = {
- def fn = (x: InputStream) => converter.call(x).toIterator
+ def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
@@ -313,7 +313,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
- def fn = (x: Path) => filter.call(x).booleanValue()
+ def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
}
@@ -344,7 +344,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
- def fn = (x: Path) => filter.call(x).booleanValue()
+ def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
}
@@ -625,7 +625,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
- def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
+ def stop(stopSparkContext: Boolean): Unit = ssc.stop(stopSparkContext)
/**
* Stop the execution of the streams.
@@ -633,7 +633,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
- def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
+ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
ssc.stop(stopSparkContext, stopGracefully)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 795c5aa6d5..24f99a2b92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -839,7 +839,7 @@ object DStream {
/** Filtering function that excludes non-user classes for a streaming application */
def streamingExclustionFunction(className: String): Boolean = {
- def doesMatch(r: Regex) = r.findFirstIn(className).isDefined
+ def doesMatch(r: Regex): Boolean = r.findFirstIn(className).isDefined
val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
val isSparkStreamingTestClass = doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 8d0f09933c..583823c90c 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -17,7 +17,7 @@
package org.apache.spark.tools
-import java.lang.reflect.Method
+import java.lang.reflect.{Type, Method}
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
@@ -302,7 +302,7 @@ object JavaAPICompletenessChecker {
private def isExcludedByInterface(method: Method): Boolean = {
val excludedInterfaces =
Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
- def toComparisionKey(method: Method) =
+ def toComparisionKey(method: Method): (Class[_], String, Type) =
(method.getReturnType, method.getName, method.getGenericReturnType)
val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
excludedInterfaces.contains(i.getName)
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 6b666a0384..f2d135397c 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
* Writes simulated shuffle output from several threads and records the observed throughput.
*/
object StoragePerfTester {
- def main(args: Array[String]) = {
+ def main(args: Array[String]): Unit = {
/** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
@@ -58,7 +58,7 @@ object StoragePerfTester {
val sc = new SparkContext("local[4]", "Write Tester", conf)
val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
- def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+ def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = {
val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
@@ -78,7 +78,7 @@ object StoragePerfTester {
val totalBytes = new AtomicLong()
for (task <- 1 to numMaps) {
executor.submit(new Runnable() {
- override def run() = {
+ override def run(): Unit = {
try {
writeOutputBytes(task, totalBytes)
latch.countDown()
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3d18690cd9..455554eea0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -162,7 +162,7 @@ private[spark] class ApplicationMaster(
* status to SUCCEEDED in cluster mode to handle if the user calls System.exit
* from the application code.
*/
- final def getDefaultFinalStatus() = {
+ final def getDefaultFinalStatus(): FinalApplicationStatus = {
if (isClusterMode) {
FinalApplicationStatus.SUCCEEDED
} else {
@@ -175,31 +175,35 @@ private[spark] class ApplicationMaster(
* This means the ResourceManager will not retry the application attempt on your behalf if
* a failure occurred.
*/
- final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
- if (!unregistered) {
- logInfo(s"Unregistering ApplicationMaster with $status" +
- Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
- unregistered = true
- client.unregister(status, Option(diagnostics).getOrElse(""))
+ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
+ synchronized {
+ if (!unregistered) {
+ logInfo(s"Unregistering ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+ unregistered = true
+ client.unregister(status, Option(diagnostics).getOrElse(""))
+ }
}
}
- final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
- if (!finished) {
- val inShutdown = Utils.inShutdown()
- logInfo(s"Final app status: ${status}, exitCode: ${code}" +
- Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
- exitCode = code
- finalStatus = status
- finalMsg = msg
- finished = true
- if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
- logDebug("shutting down reporter thread")
- reporterThread.interrupt()
- }
- if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
- logDebug("shutting down user thread")
- userClassThread.interrupt()
+ final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
+ synchronized {
+ if (!finished) {
+ val inShutdown = Utils.inShutdown()
+ logInfo(s"Final app status: $status, exitCode: $code" +
+ Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+ exitCode = code
+ finalStatus = status
+ finalMsg = msg
+ finished = true
+ if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
+ logDebug("shutting down reporter thread")
+ reporterThread.interrupt()
+ }
+ if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
+ logDebug("shutting down user thread")
+ userClassThread.interrupt()
+ }
}
}
}
@@ -506,7 +510,7 @@ private[spark] class ApplicationMaster(
private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor {
var driver: ActorSelection = _
- override def preStart() = {
+ override def preStart(): Unit = {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Send a hello message to establish the connection, after which
@@ -520,7 +524,7 @@ private[spark] class ApplicationMaster(
}
}
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
// In cluster mode, do not rely on the disassociated event to exit
@@ -567,7 +571,7 @@ object ApplicationMaster extends Logging {
private var master: ApplicationMaster = _
- def main(args: Array[String]) = {
+ def main(args: Array[String]): Unit = {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
@@ -576,11 +580,11 @@ object ApplicationMaster extends Logging {
}
}
- private[spark] def sparkContextInitialized(sc: SparkContext) = {
+ private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
master.sparkContextInitialized(sc)
}
- private[spark] def sparkContextStopped(sc: SparkContext) = {
+ private[spark] def sparkContextStopped(sc: SparkContext): Boolean = {
master.sparkContextStopped(sc)
}
@@ -592,7 +596,7 @@ object ApplicationMaster extends Logging {
*/
object ExecutorLauncher {
- def main(args: Array[String]) = {
+ def main(args: Array[String]): Unit = {
ApplicationMaster.main(args)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index c1d3f7320f..1ce10d906a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -59,15 +59,15 @@ class ExecutorRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
- def run = {
+ override def run(): Unit = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.start()
- startContainer
+ startContainer()
}
- def startContainer = {
+ def startContainer(): java.util.Map[String, ByteBuffer] = {
logInfo("Setting up ContainerLaunchContext")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])