From cbfbc01938f4be051f6c5ae98874e735b1780925 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 11 Jan 2014 16:22:45 +0800 Subject: Fix configure didn't work small problem in ALS --- .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 8b27ecf82c..89ee07063d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -22,7 +22,7 @@ import scala.util.Random import scala.util.Sorting import org.apache.spark.broadcast.Broadcast -import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext} +import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator @@ -578,12 +578,13 @@ object ALS { val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false val alpha = if (args.length >= 8) args(7).toDouble else 1 val blocks = if (args.length == 9) args(8).toInt else -1 - val sc = new SparkContext(master, "ALS") - sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - sc.conf.set("spark.kryo.registrator", classOf[ALSRegistrator].getName) - sc.conf.set("spark.kryo.referenceTracking", "false") - sc.conf.set("spark.kryoserializer.buffer.mb", "8") - sc.conf.set("spark.locality.wait", "10000") + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) + .set("spark.kryo.referenceTracking", "false") + .set("spark.kryoserializer.buffer.mb", "8") + .set("spark.locality.wait", "10000") + val sc = new SparkContext(master, "ALS", conf) val ratings = sc.textFile(ratingsFile).map { line => val fields = line.split(',') -- cgit v1.2.3 From 9a0dfdf868187fb9a2e1656e4cf5f29d952ce5db Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 9 Jan 2014 23:55:06 -0800 Subject: Add Naive Bayes to Python MLlib, and some API fixes - Added a Python wrapper for Naive Bayes - Updated the Scala Naive Bayes to match the style of our other algorithms better and in particular make it easier to call from Java (added builder pattern, removed default value in train method) - Updated Python MLlib functions to not require a SparkContext; we can get that from the RDD the user gives - Added a toString method in LabeledPoint - Made the Python MLlib tests run as part of run-tests as well (before they could only be run individually through each file) --- docs/mllib-guide.md | 10 ++-- .../spark/mllib/api/python/PythonMLLibAPI.scala | 17 ++++++ .../spark/mllib/classification/NaiveBayes.scala | 44 ++++++++++++--- .../spark/mllib/regression/LabeledPoint.scala | 6 +- python/pyspark/mllib/_common.py | 2 +- python/pyspark/mllib/classification.py | 65 ++++++++++++++++++++-- python/pyspark/mllib/clustering.py | 11 ++-- python/pyspark/mllib/recommendation.py | 10 ++-- python/pyspark/mllib/regression.py | 17 +++--- python/run-tests | 5 ++ 10 files changed, 150 insertions(+), 37 deletions(-) (limited to 'mllib') diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 45ee166688..c977bc4f35 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -330,7 +330,7 @@ from numpy import array # Load and parse the data data = sc.textFile("mllib/data/sample_svm_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) -model = LogisticRegressionWithSGD.train(sc, parsedData) +model = LogisticRegressionWithSGD.train(parsedData) # Build the model labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), @@ -356,7 +356,7 @@ data = sc.textFile("mllib/data/ridge-data/lpsa.data") parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) # Build the model -model = LinearRegressionWithSGD.train(sc, parsedData) +model = LinearRegressionWithSGD.train(parsedData) # Evaluate the model on training data valuesAndPreds = parsedData.map(lambda point: (point.item(0), @@ -382,7 +382,7 @@ data = sc.textFile("kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # Build the model (cluster the data) -clusters = KMeans.train(sc, parsedData, 2, maxIterations=10, +clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=30, initialization_mode="random") # Evaluate clustering by computing Within Set Sum of Squared Errors @@ -411,7 +411,7 @@ data = sc.textFile("mllib/data/als/test.data") ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) # Build the recommendation model using Alternating Least Squares -model = ALS.train(sc, ratings, 1, 20) +model = ALS.train(ratings, 1, 20) # Evaluate the model on training data testdata = ratings.map(lambda p: (int(p[0]), int(p[1]))) @@ -426,5 +426,5 @@ signals), you can use the trainImplicit method to get better results. {% highlight python %} # Build the recommendation model using Alternating Least Squares based on implicit ratings -model = ALS.trainImplicit(sc, ratings, 1, 20) +model = ALS.trainImplicit(ratings, 1, 20) {% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2d8623392e..7ca2cb2a08 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -184,6 +184,23 @@ class PythonMLLibAPI extends Serializable { dataBytesJRDD, initialWeightsBA) } + /** + * Java stub for NaiveBayes.train() + */ + def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double) + : java.util.List[java.lang.Object] = + { + val data = dataBytesJRDD.rdd.map(xBytes => { + val x = deserializeDoubleVector(xBytes) + LabeledPoint(x(0), x.slice(1, x.length)) + }) + val model = NaiveBayes.train(data, lambda) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(serializeDoubleVector(model.pi)) + ret.add(serializeDoubleMatrix(model.theta)) + ret + } + /** * Java stub for Python mllib KMeans.train() */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 524300d6ae..f45802cd0b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -29,9 +29,9 @@ import org.apache.spark.rdd.RDD * Model for Naive Bayes Classifiers. * * @param pi Log of class priors, whose dimension is C. - * @param theta Log of class conditional probabilities, whose dimension is CXD. + * @param theta Log of class conditional probabilities, whose dimension is CxD. */ -class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) +class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions @@ -50,10 +50,21 @@ class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * - * @param lambda The smooth parameter + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). */ -class NaiveBayes private (val lambda: Double = 1.0) - extends Serializable with Logging { +class NaiveBayes private (var lambda: Double) + extends Serializable with Logging +{ + def this() = this(1.0) + + /** Set the smoothing parameter. Default: 1.0. */ + def setLambda(lambda: Double): NaiveBayes = { + this.lambda = lambda + this + } /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. @@ -106,14 +117,31 @@ object NaiveBayes { * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for - * document classification. By making every vector a 0-1 vector. it can also be used as + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * This version of the method uses a default smoothing parameter of 1.0. + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + */ + def train(input: RDD[LabeledPoint]): NaiveBayesModel = { + new NaiveBayes().run(input) + } + + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. - * @param lambda The smooth parameter + * @param lambda The smoothing parameter */ - def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 63240e24dc..1a18292fe3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -23,4 +23,8 @@ package org.apache.spark.mllib.regression * @param label Label for this data point. * @param features List of features for this data point. */ -case class LabeledPoint(val label: Double, val features: Array[Double]) +case class LabeledPoint(label: Double, features: Array[Double]) { + override def toString: String = { + "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]")) + } +} diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 769d88dfb9..20a0e309d1 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -16,7 +16,7 @@ # from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape -from pyspark import SparkContext +from pyspark import SparkContext, RDD from pyspark.serializers import Serializer import struct diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 70de332d34..03ff5a572e 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -15,6 +15,8 @@ # limitations under the License. # +import numpy + from numpy import array, dot, shape from pyspark import SparkContext from pyspark.mllib._common import \ @@ -29,8 +31,8 @@ class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data)) - >>> lrm.predict(array([1.0])) != None + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) + >>> lrm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -41,9 +43,10 @@ class LogisticRegressionModel(LinearModel): class LogisticRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, + def train(cls, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a logistic regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, iterations, step, mini_batch_fraction, i), @@ -53,8 +56,8 @@ class SVMModel(LinearModel): """A support vector machine. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> svm = SVMWithSGD.train(sc, sc.parallelize(data)) - >>> svm.predict(array([1.0])) != None + >>> svm = SVMWithSGD.train(sc.parallelize(data)) + >>> svm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -64,14 +67,64 @@ class SVMModel(LinearModel): class SVMWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a support vector machine on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), SVMModel, data, initial_weights) +class NaiveBayesModel(object): + """ + Model for Naive Bayes classifiers. + + Contains two parameters: + - pi: vector of logs of class priors (dimension C) + - theta: matrix of logs of class conditional probabilities (CxD) + + >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) + >>> model = NaiveBayes.train(sc.parallelize(data)) + >>> model.predict(array([0.0, 1.0])) + 0 + >>> model.predict(array([1.0, 0.0])) + 1 + """ + + def __init__(self, pi, theta): + self.pi = pi + self.theta = theta + + def predict(self, x): + """Return the most likely class for a data vector x""" + return numpy.argmax(self.pi + dot(x, self.theta)) + +class NaiveBayes(object): + @classmethod + def train(cls, data, lambda_=1.0): + """ + Train a Naive Bayes model given an RDD of (label, features) vectors. + + This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can + handle all kinds of discrete data. For example, by converting + documents into TF-IDF vectors, it can be used for document + classification. By making every vector a 0-1 vector, it can also be + used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). + + @param data: RDD of NumPy vectors, one per element, where the first + coordinate is the label and the rest is the feature vector + (e.g. a count vector). + @param lambda_: The smoothing parameter + """ + sc = data.context + dataBytes = _get_unmangled_double_vector_rdd(data) + ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) + return NaiveBayesModel( + _deserialize_double_vector(ans[0]), + _deserialize_double_matrix(ans[1])) + + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8cf20e591a..30862918c3 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,12 +28,12 @@ class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random") + >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0])) True >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0])) True - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2) + >>> clusters = KMeans.train(sc.parallelize(data), 2) """ def __init__(self, centers_): self.centers = centers_ @@ -52,12 +52,13 @@ class KMeansModel(object): class KMeans(object): @classmethod - def train(cls, sc, data, k, maxIterations=100, runs=1, - initialization_mode="k-means||"): + def train(cls, data, k, maxIterations=100, runs=1, + initializationMode="k-means||"): """Train a k-means clustering model.""" + sc = data.context dataBytes = _get_unmangled_double_vector_rdd(data) ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, - k, maxIterations, runs, initialization_mode) + k, maxIterations, runs, initializationMode) if len(ans) != 1: raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 0eeb5bb66b..f4a83f0209 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -32,11 +32,11 @@ class MatrixFactorizationModel(object): >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> ratings = sc.parallelize([r1, r2, r3]) - >>> model = ALS.trainImplicit(sc, ratings, 1) + >>> model = ALS.trainImplicit(ratings, 1) >>> model.predict(2,2) is not None True >>> testset = sc.parallelize([(1, 2), (1, 1)]) - >>> model.predictAll(testset).count == 2 + >>> model.predictAll(testset).count() == 2 True """ @@ -57,14 +57,16 @@ class MatrixFactorizationModel(object): class ALS(object): @classmethod - def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks) return MatrixFactorizationModel(sc, mod) @classmethod - def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index a3a68b29e0..e90b72893f 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -47,14 +47,15 @@ class LinearRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ class LinearRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, + def train(cls, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a linear regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( d._jrdd, iterations, step, mini_batch_fraction, i), @@ -65,14 +66,15 @@ class LassoModel(LinearRegressionModelBase): l_1 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LassoWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ - + class LassoWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a Lasso regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), @@ -83,14 +85,15 @@ class RidgeRegressionModel(LinearRegressionModelBase): l_2 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ class RidgeRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a ridge regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), diff --git a/python/run-tests b/python/run-tests index feba97cee0..a986ac9380 100755 --- a/python/run-tests +++ b/python/run-tests @@ -40,6 +40,11 @@ run_test "-m doctest pyspark/broadcast.py" run_test "-m doctest pyspark/accumulators.py" run_test "-m doctest pyspark/serializers.py" run_test "pyspark/tests.py" +run_test "pyspark/mllib/_common.py" +run_test "pyspark/mllib/classification.py" +run_test "pyspark/mllib/clustering.py" +run_test "pyspark/mllib/recommendation.py" +run_test "pyspark/mllib/regression.py" if [[ $FAILED != 0 ]]; then echo -en "\033[31m" # Red -- cgit v1.2.3 From f00e949f84df949fbe32c254b592a580b4623811 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 10 Jan 2014 16:08:35 -0800 Subject: Added Java unit test, data, and main method for Naive Bayes Also fixes mains of a few other algorithms to print the final model --- mllib/data/sample_naive_bayes_data.txt | 6 ++ .../mllib/classification/LogisticRegression.scala | 4 +- .../spark/mllib/classification/NaiveBayes.scala | 21 ++++++- .../apache/spark/mllib/classification/SVM.scala | 2 + .../org/apache/spark/mllib/regression/Lasso.scala | 4 +- .../spark/mllib/regression/LinearRegression.scala | 2 + .../spark/mllib/regression/RidgeRegression.scala | 4 +- .../mllib/classification/JavaNaiveBayesSuite.java | 72 ++++++++++++++++++++++ 8 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 mllib/data/sample_naive_bayes_data.txt create mode 100644 mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java (limited to 'mllib') diff --git a/mllib/data/sample_naive_bayes_data.txt b/mllib/data/sample_naive_bayes_data.txt new file mode 100644 index 0000000000..f874adbaf4 --- /dev/null +++ b/mllib/data/sample_naive_bayes_data.txt @@ -0,0 +1,6 @@ +0, 1 0 0 +0, 2 0 0 +1, 0 1 0 +1, 0 2 0 +2, 0 0 1 +2, 0 0 2 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 50aede9c07..a481f52276 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -97,7 +97,7 @@ object LogisticRegressionWithSGD { * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -183,6 +183,8 @@ object LogisticRegressionWithSGD { val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f45802cd0b..6539b2f339 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -21,9 +21,10 @@ import scala.collection.mutable import org.jblas.DoubleMatrix -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.util.MLUtils /** * Model for Naive Bayes Classifiers. @@ -144,4 +145,22 @@ object NaiveBayes { def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } + + def main(args: Array[String]) { + if (args.length != 2 && args.length != 3) { + println("Usage: NaiveBayes []") + System.exit(1) + } + val sc = new SparkContext(args(0), "NaiveBayes") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = if (args.length == 2) { + NaiveBayes.train(data) + } else { + NaiveBayes.train(data, args(2).toDouble) + } + println("Pi: " + model.pi.mkString("[", ", ", "]")) + println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]")) + + sc.stop() + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index 3b8f8550d0..f2964ea446 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -183,6 +183,8 @@ object SVMWithSGD { val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)) val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala index d959695325..7c41793722 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala @@ -121,7 +121,7 @@ object LassoWithSGD { * @param stepSize Step size to be used for each iteration of gradient descent. * @param regParam Regularization parameter. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -205,6 +205,8 @@ object LassoWithSGD { val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index 597d55e0bb..fe5cce064b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -162,6 +162,8 @@ object LinearRegressionWithSGD { val sc = new SparkContext(args(0), "LinearRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala index b29508d2b9..c125c6797a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -122,7 +122,7 @@ object RidgeRegressionWithSGD { * @param stepSize Step size to be used for each iteration of gradient descent. * @param regParam Regularization parameter. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -208,6 +208,8 @@ object RidgeRegressionWithSGD { val data = MLUtils.loadLabeledData(sc, args(1)) val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java new file mode 100644 index 0000000000..23ea3548b9 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -0,0 +1,72 @@ +package org.apache.spark.mllib.classification; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +public class JavaNaiveBayesSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaNaiveBayesSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + private static final List POINTS = Arrays.asList( + new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}), + new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}), + new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}), + new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}), + new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}), + new LabeledPoint(2, new double[] {0.0, 0.0, 2.0}) + ); + + private int validatePrediction(List points, NaiveBayesModel model) { + int correct = 0; + for (LabeledPoint p: points) { + if (model.predict(p.features()) == p.label()) { + correct += 1; + } + } + return correct; + } + + @Test + public void runUsingConstructor() { + JavaRDD testRDD = sc.parallelize(POINTS, 2).cache(); + + NaiveBayes nb = new NaiveBayes().setLambda(1.0); + NaiveBayesModel model = nb.run(testRDD.rdd()); + + int numAccurate = validatePrediction(POINTS, model); + Assert.assertEquals(POINTS.size(), numAccurate); + } + + @Test + public void runUsingStaticMethods() { + JavaRDD testRDD = sc.parallelize(POINTS, 2).cache(); + + NaiveBayesModel model1 = NaiveBayes.train(testRDD.rdd()); + int numAccurate1 = validatePrediction(POINTS, model1); + Assert.assertEquals(POINTS.size(), numAccurate1); + + NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5); + int numAccurate2 = validatePrediction(POINTS, model2); + Assert.assertEquals(POINTS.size(), numAccurate2); + } +} -- cgit v1.2.3 From 93a65e5fde64ffed3dbd2a050c1007e077ecd004 Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Sun, 12 Jan 2014 10:30:04 -0800 Subject: Remove simple redundant return statement for Scala methods/functions: -) Only change simple return statements at the end of method -) Ignore the complex if-else check -) Ignore the ones inside synchronized --- .../main/scala/org/apache/spark/Accumulators.scala | 2 +- .../main/scala/org/apache/spark/CacheManager.scala | 4 +-- .../scala/org/apache/spark/HttpFileServer.scala | 6 ++--- core/src/main/scala/org/apache/spark/Logging.scala | 2 +- .../scala/org/apache/spark/MapOutputTracker.scala | 4 +-- .../main/scala/org/apache/spark/Partitioner.scala | 4 +-- .../main/scala/org/apache/spark/SparkContext.scala | 11 +++++--- .../scala/org/apache/spark/SparkHadoopWriter.scala | 13 +++++----- .../org/apache/spark/api/python/PythonRDD.scala | 2 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 6 ++--- .../org/apache/spark/network/BufferMessage.scala | 2 +- .../org/apache/spark/network/Connection.scala | 6 ++--- .../scala/org/apache/spark/network/Message.scala | 6 ++--- .../apache/spark/network/netty/ShuffleSender.scala | 2 +- .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 4 +-- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +++--- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +-- .../apache/spark/scheduler/InputFormatInfo.scala | 8 +++--- .../scala/org/apache/spark/scheduler/Pool.scala | 8 +++--- .../spark/scheduler/SchedulingAlgorithm.scala | 11 ++++---- .../apache/spark/scheduler/SparkListenerBus.scala | 2 +- .../scala/org/apache/spark/scheduler/Stage.scala | 2 +- .../org/apache/spark/scheduler/TaskResult.scala | 2 +- .../apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 14 +++++------ .../mesos/CoarseMesosSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosSchedulerBackend.scala | 6 ++--- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../apache/spark/storage/BlockManagerWorker.scala | 14 +++++------ .../org/apache/spark/storage/BlockMessage.scala | 2 +- .../apache/spark/storage/BlockMessageArray.scala | 2 +- .../org/apache/spark/storage/MemoryStore.scala | 2 +- .../org/apache/spark/storage/StorageLevel.scala | 2 +- .../org/apache/spark/util/AppendOnlyMap.scala | 2 +- .../org/apache/spark/util/ClosureCleaner.scala | 10 ++++---- .../org/apache/spark/util/SizeEstimator.scala | 10 ++++---- .../main/scala/org/apache/spark/util/Utils.scala | 19 +++++++------- .../main/scala/org/apache/spark/util/Vector.scala | 12 ++++----- .../spark/scheduler/ClusterSchedulerSuite.scala | 9 ++++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../apache/spark/scheduler/JobLoggerSuite.scala | 2 +- .../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++------ .../scala/org/apache/spark/examples/LocalALS.scala | 8 +++--- .../org/apache/spark/examples/LocalFileLR.scala | 2 +- .../org/apache/spark/examples/LocalKMeans.scala | 2 +- .../scala/org/apache/spark/examples/SparkALS.scala | 6 ++--- .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- .../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----- .../examples/clickstream/PageViewGenerator.scala | 2 +- .../spark/mllib/api/python/PythonMLLibAPI.scala | 29 +++++++++++----------- .../org/apache/spark/streaming/Checkpoint.scala | 2 +- .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../spark/streaming/dstream/StateDStream.scala | 8 +++--- .../streaming/scheduler/StreamingListenerBus.scala | 2 +- .../org/apache/spark/streaming/util/Clock.scala | 4 +-- .../spark/streaming/util/RawTextHelper.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 9 ++++--- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 8 +++--- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 7 +++--- .../yarn/ClientDistributedCacheManager.scala | 10 ++++---- .../yarn/ClientDistributedCacheManagerSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 5 ++-- 63 files changed, 187 insertions(+), 186 deletions(-) (limited to 'mllib') diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 5f73d234aa..e89ac28b8e 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -218,7 +218,7 @@ private object Accumulators { def newId: Long = synchronized { lastId += 1 - return lastId + lastId } def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 519ecde50a..8e5dd8a850 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(values) => // Partition is already materialized, so just return its values - return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => // Mark the split as loading (unless someone else marks it first) @@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, tellMaster = true) - return elements.iterator.asInstanceOf[Iterator[T]] + elements.iterator.asInstanceOf[Iterator[T]] } finally { loading.synchronized { loading.remove(key) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index ad1ee20045..a885898ad4 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging { def addFile(file: File) : String = { addFileToDir(file, fileDir) - return serverUri + "/files/" + file.getName + serverUri + "/files/" + file.getName } def addJar(file: File) : String = { addFileToDir(file, jarDir) - return serverUri + "/jars/" + file.getName + serverUri + "/jars/" + file.getName } def addFileToDir(file: File, dir: File) : String = { Files.copy(file, new File(dir, file.getName)) - return dir + "/" + file.getName + dir + "/" + file.getName } } diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 4a34989e50..9063cae87e 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -41,7 +41,7 @@ trait Logging { } log_ = LoggerFactory.getLogger(className) } - return log_ + log_ } // Log methods that take only a String diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 77b8ca1cce..57bdf22b9c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } } - else{ + else { throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing all output locations for shuffle " + shuffleId)) } @@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { - // Cache a serialized version of the output statuses for each shuffle to send them out faster + // Cache a serialized version of the output statuses for each shuffle to send them out faster return private var cacheEpoch = epoch private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]] diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9b043f06dd..fc0a749882 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -53,9 +53,9 @@ object Partitioner { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { - return new HashPartitioner(rdd.context.defaultParallelism) + new HashPartitioner(rdd.context.defaultParallelism) } else { - return new HashPartitioner(bySize.head.partitions.size) + new HashPartitioner(bySize.head.partitions.size) } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f91392b351..3d82bfc019 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -756,8 +756,11 @@ class SparkContext( private[spark] def getCallSite(): String = { val callSite = getLocalProperty("externalCallSite") - if (callSite == null) return Utils.formatSparkCallSite - callSite + if (callSite == null) { + Utils.formatSparkCallSite + } else { + callSite + } } /** @@ -907,7 +910,7 @@ class SparkContext( */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) - return f + f } /** @@ -919,7 +922,7 @@ class SparkContext( val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) - fs.getFileStatus(path).getPath().toString + fs.getFileStatus(path).getPath.toString } } diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 618d95015f..bba873a0b6 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf) format = conf.value.getOutputFormat() .asInstanceOf[OutputFormat[AnyRef,AnyRef]] } - return format + format } private def getOutputCommitter(): OutputCommitter = { if (committer == null) { committer = conf.value.getOutputCommitter } - return committer + committer } private def getJobContext(): JobContext = { if (jobContext == null) { jobContext = newJobContext(conf.value, jID.value) } - return jobContext + jobContext } private def getTaskContext(): TaskAttemptContext = { if (taskContext == null) { taskContext = newTaskAttemptContext(conf.value, taID.value) } - return taskContext + taskContext } private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { @@ -182,7 +182,7 @@ object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) - return new JobID(jobtrackerID, id) + new JobID(jobtrackerID, id) } def createPathFromString(path: String, conf: JobConf): Path = { @@ -194,7 +194,6 @@ object SparkHadoopWriter { if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath = outputPath.makeQualified(fs) - return outputPath + outputPath.makeQualified(fs) } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 40c519b5bd..8830de7273 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) - return new Iterator[Array[Byte]] { + new Iterator[Array[Byte]] { def next(): Array[Byte] = { val obj = _nextObj if (hasNext) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index fdf92eca4f..1d295c62bc 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -203,16 +203,16 @@ extends Logging { } bais.close() - var tInfo = TorrentInfo(retVal, blockNum, byteArray.length) + val tInfo = TorrentInfo(retVal, blockNum, byteArray.length) tInfo.hasBlocks = blockNum - return tInfo + tInfo } def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock], totalBytes: Int, totalBlocks: Int): T = { - var retByteArray = new Array[Byte](totalBytes) + val retByteArray = new Array[Byte](totalBytes) for (i <- 0 until totalBlocks) { System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray, i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length) diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala index f736bb3713..fb4c65909a 100644 --- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala +++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala @@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: throw new Exception("Max chunk size is " + maxChunkSize) } - if (size == 0 && gotChunkForSendingOnce == false) { + if (size == 0 && !gotChunkForSendingOnce) { val newChunk = new MessageChunk( new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null) gotChunkForSendingOnce = true diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 95cb0206ac..cba8477ed5 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // Is highly unlikely unless there was an unclean close of socket, etc registerInterest() logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") - return true + true } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) @@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } // should not happen - to keep scala compiler happy - return true + true } // This is a hack to determine if remote socket was closed or not. @@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S } } // should not happen - to keep scala compiler happy - return true + true } def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback} diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala index f2ecc6d439..2612884bdb 100644 --- a/core/src/main/scala/org/apache/spark/network/Message.scala +++ b/core/src/main/scala/org/apache/spark/network/Message.scala @@ -61,7 +61,7 @@ private[spark] object Message { if (dataBuffers.exists(_ == null)) { throw new Exception("Attempting to create buffer message with null buffer") } - return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId) + new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId) } def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage = @@ -69,9 +69,9 @@ private[spark] object Message { def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = { if (dataBuffer == null) { - return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId) + createBufferMessage(Array(ByteBuffer.allocate(0)), ackId) } else { - return createBufferMessage(Array(dataBuffer), ackId) + createBufferMessage(Array(dataBuffer), ackId) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 546d921067..44204a8c46 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -64,7 +64,7 @@ private[spark] object ShuffleSender { val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val file = new File(subDir, blockId.name) - return new FileSegment(file, 0, file.length()) + new FileSegment(file, 0, file.length()) } } val sender = new ShuffleSender(port, pResovler) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 98da35763b..a5394a28e0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -296,9 +296,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val prefPartActual = prefPart.get if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows - return minPowerOfTwo // prefer balance over locality + minPowerOfTwo // prefer balance over locality else { - return prefPartActual // prefer locality over balance + prefPartActual // prefer locality over balance } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 53f77a38f5..20db7db5ed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -99,11 +99,11 @@ class HadoopRDD[K, V]( val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - return conf.asInstanceOf[JobConf] + conf.asInstanceOf[JobConf] } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. - return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). @@ -111,7 +111,7 @@ class HadoopRDD[K, V]( val newJobConf = new JobConf(broadcastedConf.value.value) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - return newJobConf + newJobConf } } @@ -127,7 +127,7 @@ class HadoopRDD[K, V]( newInputFormat.asInstanceOf[Configurable].setConf(conf) } HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) - return newInputFormat + newInputFormat } override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 1dbbe39898..d4f396afb5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val lines = Source.fromInputStream(proc.getInputStream).getLines - return new Iterator[String] { + new Iterator[String] { def next() = lines.next() def hasNext = { if (lines.hasNext) { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f9dc12eee3..edd4f381db 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag]( val entry = iter.next() m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue) } - return m1 + m1 } val myResult = mapPartitions(countPartition).reduce(mergeMaps) myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map @@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag]( partsScanned += numPartsToTry } - return buf.toArray + buf.toArray } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 90eb8a747f..cc10cc0849 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split) } - return retval.toSet + retval.toSet } // This method does not expect failures, since validate has already passed ... @@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem) ) - return retval.toSet + retval.toSet } private def findPreferredLocations(): Set[SplitInfo] = { logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat + ", inputFormatClazz : " + inputFormatClazz) if (mapreduceInputFormat) { - return prefLocsFromMapreduceInputFormat() + prefLocsFromMapreduceInputFormat() } else { assert(mapredInputFormat) - return prefLocsFromMapredInputFormat() + prefLocsFromMapredInputFormat() } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 1791242215..4bc13c23d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -75,12 +75,12 @@ private[spark] class Pool( return schedulableNameToSchedulable(schedulableName) } for (schedulable <- schedulableQueue) { - var sched = schedulable.getSchedulableByName(schedulableName) + val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched } } - return null + null } override def executorLost(executorId: String, host: String) { @@ -92,7 +92,7 @@ private[spark] class Pool( for (schedulable <- schedulableQueue) { shouldRevive |= schedulable.checkSpeculatableTasks() } - return shouldRevive + shouldRevive } override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { @@ -101,7 +101,7 @@ private[spark] class Pool( for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() } - return sortedTaskSetQueue + sortedTaskSetQueue } def increaseRunningTasks(taskNum: Int) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 3418640b8c..5e62c8468f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -37,9 +37,9 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { res = math.signum(stageId1 - stageId2) } if (res < 0) { - return true + true } else { - return false + false } } } @@ -56,7 +56,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble - var res:Boolean = true var compare:Int = 0 if (s1Needy && !s2Needy) { @@ -70,11 +69,11 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { } if (compare < 0) { - return true + true } else if (compare > 0) { - return false + false } else { - return s1.name < s2.name + s1.name < s2.name } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e7defd768b..e551c11f72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -88,6 +88,6 @@ private[spark] class SparkListenerBus() extends Logging { * add overhead in the general case. */ Thread.sleep(10) } - return true + true } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 7cb3fe46e5..c60e9896de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -96,7 +96,7 @@ private[spark] class Stage( def newAttemptId(): Int = { val id = nextAttemptId nextAttemptId += 1 - return id + id } val name = callSite.getOrElse(rdd.origin) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index e80cc6b0f6..9d3e615826 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -74,6 +74,6 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long def value(): T = { val resultSer = SparkEnv.get.serializer.newInstance() - return resultSer.deserialize(valueBytes) + resultSer.deserialize(valueBytes) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index c52d6175d2..35e9544718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -37,7 +37,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { - return sparkEnv.closureSerializer.newInstance() + sparkEnv.closureSerializer.newInstance() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a10e5397ad..fc0ee07089 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -228,7 +228,7 @@ private[spark] class TaskSetManager( return Some(index) } } - return None + None } /** Check whether a task is currently running an attempt on a given host */ @@ -291,7 +291,7 @@ private[spark] class TaskSetManager( } } - return None + None } /** @@ -332,7 +332,7 @@ private[spark] class TaskSetManager( } // Finally, if all else has failed, find a speculative task - return findSpeculativeTask(execId, host, locality) + findSpeculativeTask(execId, host, locality) } /** @@ -387,7 +387,7 @@ private[spark] class TaskSetManager( case _ => } } - return None + None } /** @@ -584,7 +584,7 @@ private[spark] class TaskSetManager( } override def getSchedulableByName(name: String): Schedulable = { - return null + null } override def addSchedulable(schedulable: Schedulable) {} @@ -594,7 +594,7 @@ private[spark] class TaskSetManager( override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this) sortedTaskSetQueue += this - return sortedTaskSetQueue + sortedTaskSetQueue } /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */ @@ -669,7 +669,7 @@ private[spark] class TaskSetManager( } } } - return foundTasks + foundTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index e16d60c54c..c27049bdb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -140,7 +140,7 @@ private[spark] class CoarseMesosSchedulerBackend( .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } - return command.build() + command.build() } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b428c82a48..49781485d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -141,13 +141,13 @@ private[spark] class MesosSchedulerBackend( // Serialize the map as an array of (String, String) pairs execArgs = Utils.serialize(props.toArray) } - return execArgs + execArgs } private def setClassLoader(): ClassLoader = { val oldClassLoader = Thread.currentThread.getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) - return oldClassLoader + oldClassLoader } private def restoreClassLoader(oldClassLoader: ClassLoader) { @@ -255,7 +255,7 @@ private[spark] class MesosSchedulerBackend( .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(1).build()) .build() - return MesosTaskInfo.newBuilder() + MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setExecutor(createExecutorInfo(slaveId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c56e2ca2df..a716b1d577 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -412,7 +412,7 @@ private[spark] class BlockManager( logDebug("The value of block " + blockId + " is null") } logDebug("Block " + blockId + " not found") - return None + None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 21f003609b..a36abe0670 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -42,7 +42,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) - return Some(new BlockMessageArray(responseMessages).toBufferMessage) + Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) return None @@ -50,7 +50,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends } case otherMessage: Any => { logError("Unknown type message received: " + otherMessage) - return None + None } } } @@ -61,7 +61,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) - return None + None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) @@ -70,9 +70,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends if (buffer == null) { return None } - return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) + Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) } - case _ => return None + case _ => None } } @@ -93,7 +93,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends } logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs) + " and got buffer " + buffer) - return buffer + buffer } } @@ -132,6 +132,6 @@ private[spark] object BlockManagerWorker extends Logging { } case None => logDebug("No response message received"); return null } - return null + null } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala index 80dcb5a207..fbafcf79d2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala @@ -154,7 +154,7 @@ private[spark] class BlockMessage() { println() */ val finishTime = System.currentTimeMillis - return Message.createBufferMessage(buffers) + Message.createBufferMessage(buffers) } override def toString: String = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index a06f50a0ac..59329361f3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -96,7 +96,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM println() println() */ - return Message.createBufferMessage(buffers) + Message.createBufferMessage(buffers) } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 05f676c6e2..27f057b9f2 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -245,7 +245,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return false } } - return true + true } override def contains(blockId: BlockId): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index b5596dffd3..0f84810d6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -74,7 +74,7 @@ class StorageLevel private( if (deserialized_) { ret |= 1 } - return ret + ret } override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala index 8bb4ee3bfa..edfa58b2d9 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala @@ -67,7 +67,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi i += 1 } } - return null.asInstanceOf[V] + null.asInstanceOf[V] } /** Set the value for a key */ diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 7108595e3e..1df6b87fb0 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -61,7 +61,7 @@ private[spark] object ClosureCleaner extends Logging { return f.getType :: Nil // Stop at the first $outer that is not a closure } } - return Nil + Nil } // Get a list of the outer objects for a given closure object. @@ -74,7 +74,7 @@ private[spark] object ClosureCleaner extends Logging { return f.get(obj) :: Nil // Stop at the first $outer that is not a closure } } - return Nil + Nil } private def getInnerClasses(obj: AnyRef): List[Class[_]] = { @@ -174,7 +174,7 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(obj, outer) } - return obj + obj } } } @@ -182,7 +182,7 @@ private[spark] object ClosureCleaner extends Logging { private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new MethodVisitor(ASM4) { + new MethodVisitor(ASM4) { override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) { if (op == GETFIELD) { for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { @@ -215,7 +215,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new MethodVisitor(ASM4) { + new MethodVisitor(ASM4) { override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { val argTypes = Type.getArgumentTypes(desc) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index bddb3bb735..3cf94892e9 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -108,7 +108,7 @@ private[spark] object SizeEstimator extends Logging { val bean = ManagementFactory.newPlatformMXBeanProxy(server, hotSpotMBeanName, hotSpotMBeanClass) // TODO: We could use reflection on the VMOption returned ? - return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") + getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") } catch { case e: Exception => { // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB @@ -141,7 +141,7 @@ private[spark] object SizeEstimator extends Logging { def dequeue(): AnyRef = { val elem = stack.last stack.trimEnd(1) - return elem + elem } } @@ -162,7 +162,7 @@ private[spark] object SizeEstimator extends Logging { while (!state.isFinished) { visitSingleObject(state.dequeue(), state) } - return state.size + state.size } private def visitSingleObject(obj: AnyRef, state: SearchState) { @@ -276,11 +276,11 @@ private[spark] object SizeEstimator extends Logging { // Create and cache a new ClassInfo val newInfo = new ClassInfo(shellSize, pointerFields) classInfos.put(cls, newInfo) - return newInfo + newInfo } private def alignSize(size: Long): Long = { val rem = size % ALIGN_SIZE - return if (rem == 0) size else (size + ALIGN_SIZE - rem) + if (rem == 0) size else (size + ALIGN_SIZE - rem) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f1253100b..f80ed290ab 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -49,14 +49,14 @@ private[spark] object Utils extends Logging { val oos = new ObjectOutputStream(bos) oos.writeObject(o) oos.close() - return bos.toByteArray + bos.toByteArray } /** Deserialize an object using Java serialization */ def deserialize[T](bytes: Array[Byte]): T = { val bis = new ByteArrayInputStream(bytes) val ois = new ObjectInputStream(bis) - return ois.readObject.asInstanceOf[T] + ois.readObject.asInstanceOf[T] } /** Deserialize an object using Java serialization and the given ClassLoader */ @@ -66,7 +66,7 @@ private[spark] object Utils extends Logging { override def resolveClass(desc: ObjectStreamClass) = Class.forName(desc.getName, false, loader) } - return ois.readObject.asInstanceOf[T] + ois.readObject.asInstanceOf[T] } /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ @@ -144,7 +144,7 @@ private[spark] object Utils extends Logging { i += 1 } } - return buf + buf } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() @@ -428,7 +428,7 @@ private[spark] object Utils extends Logging { def parseHostPort(hostPort: String): (String, Int) = { { // Check cache first. - var cached = hostPortParseResults.get(hostPort) + val cached = hostPortParseResults.get(hostPort) if (cached != null) return cached } @@ -731,7 +731,7 @@ private[spark] object Utils extends Logging { } catch { case ise: IllegalStateException => return true } - return false + false } def isSpace(c: Char): Boolean = { @@ -748,7 +748,7 @@ private[spark] object Utils extends Logging { var inWord = false var inSingleQuote = false var inDoubleQuote = false - var curWord = new StringBuilder + val curWord = new StringBuilder def endWord() { buf += curWord.toString curWord.clear() @@ -794,7 +794,7 @@ private[spark] object Utils extends Logging { if (inWord || inDoubleQuote || inSingleQuote) { endWord() } - return buf + buf } /* Calculates 'x' modulo 'mod', takes to consideration sign of x, @@ -822,8 +822,7 @@ private[spark] object Utils extends Logging { /** Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { - return System.getProperties().clone() - .asInstanceOf[java.util.Properties].toMap[String, String] + System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String] } /** diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index fe710c58ac..094edcde7e 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -25,7 +25,7 @@ class Vector(val elements: Array[Double]) extends Serializable { def + (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) + other(i)) + Vector(length, i => this(i) + other(i)) } def add(other: Vector) = this + other @@ -33,7 +33,7 @@ class Vector(val elements: Array[Double]) extends Serializable { def - (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) - other(i)) + Vector(length, i => this(i) - other(i)) } def subtract(other: Vector) = this - other @@ -47,7 +47,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += this(i) * other(i) i += 1 } - return ans + ans } /** @@ -67,7 +67,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += (this(i) + plus(i)) * other(i) i += 1 } - return ans + ans } def += (other: Vector): Vector = { @@ -102,7 +102,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += (this(i) - other(i)) * (this(i) - other(i)) i += 1 } - return ans + ans } def dist(other: Vector): Double = math.sqrt(squaredDist(other)) @@ -117,7 +117,7 @@ object Vector { def apply(length: Int, initializer: Int => Double): Vector = { val elements: Array[Double] = Array.tabulate(length)(initializer) - return new Vector(elements) + new Vector(elements) } def zeros(length: Int) = new Vector(new Array[Double](length)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala index 7bf2020fe3..235d31709a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala @@ -64,7 +64,7 @@ class FakeTaskSetManager( } override def getSchedulableByName(name: String): Schedulable = { - return null + null } override def executorLost(executorId: String, host: String): Unit = { @@ -79,13 +79,14 @@ class FakeTaskSetManager( { if (tasksSuccessful + runningTasks < numTasks) { increaseRunningTasks(1) - return Some(new TaskDescription(0, execId, "task 0:0", 0, null)) + Some(new TaskDescription(0, execId, "task 0:0", 0, null)) + } else { + None } - return None } override def checkSpeculatableTasks(): Boolean = { - return true + true } def taskFinished() { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2aa259daf3..14f89d50b7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont locations: Seq[Seq[String]] = Nil ): MyRDD = { val maxPartition = numPartitions - 1 - return new MyRDD(sc, dependencies) { + new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 5cc48ee00a..3880e68725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -47,7 +47,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers dependencies: List[Dependency[_]] ): MyRDD = { val maxPartition = numPartitions - 1 - return new MyRDD(sc, dependencies) { + new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 0ed366fb70..de4871d043 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -61,8 +61,8 @@ class NonSerializable {} object TestObject { def run(): Int = { var nonSer = new NonSerializable - var x = 5 - return withSpark(new SparkContext("local", "test")) { sc => + val x = 5 + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) } @@ -76,7 +76,7 @@ class TestClass extends Serializable { def run(): Int = { var nonSer = new NonSerializable - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + getX).reduce(_ + _) } @@ -88,7 +88,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + getX).reduce(_ + _) } @@ -103,7 +103,7 @@ class TestClassWithoutFieldAccess { def run(): Int = { var nonSer2 = new NonSerializable var x = 5 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) } @@ -115,7 +115,7 @@ object TestObjectWithNesting { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) var y = 1 for (i <- 1 to 4) { @@ -134,7 +134,7 @@ class TestClassWithNesting(val y: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) for (i <- 1 to 4) { var nonSer2 = new NonSerializable diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 83db8b9e26..c8ecbb8e41 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -43,7 +43,7 @@ object LocalALS { def generateR(): DoubleMatrix2D = { val mh = factory2D.random(M, F) val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) + algebra.mult(mh, algebra.transpose(uh)) } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], @@ -56,7 +56,7 @@ object LocalALS { //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) + sqrt(sumSqs / (M * U)) } def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], @@ -80,7 +80,7 @@ object LocalALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], @@ -104,7 +104,7 @@ object LocalALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index fb130ea198..9ab5f5a486 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -28,7 +28,7 @@ object LocalFileLR { def parsePoint(line: String): DataPoint = { val nums = line.split(' ').map(_.toDouble) - return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index f90ea35cd4..a730464ea1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -55,7 +55,7 @@ object LocalKMeans { } } - return bestIndex + bestIndex } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index 30c86d83e6..17bafc2218 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -44,7 +44,7 @@ object SparkALS { def generateR(): DoubleMatrix2D = { val mh = factory2D.random(M, F) val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) + algebra.mult(mh, algebra.transpose(uh)) } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], @@ -57,7 +57,7 @@ object SparkALS { //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) + sqrt(sumSqs / (M * U)) } def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], @@ -83,7 +83,7 @@ object SparkALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index ff72532db1..39819064ed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -43,7 +43,7 @@ object SparkHdfsLR { while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 } - return DataPoint(new Vector(x), y) + DataPoint(new Vector(x), y) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 8c99025eaa..9fe2465235 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -30,7 +30,7 @@ object SparkKMeans { val rand = new Random(42) def parseVector(line: String): Vector = { - return new Vector(line.split(' ').map(_.toDouble)) + new Vector(line.split(' ').map(_.toDouble)) } def closestPoint(p: Vector, centers: Array[Vector]): Int = { @@ -46,7 +46,7 @@ object SparkKMeans { } } - return bestIndex + bestIndex } def main(args: Array[String]) { @@ -61,15 +61,15 @@ object SparkKMeans { val K = args(2).toInt val convergeDist = args(3).toDouble - var kPoints = data.takeSample(false, K, 42).toArray + val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} + val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() + val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() tempDist = 0.0 for (i <- 0 until K) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 4fe57de4a4..a2600989ca 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -65,7 +65,7 @@ object PageViewGenerator { return item } } - return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 + inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 } def getNextClickEvent() : String = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2d8623392e..c972a71349 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -48,7 +48,7 @@ class PythonMLLibAPI extends Serializable { val db = bb.asDoubleBuffer() val ans = new Array[Double](length.toInt) db.get(ans) - return ans + ans } private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = { @@ -60,7 +60,7 @@ class PythonMLLibAPI extends Serializable { bb.putLong(len) val db = bb.asDoubleBuffer() db.put(doubles) - return bytes + bytes } private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = { @@ -86,7 +86,7 @@ class PythonMLLibAPI extends Serializable { ans(i) = new Array[Double](cols.toInt) db.get(ans(i)) } - return ans + ans } private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = { @@ -102,11 +102,10 @@ class PythonMLLibAPI extends Serializable { bb.putLong(rows) bb.putLong(cols) val db = bb.asDoubleBuffer() - var i = 0 for (i <- 0 until rows) { db.put(doubles(i)) } - return bytes + bytes } private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel, @@ -121,7 +120,7 @@ class PythonMLLibAPI extends Serializable { val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleVector(model.weights)) ret.add(model.intercept: java.lang.Double) - return ret + ret } /** @@ -130,7 +129,7 @@ class PythonMLLibAPI extends Serializable { def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LinearRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -142,7 +141,7 @@ class PythonMLLibAPI extends Serializable { def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LassoWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -154,7 +153,7 @@ class PythonMLLibAPI extends Serializable { def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -166,7 +165,7 @@ class PythonMLLibAPI extends Serializable { def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => SVMWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -178,7 +177,7 @@ class PythonMLLibAPI extends Serializable { def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LogisticRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -194,7 +193,7 @@ class PythonMLLibAPI extends Serializable { val model = KMeans.train(data, k, maxIterations, runs, initializationMode) val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleMatrix(model.clusterCenters)) - return ret + ret } /** Unpack a Rating object from an array of bytes */ @@ -204,7 +203,7 @@ class PythonMLLibAPI extends Serializable { val user = bb.getInt() val product = bb.getInt() val rating = bb.getDouble() - return new Rating(user, product, rating) + new Rating(user, product, rating) } /** Unpack a tuple of Ints from an array of bytes */ @@ -245,7 +244,7 @@ class PythonMLLibAPI extends Serializable { def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - return ALS.train(ratings, rank, iterations, lambda, blocks) + ALS.train(ratings, rank, iterations, lambda, blocks) } /** @@ -257,6 +256,6 @@ class PythonMLLibAPI extends Serializable { def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) + ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index ca0115f90e..ebfb8dba8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -203,6 +203,6 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade } catch { case e: Exception => } - return super.resolveClass(desc) + super.resolveClass(desc) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fb9eda8996..a7ba2339c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -219,7 +219,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas reset() return false } - return true + true } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index e0ff3ccba4..b34ba7b9b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -65,7 +65,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime) - return Some(stateRDD) + Some(stateRDD) } case None => { // If parent RDD does not exist @@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) - return Some(stateRDD) + Some(stateRDD) } } } @@ -98,11 +98,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime + " (first)") - return Some(sessionRDD) + Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! //logDebug("Not generating state RDD (no previous state, no parent)") - return None + None } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 110a20f282..73dc52023a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -76,6 +76,6 @@ private[spark] class StreamingListenerBus() extends Logging { * add overhead in the general case. */ Thread.sleep(10) } - return true + true } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index f67bb2f6ac..c3a849d276 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -66,7 +66,7 @@ class SystemClock() extends Clock { } Thread.sleep(sleepTime) } - return -1 + -1 } } @@ -96,6 +96,6 @@ class ManualClock() extends Clock { this.wait(100) } } - return currentTime() + currentTime() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 4e6ce6eabd..5b6c048a39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -90,7 +90,7 @@ object RawTextHelper { } } } - return taken.toIterator + taken.toIterator } /** diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 23781ea35c..e1fe09e3e2 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -158,7 +158,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) - return appContext + appContext } /** See if two file systems are the same or not. */ @@ -191,9 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ @@ -299,7 +300,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } UserGroupInformation.getCurrentUser().addCredentials(credentials) - return localResources + localResources } def setupLaunchEnv( diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ddfec1a4ac..0138d7ade1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -125,7 +125,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val containerId = ConverterUtils.toContainerId(containerIdString) val appAttemptId = containerId.getApplicationAttemptId() logInfo("ApplicationAttemptId: " + appAttemptId) - return appAttemptId + appAttemptId } private def registerWithResourceManager(): AMRMProtocol = { @@ -133,7 +133,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)) logInfo("Connecting to ResourceManager at " + rmAddress) - return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] + rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { @@ -147,7 +147,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? appMasterRequest.setTrackingUrl("") - return resourceManager.registerApplicationMaster(appMasterRequest) + resourceManager.registerApplicationMaster(appMasterRequest) } private def waitForSparkMaster() { @@ -220,7 +220,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar t.setDaemon(true) t.start() logInfo("Started progress reporter thread - sleep time : " + sleepTime) - return t + t } private def sendProgress() { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 132630e5ef..d32cdcc879 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -195,7 +195,7 @@ class WorkerRunnable( } logInfo("Prepared Local resources " + localResources) - return localResources + localResources } def prepareEnvironment: HashMap[String, String] = { @@ -207,7 +207,7 @@ class WorkerRunnable( Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } - return env + env } def connectToCM: ContainerManager = { @@ -226,8 +226,7 @@ class WorkerRunnable( val proxy = user .doAs(new PrivilegedExceptionAction[ContainerManager] { def run: ContainerManager = { - return rpc.getProxy(classOf[ContainerManager], - cmAddress, conf).asInstanceOf[ContainerManager] + rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } }) proxy diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 5f159b073f..535abbfb7f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -143,7 +143,7 @@ class ClientDistributedCacheManager() extends Logging { if (isPublic(conf, uri, statCache)) { return LocalResourceVisibility.PUBLIC } - return LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PRIVATE } /** @@ -161,7 +161,7 @@ class ClientDistributedCacheManager() extends Logging { if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { return false } - return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) + ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) } /** @@ -183,7 +183,7 @@ class ClientDistributedCacheManager() extends Logging { } current = current.getParent() } - return true + true } /** @@ -203,7 +203,7 @@ class ClientDistributedCacheManager() extends Logging { if (otherAction.implies(action)) { return true } - return false + false } /** @@ -223,6 +223,6 @@ class ClientDistributedCacheManager() extends Logging { statCache.put(uri, newStat) newStat } - return stat + stat } } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index 2941356bc5..458df4fa3c 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -42,7 +42,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { class MockClientDistributedCacheManager extends ClientDistributedCacheManager { override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): LocalResourceVisibility = { - return LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PRIVATE } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index be323d7783..efeee31acd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -205,9 +205,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ -- cgit v1.2.3