From b810a85cdddb247e1a104f4daad905b97222ad85 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 2 Jan 2014 18:37:40 +0530 Subject: spark-shell -> bin/spark-shell --- docs/mllib-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'docs/mllib-guide.md') diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index c1ff9c417c..95537ef185 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -87,7 +87,7 @@ svmAlg.optimizer.setNumIterations(200) val modelL1 = svmAlg.run(parsedData) {% endhighlight %} -Both of the code snippets above can be executed in `spark-shell` to generate a +Both of the code snippets above can be executed in `bin/spark-shell` to generate a classifier for the provided dataset. Available algorithms for binary classification: -- cgit v1.2.3 From c189c8362caeaa7a0f46af1c8e0d8d37fd171d7b Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 2 Jan 2014 15:22:20 -0800 Subject: Added Scala and Python examples for mllib --- docs/mllib-guide.md | 313 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 261 insertions(+), 52 deletions(-) (limited to 'docs/mllib-guide.md') diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index c1ff9c417c..e9d3785427 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -39,56 +39,9 @@ underlying gradient descent primitive (described parameter (*regParam*) along with various parameters associated with gradient descent (*stepSize*, *numIterations*, *miniBatchFraction*). -The following code snippet illustrates how to load a sample dataset, execute a -training algorithm on this training data using a static method in the algorithm -object, and make predictions with the resulting model to compute the training -error. - -{% highlight scala %} -import org.apache.spark.SparkContext -import org.apache.spark.mllib.classification.SVMWithSGD -import org.apache.spark.mllib.regression.LabeledPoint - -// Load and parse the data file -val data = sc.textFile("mllib/data/sample_svm_data.txt") -val parsedData = data.map { line => - val parts = line.split(' ') - LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray) -} - -// Run training algorithm -val numIterations = 20 -val model = SVMWithSGD.train(parsedData, numIterations) - -// Evaluate model on training examples and compute training error -val labelAndPreds = parsedData.map { point => - val prediction = model.predict(point.features) - (point.label, prediction) -} -val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count -println("trainError = " + trainErr) -{% endhighlight %} - -The `SVMWithSGD.train()` method by default performs L2 regularization with the -regularization parameter set to 1.0. If we want to configure this algorithm, we -can customize `SVMWithSGD` further by creating a new object directly and -calling setter methods. All other MLlib algorithms support customization in -this way as well. For example, the following code produces an L1 regularized -variant of SVMs with regularization parameter set to 0.1, and runs the training -algorithm for 200 iterations. -{% highlight scala %} -import org.apache.spark.mllib.optimization.L1Updater -val svmAlg = new SVMWithSGD() -svmAlg.optimizer.setNumIterations(200) - .setRegParam(0.1) - .setUpdater(new L1Updater) -val modelL1 = svmAlg.run(parsedData) -{% endhighlight %} -Both of the code snippets above can be executed in `spark-shell` to generate a -classifier for the provided dataset. Available algorithms for binary classification: @@ -121,14 +74,14 @@ of entities with one another based on some notion of similarity. Clustering is often used for exploratory analysis and/or as a component of a hierarchical supervised learning pipeline (in which distinct classifiers or regression models are trained for each cluster). MLlib supports -[k-means](http://en.wikipedia.org/wiki/K-means_clustering) clustering, arguably -the most commonly used clustering approach that clusters the data points into -*k* clusters. The MLlib implementation includes a parallelized +[k-means](http://en.wikipedia.org/wiki/K-means_clustering) clustering, one of +the most commonly used clustering algorithms that clusters the data points into +predfined number of clusters. The MLlib implementation includes a parallelized variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf). The implementation in MLlib has the following parameters: -* *k* is the number of clusters. +* *k* is the number of desired clusters. * *maxIterations* is the maximum number of iterations to run. * *initializationMode* specifies either random initialization or initialization via k-means\|\|. @@ -169,7 +122,7 @@ the entries in the user-item matrix as *explicit* preferences given by the user It is common in many real-world use cases to only have access to *implicit feedback* (e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with such data is taken from -[Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433). +[Collaborative Filtering for Implicit Feedback Datasets](http://www2.research.att.com/~yifanhu/PUB/cf.pdf). Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as a combination of binary preferences and *confidence values*. The ratings are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. @@ -210,3 +163,259 @@ at each iteration. Available algorithms for gradient descent: * [GradientDescent](api/mllib/index.html#org.apache.spark.mllib.optimization.GradientDescent) + +# Using MLLib in Scala + +Following code snippets can be executed in `spark-shell`. + +## Binary Classification + +The following code snippet illustrates how to load a sample dataset, execute a +training algorithm on this training data using a static method in the algorithm +object, and make predictions with the resulting model to compute the training +error. + +{% highlight scala %} +import org.apache.spark.SparkContext +import org.apache.spark.mllib.classification.SVMWithSGD +import org.apache.spark.mllib.regression.LabeledPoint + +// Load and parse the data file +val data = sc.textFile("mllib/data/sample_svm_data.txt") +val parsedData = data.map { line => + val parts = line.split(' ') + LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray) +} + +// Run training algorithm to build the model +val numIterations = 20 +val model = SVMWithSGD.train(parsedData, numIterations) + +// Evaluate model on training examples and compute training error +val labelAndPreds = parsedData.map { point => + val prediction = model.predict(point.features) + (point.label, prediction) +} +val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count +println("Training Error = " + trainErr) +{% endhighlight %} + + +The `SVMWithSGD.train()` method by default performs L2 regularization with the +regularization parameter set to 1.0. If we want to configure this algorithm, we +can customize `SVMWithSGD` further by creating a new object directly and +calling setter methods. All other MLlib algorithms support customization in +this way as well. For example, the following code produces an L1 regularized +variant of SVMs with regularization parameter set to 0.1, and runs the training +algorithm for 200 iterations. + +{% highlight scala %} +import org.apache.spark.mllib.optimization.L1Updater + +val svmAlg = new SVMWithSGD() +svmAlg.optimizer.setNumIterations(200) + .setRegParam(0.1) + .setUpdater(new L1Updater) +val modelL1 = svmAlg.run(parsedData) +{% endhighlight %} + +## Linear Regression +The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The +example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We +compute the Mean Squared Error at the end to evaluate +[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit) + +{% highlight scala %} +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.mllib.regression.LabeledPoint + +// Load and parse the data +val data = sc.textFile("mllib/data/ridge-data/lpsa.data") +val parsedData = data.map { line => + val parts = line.split(',') + LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray) +} + +// Building the model +val numIterations = 20 +val model = LinearRegressionWithSGD.train(parsedData, numIterations) + +// Evaluate model on training examples and compute training error +val valuesAndPreds = parsedData.map { point => + val prediction = model.predict(point.features) + (point.label, prediction) +} +val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count +println("training Mean Squared Error = " + MSE) +{% endhighlight %} + + +Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training +[Mean Squared Errors](http://en.wikipedia.org/wiki/Mean_squared_error). + +## Clustering +In the following example after loading and parsing data, we use the KMeans object to cluster the data +into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within +Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact the +optimal *k* is usually one where there is an "elbow" in the WSSSE graph. + +{% highlight scala %} +import org.apache.spark.mllib.clustering.KMeans + +// Load and parse the data +val data = sc.textFile("kmeans_data.txt") +val parsedData = data.map( _.split(' ').map(_.toDouble)) + +// Cluster the data into two classes using KMeans +val numIterations = 20 +val numClusters = 2 +val clusters = KMeans.train(parsedData, numClusters, numIterations) + +// Evaluate clustering by computing Within Set Sum of Squared Errors +val WSSSE = clusters.computeCost(parsedData) +println("Within Set Sum of Squared Errors = " + WSSSE) +{% endhighlight %} + + +## Collaborative Filtering +In the following example we load rating data. Each row consists of a user, a product and a rating. +We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation +model by measuring the Mean Squared Error of rating prediction. + +{% highlight scala %} +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.Rating + +// Load and parse the data +val data = sc.textFile("mllib/data/als/test.data") +val ratings = data.map(_.split(',') match { + case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) +}) + +// Build the recommendation model using ALS +val numIterations = 20 +val model = ALS.train(ratings, 1, 20, 0.01) + +// Evaluate the model on rating data +val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))} +val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count +{% endhighlight %} + +If the rating matrix is derived from other source of information (i.e., it is inferred from +other signals), you can use the trainImplicit method to get better results. + +{% highlight scala %} +val model = ALS.trainImplicit(ratings, 1, 20, 0.01) +{% endhighlight %} + +# Using MLLib in Python +Following examples can be tested in the PySpark shell. + +## Binary Classification +The following example shows how to load a sample dataset, build Logistic Regression model, +and make predictions with the resulting model to compute the training error. + +{% highlight python %} +from pyspark.mllib.classification import LogisticRegressionWithSGD +from numpy import array + +# Load and parse the data +data = sc.textFile("mllib/data/sample_svm_data.txt") +parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) +model = LogisticRegressionWithSGD.train(sc, parsedData) + +# Build the model +labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), + model.predict(point.take(range(1, point.size))))) + +# Evaluating the model on training data +trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) +print("Training Error = " + str(trainErr)) +{% endhighlight %} + +## Linear Regression +The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The +example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We +compute the Mean Squared Error at the end to evaluate +[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit) + +{% highlight python %} +from pyspark.mllib.regression import LinearRegressionWithSGD +from numpy import array + +# Load and parse the data +data = sc.textFile("mllib/data/ridge-data/lpsa.data") +parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) + +# Build the model +model = LinearRegressionWithSGD.train(sc, parsedData) + +# Evaluate the model on training data +valuesAndPreds = parsedData.map(lambda point: (point.item(0), + model.predict(point.take(range(1, point.size))))) +MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() +print("Mean Squared Error = " + str(MSE)) +{% endhighlight %} + + +## Clustering +In the following example after loading and parsing data, we use the KMeans object to cluster the data +into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within +Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact the +optimal *k* is usually one where there is an "elbow" in the WSSSE graph. + +{% highlight python %} +from pyspark.mllib.clustering import KMeans +from numpy import array +from math import sqrt + +# Load and parse the data +data = sc.textFile("kmeans_data.txt") +parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) + +# Build the model (cluster the data) +clusters = KMeans.train(sc, parsedData, 2, maxIterations=10, + runs=30, initialization_mode="random") + +# Evaluate clustering by computing Within Set Sum of Squared Errors +def error(point): + center = clusters.centers[clusters.predict(point)] + return sqrt(sum([x**2 for x in (point - center)])) + +WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) +print("Within Set Sum of Squared Error = " + str(WSSSE)) +{% endhighlight %} + +Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training Mean Squared +Errors. + +## Collaborative Filtering +In the following example we load rating data. Each row consists of a user, a product and a rating. +We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation +model by measuring the Mean Squared Error of rating prediction. + +{% highlight python %} +from pyspark.mllib.recommendation import ALS +from numpy import array + +# Load and parse the data +data = sc.textFile("mllib/data/als/test.data") +ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) + +# Build the recommendation model using Alternating Least Squares +model = ALS.train(sc, ratings, 1, 20) + +# Evaluate the model on training data +ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), int(p[1])))) +MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() +print("Mean Squared Error = " + str(MSE)) + +{% endhighlight %} + +If the rating matrix is derived from other source of information (i.e., it is inferred from other +signals), you can use the trainImplicit method to get better results. + +{% highlight python %} +# Build the recommendation model using Alternating Least Squares based on implicit ratings +model = ALS.trainImplicit(sc, ratings, 1, 20) +{% endhighlight %} \ No newline at end of file -- cgit v1.2.3 From 81989e26647ede54e19ef8058846e1bd42c0bfb5 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 2 Jan 2014 16:22:13 -0800 Subject: Commented the last part of collaborative filtering examples that lead to errors --- docs/mllib-guide.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'docs/mllib-guide.md') diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index e9d3785427..0bebc41137 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -297,8 +297,9 @@ val numIterations = 20 val model = ALS.train(ratings, 1, 20, 0.01) // Evaluate the model on rating data -val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))} -val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count +//val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))} +//val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count +//println("Mean Squared Error = " + MSE) {% endhighlight %} If the rating matrix is derived from other source of information (i.e., it is inferred from @@ -406,9 +407,9 @@ ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) model = ALS.train(sc, ratings, 1, 20) # Evaluate the model on training data -ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), int(p[1])))) -MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() -print("Mean Squared Error = " + str(MSE)) +#ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), int(p[1])))) +#MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() +#print("Mean Squared Error = " + str(MSE)) {% endhighlight %} -- cgit v1.2.3 From 8b5be0675245e206943574b8c6f6b77018b3561a Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Fri, 3 Jan 2014 16:38:33 -0800 Subject: Added table of contents and minor fixes --- docs/mllib-guide.md | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) (limited to 'docs/mllib-guide.md') diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 0bebc41137..3fd3c91e2a 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -3,6 +3,9 @@ layout: global title: Machine Learning Library (MLlib) --- +* Table of contests +{:toc} + MLlib is a Spark implementation of some common machine learning (ML) functionality, as well associated tests and data generators. MLlib currently supports four common types of machine learning problem settings, @@ -297,9 +300,17 @@ val numIterations = 20 val model = ALS.train(ratings, 1, 20, 0.01) // Evaluate the model on rating data -//val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))} -//val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count -//println("Mean Squared Error = " + MSE) +val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product)} +val predictions = model.predict(usersProducts).map{ + case Rating(user, product, rate) => ((user, product), rate) +} +val ratesAndPreds = ratings.map{ + case Rating(user, product, rate) => ((user, product), rate) +}.join(predictions) +val MSE = ratesAndPreds.map{ + case ((user, product), (r1, r2)) => math.pow((r1- r2), 2) +}.reduce(_ + _)/ratesAndPreds.count +println("Mean Squared Error = " + MSE) {% endhighlight %} If the rating matrix is derived from other source of information (i.e., it is inferred from @@ -393,7 +404,7 @@ Errors. ## Collaborative Filtering In the following example we load rating data. Each row consists of a user, a product and a rating. We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation -model by measuring the Mean Squared Error of rating prediction. +on one example. {% highlight python %} from pyspark.mllib.recommendation import ALS @@ -407,10 +418,7 @@ ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) model = ALS.train(sc, ratings, 1, 20) # Evaluate the model on training data -#ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), int(p[1])))) -#MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() -#print("Mean Squared Error = " + str(MSE)) - +print("predicted rating of user {0} for item {1} is {2:.6}".format(1, 2, model.predict(1, 2))) {% endhighlight %} If the rating matrix is derived from other source of information (i.e., it is inferred from other -- cgit v1.2.3 From 150089dae12bbba693db4edbfcea360b443637df Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 6 Jan 2014 12:43:17 -0800 Subject: Added proper evaluation example for collaborative filtering and fixed typo --- docs/mllib-guide.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'docs/mllib-guide.md') diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 3fd3c91e2a..5f3b676126 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -3,7 +3,7 @@ layout: global title: Machine Learning Library (MLlib) --- -* Table of contests +* Table of contents {:toc} MLlib is a Spark implementation of some common machine learning (ML) @@ -403,8 +403,8 @@ Errors. ## Collaborative Filtering In the following example we load rating data. Each row consists of a user, a product and a rating. -We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation -on one example. +We use the default ALS.train() method which assumes ratings are explicit. We evaluate the +recommendation by measuring the Mean Squared Error of rating prediction. {% highlight python %} from pyspark.mllib.recommendation import ALS @@ -418,7 +418,11 @@ ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) model = ALS.train(sc, ratings, 1, 20) # Evaluate the model on training data -print("predicted rating of user {0} for item {1} is {2:.6}".format(1, 2, model.predict(1, 2))) +testdata = ratings.map(lambda p: (int(p[0]), int(p[1]))) +predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) +ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) +MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count() +print("Mean Squared Error = " + str(MSE)) {% endhighlight %} If the rating matrix is derived from other source of information (i.e., it is inferred from other -- cgit v1.2.3