aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/mllib-clustering.md3
-rw-r--r--docs/mllib-linear-methods.md9
-rw-r--r--docs/mllib-optimization.md1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala54
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala22
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala11
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala21
7 files changed, 91 insertions, 30 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index dfd9cd5728..d10bd63746 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -52,7 +52,7 @@ import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
-val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
+val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
@@ -100,6 +100,7 @@ public class KMeansExample {
}
}
);
+ parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 9137f9dc1b..d31bec3e1b 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -396,7 +396,7 @@ val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
-}
+}.cache()
// Building the model
val numIterations = 100
@@ -455,6 +455,7 @@ public class LinearRegression {
}
}
);
+ parsedData.cache();
// Building the model
int numIterations = 100;
@@ -470,7 +471,7 @@ public class LinearRegression {
}
}
);
- JavaRDD<Object> MSE = new JavaDoubleRDD(valuesAndPreds.map(
+ double MSE = new JavaDoubleRDD(valuesAndPreds.map(
new Function<Tuple2<Double, Double>, Object>() {
public Object call(Tuple2<Double, Double> pair) {
return Math.pow(pair._1() - pair._2(), 2.0);
@@ -553,8 +554,8 @@ but in practice you will likely want to use unlabeled vectors for test data.
{% highlight scala %}
-val trainingData = ssc.textFileStream('/training/data/dir').map(LabeledPoint.parse)
-val testData = ssc.textFileStream('/testing/data/dir').map(LabeledPoint.parse)
+val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache()
+val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
{% endhighlight %}
diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md
index 26ce5f3c50..45141c235b 100644
--- a/docs/mllib-optimization.md
+++ b/docs/mllib-optimization.md
@@ -217,6 +217,7 @@ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
+import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 9164c294ac..e9f4175858 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -67,11 +67,13 @@ class PythonMLLibAPI extends Serializable {
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
private def trainRegressionModel(
- trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
+ learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
data: JavaRDD[LabeledPoint],
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
val initialWeights = SerDe.loads(initialWeightsBA).asInstanceOf[Vector]
- val model = trainFunc(data.rdd, initialWeights)
+ // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD.
+ learner.disableUncachedWarning()
+ val model = learner.run(data.rdd, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(SerDe.dumps(model.weights))
ret.add(model.intercept: java.lang.Double)
@@ -106,8 +108,7 @@ class PythonMLLibAPI extends Serializable {
+ " Can only be initialized using the following string values: [l1, l2, none].")
}
trainRegressionModel(
- (data, initialWeights) =>
- lrAlg.run(data, initialWeights),
+ lrAlg,
data,
initialWeightsBA)
}
@@ -122,15 +123,14 @@ class PythonMLLibAPI extends Serializable {
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ val lassoAlg = new LassoWithSGD()
+ lassoAlg.optimizer
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setStepSize(stepSize)
+ .setMiniBatchFraction(miniBatchFraction)
trainRegressionModel(
- (data, initialWeights) =>
- LassoWithSGD.train(
- data,
- numIterations,
- stepSize,
- regParam,
- miniBatchFraction,
- initialWeights),
+ lassoAlg,
data,
initialWeightsBA)
}
@@ -145,15 +145,14 @@ class PythonMLLibAPI extends Serializable {
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ val ridgeAlg = new RidgeRegressionWithSGD()
+ ridgeAlg.optimizer
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setStepSize(stepSize)
+ .setMiniBatchFraction(miniBatchFraction)
trainRegressionModel(
- (data, initialWeights) =>
- RidgeRegressionWithSGD.train(
- data,
- numIterations,
- stepSize,
- regParam,
- miniBatchFraction,
- initialWeights),
+ ridgeAlg,
data,
initialWeightsBA)
}
@@ -186,8 +185,7 @@ class PythonMLLibAPI extends Serializable {
+ " Can only be initialized using the following string values: [l1, l2, none].")
}
trainRegressionModel(
- (data, initialWeights) =>
- SVMAlg.run(data, initialWeights),
+ SVMAlg,
data,
initialWeightsBA)
}
@@ -220,8 +218,7 @@ class PythonMLLibAPI extends Serializable {
+ " Can only be initialized using the following string values: [l1, l2, none].")
}
trainRegressionModel(
- (data, initialWeights) =>
- LogRegAlg.run(data, initialWeights),
+ LogRegAlg,
data,
initialWeightsBA)
}
@@ -249,7 +246,14 @@ class PythonMLLibAPI extends Serializable {
maxIterations: Int,
runs: Int,
initializationMode: String): KMeansModel = {
- KMeans.train(data.rdd, k, maxIterations, runs, initializationMode)
+ val kMeansAlg = new KMeans()
+ .setK(k)
+ .setMaxIterations(maxIterations)
+ .setRuns(runs)
+ .setInitializationMode(initializationMode)
+ // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD.
+ .disableUncachedWarning()
+ return kMeansAlg.run(data.rdd)
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index fce8fe29f6..7443f232ec 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.random.XORShiftRandom
/**
@@ -112,11 +113,26 @@ class KMeans private (
this
}
+ /** Whether a warning should be logged if the input RDD is uncached. */
+ private var warnOnUncachedInput = true
+
+ /** Disable warnings about uncached input. */
+ private[spark] def disableUncachedWarning(): this.type = {
+ warnOnUncachedInput = false
+ this
+ }
+
/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Vector]): KMeansModel = {
+
+ if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) {
+ logWarning("The input data is not directly cached, which may hurt performance if its"
+ + " parent RDDs are also uncached.")
+ }
+
// Compute squared norms and cache them.
val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
norms.persist()
@@ -125,6 +141,12 @@ class KMeans private (
}
val model = runBreeze(breezeData)
norms.unpersist()
+
+ // Warn at the end of the run as well, for increased visibility.
+ if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) {
+ logWarning("The input data was not directly cached, which may hurt performance if its"
+ + " parent RDDs are also uncached.")
+ }
model
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 2e414a73be..4174f45d23 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
+import org.apache.spark.storage.StorageLevel
/**
* :: Experimental ::
@@ -231,6 +232,10 @@ class RowMatrix(
val brzSvd.SVD(uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
(sigmaSquaresFull, uFull)
case SVDMode.DistARPACK =>
+ if (rows.getStorageLevel == StorageLevel.NONE) {
+ logWarning("The input data is not directly cached, which may hurt performance if its"
+ + " parent RDDs are also uncached.")
+ }
require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
}
@@ -256,6 +261,12 @@ class RowMatrix(
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
}
+ // Warn at the end of the run as well, for increased visibility.
+ if (computeMode == SVDMode.DistARPACK && rows.getStorageLevel == StorageLevel.NONE) {
+ logWarning("The input data was not directly cached, which may hurt performance if its"
+ + " parent RDDs are also uncached.")
+ }
+
val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 20c1fdd226..d0fe417968 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.util.MLUtils._
+import org.apache.spark.storage.StorageLevel
/**
* :: DeveloperApi ::
@@ -133,6 +134,15 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
this
}
+ /** Whether a warning should be logged if the input RDD is uncached. */
+ private var warnOnUncachedInput = true
+
+ /** Disable warnings about uncached input. */
+ private[spark] def disableUncachedWarning(): this.type = {
+ warnOnUncachedInput = false
+ this
+ }
+
/**
* Run the algorithm with the configured parameters on an input
* RDD of LabeledPoint entries.
@@ -149,6 +159,11 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
*/
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
+ if (warnOnUncachedInput && input.getStorageLevel == StorageLevel.NONE) {
+ logWarning("The input data is not directly cached, which may hurt performance if its"
+ + " parent RDDs are also uncached.")
+ }
+
// Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
throw new SparkException("Input validation failed.")
@@ -223,6 +238,12 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
weights = scaler.transform(weights)
}
+ // Warn at the end of the run as well, for increased visibility.
+ if (warnOnUncachedInput && input.getStorageLevel == StorageLevel.NONE) {
+ logWarning("The input data was not directly cached, which may hurt performance if its"
+ + " parent RDDs are also uncached.")
+ }
+
createModel(weights, intercept)
}
}