aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-29 23:57:09 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-29 23:57:09 -0700
commitca69fc8efda8a3e5442ffa16692a2b1eb86b7673 (patch)
tree7afaae97cc20ba1fcd660bf0bffd4f4a10db7662 /docs
parent905fbe498bdd29116468628e6a2a553c1fd57165 (diff)
downloadspark-ca69fc8efda8a3e5442ffa16692a2b1eb86b7673.tar.gz
spark-ca69fc8efda8a3e5442ffa16692a2b1eb86b7673.tar.bz2
spark-ca69fc8efda8a3e5442ffa16692a2b1eb86b7673.zip
[SPARK-10331] [MLLIB] Update example code in ml-guide
* The example code was added in 1.2, before `createDataFrame`. This PR switches to `createDataFrame`. Java code still uses JavaBean. * assume `sqlContext` is available * fix some minor issues from previous code review jkbradley srowen feynmanliang Author: Xiangrui Meng <meng@databricks.com> Closes #8518 from mengxr/SPARK-10331.
Diffstat (limited to 'docs')
-rw-r--r--docs/ml-guide.md362
1 files changed, 147 insertions, 215 deletions
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 4ba07542bf..78c93a95c7 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -212,26 +212,18 @@ This example covers the concepts of `Estimator`, `Transformer`, and `Param`.
<div data-lang="scala">
{% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
-import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.Row
-val conf = new SparkConf().setAppName("SimpleParamsExample")
-val sc = new SparkContext(conf)
-val sqlContext = new SQLContext(sc)
-import sqlContext.implicits._
-
-// Prepare training data.
-// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes
-// into DataFrames, where it uses the case class metadata to infer the schema.
-val training = sc.parallelize(Seq(
- LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
- LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
- LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
- LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
+// Prepare training data from a list of (label, features) tuples.
+val training = sqlContext.createDataFrame(Seq(
+ (1.0, Vectors.dense(0.0, 1.1, 0.1)),
+ (0.0, Vectors.dense(2.0, 1.0, -1.0)),
+ (0.0, Vectors.dense(2.0, 1.3, 1.0)),
+ (1.0, Vectors.dense(0.0, 1.2, -0.5))
+)).toDF("label", "features")
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
@@ -243,7 +235,7 @@ lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
-val model1 = lr.fit(training.toDF)
+val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
@@ -253,8 +245,8 @@ println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
-paramMap.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
-paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
+ .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
+ .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
@@ -262,27 +254,27 @@ val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
-val model2 = lr.fit(training.toDF, paramMapCombined)
+val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
// Prepare test data.
-val test = sc.parallelize(Seq(
- LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
- LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
- LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
+val test = sqlContext.createDataFrame(Seq(
+ (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
+ (0.0, Vectors.dense(3.0, 2.0, -0.1)),
+ (1.0, Vectors.dense(0.0, 2.2, -1.5))
+)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
-model2.transform(test.toDF)
+model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
-sc.stop()
{% endhighlight %}
</div>
@@ -291,30 +283,23 @@ sc.stop()
import java.util.Arrays;
import java.util.List;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;
-SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample");
-JavaSparkContext jsc = new JavaSparkContext(conf);
-SQLContext jsql = new SQLContext(jsc);
-
// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// into DataFrames, where it uses the bean metadata to infer the schema.
-List<LabeledPoint> localTraining = Arrays.asList(
+DataFrame training = sqlContext.createDataFrame(Arrays.asList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
- new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
-DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);
+ new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
+), LabeledPoint.class);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
@@ -334,14 +319,14 @@ LogisticRegressionModel model1 = lr.fit(training);
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
// We may alternatively specify parameters using a ParamMap.
-ParamMap paramMap = new ParamMap();
-paramMap.put(lr.maxIter().w(20)); // Specify 1 Param.
-paramMap.put(lr.maxIter(), 30); // This overwrites the original maxIter.
-paramMap.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
+ParamMap paramMap = new ParamMap()
+ .put(lr.maxIter().w(20)) // Specify 1 Param.
+ .put(lr.maxIter(), 30) // This overwrites the original maxIter.
+ .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
// One can also combine ParamMaps.
-ParamMap paramMap2 = new ParamMap();
-paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name
+ParamMap paramMap2 = new ParamMap()
+ .put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
@@ -350,11 +335,11 @@ LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
// Prepare test documents.
-List<LabeledPoint> localTest = Arrays.asList(
- new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
- new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
- new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
-DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);
+DataFrame test = sqlContext.createDataFrame(Arrays.asList(
+ new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
+ new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
+ new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
+), LabeledPoint.class);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
@@ -366,28 +351,21 @@ for (Row r: results.select("features", "label", "myProbability", "prediction").c
+ ", prediction=" + r.get(3));
}
-jsc.stop();
{% endhighlight %}
</div>
<div data-lang="python">
{% highlight python %}
-from pyspark import SparkContext
-from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
-from pyspark.sql import Row, SQLContext
-sc = SparkContext(appName="SimpleParamsExample")
-sqlContext = SQLContext(sc)
-
-# Prepare training data.
-# We use LabeledPoint.
-# Spark SQL can convert RDDs of LabeledPoints into DataFrames.
-training = sc.parallelize([LabeledPoint(1.0, [0.0, 1.1, 0.1]),
- LabeledPoint(0.0, [2.0, 1.0, -1.0]),
- LabeledPoint(0.0, [2.0, 1.3, 1.0]),
- LabeledPoint(1.0, [0.0, 1.2, -0.5])])
+# Prepare training data from a list of (label, features) tuples.
+training = sqlContext.createDataFrame([
+ (1.0, Vectors.dense([0.0, 1.1, 0.1])),
+ (0.0, Vectors.dense([2.0, 1.0, -1.0])),
+ (0.0, Vectors.dense([2.0, 1.3, 1.0])),
+ (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
@@ -395,7 +373,7 @@ lr = LogisticRegression(maxIter=10, regParam=0.01)
print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
# Learn a LogisticRegression model. This uses the parameters stored in lr.
-model1 = lr.fit(training.toDF())
+model1 = lr.fit(training)
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
@@ -416,25 +394,25 @@ paramMapCombined.update(paramMap2)
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
-model2 = lr.fit(training.toDF(), paramMapCombined)
+model2 = lr.fit(training, paramMapCombined)
print "Model 2 was fit using parameters: "
print model2.extractParamMap()
# Prepare test data
-test = sc.parallelize([LabeledPoint(1.0, [-1.0, 1.5, 1.3]),
- LabeledPoint(0.0, [ 3.0, 2.0, -0.1]),
- LabeledPoint(1.0, [ 0.0, 2.2, -1.5])])
+test = sqlContext.createDataFrame([
+ (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
+ (0.0, Vectors.dense([3.0, 2.0, -0.1])),
+ (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
-prediction = model2.transform(test.toDF())
+prediction = model2.transform(test)
selected = prediction.select("features", "label", "myProbability", "prediction")
for row in selected.collect():
print row
-sc.stop()
{% endhighlight %}
</div>
@@ -448,30 +426,19 @@ This example follows the simple text document `Pipeline` illustrated in the figu
<div data-lang="scala">
{% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.sql.{Row, SQLContext}
-
-// Labeled and unlabeled instance types.
-// Spark SQL can infer schema from case classes.
-case class LabeledDocument(id: Long, text: String, label: Double)
-case class Document(id: Long, text: String)
+import org.apache.spark.sql.Row
-// Set up contexts. Import implicit conversions to DataFrame from sqlContext.
-val conf = new SparkConf().setAppName("SimpleTextClassificationPipeline")
-val sc = new SparkContext(conf)
-val sqlContext = new SQLContext(sc)
-import sqlContext.implicits._
-
-// Prepare training documents, which are labeled.
-val training = sc.parallelize(Seq(
- LabeledDocument(0L, "a b c d e spark", 1.0),
- LabeledDocument(1L, "b d", 0.0),
- LabeledDocument(2L, "spark f g h", 1.0),
- LabeledDocument(3L, "hadoop mapreduce", 0.0)))
+// Prepare training documents from a list of (id, text, label) tuples.
+val training = sqlContext.createDataFrame(Seq(
+ (0L, "a b c d e spark", 1.0),
+ (1L, "b d", 0.0),
+ (2L, "spark f g h", 1.0),
+ (3L, "hadoop mapreduce", 0.0)
+)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
@@ -488,14 +455,15 @@ val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
-val model = pipeline.fit(training.toDF)
+val model = pipeline.fit(training)
-// Prepare test documents, which are unlabeled.
-val test = sc.parallelize(Seq(
- Document(4L, "spark i j k"),
- Document(5L, "l m n"),
- Document(6L, "mapreduce spark"),
- Document(7L, "apache hadoop")))
+// Prepare test documents, which are unlabeled (id, text) tuples.
+val test = sqlContext.createDataFrame(Seq(
+ (4L, "spark i j k"),
+ (5L, "l m n"),
+ (6L, "mapreduce spark"),
+ (7L, "apache hadoop")
+)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test.toDF)
@@ -505,7 +473,6 @@ model.transform(test.toDF)
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
-sc.stop()
{% endhighlight %}
</div>
@@ -514,8 +481,6 @@ sc.stop()
import java.util.Arrays;
import java.util.List;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
@@ -524,7 +489,6 @@ import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
@@ -556,18 +520,13 @@ public class LabeledDocument extends Document implements Serializable {
public void setLabel(double label) { this.label = label; }
}
-// Set up contexts.
-SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline");
-JavaSparkContext jsc = new JavaSparkContext(conf);
-SQLContext jsql = new SQLContext(jsc);
-
// Prepare training documents, which are labeled.
-List<LabeledDocument> localTraining = Arrays.asList(
+DataFrame training = sqlContext.createDataFrame(Arrays.asList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
- new LabeledDocument(3L, "hadoop mapreduce", 0.0));
-DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
+ new LabeledDocument(3L, "hadoop mapreduce", 0.0)
+), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
@@ -587,12 +546,12 @@ Pipeline pipeline = new Pipeline()
PipelineModel model = pipeline.fit(training);
// Prepare test documents, which are unlabeled.
-List<Document> localTest = Arrays.asList(
+DataFrame test = sqlContext.createDataFrame(Arrays.asList(
new Document(4L, "spark i j k"),
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
- new Document(7L, "apache hadoop"));
-DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
+ new Document(7L, "apache hadoop")
+), Document.class);
// Make predictions on test documents.
DataFrame predictions = model.transform(test);
@@ -601,28 +560,23 @@ for (Row r: predictions.select("id", "text", "probability", "prediction").collec
+ ", prediction=" + r.get(3));
}
-jsc.stop();
{% endhighlight %}
</div>
<div data-lang="python">
{% highlight python %}
-from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
-from pyspark.sql import Row, SQLContext
-
-sc = SparkContext(appName="SimpleTextClassificationPipeline")
-sqlContext = SQLContext(sc)
+from pyspark.sql import Row
-# Prepare training documents, which are labeled.
+# Prepare training documents from a list of (id, text, label) tuples.
LabeledDocument = Row("id", "text", "label")
-training = sc.parallelize([(0L, "a b c d e spark", 1.0),
- (1L, "b d", 0.0),
- (2L, "spark f g h", 1.0),
- (3L, "hadoop mapreduce", 0.0)]) \
- .map(lambda x: LabeledDocument(*x)).toDF()
+training = sqlContext.createDataFrame([
+ (0L, "a b c d e spark", 1.0),
+ (1L, "b d", 0.0),
+ (2L, "spark f g h", 1.0),
+ (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
@@ -633,13 +587,12 @@ pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
-# Prepare test documents, which are unlabeled.
-Document = Row("id", "text")
-test = sc.parallelize([(4L, "spark i j k"),
- (5L, "l m n"),
- (6L, "mapreduce spark"),
- (7L, "apache hadoop")]) \
- .map(lambda x: Document(*x)).toDF()
+# Prepare test documents, which are unlabeled (id, text) tuples.
+test = sqlContext.createDataFrame([
+ (4L, "spark i j k"),
+ (5L, "l m n"),
+ (6L, "mapreduce spark"),
+ (7L, "apache hadoop")], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
@@ -647,7 +600,6 @@ selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print(row)
-sc.stop()
{% endhighlight %}
</div>
@@ -664,8 +616,8 @@ Currently, `spark.ml` supports model selection using the [`CrossValidator`](api/
The `Evaluator` can be a [`RegressionEvaluator`](api/scala/index.html#org.apache.spark.ml.RegressionEvaluator)
for regression problems, a [`BinaryClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.BinaryClassificationEvaluator)
-for binary data or a [`MultiClassClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.MultiClassClassificationEvaluator)
-for multiclass problems. The default metric used to choose the best `ParamMap` can be overriden by the setMetric
+for binary data, or a [`MultiClassClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.MultiClassClassificationEvaluator)
+for multiclass problems. The default metric used to choose the best `ParamMap` can be overriden by the `setMetric`
method in each of these evaluators.
The `ParamMap` which produces the best evaluation metric (averaged over the `$k$` folds) is selected as the best model.
@@ -684,39 +636,29 @@ However, it is also a well-established method for choosing parameters which is m
<div data-lang="scala">
{% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.sql.{Row, SQLContext}
-
-// Labeled and unlabeled instance types.
-// Spark SQL can infer schema from case classes.
-case class LabeledDocument(id: Long, text: String, label: Double)
-case class Document(id: Long, text: String)
-
-val conf = new SparkConf().setAppName("CrossValidatorExample")
-val sc = new SparkContext(conf)
-val sqlContext = new SQLContext(sc)
-import sqlContext.implicits._
-
-// Prepare training documents, which are labeled.
-val training = sc.parallelize(Seq(
- LabeledDocument(0L, "a b c d e spark", 1.0),
- LabeledDocument(1L, "b d", 0.0),
- LabeledDocument(2L, "spark f g h", 1.0),
- LabeledDocument(3L, "hadoop mapreduce", 0.0),
- LabeledDocument(4L, "b spark who", 1.0),
- LabeledDocument(5L, "g d a y", 0.0),
- LabeledDocument(6L, "spark fly", 1.0),
- LabeledDocument(7L, "was mapreduce", 0.0),
- LabeledDocument(8L, "e spark program", 1.0),
- LabeledDocument(9L, "a e c l", 0.0),
- LabeledDocument(10L, "spark compile", 1.0),
- LabeledDocument(11L, "hadoop software", 0.0)))
+import org.apache.spark.sql.Row
+
+// Prepare training data from a list of (id, text, label) tuples.
+val training = sqlContext.createDataFrame(Seq(
+ (0L, "a b c d e spark", 1.0),
+ (1L, "b d", 0.0),
+ (2L, "spark f g h", 1.0),
+ (3L, "hadoop mapreduce", 0.0),
+ (4L, "b spark who", 1.0),
+ (5L, "g d a y", 0.0),
+ (6L, "spark fly", 1.0),
+ (7L, "was mapreduce", 0.0),
+ (8L, "e spark program", 1.0),
+ (9L, "a e c l", 0.0),
+ (10L, "spark compile", 1.0),
+ (11L, "hadoop software", 0.0)
+)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
@@ -730,15 +672,6 @@ val lr = new LogisticRegression()
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
-// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
-// This will allow us to jointly choose parameters for all Pipeline stages.
-// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
-// Note that the evaluator here is a BinaryClassificationEvaluator and the default metric
-// used is areaUnderROC.
-val crossval = new CrossValidator()
- .setEstimator(pipeline)
- .setEvaluator(new BinaryClassificationEvaluator)
-
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
@@ -746,28 +679,37 @@ val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
-crossval.setEstimatorParamMaps(paramGrid)
-crossval.setNumFolds(2) // Use 3+ in practice
+
+// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
+// This will allow us to jointly choose parameters for all Pipeline stages.
+// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
+// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
+// is areaUnderROC.
+val cv = new CrossValidator()
+ .setEstimator(pipeline)
+ .setEvaluator(new BinaryClassificationEvaluator)
+ .setEstimatorParamMaps(paramGrid)
+ .setNumFolds(2) // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
-val cvModel = crossval.fit(training.toDF)
+val cvModel = cv.fit(training)
-// Prepare test documents, which are unlabeled.
-val test = sc.parallelize(Seq(
- Document(4L, "spark i j k"),
- Document(5L, "l m n"),
- Document(6L, "mapreduce spark"),
- Document(7L, "apache hadoop")))
+// Prepare test documents, which are unlabeled (id, text) tuples.
+val test = sqlContext.createDataFrame(Seq(
+ (4L, "spark i j k"),
+ (5L, "l m n"),
+ (6L, "mapreduce spark"),
+ (7L, "apache hadoop")
+)).toDF("id", "text")
// Make predictions on test documents. cvModel uses the best model found (lrModel).
-cvModel.transform(test.toDF)
+cvModel.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
- println(s"($id, $text) --> prob=$prob, prediction=$prediction")
-}
+ println(s"($id, $text) --> prob=$prob, prediction=$prediction")
+ }
-sc.stop()
{% endhighlight %}
</div>
@@ -776,8 +718,6 @@ sc.stop()
import java.util.Arrays;
import java.util.List;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
@@ -790,7 +730,6 @@ import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
@@ -822,12 +761,9 @@ public class LabeledDocument extends Document implements Serializable {
public void setLabel(double label) { this.label = label; }
}
-SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample");
-JavaSparkContext jsc = new JavaSparkContext(conf);
-SQLContext jsql = new SQLContext(jsc);
// Prepare training documents, which are labeled.
-List<LabeledDocument> localTraining = Arrays.asList(
+DataFrame training = sqlContext.createDataFrame(Arrays.asList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
@@ -839,8 +775,8 @@ List<LabeledDocument> localTraining = Arrays.asList(
new LabeledDocument(8L, "e spark program", 1.0),
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
- new LabeledDocument(11L, "hadoop software", 0.0));
-DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
+ new LabeledDocument(11L, "hadoop software", 0.0)
+), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
@@ -856,15 +792,6 @@ LogisticRegression lr = new LogisticRegression()
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
-// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
-// This will allow us to jointly choose parameters for all Pipeline stages.
-// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
-// Note that the evaluator here is a BinaryClassificationEvaluator and the default metric
-// used is areaUnderROC.
-CrossValidator crossval = new CrossValidator()
- .setEstimator(pipeline)
- .setEvaluator(new BinaryClassificationEvaluator());
-
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
@@ -872,19 +799,28 @@ ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures(), new int[]{10, 100, 1000})
.addGrid(lr.regParam(), new double[]{0.1, 0.01})
.build();
-crossval.setEstimatorParamMaps(paramGrid);
-crossval.setNumFolds(2); // Use 3+ in practice
+
+// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
+// This will allow us to jointly choose parameters for all Pipeline stages.
+// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
+// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
+// is areaUnderROC.
+CrossValidator cv = new CrossValidator()
+ .setEstimator(pipeline)
+ .setEvaluator(new BinaryClassificationEvaluator())
+ .setEstimatorParamMaps(paramGrid)
+ .setNumFolds(2); // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
-CrossValidatorModel cvModel = crossval.fit(training);
+CrossValidatorModel cvModel = cv.fit(training);
// Prepare test documents, which are unlabeled.
-List<Document> localTest = Arrays.asList(
+DataFrame test = sqlContext.createDataFrame(Arrays.asList(
new Document(4L, "spark i j k"),
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
- new Document(7L, "apache hadoop"));
-DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
+ new Document(7L, "apache hadoop")
+), Document.class);
// Make predictions on test documents. cvModel uses the best model found (lrModel).
DataFrame predictions = cvModel.transform(test);
@@ -893,7 +829,6 @@ for (Row r: predictions.select("id", "text", "probability", "prediction").collec
+ ", prediction=" + r.get(3));
}
-jsc.stop();
{% endhighlight %}
</div>
@@ -935,7 +870,7 @@ val lr = new LinearRegression()
// the evaluator.
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
- .addGrid(lr.fitIntercept, Array(true, false))
+ .addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
@@ -945,9 +880,8 @@ val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
-
-// 80% of the data will be used for training and the remaining 20% for validation.
-trainValidationSplit.setTrainRatio(0.8)
+ // 80% of the data will be used for training and the remaining 20% for validation.
+ .setTrainRatio(0.8)
// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)
@@ -972,12 +906,12 @@ import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrame;
-DataFrame data = jsql.createDataFrame(
+DataFrame data = sqlContext.createDataFrame(
MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt"),
LabeledPoint.class);
// Prepare training and test data.
-DataFrame[] splits = data.randomSplit(new double [] {0.9, 0.1}, 12345);
+DataFrame[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345);
DataFrame training = splits[0];
DataFrame test = splits[1];
@@ -997,10 +931,8 @@ ParamMap[] paramGrid = new ParamGridBuilder()
TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator())
- .setEstimatorParamMaps(paramGrid);
-
-// 80% of the data will be used for training and the remaining 20% for validation.
-trainValidationSplit.setTrainRatio(0.8);
+ .setEstimatorParamMaps(paramGrid)
+ .setTrainRatio(0.8); // 80% for training and the remaining 20% for validation
// Run train validation split, and choose the best set of parameters.
TrainValidationSplitModel model = trainValidationSplit.fit(training);