aboutsummaryrefslogtreecommitdiff
path: root/docs/ml-guide.md
diff options
context:
space:
mode:
authorJoseph K. Bradley <joseph@databricks.com>2015-02-20 02:31:32 -0800
committerXiangrui Meng <meng@databricks.com>2015-02-20 02:31:32 -0800
commit4a17eedb16343413e5b6f8bb58c6da8952ee7ab6 (patch)
treeac17fd9eed4f42ba3095b148c68d4e78b6afc875 /docs/ml-guide.md
parentd3dfebebce9f76e4433e16d4d6d29fb8fa4d4193 (diff)
downloadspark-4a17eedb16343413e5b6f8bb58c6da8952ee7ab6.tar.gz
spark-4a17eedb16343413e5b6f8bb58c6da8952ee7ab6.tar.bz2
spark-4a17eedb16343413e5b6f8bb58c6da8952ee7ab6.zip
[SPARK-5867] [SPARK-5892] [doc] [ml] [mllib] Doc cleanups for 1.3 release
For SPARK-5867: * The spark.ml programming guide needs to be updated to use the new SQL DataFrame API instead of the old SchemaRDD API. * It should also include Python examples now. For SPARK-5892: * Fix Python docs * Various other cleanups BTW, I accidentally merged this with master. If you want to compile it on your own, use this branch which is based on spark/branch-1.3 and cherry-picks the commits from this PR: [https://github.com/jkbradley/spark/tree/doc-review-1.3-check] CC: mengxr (ML), davies (Python docs) Author: Joseph K. Bradley <joseph@databricks.com> Closes #4675 from jkbradley/doc-review-1.3 and squashes the following commits: f191bb0 [Joseph K. Bradley] small cleanups e786efa [Joseph K. Bradley] small doc corrections 6b1ab4a [Joseph K. Bradley] fixed python lint test 946affa [Joseph K. Bradley] Added sample data for ml.MovieLensALS example. Changed spark.ml Java examples to use DataFrames API instead of sql() da81558 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into doc-review-1.3 629dbf5 [Joseph K. Bradley] Updated based on code review: * made new page for old migration guides * small fixes * moved inherit_doc in python b9df7c4 [Joseph K. Bradley] Small cleanups: toDF to toDF(), adding s for string interpolation 34b067f [Joseph K. Bradley] small doc correction da16aef [Joseph K. Bradley] Fixed python mllib docs 8cce91c [Joseph K. Bradley] GMM: removed old imports, added some doc 695f3f6 [Joseph K. Bradley] partly done trying to fix inherit_doc for class hierarchies in python docs a72c018 [Joseph K. Bradley] made ChiSqTestResult appear in python docs b05a80d [Joseph K. Bradley] organize imports. doc cleanups e572827 [Joseph K. Bradley] updated programming guide for ml and mllib
Diffstat (limited to 'docs/ml-guide.md')
-rw-r--r--docs/ml-guide.md249
1 files changed, 155 insertions, 94 deletions
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 4bf14fba34..da6aef7f14 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -23,13 +23,13 @@ to `spark.ml`.
Spark ML standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Spark ML API.
-* **[ML Dataset](ml-guide.html#ml-dataset)**: Spark ML uses the [`SchemaRDD`](api/scala/index.html#org.apache.spark.sql.SchemaRDD) from Spark SQL as a dataset which can hold a variety of data types.
+* **[ML Dataset](ml-guide.html#ml-dataset)**: Spark ML uses the [`DataFrame`](api/scala/index.html#org.apache.spark.sql.DataFrame) from Spark SQL as a dataset which can hold a variety of data types.
E.g., a dataset could have different columns storing text, feature vectors, true labels, and predictions.
-* **[`Transformer`](ml-guide.html#transformers)**: A `Transformer` is an algorithm which can transform one `SchemaRDD` into another `SchemaRDD`.
+* **[`Transformer`](ml-guide.html#transformers)**: A `Transformer` is an algorithm which can transform one `DataFrame` into another `DataFrame`.
E.g., an ML model is a `Transformer` which transforms an RDD with features into an RDD with predictions.
-* **[`Estimator`](ml-guide.html#estimators)**: An `Estimator` is an algorithm which can be fit on a `SchemaRDD` to produce a `Transformer`.
+* **[`Estimator`](ml-guide.html#estimators)**: An `Estimator` is an algorithm which can be fit on a `DataFrame` to produce a `Transformer`.
E.g., a learning algorithm is an `Estimator` which trains on a dataset and produces a model.
* **[`Pipeline`](ml-guide.html#pipeline)**: A `Pipeline` chains multiple `Transformer`s and `Estimator`s together to specify an ML workflow.
@@ -39,20 +39,20 @@ E.g., a learning algorithm is an `Estimator` which trains on a dataset and produ
## ML Dataset
Machine learning can be applied to a wide variety of data types, such as vectors, text, images, and structured data.
-Spark ML adopts the [`SchemaRDD`](api/scala/index.html#org.apache.spark.sql.SchemaRDD) from Spark SQL in order to support a variety of data types under a unified Dataset concept.
+Spark ML adopts the [`DataFrame`](api/scala/index.html#org.apache.spark.sql.DataFrame) from Spark SQL in order to support a variety of data types under a unified Dataset concept.
-`SchemaRDD` supports many basic and structured types; see the [Spark SQL datatype reference](sql-programming-guide.html#spark-sql-datatype-reference) for a list of supported types.
-In addition to the types listed in the Spark SQL guide, `SchemaRDD` can use ML [`Vector`](api/scala/index.html#org.apache.spark.mllib.linalg.Vector) types.
+`DataFrame` supports many basic and structured types; see the [Spark SQL datatype reference](sql-programming-guide.html#spark-sql-datatype-reference) for a list of supported types.
+In addition to the types listed in the Spark SQL guide, `DataFrame` can use ML [`Vector`](api/scala/index.html#org.apache.spark.mllib.linalg.Vector) types.
-A `SchemaRDD` can be created either implicitly or explicitly from a regular `RDD`. See the code examples below and the [Spark SQL programming guide](sql-programming-guide.html) for examples.
+A `DataFrame` can be created either implicitly or explicitly from a regular `RDD`. See the code examples below and the [Spark SQL programming guide](sql-programming-guide.html) for examples.
-Columns in a `SchemaRDD` are named. The code examples below use names such as "text," "features," and "label."
+Columns in a `DataFrame` are named. The code examples below use names such as "text," "features," and "label."
## ML Algorithms
### Transformers
-A [`Transformer`](api/scala/index.html#org.apache.spark.ml.Transformer) is an abstraction which includes feature transformers and learned models. Technically, a `Transformer` implements a method `transform()` which converts one `SchemaRDD` into another, generally by appending one or more columns.
+A [`Transformer`](api/scala/index.html#org.apache.spark.ml.Transformer) is an abstraction which includes feature transformers and learned models. Technically, a `Transformer` implements a method `transform()` which converts one `DataFrame` into another, generally by appending one or more columns.
For example:
* A feature transformer might take a dataset, read a column (e.g., text), convert it into a new column (e.g., feature vectors), append the new column to the dataset, and output the updated dataset.
@@ -60,7 +60,7 @@ For example:
### Estimators
-An [`Estimator`](api/scala/index.html#org.apache.spark.ml.Estimator) abstracts the concept of a learning algorithm or any algorithm which fits or trains on data. Technically, an `Estimator` implements a method `fit()` which accepts a `SchemaRDD` and produces a `Transformer`.
+An [`Estimator`](api/scala/index.html#org.apache.spark.ml.Estimator) abstracts the concept of a learning algorithm or any algorithm which fits or trains on data. Technically, an `Estimator` implements a method `fit()` which accepts a `DataFrame` and produces a `Transformer`.
For example, a learning algorithm such as `LogisticRegression` is an `Estimator`, and calling `fit()` trains a `LogisticRegressionModel`, which is a `Transformer`.
### Properties of ML Algorithms
@@ -101,7 +101,7 @@ We illustrate this for the simple text document workflow. The figure below is f
Above, the top row represents a `Pipeline` with three stages.
The first two (`Tokenizer` and `HashingTF`) are `Transformer`s (blue), and the third (`LogisticRegression`) is an `Estimator` (red).
-The bottom row represents data flowing through the pipeline, where cylinders indicate `SchemaRDD`s.
+The bottom row represents data flowing through the pipeline, where cylinders indicate `DataFrame`s.
The `Pipeline.fit()` method is called on the original dataset which has raw text documents and labels.
The `Tokenizer.transform()` method splits the raw text documents into words, adding a new column with words into the dataset.
The `HashingTF.transform()` method converts the words column into feature vectors, adding a new column with those vectors to the dataset.
@@ -130,7 +130,7 @@ Each stage's `transform()` method updates the dataset and passes it to the next
*DAG `Pipeline`s*: A `Pipeline`'s stages are specified as an ordered array. The examples given here are all for linear `Pipeline`s, i.e., `Pipeline`s in which each stage uses data produced by the previous stage. It is possible to create non-linear `Pipeline`s as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the `Pipeline` forms a DAG, then the stages must be specified in topological order.
-*Runtime checking*: Since `Pipeline`s can operate on datasets with varied types, they cannot use compile-time type checking. `Pipeline`s and `PipelineModel`s instead do runtime checking before actually running the `Pipeline`. This type checking is done using the dataset *schema*, a description of the data types of columns in the `SchemaRDD`.
+*Runtime checking*: Since `Pipeline`s can operate on datasets with varied types, they cannot use compile-time type checking. `Pipeline`s and `PipelineModel`s instead do runtime checking before actually running the `Pipeline`. This type checking is done using the dataset *schema*, a description of the data types of columns in the `DataFrame`.
## Parameters
@@ -171,12 +171,12 @@ import org.apache.spark.sql.{Row, SQLContext}
val conf = new SparkConf().setAppName("SimpleParamsExample")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
-import sqlContext._
+import sqlContext.implicits._
// Prepare training data.
// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes
-// into SchemaRDDs, where it uses the case class metadata to infer the schema.
-val training = sparkContext.parallelize(Seq(
+// into DataFrames, where it uses the case class metadata to infer the schema.
+val training = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
@@ -192,7 +192,7 @@ lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
-val model1 = lr.fit(training)
+val model1 = lr.fit(training.toDF)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
@@ -203,33 +203,35 @@ println("Model 1 was fit using parameters: " + model1.fittingParamMap)
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
paramMap.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
-paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.5) // Specify multiple Params.
+paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
-val paramMap2 = ParamMap(lr.scoreCol -> "probability") // Changes output column name.
+val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
-val model2 = lr.fit(training, paramMapCombined)
+val model2 = lr.fit(training.toDF, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.fittingParamMap)
-// Prepare test documents.
-val test = sparkContext.parallelize(Seq(
+// Prepare test data.
+val test = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
-// Make predictions on test documents using the Transformer.transform() method.
+// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
-// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
-// column since we renamed the lr.scoreCol parameter previously.
-model2.transform(test)
- .select('features, 'label, 'probability, 'prediction)
+// Note that model2.transform() outputs a 'myProbability' column instead of the usual
+// 'probability' column since we renamed the lr.probabilityCol parameter previously.
+model2.transform(test.toDF)
+ .select("features", "label", "myProbability", "prediction")
.collect()
- .foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
- println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction)
+ .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
+ println("($features, $label) -> prob=$prob, prediction=$prediction")
}
+
+sc.stop()
{% endhighlight %}
</div>
@@ -244,23 +246,23 @@ import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Row;
SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
-JavaSQLContext jsql = new JavaSQLContext(jsc);
+SQLContext jsql = new SQLContext(jsc);
// Prepare training data.
-// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes
-// into SchemaRDDs, where it uses the case class metadata to infer the schema.
+// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
+// into DataFrames, where it uses the bean metadata to infer the schema.
List<LabeledPoint> localTraining = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
-JavaSchemaRDD training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);
+DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
@@ -281,13 +283,13 @@ System.out.println("Model 1 was fit using parameters: " + model1.fittingParamMap
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap();
-paramMap.put(lr.maxIter(), 20); // Specify 1 Param.
+paramMap.put(lr.maxIter().w(20)); // Specify 1 Param.
paramMap.put(lr.maxIter(), 30); // This overwrites the original maxIter.
-paramMap.put(lr.regParam(), 0.1);
+paramMap.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap();
-paramMap2.put(lr.scoreCol(), "probability"); // Changes output column name.
+paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
@@ -300,19 +302,19 @@ List<LabeledPoint> localTest = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
-JavaSchemaRDD test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);
+DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
-// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
-// column since we renamed the lr.scoreCol parameter previously.
-model2.transform(test).registerAsTable("results");
-JavaSchemaRDD results =
- jsql.sql("SELECT features, label, probability, prediction FROM results");
-for (Row r: results.collect()) {
+// Note that model2.transform() outputs a 'myProbability' column instead of the usual
+// 'probability' column since we renamed the lr.probabilityCol parameter previously.
+DataFrame results = model2.transform(test);
+for (Row r: results.select("features", "label", "myProbability", "prediction").collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
+
+jsc.stop();
{% endhighlight %}
</div>
@@ -330,6 +332,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{Row, SQLContext}
// Labeled and unlabeled instance types.
@@ -337,14 +340,14 @@ import org.apache.spark.sql.{Row, SQLContext}
case class LabeledDocument(id: Long, text: String, label: Double)
case class Document(id: Long, text: String)
-// Set up contexts. Import implicit conversions to SchemaRDD from sqlContext.
+// Set up contexts. Import implicit conversions to DataFrame from sqlContext.
val conf = new SparkConf().setAppName("SimpleTextClassificationPipeline")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
-import sqlContext._
+import sqlContext.implicits._
// Prepare training documents, which are labeled.
-val training = sparkContext.parallelize(Seq(
+val training = sc.parallelize(Seq(
LabeledDocument(0L, "a b c d e spark", 1.0),
LabeledDocument(1L, "b d", 0.0),
LabeledDocument(2L, "spark f g h", 1.0),
@@ -365,30 +368,32 @@ val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
-val model = pipeline.fit(training)
+val model = pipeline.fit(training.toDF)
// Prepare test documents, which are unlabeled.
-val test = sparkContext.parallelize(Seq(
+val test = sc.parallelize(Seq(
Document(4L, "spark i j k"),
Document(5L, "l m n"),
Document(6L, "mapreduce spark"),
Document(7L, "apache hadoop")))
// Make predictions on test documents.
-model.transform(test)
- .select('id, 'text, 'score, 'prediction)
+model.transform(test.toDF)
+ .select("id", "text", "probability", "prediction")
.collect()
- .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
- println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
+ .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
+ println("($id, $text) --> prob=$prob, prediction=$prediction")
}
+
+sc.stop()
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
-import java.io.Serializable;
import java.util.List;
import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
@@ -396,10 +401,9 @@ import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
-import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
@@ -434,7 +438,7 @@ public class LabeledDocument extends Document implements Serializable {
// Set up contexts.
SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline");
JavaSparkContext jsc = new JavaSparkContext(conf);
-JavaSQLContext jsql = new JavaSQLContext(jsc);
+SQLContext jsql = new SQLContext(jsc);
// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
@@ -442,8 +446,7 @@ List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
-JavaSchemaRDD training =
- jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
+DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
@@ -468,16 +471,62 @@ List<Document> localTest = Lists.newArrayList(
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
-JavaSchemaRDD test =
- jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
+DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
// Make predictions on test documents.
-model.transform(test).registerAsTable("prediction");
-JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
-for (Row r: predictions.collect()) {
- System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+DataFrame predictions = model.transform(test);
+for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
+ System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
+
+jsc.stop();
+{% endhighlight %}
+</div>
+
+<div data-lang="python">
+{% highlight python %}
+from pyspark import SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.classification import LogisticRegression
+from pyspark.ml.feature import HashingTF, Tokenizer
+from pyspark.sql import Row, SQLContext
+
+sc = SparkContext(appName="SimpleTextClassificationPipeline")
+sqlCtx = SQLContext(sc)
+
+# Prepare training documents, which are labeled.
+LabeledDocument = Row("id", "text", "label")
+training = sc.parallelize([(0L, "a b c d e spark", 1.0),
+ (1L, "b d", 0.0),
+ (2L, "spark f g h", 1.0),
+ (3L, "hadoop mapreduce", 0.0)]) \
+ .map(lambda x: LabeledDocument(*x)).toDF()
+
+# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
+tokenizer = Tokenizer(inputCol="text", outputCol="words")
+hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
+lr = LogisticRegression(maxIter=10, regParam=0.01)
+pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
+
+# Fit the pipeline to training documents.
+model = pipeline.fit(training)
+
+# Prepare test documents, which are unlabeled.
+Document = Row("id", "text")
+test = sc.parallelize([(4L, "spark i j k"),
+ (5L, "l m n"),
+ (6L, "mapreduce spark"),
+ (7L, "apache hadoop")]) \
+ .map(lambda x: Document(*x)).toDF()
+
+# Make predictions on test documents and print columns of interest.
+prediction = model.transform(test)
+selected = prediction.select("id", "text", "prediction")
+for row in selected.collect():
+ print row
+
+sc.stop()
{% endhighlight %}
</div>
@@ -508,21 +557,21 @@ However, it is also a well-established method for choosing parameters which is m
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{Row, SQLContext}
val conf = new SparkConf().setAppName("CrossValidatorExample")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
-import sqlContext._
+import sqlContext.implicits._
// Prepare training documents, which are labeled.
-val training = sparkContext.parallelize(Seq(
+val training = sc.parallelize(Seq(
LabeledDocument(0L, "a b c d e spark", 1.0),
LabeledDocument(1L, "b d", 0.0),
LabeledDocument(2L, "spark f g h", 1.0),
@@ -565,24 +614,24 @@ crossval.setEstimatorParamMaps(paramGrid)
crossval.setNumFolds(2) // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
-val cvModel = crossval.fit(training)
-// Get the best LogisticRegression model (with the best set of parameters from paramGrid).
-val lrModel = cvModel.bestModel
+val cvModel = crossval.fit(training.toDF)
// Prepare test documents, which are unlabeled.
-val test = sparkContext.parallelize(Seq(
+val test = sc.parallelize(Seq(
Document(4L, "spark i j k"),
Document(5L, "l m n"),
Document(6L, "mapreduce spark"),
Document(7L, "apache hadoop")))
// Make predictions on test documents. cvModel uses the best model found (lrModel).
-cvModel.transform(test)
- .select('id, 'text, 'score, 'prediction)
+cvModel.transform(test.toDF)
+ .select("id", "text", "probability", "prediction")
.collect()
- .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
- println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
+ .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
+ println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
+
+sc.stop()
{% endhighlight %}
</div>
@@ -592,7 +641,6 @@ import java.util.List;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.ml.Model;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
@@ -603,13 +651,13 @@ import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
-import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.sql.api.java.JavaSchemaRDD;
-import org.apache.spark.sql.api.java.Row;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
-JavaSQLContext jsql = new JavaSQLContext(jsc);
+SQLContext jsql = new SQLContext(jsc);
// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
@@ -625,8 +673,7 @@ List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
-JavaSchemaRDD training =
- jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
+DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
@@ -660,8 +707,6 @@ crossval.setNumFolds(2); // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
CrossValidatorModel cvModel = crossval.fit(training);
-// Get the best LogisticRegression model (with the best set of parameters from paramGrid).
-Model lrModel = cvModel.bestModel();
// Prepare test documents, which are unlabeled.
List<Document> localTest = Lists.newArrayList(
@@ -669,15 +714,16 @@ List<Document> localTest = Lists.newArrayList(
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
-JavaSchemaRDD test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
+DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
// Make predictions on test documents. cvModel uses the best model found (lrModel).
-cvModel.transform(test).registerAsTable("prediction");
-JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
-for (Row r: predictions.collect()) {
- System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+DataFrame predictions = cvModel.transform(test);
+for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
+ System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
+
+jsc.stop();
{% endhighlight %}
</div>
@@ -686,6 +732,21 @@ for (Row r: predictions.collect()) {
# Dependencies
Spark ML currently depends on MLlib and has the same dependencies.
-Please see the [MLlib Dependencies guide](mllib-guide.html#Dependencies) for more info.
+Please see the [MLlib Dependencies guide](mllib-guide.html#dependencies) for more info.
Spark ML also depends upon Spark SQL, but the relevant parts of Spark SQL do not bring additional dependencies.
+
+# Migration Guide
+
+## From 1.2 to 1.3
+
+The main API changes are from Spark SQL. We list the most important changes here:
+
+* The old [SchemaRDD](http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.sql.SchemaRDD) has been replaced with [DataFrame](api/scala/index.html#org.apache.spark.sql.DataFrame) with a somewhat modified API. All algorithms in Spark ML which used to use SchemaRDD now use DataFrame.
+* In Spark 1.2, we used implicit conversions from `RDD`s of `LabeledPoint` into `SchemaRDD`s by calling `import sqlContext._` where `sqlContext` was an instance of `SQLContext`. These implicits have been moved, so we now call `import sqlContext.implicits._`.
+* Java APIs for SQL have also changed accordingly. Please see the examples above and the [Spark SQL Programming Guide](sql-programming-guide.html) for details.
+
+Other changes were in `LogisticRegression`:
+
+* The `scoreCol` output column (with default value "score") was renamed to be `probabilityCol` (with default value "probability"). The type was originally `Double` (for the probability of class 1.0), but it is now `Vector` (for the probability of each class, to support multiclass classification in the future).
+* In Spark 1.2, `LogisticRegressionModel` did not include an intercept. In Spark 1.3, it includes an intercept; however, it will always be 0.0 since it uses the default settings for [spark.mllib.LogisticRegressionWithLBFGS](api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS). The option to use an intercept will be added in the future.