diff options
Diffstat (limited to 'docs/ml-guide.md')
-rw-r--r-- | docs/ml-guide.md | 249 |
1 files changed, 155 insertions, 94 deletions
diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 4bf14fba34..da6aef7f14 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -23,13 +23,13 @@ to `spark.ml`. Spark ML standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Spark ML API. -* **[ML Dataset](ml-guide.html#ml-dataset)**: Spark ML uses the [`SchemaRDD`](api/scala/index.html#org.apache.spark.sql.SchemaRDD) from Spark SQL as a dataset which can hold a variety of data types. +* **[ML Dataset](ml-guide.html#ml-dataset)**: Spark ML uses the [`DataFrame`](api/scala/index.html#org.apache.spark.sql.DataFrame) from Spark SQL as a dataset which can hold a variety of data types. E.g., a dataset could have different columns storing text, feature vectors, true labels, and predictions. -* **[`Transformer`](ml-guide.html#transformers)**: A `Transformer` is an algorithm which can transform one `SchemaRDD` into another `SchemaRDD`. +* **[`Transformer`](ml-guide.html#transformers)**: A `Transformer` is an algorithm which can transform one `DataFrame` into another `DataFrame`. E.g., an ML model is a `Transformer` which transforms an RDD with features into an RDD with predictions. -* **[`Estimator`](ml-guide.html#estimators)**: An `Estimator` is an algorithm which can be fit on a `SchemaRDD` to produce a `Transformer`. +* **[`Estimator`](ml-guide.html#estimators)**: An `Estimator` is an algorithm which can be fit on a `DataFrame` to produce a `Transformer`. E.g., a learning algorithm is an `Estimator` which trains on a dataset and produces a model. * **[`Pipeline`](ml-guide.html#pipeline)**: A `Pipeline` chains multiple `Transformer`s and `Estimator`s together to specify an ML workflow. @@ -39,20 +39,20 @@ E.g., a learning algorithm is an `Estimator` which trains on a dataset and produ ## ML Dataset Machine learning can be applied to a wide variety of data types, such as vectors, text, images, and structured data. -Spark ML adopts the [`SchemaRDD`](api/scala/index.html#org.apache.spark.sql.SchemaRDD) from Spark SQL in order to support a variety of data types under a unified Dataset concept. +Spark ML adopts the [`DataFrame`](api/scala/index.html#org.apache.spark.sql.DataFrame) from Spark SQL in order to support a variety of data types under a unified Dataset concept. -`SchemaRDD` supports many basic and structured types; see the [Spark SQL datatype reference](sql-programming-guide.html#spark-sql-datatype-reference) for a list of supported types. -In addition to the types listed in the Spark SQL guide, `SchemaRDD` can use ML [`Vector`](api/scala/index.html#org.apache.spark.mllib.linalg.Vector) types. +`DataFrame` supports many basic and structured types; see the [Spark SQL datatype reference](sql-programming-guide.html#spark-sql-datatype-reference) for a list of supported types. +In addition to the types listed in the Spark SQL guide, `DataFrame` can use ML [`Vector`](api/scala/index.html#org.apache.spark.mllib.linalg.Vector) types. -A `SchemaRDD` can be created either implicitly or explicitly from a regular `RDD`. See the code examples below and the [Spark SQL programming guide](sql-programming-guide.html) for examples. +A `DataFrame` can be created either implicitly or explicitly from a regular `RDD`. See the code examples below and the [Spark SQL programming guide](sql-programming-guide.html) for examples. -Columns in a `SchemaRDD` are named. The code examples below use names such as "text," "features," and "label." +Columns in a `DataFrame` are named. The code examples below use names such as "text," "features," and "label." ## ML Algorithms ### Transformers -A [`Transformer`](api/scala/index.html#org.apache.spark.ml.Transformer) is an abstraction which includes feature transformers and learned models. Technically, a `Transformer` implements a method `transform()` which converts one `SchemaRDD` into another, generally by appending one or more columns. +A [`Transformer`](api/scala/index.html#org.apache.spark.ml.Transformer) is an abstraction which includes feature transformers and learned models. Technically, a `Transformer` implements a method `transform()` which converts one `DataFrame` into another, generally by appending one or more columns. For example: * A feature transformer might take a dataset, read a column (e.g., text), convert it into a new column (e.g., feature vectors), append the new column to the dataset, and output the updated dataset. @@ -60,7 +60,7 @@ For example: ### Estimators -An [`Estimator`](api/scala/index.html#org.apache.spark.ml.Estimator) abstracts the concept of a learning algorithm or any algorithm which fits or trains on data. Technically, an `Estimator` implements a method `fit()` which accepts a `SchemaRDD` and produces a `Transformer`. +An [`Estimator`](api/scala/index.html#org.apache.spark.ml.Estimator) abstracts the concept of a learning algorithm or any algorithm which fits or trains on data. Technically, an `Estimator` implements a method `fit()` which accepts a `DataFrame` and produces a `Transformer`. For example, a learning algorithm such as `LogisticRegression` is an `Estimator`, and calling `fit()` trains a `LogisticRegressionModel`, which is a `Transformer`. ### Properties of ML Algorithms @@ -101,7 +101,7 @@ We illustrate this for the simple text document workflow. The figure below is f Above, the top row represents a `Pipeline` with three stages. The first two (`Tokenizer` and `HashingTF`) are `Transformer`s (blue), and the third (`LogisticRegression`) is an `Estimator` (red). -The bottom row represents data flowing through the pipeline, where cylinders indicate `SchemaRDD`s. +The bottom row represents data flowing through the pipeline, where cylinders indicate `DataFrame`s. The `Pipeline.fit()` method is called on the original dataset which has raw text documents and labels. The `Tokenizer.transform()` method splits the raw text documents into words, adding a new column with words into the dataset. The `HashingTF.transform()` method converts the words column into feature vectors, adding a new column with those vectors to the dataset. @@ -130,7 +130,7 @@ Each stage's `transform()` method updates the dataset and passes it to the next *DAG `Pipeline`s*: A `Pipeline`'s stages are specified as an ordered array. The examples given here are all for linear `Pipeline`s, i.e., `Pipeline`s in which each stage uses data produced by the previous stage. It is possible to create non-linear `Pipeline`s as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the `Pipeline` forms a DAG, then the stages must be specified in topological order. -*Runtime checking*: Since `Pipeline`s can operate on datasets with varied types, they cannot use compile-time type checking. `Pipeline`s and `PipelineModel`s instead do runtime checking before actually running the `Pipeline`. This type checking is done using the dataset *schema*, a description of the data types of columns in the `SchemaRDD`. +*Runtime checking*: Since `Pipeline`s can operate on datasets with varied types, they cannot use compile-time type checking. `Pipeline`s and `PipelineModel`s instead do runtime checking before actually running the `Pipeline`. This type checking is done using the dataset *schema*, a description of the data types of columns in the `DataFrame`. ## Parameters @@ -171,12 +171,12 @@ import org.apache.spark.sql.{Row, SQLContext} val conf = new SparkConf().setAppName("SimpleParamsExample") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) -import sqlContext._ +import sqlContext.implicits._ // Prepare training data. // We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes -// into SchemaRDDs, where it uses the case class metadata to infer the schema. -val training = sparkContext.parallelize(Seq( +// into DataFrames, where it uses the case class metadata to infer the schema. +val training = sc.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), @@ -192,7 +192,7 @@ lr.setMaxIter(10) .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. -val model1 = lr.fit(training) +val model1 = lr.fit(training.toDF) // Since model1 is a Model (i.e., a Transformer produced by an Estimator), // we can view the parameters it used during fit(). // This prints the parameter (name: value) pairs, where names are unique IDs for this @@ -203,33 +203,35 @@ println("Model 1 was fit using parameters: " + model1.fittingParamMap) // which supports several methods for specifying parameters. val paramMap = ParamMap(lr.maxIter -> 20) paramMap.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. -paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.5) // Specify multiple Params. +paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. // One can also combine ParamMaps. -val paramMap2 = ParamMap(lr.scoreCol -> "probability") // Changes output column name. +val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name val paramMapCombined = paramMap ++ paramMap2 // Now learn a new model using the paramMapCombined parameters. // paramMapCombined overrides all parameters set earlier via lr.set* methods. -val model2 = lr.fit(training, paramMapCombined) +val model2 = lr.fit(training.toDF, paramMapCombined) println("Model 2 was fit using parameters: " + model2.fittingParamMap) -// Prepare test documents. -val test = sparkContext.parallelize(Seq( +// Prepare test data. +val test = sc.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)))) -// Make predictions on test documents using the Transformer.transform() method. +// Make predictions on test data using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. -// Note that model2.transform() outputs a 'probability' column instead of the usual 'score' -// column since we renamed the lr.scoreCol parameter previously. -model2.transform(test) - .select('features, 'label, 'probability, 'prediction) +// Note that model2.transform() outputs a 'myProbability' column instead of the usual +// 'probability' column since we renamed the lr.probabilityCol parameter previously. +model2.transform(test.toDF) + .select("features", "label", "myProbability", "prediction") .collect() - .foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) => - println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction) + .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => + println("($features, $label) -> prob=$prob, prediction=$prediction") } + +sc.stop() {% endhighlight %} </div> @@ -244,23 +246,23 @@ import org.apache.spark.ml.param.ParamMap; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.Row; SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample"); JavaSparkContext jsc = new JavaSparkContext(conf); -JavaSQLContext jsql = new JavaSQLContext(jsc); +SQLContext jsql = new SQLContext(jsc); // Prepare training data. -// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes -// into SchemaRDDs, where it uses the case class metadata to infer the schema. +// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans +// into DataFrames, where it uses the bean metadata to infer the schema. List<LabeledPoint> localTraining = Lists.newArrayList( new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))); -JavaSchemaRDD training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class); +DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class); // Create a LogisticRegression instance. This instance is an Estimator. LogisticRegression lr = new LogisticRegression(); @@ -281,13 +283,13 @@ System.out.println("Model 1 was fit using parameters: " + model1.fittingParamMap // We may alternatively specify parameters using a ParamMap. ParamMap paramMap = new ParamMap(); -paramMap.put(lr.maxIter(), 20); // Specify 1 Param. +paramMap.put(lr.maxIter().w(20)); // Specify 1 Param. paramMap.put(lr.maxIter(), 30); // This overwrites the original maxIter. -paramMap.put(lr.regParam(), 0.1); +paramMap.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params. // One can also combine ParamMaps. ParamMap paramMap2 = new ParamMap(); -paramMap2.put(lr.scoreCol(), "probability"); // Changes output column name. +paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2); // Now learn a new model using the paramMapCombined parameters. @@ -300,19 +302,19 @@ List<LabeledPoint> localTest = Lists.newArrayList( new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))); -JavaSchemaRDD test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class); +DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class); // Make predictions on test documents using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. -// Note that model2.transform() outputs a 'probability' column instead of the usual 'score' -// column since we renamed the lr.scoreCol parameter previously. -model2.transform(test).registerAsTable("results"); -JavaSchemaRDD results = - jsql.sql("SELECT features, label, probability, prediction FROM results"); -for (Row r: results.collect()) { +// Note that model2.transform() outputs a 'myProbability' column instead of the usual +// 'probability' column since we renamed the lr.probabilityCol parameter previously. +DataFrame results = model2.transform(test); +for (Row r: results.select("features", "label", "myProbability", "prediction").collect()) { System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2) + ", prediction=" + r.get(3)); } + +jsc.stop(); {% endhighlight %} </div> @@ -330,6 +332,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.{Row, SQLContext} // Labeled and unlabeled instance types. @@ -337,14 +340,14 @@ import org.apache.spark.sql.{Row, SQLContext} case class LabeledDocument(id: Long, text: String, label: Double) case class Document(id: Long, text: String) -// Set up contexts. Import implicit conversions to SchemaRDD from sqlContext. +// Set up contexts. Import implicit conversions to DataFrame from sqlContext. val conf = new SparkConf().setAppName("SimpleTextClassificationPipeline") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) -import sqlContext._ +import sqlContext.implicits._ // Prepare training documents, which are labeled. -val training = sparkContext.parallelize(Seq( +val training = sc.parallelize(Seq( LabeledDocument(0L, "a b c d e spark", 1.0), LabeledDocument(1L, "b d", 0.0), LabeledDocument(2L, "spark f g h", 1.0), @@ -365,30 +368,32 @@ val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. -val model = pipeline.fit(training) +val model = pipeline.fit(training.toDF) // Prepare test documents, which are unlabeled. -val test = sparkContext.parallelize(Seq( +val test = sc.parallelize(Seq( Document(4L, "spark i j k"), Document(5L, "l m n"), Document(6L, "mapreduce spark"), Document(7L, "apache hadoop"))) // Make predictions on test documents. -model.transform(test) - .select('id, 'text, 'score, 'prediction) +model.transform(test.toDF) + .select("id", "text", "probability", "prediction") .collect() - .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) => - println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction) + .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => + println("($id, $text) --> prob=$prob, prediction=$prediction") } + +sc.stop() {% endhighlight %} </div> <div data-lang="java"> {% highlight java %} -import java.io.Serializable; import java.util.List; import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; @@ -396,10 +401,9 @@ import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.feature.HashingTF; import org.apache.spark.ml.feature.Tokenizer; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; -import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; // Labeled and unlabeled instance types. // Spark SQL can infer schema from Java Beans. @@ -434,7 +438,7 @@ public class LabeledDocument extends Document implements Serializable { // Set up contexts. SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline"); JavaSparkContext jsc = new JavaSparkContext(conf); -JavaSQLContext jsql = new JavaSQLContext(jsc); +SQLContext jsql = new SQLContext(jsc); // Prepare training documents, which are labeled. List<LabeledDocument> localTraining = Lists.newArrayList( @@ -442,8 +446,7 @@ List<LabeledDocument> localTraining = Lists.newArrayList( new LabeledDocument(1L, "b d", 0.0), new LabeledDocument(2L, "spark f g h", 1.0), new LabeledDocument(3L, "hadoop mapreduce", 0.0)); -JavaSchemaRDD training = - jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class); +DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer() @@ -468,16 +471,62 @@ List<Document> localTest = Lists.newArrayList( new Document(5L, "l m n"), new Document(6L, "mapreduce spark"), new Document(7L, "apache hadoop")); -JavaSchemaRDD test = - jsql.createDataFrame(jsc.parallelize(localTest), Document.class); +DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class); // Make predictions on test documents. -model.transform(test).registerAsTable("prediction"); -JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); -for (Row r: predictions.collect()) { - System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2) +DataFrame predictions = model.transform(test); +for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) { + System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2) + ", prediction=" + r.get(3)); } + +jsc.stop(); +{% endhighlight %} +</div> + +<div data-lang="python"> +{% highlight python %} +from pyspark import SparkContext +from pyspark.ml import Pipeline +from pyspark.ml.classification import LogisticRegression +from pyspark.ml.feature import HashingTF, Tokenizer +from pyspark.sql import Row, SQLContext + +sc = SparkContext(appName="SimpleTextClassificationPipeline") +sqlCtx = SQLContext(sc) + +# Prepare training documents, which are labeled. +LabeledDocument = Row("id", "text", "label") +training = sc.parallelize([(0L, "a b c d e spark", 1.0), + (1L, "b d", 0.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 0.0)]) \ + .map(lambda x: LabeledDocument(*x)).toDF() + +# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. +tokenizer = Tokenizer(inputCol="text", outputCol="words") +hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") +lr = LogisticRegression(maxIter=10, regParam=0.01) +pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) + +# Fit the pipeline to training documents. +model = pipeline.fit(training) + +# Prepare test documents, which are unlabeled. +Document = Row("id", "text") +test = sc.parallelize([(4L, "spark i j k"), + (5L, "l m n"), + (6L, "mapreduce spark"), + (7L, "apache hadoop")]) \ + .map(lambda x: Document(*x)).toDF() + +# Make predictions on test documents and print columns of interest. +prediction = model.transform(test) +selected = prediction.select("id", "text", "prediction") +for row in selected.collect(): + print row + +sc.stop() {% endhighlight %} </div> @@ -508,21 +557,21 @@ However, it is also a well-established method for choosing parameters which is m <div data-lang="scala"> {% highlight scala %} import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.{Row, SQLContext} val conf = new SparkConf().setAppName("CrossValidatorExample") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) -import sqlContext._ +import sqlContext.implicits._ // Prepare training documents, which are labeled. -val training = sparkContext.parallelize(Seq( +val training = sc.parallelize(Seq( LabeledDocument(0L, "a b c d e spark", 1.0), LabeledDocument(1L, "b d", 0.0), LabeledDocument(2L, "spark f g h", 1.0), @@ -565,24 +614,24 @@ crossval.setEstimatorParamMaps(paramGrid) crossval.setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. -val cvModel = crossval.fit(training) -// Get the best LogisticRegression model (with the best set of parameters from paramGrid). -val lrModel = cvModel.bestModel +val cvModel = crossval.fit(training.toDF) // Prepare test documents, which are unlabeled. -val test = sparkContext.parallelize(Seq( +val test = sc.parallelize(Seq( Document(4L, "spark i j k"), Document(5L, "l m n"), Document(6L, "mapreduce spark"), Document(7L, "apache hadoop"))) // Make predictions on test documents. cvModel uses the best model found (lrModel). -cvModel.transform(test) - .select('id, 'text, 'score, 'prediction) +cvModel.transform(test.toDF) + .select("id", "text", "probability", "prediction") .collect() - .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) => - println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction) + .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => + println(s"($id, $text) --> prob=$prob, prediction=$prediction") } + +sc.stop() {% endhighlight %} </div> @@ -592,7 +641,6 @@ import java.util.List; import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ml.Model; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.classification.LogisticRegression; @@ -603,13 +651,13 @@ import org.apache.spark.ml.param.ParamMap; import org.apache.spark.ml.tuning.CrossValidator; import org.apache.spark.ml.tuning.CrossValidatorModel; import org.apache.spark.ml.tuning.ParamGridBuilder; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample"); JavaSparkContext jsc = new JavaSparkContext(conf); -JavaSQLContext jsql = new JavaSQLContext(jsc); +SQLContext jsql = new SQLContext(jsc); // Prepare training documents, which are labeled. List<LabeledDocument> localTraining = Lists.newArrayList( @@ -625,8 +673,7 @@ List<LabeledDocument> localTraining = Lists.newArrayList( new LabeledDocument(9L, "a e c l", 0.0), new LabeledDocument(10L, "spark compile", 1.0), new LabeledDocument(11L, "hadoop software", 0.0)); -JavaSchemaRDD training = - jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class); +DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer() @@ -660,8 +707,6 @@ crossval.setNumFolds(2); // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. CrossValidatorModel cvModel = crossval.fit(training); -// Get the best LogisticRegression model (with the best set of parameters from paramGrid). -Model lrModel = cvModel.bestModel(); // Prepare test documents, which are unlabeled. List<Document> localTest = Lists.newArrayList( @@ -669,15 +714,16 @@ List<Document> localTest = Lists.newArrayList( new Document(5L, "l m n"), new Document(6L, "mapreduce spark"), new Document(7L, "apache hadoop")); -JavaSchemaRDD test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class); +DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class); // Make predictions on test documents. cvModel uses the best model found (lrModel). -cvModel.transform(test).registerAsTable("prediction"); -JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); -for (Row r: predictions.collect()) { - System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2) +DataFrame predictions = cvModel.transform(test); +for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) { + System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2) + ", prediction=" + r.get(3)); } + +jsc.stop(); {% endhighlight %} </div> @@ -686,6 +732,21 @@ for (Row r: predictions.collect()) { # Dependencies Spark ML currently depends on MLlib and has the same dependencies. -Please see the [MLlib Dependencies guide](mllib-guide.html#Dependencies) for more info. +Please see the [MLlib Dependencies guide](mllib-guide.html#dependencies) for more info. Spark ML also depends upon Spark SQL, but the relevant parts of Spark SQL do not bring additional dependencies. + +# Migration Guide + +## From 1.2 to 1.3 + +The main API changes are from Spark SQL. We list the most important changes here: + +* The old [SchemaRDD](http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.sql.SchemaRDD) has been replaced with [DataFrame](api/scala/index.html#org.apache.spark.sql.DataFrame) with a somewhat modified API. All algorithms in Spark ML which used to use SchemaRDD now use DataFrame. +* In Spark 1.2, we used implicit conversions from `RDD`s of `LabeledPoint` into `SchemaRDD`s by calling `import sqlContext._` where `sqlContext` was an instance of `SQLContext`. These implicits have been moved, so we now call `import sqlContext.implicits._`. +* Java APIs for SQL have also changed accordingly. Please see the examples above and the [Spark SQL Programming Guide](sql-programming-guide.html) for details. + +Other changes were in `LogisticRegression`: + +* The `scoreCol` output column (with default value "score") was renamed to be `probabilityCol` (with default value "probability"). The type was originally `Double` (for the probability of class 1.0), but it is now `Vector` (for the probability of each class, to support multiclass classification in the future). +* In Spark 1.2, `LogisticRegressionModel` did not include an intercept. In Spark 1.3, it includes an intercept; however, it will always be 0.0 since it uses the default settings for [spark.mllib.LogisticRegressionWithLBFGS](api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS). The option to use an intercept will be added in the future. |