diff options
Diffstat (limited to 'docs/ml-guide.md')
-rw-r--r-- | docs/ml-guide.md | 362 |
1 files changed, 147 insertions, 215 deletions
diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 4ba07542bf..78c93a95c7 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -212,26 +212,18 @@ This example covers the concepts of `Estimator`, `Transformer`, and `Param`. <div data-lang="scala"> {% highlight scala %} -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.Row -val conf = new SparkConf().setAppName("SimpleParamsExample") -val sc = new SparkContext(conf) -val sqlContext = new SQLContext(sc) -import sqlContext.implicits._ - -// Prepare training data. -// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes -// into DataFrames, where it uses the case class metadata to infer the schema. -val training = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), - LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), - LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), - LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))) +// Prepare training data from a list of (label, features) tuples. +val training = sqlContext.createDataFrame(Seq( + (1.0, Vectors.dense(0.0, 1.1, 0.1)), + (0.0, Vectors.dense(2.0, 1.0, -1.0)), + (0.0, Vectors.dense(2.0, 1.3, 1.0)), + (1.0, Vectors.dense(0.0, 1.2, -0.5)) +)).toDF("label", "features") // Create a LogisticRegression instance. This instance is an Estimator. val lr = new LogisticRegression() @@ -243,7 +235,7 @@ lr.setMaxIter(10) .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. -val model1 = lr.fit(training.toDF) +val model1 = lr.fit(training) // Since model1 is a Model (i.e., a Transformer produced by an Estimator), // we can view the parameters it used during fit(). // This prints the parameter (name: value) pairs, where names are unique IDs for this @@ -253,8 +245,8 @@ println("Model 1 was fit using parameters: " + model1.parent.extractParamMap) // We may alternatively specify parameters using a ParamMap, // which supports several methods for specifying parameters. val paramMap = ParamMap(lr.maxIter -> 20) -paramMap.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. -paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. + .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. + .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. // One can also combine ParamMaps. val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name @@ -262,27 +254,27 @@ val paramMapCombined = paramMap ++ paramMap2 // Now learn a new model using the paramMapCombined parameters. // paramMapCombined overrides all parameters set earlier via lr.set* methods. -val model2 = lr.fit(training.toDF, paramMapCombined) +val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap) // Prepare test data. -val test = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), - LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), - LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)))) +val test = sqlContext.createDataFrame(Seq( + (1.0, Vectors.dense(-1.0, 1.5, 1.3)), + (0.0, Vectors.dense(3.0, 2.0, -0.1)), + (1.0, Vectors.dense(0.0, 2.2, -1.5)) +)).toDF("label", "features") // Make predictions on test data using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. // Note that model2.transform() outputs a 'myProbability' column instead of the usual // 'probability' column since we renamed the lr.probabilityCol parameter previously. -model2.transform(test.toDF) +model2.transform(test) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction") } -sc.stop() {% endhighlight %} </div> @@ -291,30 +283,23 @@ sc.stop() import java.util.Arrays; import java.util.List; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.classification.LogisticRegressionModel; import org.apache.spark.ml.param.ParamMap; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.Row; -SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample"); -JavaSparkContext jsc = new JavaSparkContext(conf); -SQLContext jsql = new SQLContext(jsc); - // Prepare training data. // We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans // into DataFrames, where it uses the bean metadata to infer the schema. -List<LabeledPoint> localTraining = Arrays.asList( +DataFrame training = sqlContext.createDataFrame(Arrays.asList( new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), - new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))); -DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class); + new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)) +), LabeledPoint.class); // Create a LogisticRegression instance. This instance is an Estimator. LogisticRegression lr = new LogisticRegression(); @@ -334,14 +319,14 @@ LogisticRegressionModel model1 = lr.fit(training); System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap()); // We may alternatively specify parameters using a ParamMap. -ParamMap paramMap = new ParamMap(); -paramMap.put(lr.maxIter().w(20)); // Specify 1 Param. -paramMap.put(lr.maxIter(), 30); // This overwrites the original maxIter. -paramMap.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params. +ParamMap paramMap = new ParamMap() + .put(lr.maxIter().w(20)) // Specify 1 Param. + .put(lr.maxIter(), 30) // This overwrites the original maxIter. + .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params. // One can also combine ParamMaps. -ParamMap paramMap2 = new ParamMap(); -paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name +ParamMap paramMap2 = new ParamMap() + .put(lr.probabilityCol().w("myProbability")); // Change output column name ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2); // Now learn a new model using the paramMapCombined parameters. @@ -350,11 +335,11 @@ LogisticRegressionModel model2 = lr.fit(training, paramMapCombined); System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap()); // Prepare test documents. -List<LabeledPoint> localTest = Arrays.asList( - new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), - new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), - new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))); -DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class); +DataFrame test = sqlContext.createDataFrame(Arrays.asList( + new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), + new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), + new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)) +), LabeledPoint.class); // Make predictions on test documents using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. @@ -366,28 +351,21 @@ for (Row r: results.select("features", "label", "myProbability", "prediction").c + ", prediction=" + r.get(3)); } -jsc.stop(); {% endhighlight %} </div> <div data-lang="python"> {% highlight python %} -from pyspark import SparkContext -from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.linalg import Vectors from pyspark.ml.classification import LogisticRegression from pyspark.ml.param import Param, Params -from pyspark.sql import Row, SQLContext -sc = SparkContext(appName="SimpleParamsExample") -sqlContext = SQLContext(sc) - -# Prepare training data. -# We use LabeledPoint. -# Spark SQL can convert RDDs of LabeledPoints into DataFrames. -training = sc.parallelize([LabeledPoint(1.0, [0.0, 1.1, 0.1]), - LabeledPoint(0.0, [2.0, 1.0, -1.0]), - LabeledPoint(0.0, [2.0, 1.3, 1.0]), - LabeledPoint(1.0, [0.0, 1.2, -0.5])]) +# Prepare training data from a list of (label, features) tuples. +training = sqlContext.createDataFrame([ + (1.0, Vectors.dense([0.0, 1.1, 0.1])), + (0.0, Vectors.dense([2.0, 1.0, -1.0])), + (0.0, Vectors.dense([2.0, 1.3, 1.0])), + (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # Create a LogisticRegression instance. This instance is an Estimator. lr = LogisticRegression(maxIter=10, regParam=0.01) @@ -395,7 +373,7 @@ lr = LogisticRegression(maxIter=10, regParam=0.01) print "LogisticRegression parameters:\n" + lr.explainParams() + "\n" # Learn a LogisticRegression model. This uses the parameters stored in lr. -model1 = lr.fit(training.toDF()) +model1 = lr.fit(training) # Since model1 is a Model (i.e., a transformer produced by an Estimator), # we can view the parameters it used during fit(). @@ -416,25 +394,25 @@ paramMapCombined.update(paramMap2) # Now learn a new model using the paramMapCombined parameters. # paramMapCombined overrides all parameters set earlier via lr.set* methods. -model2 = lr.fit(training.toDF(), paramMapCombined) +model2 = lr.fit(training, paramMapCombined) print "Model 2 was fit using parameters: " print model2.extractParamMap() # Prepare test data -test = sc.parallelize([LabeledPoint(1.0, [-1.0, 1.5, 1.3]), - LabeledPoint(0.0, [ 3.0, 2.0, -0.1]), - LabeledPoint(1.0, [ 0.0, 2.2, -1.5])]) +test = sqlContext.createDataFrame([ + (1.0, Vectors.dense([-1.0, 1.5, 1.3])), + (0.0, Vectors.dense([3.0, 2.0, -0.1])), + (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"]) # Make predictions on test data using the Transformer.transform() method. # LogisticRegression.transform will only use the 'features' column. # Note that model2.transform() outputs a "myProbability" column instead of the usual # 'probability' column since we renamed the lr.probabilityCol parameter previously. -prediction = model2.transform(test.toDF()) +prediction = model2.transform(test) selected = prediction.select("features", "label", "myProbability", "prediction") for row in selected.collect(): print row -sc.stop() {% endhighlight %} </div> @@ -448,30 +426,19 @@ This example follows the simple text document `Pipeline` illustrated in the figu <div data-lang="scala"> {% highlight scala %} -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.sql.{Row, SQLContext} - -// Labeled and unlabeled instance types. -// Spark SQL can infer schema from case classes. -case class LabeledDocument(id: Long, text: String, label: Double) -case class Document(id: Long, text: String) +import org.apache.spark.sql.Row -// Set up contexts. Import implicit conversions to DataFrame from sqlContext. -val conf = new SparkConf().setAppName("SimpleTextClassificationPipeline") -val sc = new SparkContext(conf) -val sqlContext = new SQLContext(sc) -import sqlContext.implicits._ - -// Prepare training documents, which are labeled. -val training = sc.parallelize(Seq( - LabeledDocument(0L, "a b c d e spark", 1.0), - LabeledDocument(1L, "b d", 0.0), - LabeledDocument(2L, "spark f g h", 1.0), - LabeledDocument(3L, "hadoop mapreduce", 0.0))) +// Prepare training documents from a list of (id, text, label) tuples. +val training = sqlContext.createDataFrame(Seq( + (0L, "a b c d e spark", 1.0), + (1L, "b d", 0.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 0.0) +)).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() @@ -488,14 +455,15 @@ val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. -val model = pipeline.fit(training.toDF) +val model = pipeline.fit(training) -// Prepare test documents, which are unlabeled. -val test = sc.parallelize(Seq( - Document(4L, "spark i j k"), - Document(5L, "l m n"), - Document(6L, "mapreduce spark"), - Document(7L, "apache hadoop"))) +// Prepare test documents, which are unlabeled (id, text) tuples. +val test = sqlContext.createDataFrame(Seq( + (4L, "spark i j k"), + (5L, "l m n"), + (6L, "mapreduce spark"), + (7L, "apache hadoop") +)).toDF("id", "text") // Make predictions on test documents. model.transform(test.toDF) @@ -505,7 +473,6 @@ model.transform(test.toDF) println(s"($id, $text) --> prob=$prob, prediction=$prediction") } -sc.stop() {% endhighlight %} </div> @@ -514,8 +481,6 @@ sc.stop() import java.util.Arrays; import java.util.List; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; import org.apache.spark.ml.PipelineStage; @@ -524,7 +489,6 @@ import org.apache.spark.ml.feature.HashingTF; import org.apache.spark.ml.feature.Tokenizer; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; // Labeled and unlabeled instance types. // Spark SQL can infer schema from Java Beans. @@ -556,18 +520,13 @@ public class LabeledDocument extends Document implements Serializable { public void setLabel(double label) { this.label = label; } } -// Set up contexts. -SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline"); -JavaSparkContext jsc = new JavaSparkContext(conf); -SQLContext jsql = new SQLContext(jsc); - // Prepare training documents, which are labeled. -List<LabeledDocument> localTraining = Arrays.asList( +DataFrame training = sqlContext.createDataFrame(Arrays.asList( new LabeledDocument(0L, "a b c d e spark", 1.0), new LabeledDocument(1L, "b d", 0.0), new LabeledDocument(2L, "spark f g h", 1.0), - new LabeledDocument(3L, "hadoop mapreduce", 0.0)); -DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class); + new LabeledDocument(3L, "hadoop mapreduce", 0.0) +), LabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer() @@ -587,12 +546,12 @@ Pipeline pipeline = new Pipeline() PipelineModel model = pipeline.fit(training); // Prepare test documents, which are unlabeled. -List<Document> localTest = Arrays.asList( +DataFrame test = sqlContext.createDataFrame(Arrays.asList( new Document(4L, "spark i j k"), new Document(5L, "l m n"), new Document(6L, "mapreduce spark"), - new Document(7L, "apache hadoop")); -DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class); + new Document(7L, "apache hadoop") +), Document.class); // Make predictions on test documents. DataFrame predictions = model.transform(test); @@ -601,28 +560,23 @@ for (Row r: predictions.select("id", "text", "probability", "prediction").collec + ", prediction=" + r.get(3)); } -jsc.stop(); {% endhighlight %} </div> <div data-lang="python"> {% highlight python %} -from pyspark import SparkContext from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer -from pyspark.sql import Row, SQLContext - -sc = SparkContext(appName="SimpleTextClassificationPipeline") -sqlContext = SQLContext(sc) +from pyspark.sql import Row -# Prepare training documents, which are labeled. +# Prepare training documents from a list of (id, text, label) tuples. LabeledDocument = Row("id", "text", "label") -training = sc.parallelize([(0L, "a b c d e spark", 1.0), - (1L, "b d", 0.0), - (2L, "spark f g h", 1.0), - (3L, "hadoop mapreduce", 0.0)]) \ - .map(lambda x: LabeledDocument(*x)).toDF() +training = sqlContext.createDataFrame([ + (0L, "a b c d e spark", 1.0), + (1L, "b d", 0.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") @@ -633,13 +587,12 @@ pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) -# Prepare test documents, which are unlabeled. -Document = Row("id", "text") -test = sc.parallelize([(4L, "spark i j k"), - (5L, "l m n"), - (6L, "mapreduce spark"), - (7L, "apache hadoop")]) \ - .map(lambda x: Document(*x)).toDF() +# Prepare test documents, which are unlabeled (id, text) tuples. +test = sqlContext.createDataFrame([ + (4L, "spark i j k"), + (5L, "l m n"), + (6L, "mapreduce spark"), + (7L, "apache hadoop")], ["id", "text"]) # Make predictions on test documents and print columns of interest. prediction = model.transform(test) @@ -647,7 +600,6 @@ selected = prediction.select("id", "text", "prediction") for row in selected.collect(): print(row) -sc.stop() {% endhighlight %} </div> @@ -664,8 +616,8 @@ Currently, `spark.ml` supports model selection using the [`CrossValidator`](api/ The `Evaluator` can be a [`RegressionEvaluator`](api/scala/index.html#org.apache.spark.ml.RegressionEvaluator) for regression problems, a [`BinaryClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.BinaryClassificationEvaluator) -for binary data or a [`MultiClassClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.MultiClassClassificationEvaluator) -for multiclass problems. The default metric used to choose the best `ParamMap` can be overriden by the setMetric +for binary data, or a [`MultiClassClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.MultiClassClassificationEvaluator) +for multiclass problems. The default metric used to choose the best `ParamMap` can be overriden by the `setMetric` method in each of these evaluators. The `ParamMap` which produces the best evaluation metric (averaged over the `$k$` folds) is selected as the best model. @@ -684,39 +636,29 @@ However, it is also a well-established method for choosing parameters which is m <div data-lang="scala"> {% highlight scala %} -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.sql.{Row, SQLContext} - -// Labeled and unlabeled instance types. -// Spark SQL can infer schema from case classes. -case class LabeledDocument(id: Long, text: String, label: Double) -case class Document(id: Long, text: String) - -val conf = new SparkConf().setAppName("CrossValidatorExample") -val sc = new SparkContext(conf) -val sqlContext = new SQLContext(sc) -import sqlContext.implicits._ - -// Prepare training documents, which are labeled. -val training = sc.parallelize(Seq( - LabeledDocument(0L, "a b c d e spark", 1.0), - LabeledDocument(1L, "b d", 0.0), - LabeledDocument(2L, "spark f g h", 1.0), - LabeledDocument(3L, "hadoop mapreduce", 0.0), - LabeledDocument(4L, "b spark who", 1.0), - LabeledDocument(5L, "g d a y", 0.0), - LabeledDocument(6L, "spark fly", 1.0), - LabeledDocument(7L, "was mapreduce", 0.0), - LabeledDocument(8L, "e spark program", 1.0), - LabeledDocument(9L, "a e c l", 0.0), - LabeledDocument(10L, "spark compile", 1.0), - LabeledDocument(11L, "hadoop software", 0.0))) +import org.apache.spark.sql.Row + +// Prepare training data from a list of (id, text, label) tuples. +val training = sqlContext.createDataFrame(Seq( + (0L, "a b c d e spark", 1.0), + (1L, "b d", 0.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 0.0), + (4L, "b spark who", 1.0), + (5L, "g d a y", 0.0), + (6L, "spark fly", 1.0), + (7L, "was mapreduce", 0.0), + (8L, "e spark program", 1.0), + (9L, "a e c l", 0.0), + (10L, "spark compile", 1.0), + (11L, "hadoop software", 0.0) +)).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() @@ -730,15 +672,6 @@ val lr = new LogisticRegression() val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) -// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. -// This will allow us to jointly choose parameters for all Pipeline stages. -// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. -// Note that the evaluator here is a BinaryClassificationEvaluator and the default metric -// used is areaUnderROC. -val crossval = new CrossValidator() - .setEstimator(pipeline) - .setEvaluator(new BinaryClassificationEvaluator) - // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. @@ -746,28 +679,37 @@ val paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build() -crossval.setEstimatorParamMaps(paramGrid) -crossval.setNumFolds(2) // Use 3+ in practice + +// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. +// This will allow us to jointly choose parameters for all Pipeline stages. +// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. +// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric +// is areaUnderROC. +val cv = new CrossValidator() + .setEstimator(pipeline) + .setEvaluator(new BinaryClassificationEvaluator) + .setEstimatorParamMaps(paramGrid) + .setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. -val cvModel = crossval.fit(training.toDF) +val cvModel = cv.fit(training) -// Prepare test documents, which are unlabeled. -val test = sc.parallelize(Seq( - Document(4L, "spark i j k"), - Document(5L, "l m n"), - Document(6L, "mapreduce spark"), - Document(7L, "apache hadoop"))) +// Prepare test documents, which are unlabeled (id, text) tuples. +val test = sqlContext.createDataFrame(Seq( + (4L, "spark i j k"), + (5L, "l m n"), + (6L, "mapreduce spark"), + (7L, "apache hadoop") +)).toDF("id", "text") // Make predictions on test documents. cvModel uses the best model found (lrModel). -cvModel.transform(test.toDF) +cvModel.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => - println(s"($id, $text) --> prob=$prob, prediction=$prediction") -} + println(s"($id, $text) --> prob=$prob, prediction=$prediction") + } -sc.stop() {% endhighlight %} </div> @@ -776,8 +718,6 @@ sc.stop() import java.util.Arrays; import java.util.List; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.classification.LogisticRegression; @@ -790,7 +730,6 @@ import org.apache.spark.ml.tuning.CrossValidatorModel; import org.apache.spark.ml.tuning.ParamGridBuilder; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; // Labeled and unlabeled instance types. // Spark SQL can infer schema from Java Beans. @@ -822,12 +761,9 @@ public class LabeledDocument extends Document implements Serializable { public void setLabel(double label) { this.label = label; } } -SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample"); -JavaSparkContext jsc = new JavaSparkContext(conf); -SQLContext jsql = new SQLContext(jsc); // Prepare training documents, which are labeled. -List<LabeledDocument> localTraining = Arrays.asList( +DataFrame training = sqlContext.createDataFrame(Arrays.asList( new LabeledDocument(0L, "a b c d e spark", 1.0), new LabeledDocument(1L, "b d", 0.0), new LabeledDocument(2L, "spark f g h", 1.0), @@ -839,8 +775,8 @@ List<LabeledDocument> localTraining = Arrays.asList( new LabeledDocument(8L, "e spark program", 1.0), new LabeledDocument(9L, "a e c l", 0.0), new LabeledDocument(10L, "spark compile", 1.0), - new LabeledDocument(11L, "hadoop software", 0.0)); -DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class); + new LabeledDocument(11L, "hadoop software", 0.0) +), LabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer() @@ -856,15 +792,6 @@ LogisticRegression lr = new LogisticRegression() Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[] {tokenizer, hashingTF, lr}); -// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. -// This will allow us to jointly choose parameters for all Pipeline stages. -// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. -// Note that the evaluator here is a BinaryClassificationEvaluator and the default metric -// used is areaUnderROC. -CrossValidator crossval = new CrossValidator() - .setEstimator(pipeline) - .setEvaluator(new BinaryClassificationEvaluator()); - // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. @@ -872,19 +799,28 @@ ParamMap[] paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures(), new int[]{10, 100, 1000}) .addGrid(lr.regParam(), new double[]{0.1, 0.01}) .build(); -crossval.setEstimatorParamMaps(paramGrid); -crossval.setNumFolds(2); // Use 3+ in practice + +// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. +// This will allow us to jointly choose parameters for all Pipeline stages. +// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. +// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric +// is areaUnderROC. +CrossValidator cv = new CrossValidator() + .setEstimator(pipeline) + .setEvaluator(new BinaryClassificationEvaluator()) + .setEstimatorParamMaps(paramGrid) + .setNumFolds(2); // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. -CrossValidatorModel cvModel = crossval.fit(training); +CrossValidatorModel cvModel = cv.fit(training); // Prepare test documents, which are unlabeled. -List<Document> localTest = Arrays.asList( +DataFrame test = sqlContext.createDataFrame(Arrays.asList( new Document(4L, "spark i j k"), new Document(5L, "l m n"), new Document(6L, "mapreduce spark"), - new Document(7L, "apache hadoop")); -DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class); + new Document(7L, "apache hadoop") +), Document.class); // Make predictions on test documents. cvModel uses the best model found (lrModel). DataFrame predictions = cvModel.transform(test); @@ -893,7 +829,6 @@ for (Row r: predictions.select("id", "text", "probability", "prediction").collec + ", prediction=" + r.get(3)); } -jsc.stop(); {% endhighlight %} </div> @@ -935,7 +870,7 @@ val lr = new LinearRegression() // the evaluator. val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) - .addGrid(lr.fitIntercept, Array(true, false)) + .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .build() @@ -945,9 +880,8 @@ val trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) - -// 80% of the data will be used for training and the remaining 20% for validation. -trainValidationSplit.setTrainRatio(0.8) + // 80% of the data will be used for training and the remaining 20% for validation. + .setTrainRatio(0.8) // Run train validation split, and choose the best set of parameters. val model = trainValidationSplit.fit(training) @@ -972,12 +906,12 @@ import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.DataFrame; -DataFrame data = jsql.createDataFrame( +DataFrame data = sqlContext.createDataFrame( MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt"), LabeledPoint.class); // Prepare training and test data. -DataFrame[] splits = data.randomSplit(new double [] {0.9, 0.1}, 12345); +DataFrame[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345); DataFrame training = splits[0]; DataFrame test = splits[1]; @@ -997,10 +931,8 @@ ParamMap[] paramGrid = new ParamGridBuilder() TrainValidationSplit trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator()) - .setEstimatorParamMaps(paramGrid); - -// 80% of the data will be used for training and the remaining 20% for validation. -trainValidationSplit.setTrainRatio(0.8); + .setEstimatorParamMaps(paramGrid) + .setTrainRatio(0.8); // 80% for training and the remaining 20% for validation // Run train validation split, and choose the best set of parameters. TrainValidationSplitModel model = trainValidationSplit.fit(training); |