diff options
-rw-r--r-- | docs/mllib-guide.md | 313 |
1 files changed, 261 insertions, 52 deletions
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index c1ff9c417c..e9d3785427 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -39,56 +39,9 @@ underlying gradient descent primitive (described parameter (*regParam*) along with various parameters associated with gradient descent (*stepSize*, *numIterations*, *miniBatchFraction*). -The following code snippet illustrates how to load a sample dataset, execute a -training algorithm on this training data using a static method in the algorithm -object, and make predictions with the resulting model to compute the training -error. - -{% highlight scala %} -import org.apache.spark.SparkContext -import org.apache.spark.mllib.classification.SVMWithSGD -import org.apache.spark.mllib.regression.LabeledPoint - -// Load and parse the data file -val data = sc.textFile("mllib/data/sample_svm_data.txt") -val parsedData = data.map { line => - val parts = line.split(' ') - LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray) -} - -// Run training algorithm -val numIterations = 20 -val model = SVMWithSGD.train(parsedData, numIterations) - -// Evaluate model on training examples and compute training error -val labelAndPreds = parsedData.map { point => - val prediction = model.predict(point.features) - (point.label, prediction) -} -val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count -println("trainError = " + trainErr) -{% endhighlight %} - -The `SVMWithSGD.train()` method by default performs L2 regularization with the -regularization parameter set to 1.0. If we want to configure this algorithm, we -can customize `SVMWithSGD` further by creating a new object directly and -calling setter methods. All other MLlib algorithms support customization in -this way as well. For example, the following code produces an L1 regularized -variant of SVMs with regularization parameter set to 0.1, and runs the training -algorithm for 200 iterations. -{% highlight scala %} -import org.apache.spark.mllib.optimization.L1Updater -val svmAlg = new SVMWithSGD() -svmAlg.optimizer.setNumIterations(200) - .setRegParam(0.1) - .setUpdater(new L1Updater) -val modelL1 = svmAlg.run(parsedData) -{% endhighlight %} -Both of the code snippets above can be executed in `spark-shell` to generate a -classifier for the provided dataset. Available algorithms for binary classification: @@ -121,14 +74,14 @@ of entities with one another based on some notion of similarity. Clustering is often used for exploratory analysis and/or as a component of a hierarchical supervised learning pipeline (in which distinct classifiers or regression models are trained for each cluster). MLlib supports -[k-means](http://en.wikipedia.org/wiki/K-means_clustering) clustering, arguably -the most commonly used clustering approach that clusters the data points into -*k* clusters. The MLlib implementation includes a parallelized +[k-means](http://en.wikipedia.org/wiki/K-means_clustering) clustering, one of +the most commonly used clustering algorithms that clusters the data points into +predfined number of clusters. The MLlib implementation includes a parallelized variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf). The implementation in MLlib has the following parameters: -* *k* is the number of clusters. +* *k* is the number of desired clusters. * *maxIterations* is the maximum number of iterations to run. * *initializationMode* specifies either random initialization or initialization via k-means\|\|. @@ -169,7 +122,7 @@ the entries in the user-item matrix as *explicit* preferences given by the user It is common in many real-world use cases to only have access to *implicit feedback* (e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with such data is taken from -[Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433). +[Collaborative Filtering for Implicit Feedback Datasets](http://www2.research.att.com/~yifanhu/PUB/cf.pdf). Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as a combination of binary preferences and *confidence values*. The ratings are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. @@ -210,3 +163,259 @@ at each iteration. Available algorithms for gradient descent: * [GradientDescent](api/mllib/index.html#org.apache.spark.mllib.optimization.GradientDescent) + +# Using MLLib in Scala + +Following code snippets can be executed in `spark-shell`. + +## Binary Classification + +The following code snippet illustrates how to load a sample dataset, execute a +training algorithm on this training data using a static method in the algorithm +object, and make predictions with the resulting model to compute the training +error. + +{% highlight scala %} +import org.apache.spark.SparkContext +import org.apache.spark.mllib.classification.SVMWithSGD +import org.apache.spark.mllib.regression.LabeledPoint + +// Load and parse the data file +val data = sc.textFile("mllib/data/sample_svm_data.txt") +val parsedData = data.map { line => + val parts = line.split(' ') + LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray) +} + +// Run training algorithm to build the model +val numIterations = 20 +val model = SVMWithSGD.train(parsedData, numIterations) + +// Evaluate model on training examples and compute training error +val labelAndPreds = parsedData.map { point => + val prediction = model.predict(point.features) + (point.label, prediction) +} +val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count +println("Training Error = " + trainErr) +{% endhighlight %} + + +The `SVMWithSGD.train()` method by default performs L2 regularization with the +regularization parameter set to 1.0. If we want to configure this algorithm, we +can customize `SVMWithSGD` further by creating a new object directly and +calling setter methods. All other MLlib algorithms support customization in +this way as well. For example, the following code produces an L1 regularized +variant of SVMs with regularization parameter set to 0.1, and runs the training +algorithm for 200 iterations. + +{% highlight scala %} +import org.apache.spark.mllib.optimization.L1Updater + +val svmAlg = new SVMWithSGD() +svmAlg.optimizer.setNumIterations(200) + .setRegParam(0.1) + .setUpdater(new L1Updater) +val modelL1 = svmAlg.run(parsedData) +{% endhighlight %} + +## Linear Regression +The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The +example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We +compute the Mean Squared Error at the end to evaluate +[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit) + +{% highlight scala %} +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.mllib.regression.LabeledPoint + +// Load and parse the data +val data = sc.textFile("mllib/data/ridge-data/lpsa.data") +val parsedData = data.map { line => + val parts = line.split(',') + LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray) +} + +// Building the model +val numIterations = 20 +val model = LinearRegressionWithSGD.train(parsedData, numIterations) + +// Evaluate model on training examples and compute training error +val valuesAndPreds = parsedData.map { point => + val prediction = model.predict(point.features) + (point.label, prediction) +} +val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count +println("training Mean Squared Error = " + MSE) +{% endhighlight %} + + +Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training +[Mean Squared Errors](http://en.wikipedia.org/wiki/Mean_squared_error). + +## Clustering +In the following example after loading and parsing data, we use the KMeans object to cluster the data +into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within +Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact the +optimal *k* is usually one where there is an "elbow" in the WSSSE graph. + +{% highlight scala %} +import org.apache.spark.mllib.clustering.KMeans + +// Load and parse the data +val data = sc.textFile("kmeans_data.txt") +val parsedData = data.map( _.split(' ').map(_.toDouble)) + +// Cluster the data into two classes using KMeans +val numIterations = 20 +val numClusters = 2 +val clusters = KMeans.train(parsedData, numClusters, numIterations) + +// Evaluate clustering by computing Within Set Sum of Squared Errors +val WSSSE = clusters.computeCost(parsedData) +println("Within Set Sum of Squared Errors = " + WSSSE) +{% endhighlight %} + + +## Collaborative Filtering +In the following example we load rating data. Each row consists of a user, a product and a rating. +We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation +model by measuring the Mean Squared Error of rating prediction. + +{% highlight scala %} +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.Rating + +// Load and parse the data +val data = sc.textFile("mllib/data/als/test.data") +val ratings = data.map(_.split(',') match { + case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) +}) + +// Build the recommendation model using ALS +val numIterations = 20 +val model = ALS.train(ratings, 1, 20, 0.01) + +// Evaluate the model on rating data +val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))} +val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count +{% endhighlight %} + +If the rating matrix is derived from other source of information (i.e., it is inferred from +other signals), you can use the trainImplicit method to get better results. + +{% highlight scala %} +val model = ALS.trainImplicit(ratings, 1, 20, 0.01) +{% endhighlight %} + +# Using MLLib in Python +Following examples can be tested in the PySpark shell. + +## Binary Classification +The following example shows how to load a sample dataset, build Logistic Regression model, +and make predictions with the resulting model to compute the training error. + +{% highlight python %} +from pyspark.mllib.classification import LogisticRegressionWithSGD +from numpy import array + +# Load and parse the data +data = sc.textFile("mllib/data/sample_svm_data.txt") +parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) +model = LogisticRegressionWithSGD.train(sc, parsedData) + +# Build the model +labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), + model.predict(point.take(range(1, point.size))))) + +# Evaluating the model on training data +trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) +print("Training Error = " + str(trainErr)) +{% endhighlight %} + +## Linear Regression +The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The +example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We +compute the Mean Squared Error at the end to evaluate +[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit) + +{% highlight python %} +from pyspark.mllib.regression import LinearRegressionWithSGD +from numpy import array + +# Load and parse the data +data = sc.textFile("mllib/data/ridge-data/lpsa.data") +parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) + +# Build the model +model = LinearRegressionWithSGD.train(sc, parsedData) + +# Evaluate the model on training data +valuesAndPreds = parsedData.map(lambda point: (point.item(0), + model.predict(point.take(range(1, point.size))))) +MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() +print("Mean Squared Error = " + str(MSE)) +{% endhighlight %} + + +## Clustering +In the following example after loading and parsing data, we use the KMeans object to cluster the data +into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within +Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact the +optimal *k* is usually one where there is an "elbow" in the WSSSE graph. + +{% highlight python %} +from pyspark.mllib.clustering import KMeans +from numpy import array +from math import sqrt + +# Load and parse the data +data = sc.textFile("kmeans_data.txt") +parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) + +# Build the model (cluster the data) +clusters = KMeans.train(sc, parsedData, 2, maxIterations=10, + runs=30, initialization_mode="random") + +# Evaluate clustering by computing Within Set Sum of Squared Errors +def error(point): + center = clusters.centers[clusters.predict(point)] + return sqrt(sum([x**2 for x in (point - center)])) + +WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) +print("Within Set Sum of Squared Error = " + str(WSSSE)) +{% endhighlight %} + +Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training Mean Squared +Errors. + +## Collaborative Filtering +In the following example we load rating data. Each row consists of a user, a product and a rating. +We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation +model by measuring the Mean Squared Error of rating prediction. + +{% highlight python %} +from pyspark.mllib.recommendation import ALS +from numpy import array + +# Load and parse the data +data = sc.textFile("mllib/data/als/test.data") +ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) + +# Build the recommendation model using Alternating Least Squares +model = ALS.train(sc, ratings, 1, 20) + +# Evaluate the model on training data +ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), int(p[1])))) +MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() +print("Mean Squared Error = " + str(MSE)) + +{% endhighlight %} + +If the rating matrix is derived from other source of information (i.e., it is inferred from other +signals), you can use the trainImplicit method to get better results. + +{% highlight python %} +# Build the recommendation model using Alternating Least Squares based on implicit ratings +model = ALS.trainImplicit(sc, ratings, 1, 20) +{% endhighlight %}
\ No newline at end of file |