From 0d824b0442707b6842ef19a589cb8a2af997273b Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 4 Jan 2016 17:53:21 +0000 Subject: Add Spark 1.6.0 docs --- site/docs/1.6.0/ml-classification-regression.html | 2595 +++++++++++++++++++++ 1 file changed, 2595 insertions(+) create mode 100644 site/docs/1.6.0/ml-classification-regression.html (limited to 'site/docs/1.6.0/ml-classification-regression.html') diff --git a/site/docs/1.6.0/ml-classification-regression.html b/site/docs/1.6.0/ml-classification-regression.html new file mode 100644 index 000000000..0b8a9b418 --- /dev/null +++ b/site/docs/1.6.0/ml-classification-regression.html @@ -0,0 +1,2595 @@ + + + + + + + + + + Classification and regression - spark.ml - Spark 1.6.0 Documentation + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + +
+ +

Classification and regression - spark.ml

+ + +

\[ +\newcommand{\R}{\mathbb{R}} +\newcommand{\E}{\mathbb{E}} +\newcommand{\x}{\mathbf{x}} +\newcommand{\y}{\mathbf{y}} +\newcommand{\wv}{\mathbf{w}} +\newcommand{\av}{\mathbf{\alpha}} +\newcommand{\bv}{\mathbf{b}} +\newcommand{\N}{\mathbb{N}} +\newcommand{\id}{\mathbf{I}} +\newcommand{\ind}{\mathbf{1}} +\newcommand{\0}{\mathbf{0}} +\newcommand{\unit}{\mathbf{e}} +\newcommand{\one}{\mathbf{1}} +\newcommand{\zero}{\mathbf{0}} +\]

+ +

Table of Contents

+ + + +

In spark.ml, we implement popular linear methods such as logistic +regression and linear least squares with $L_1$ or $L_2$ regularization. +Refer to the linear methods in mllib for +details about implementation and tuning. We also include a DataFrame API for Elastic +net, a hybrid +of $L_1$ and $L_2$ regularization proposed in Zou et al, Regularization +and variable selection via the elastic +net. +Mathematically, it is defined as a convex combination of the $L_1$ and +the $L_2$ regularization terms: +\[ +\alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 +\] +By setting $\alpha$ properly, elastic net contains both $L_1$ and $L_2$ +regularization as special cases. For example, if a linear +regression model is +trained with the elastic net parameter $\alpha$ set to $1$, it is +equivalent to a +Lasso model. +On the other hand, if $\alpha$ is set to $0$, the trained model reduces +to a ridge +regression model. +We implement Pipelines API for both linear regression and logistic +regression with elastic net regularization.

+ +

Classification

+ +

Logistic regression

+ +

Logistic regression is a popular method to predict a binary response. It is a special case of Generalized Linear models that predicts the probability of the outcome. +For more background and more details about the implementation, refer to the documentation of the logistic regression in spark.mllib.

+ +
+

The current implementation of logistic regression in spark.ml only supports binary classes. Support for multiclass regression will be added in the future.

+
+ +

Example

+ +

The following example shows how to train a logistic regression model +with elastic net regularization. elasticNetParam corresponds to +$\alpha$ and regParam corresponds to $\lambda$.

+ +
+ +
+
import org.apache.spark.ml.classification.LogisticRegression
+
+// Load training data
+val training = sqlCtx.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+val lr = new LogisticRegression()
+  .setMaxIter(10)
+  .setRegParam(0.3)
+  .setElasticNetParam(0.8)
+
+// Fit the model
+val lrModel = lr.fit(training)
+
+// Print the coefficients and intercept for logistic regression
+println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala" in the Spark repo.
+
+ +
+
import org.apache.spark.ml.classification.LogisticRegression;
+import org.apache.spark.ml.classification.LogisticRegressionModel;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load training data
+DataFrame training = sqlContext.read().format("libsvm")
+  .load("data/mllib/sample_libsvm_data.txt");
+
+LogisticRegression lr = new LogisticRegression()
+  .setMaxIter(10)
+  .setRegParam(0.3)
+  .setElasticNetParam(0.8);
+
+// Fit the model
+LogisticRegressionModel lrModel = lr.fit(training);
+
+// Print the coefficients and intercept for logistic regression
+System.out.println("Coefficients: "
+  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java" in the Spark repo.
+
+ +
+
from pyspark.ml.classification import LogisticRegression
+
+# Load training data
+training = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
+
+# Fit the model
+lrModel = lr.fit(training)
+
+# Print the coefficients and intercept for logistic regression
+print("Coefficients: " + str(lrModel.coefficients))
+print("Intercept: " + str(lrModel.intercept))
+
+
Find full example code at "examples/src/main/python/ml/logistic_regression_with_elastic_net.py" in the Spark repo.
+
+ +
+ +

The spark.ml implementation of logistic regression also supports +extracting a summary of the model over the training set. Note that the +predictions and metrics which are stored as DataFrame in +BinaryLogisticRegressionSummary are annotated @transient and hence +only available on the driver.

+ +
+ +
+ +

LogisticRegressionTrainingSummary +provides a summary for a +LogisticRegressionModel. +Currently, only binary classification is supported and the +summary must be explicitly cast to +BinaryLogisticRegressionTrainingSummary. +This will likely change when multiclass classification is supported.

+ +

Continuing the earlier example:

+ +
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
+
+// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
+// example
+val trainingSummary = lrModel.summary
+
+// Obtain the objective per iteration.
+val objectiveHistory = trainingSummary.objectiveHistory
+objectiveHistory.foreach(loss => println(loss))
+
+// Obtain the metrics useful to judge performance on test data.
+// We cast the summary to a BinaryLogisticRegressionSummary since the problem is a
+// binary classification problem.
+val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
+
+// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
+val roc = binarySummary.roc
+roc.show()
+println(binarySummary.areaUnderROC)
+
+// Set the model threshold to maximize F-Measure
+val fMeasure = binarySummary.fMeasureByThreshold
+val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
+val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
+  .select("threshold").head().getDouble(0)
+lrModel.setThreshold(bestThreshold)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionSummaryExample.scala" in the Spark repo.
+
+ +
+

LogisticRegressionTrainingSummary +provides a summary for a +LogisticRegressionModel. +Currently, only binary classification is supported and the +summary must be explicitly cast to +BinaryLogisticRegressionTrainingSummary. +This will likely change when multiclass classification is supported.

+ +

Continuing the earlier example:

+ +
import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary;
+import org.apache.spark.ml.classification.LogisticRegression;
+import org.apache.spark.ml.classification.LogisticRegressionModel;
+import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.functions;
+
+// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
+// example
+LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();
+
+// Obtain the loss per iteration.
+double[] objectiveHistory = trainingSummary.objectiveHistory();
+for (double lossPerIteration : objectiveHistory) {
+  System.out.println(lossPerIteration);
+}
+
+// Obtain the metrics useful to judge performance on test data.
+// We cast the summary to a BinaryLogisticRegressionSummary since the problem is a binary
+// classification problem.
+BinaryLogisticRegressionSummary binarySummary =
+  (BinaryLogisticRegressionSummary) trainingSummary;
+
+// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
+DataFrame roc = binarySummary.roc();
+roc.show();
+roc.select("FPR").show();
+System.out.println(binarySummary.areaUnderROC());
+
+// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
+// this selected threshold.
+DataFrame fMeasure = binarySummary.fMeasureByThreshold();
+double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
+double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
+  .select("threshold").head().getDouble(0);
+lrModel.setThreshold(bestThreshold);
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java" in the Spark repo.
+
+ + +
+

Logistic regression model summary is not yet supported in Python.

+
+ +
+ +

Decision tree classifier

+ +

Decision trees are a popular family of classification and regression methods. +More information about the spark.ml implementation can be found further in the section on decision trees.

+ +

Example

+ +

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use two feature transformers to prepare the data; these help index categories for the label and categorical features, adding metadata to the DataFrame which the Decision Tree algorithm can recognize.

+ +
+
+ +

More details on parameters can be found in the Scala API documentation.

+ +
import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.classification.DecisionTreeClassifier
+import org.apache.spark.ml.classification.DecisionTreeClassificationModel
+import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer}
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
+
+// Load the data stored in LIBSVM format as a DataFrame.
+val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+// Index labels, adding metadata to the label column.
+// Fit on whole dataset to include all labels in index.
+val labelIndexer = new StringIndexer()
+  .setInputCol("label")
+  .setOutputCol("indexedLabel")
+  .fit(data)
+// Automatically identify categorical features, and index them.
+val featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous
+  .fit(data)
+
+// Split the data into training and test sets (30% held out for testing)
+val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
+
+// Train a DecisionTree model.
+val dt = new DecisionTreeClassifier()
+  .setLabelCol("indexedLabel")
+  .setFeaturesCol("indexedFeatures")
+
+// Convert indexed labels back to original labels.
+val labelConverter = new IndexToString()
+  .setInputCol("prediction")
+  .setOutputCol("predictedLabel")
+  .setLabels(labelIndexer.labels)
+
+// Chain indexers and tree in a Pipeline
+val pipeline = new Pipeline()
+  .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
+
+// Train model.  This also runs the indexers.
+val model = pipeline.fit(trainingData)
+
+// Make predictions.
+val predictions = model.transform(testData)
+
+// Select example rows to display.
+predictions.select("predictedLabel", "label", "features").show(5)
+
+// Select (prediction, true label) and compute test error
+val evaluator = new MulticlassClassificationEvaluator()
+  .setLabelCol("indexedLabel")
+  .setPredictionCol("prediction")
+  .setMetricName("precision")
+val accuracy = evaluator.evaluate(predictions)
+println("Test Error = " + (1.0 - accuracy))
+
+val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
+println("Learned classification tree model:\n" + treeModel.toDebugString)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala" in the Spark repo.
+ +
+ +
+ +

More details on parameters can be found in the Java API documentation.

+ +
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.ml.Pipeline;
+import org.apache.spark.ml.PipelineModel;
+import org.apache.spark.ml.PipelineStage;
+import org.apache.spark.ml.classification.DecisionTreeClassifier;
+import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
+import org.apache.spark.ml.feature.*;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load the data stored in LIBSVM format as a DataFrame.
+DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
+
+// Index labels, adding metadata to the label column.
+// Fit on whole dataset to include all labels in index.
+StringIndexerModel labelIndexer = new StringIndexer()
+  .setInputCol("label")
+  .setOutputCol("indexedLabel")
+  .fit(data);
+
+// Automatically identify categorical features, and index them.
+VectorIndexerModel featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous
+  .fit(data);
+
+// Split the data into training and test sets (30% held out for testing)
+DataFrame[] splits = data.randomSplit(new double[]{0.7, 0.3});
+DataFrame trainingData = splits[0];
+DataFrame testData = splits[1];
+
+// Train a DecisionTree model.
+DecisionTreeClassifier dt = new DecisionTreeClassifier()
+  .setLabelCol("indexedLabel")
+  .setFeaturesCol("indexedFeatures");
+
+// Convert indexed labels back to original labels.
+IndexToString labelConverter = new IndexToString()
+  .setInputCol("prediction")
+  .setOutputCol("predictedLabel")
+  .setLabels(labelIndexer.labels());
+
+// Chain indexers and tree in a Pipeline
+Pipeline pipeline = new Pipeline()
+  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});
+
+// Train model.  This also runs the indexers.
+PipelineModel model = pipeline.fit(trainingData);
+
+// Make predictions.
+DataFrame predictions = model.transform(testData);
+
+// Select example rows to display.
+predictions.select("predictedLabel", "label", "features").show(5);
+
+// Select (prediction, true label) and compute test error
+MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
+  .setLabelCol("indexedLabel")
+  .setPredictionCol("prediction")
+  .setMetricName("precision");
+double accuracy = evaluator.evaluate(predictions);
+System.out.println("Test Error = " + (1.0 - accuracy));
+
+DecisionTreeClassificationModel treeModel =
+  (DecisionTreeClassificationModel) (model.stages()[2]);
+System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java" in the Spark repo.
+ +
+ +
+ +

More details on parameters can be found in the Python API documentation.

+ +
from pyspark import SparkContext, SQLContext
+from pyspark.ml import Pipeline
+from pyspark.ml.classification import DecisionTreeClassifier
+from pyspark.ml.feature import StringIndexer, VectorIndexer
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+
+# Load the data stored in LIBSVM format as a DataFrame.
+data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+# Index labels, adding metadata to the label column.
+# Fit on whole dataset to include all labels in index.
+labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
+# Automatically identify categorical features, and index them.
+# We specify maxCategories so features with > 4 distinct values are treated as continuous.
+featureIndexer =\
+    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
+
+# Split the data into training and test sets (30% held out for testing)
+(trainingData, testData) = data.randomSplit([0.7, 0.3])
+
+# Train a DecisionTree model.
+dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
+
+# Chain indexers and tree in a Pipeline
+pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
+
+# Train model.  This also runs the indexers.
+model = pipeline.fit(trainingData)
+
+# Make predictions.
+predictions = model.transform(testData)
+
+# Select example rows to display.
+predictions.select("prediction", "indexedLabel", "features").show(5)
+
+# Select (prediction, true label) and compute test error
+evaluator = MulticlassClassificationEvaluator(
+    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
+accuracy = evaluator.evaluate(predictions)
+print("Test Error = %g " % (1.0 - accuracy))
+
+treeModel = model.stages[2]
+# summary only
+print(treeModel)
+
+
Find full example code at "examples/src/main/python/ml/decision_tree_classification_example.py" in the Spark repo.
+ +
+ +
+ +

Random forest classifier

+ +

Random forests are a popular family of classification and regression methods. +More information about the spark.ml implementation can be found further in the section on random forests.

+ +

Example

+ +

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use two feature transformers to prepare the data; these help index categories for the label and categorical features, adding metadata to the DataFrame which the tree-based algorithms can recognize.

+ +
+
+ +

Refer to the Scala API docs for more details.

+ +
import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
+import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
+
+// Load and parse the data file, converting it to a DataFrame.
+val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+// Index labels, adding metadata to the label column.
+// Fit on whole dataset to include all labels in index.
+val labelIndexer = new StringIndexer()
+  .setInputCol("label")
+  .setOutputCol("indexedLabel")
+  .fit(data)
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+val featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data)
+
+// Split the data into training and test sets (30% held out for testing)
+val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
+
+// Train a RandomForest model.
+val rf = new RandomForestClassifier()
+  .setLabelCol("indexedLabel")
+  .setFeaturesCol("indexedFeatures")
+  .setNumTrees(10)
+
+// Convert indexed labels back to original labels.
+val labelConverter = new IndexToString()
+  .setInputCol("prediction")
+  .setOutputCol("predictedLabel")
+  .setLabels(labelIndexer.labels)
+
+// Chain indexers and forest in a Pipeline
+val pipeline = new Pipeline()
+  .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
+
+// Train model.  This also runs the indexers.
+val model = pipeline.fit(trainingData)
+
+// Make predictions.
+val predictions = model.transform(testData)
+
+// Select example rows to display.
+predictions.select("predictedLabel", "label", "features").show(5)
+
+// Select (prediction, true label) and compute test error
+val evaluator = new MulticlassClassificationEvaluator()
+  .setLabelCol("indexedLabel")
+  .setPredictionCol("prediction")
+  .setMetricName("precision")
+val accuracy = evaluator.evaluate(predictions)
+println("Test Error = " + (1.0 - accuracy))
+
+val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
+println("Learned classification forest model:\n" + rfModel.toDebugString)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala" in the Spark repo.
+
+ +
+ +

Refer to the Java API docs for more details.

+ +
import org.apache.spark.ml.Pipeline;
+import org.apache.spark.ml.PipelineModel;
+import org.apache.spark.ml.PipelineStage;
+import org.apache.spark.ml.classification.RandomForestClassificationModel;
+import org.apache.spark.ml.classification.RandomForestClassifier;
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
+import org.apache.spark.ml.feature.*;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load and parse the data file, converting it to a DataFrame.
+DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
+
+// Index labels, adding metadata to the label column.
+// Fit on whole dataset to include all labels in index.
+StringIndexerModel labelIndexer = new StringIndexer()
+  .setInputCol("label")
+  .setOutputCol("indexedLabel")
+  .fit(data);
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+VectorIndexerModel featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data);
+
+// Split the data into training and test sets (30% held out for testing)
+DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
+DataFrame trainingData = splits[0];
+DataFrame testData = splits[1];
+
+// Train a RandomForest model.
+RandomForestClassifier rf = new RandomForestClassifier()
+  .setLabelCol("indexedLabel")
+  .setFeaturesCol("indexedFeatures");
+
+// Convert indexed labels back to original labels.
+IndexToString labelConverter = new IndexToString()
+  .setInputCol("prediction")
+  .setOutputCol("predictedLabel")
+  .setLabels(labelIndexer.labels());
+
+// Chain indexers and forest in a Pipeline
+Pipeline pipeline = new Pipeline()
+  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});
+
+// Train model. This also runs the indexers.
+PipelineModel model = pipeline.fit(trainingData);
+
+// Make predictions.
+DataFrame predictions = model.transform(testData);
+
+// Select example rows to display.
+predictions.select("predictedLabel", "label", "features").show(5);
+
+// Select (prediction, true label) and compute test error
+MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
+  .setLabelCol("indexedLabel")
+  .setPredictionCol("prediction")
+  .setMetricName("precision");
+double accuracy = evaluator.evaluate(predictions);
+System.out.println("Test Error = " + (1.0 - accuracy));
+
+RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
+System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestClassifierExample.java" in the Spark repo.
+
+ +
+ +

Refer to the Python API docs for more details.

+ +
from pyspark.ml import Pipeline
+from pyspark.ml.classification import RandomForestClassifier
+from pyspark.ml.feature import StringIndexer, VectorIndexer
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+
+# Load and parse the data file, converting it to a DataFrame.
+data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+# Index labels, adding metadata to the label column.
+# Fit on whole dataset to include all labels in index.
+labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
+# Automatically identify categorical features, and index them.
+# Set maxCategories so features with > 4 distinct values are treated as continuous.
+featureIndexer =\
+    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
+
+# Split the data into training and test sets (30% held out for testing)
+(trainingData, testData) = data.randomSplit([0.7, 0.3])
+
+# Train a RandomForest model.
+rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
+
+# Chain indexers and forest in a Pipeline
+pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])
+
+# Train model.  This also runs the indexers.
+model = pipeline.fit(trainingData)
+
+# Make predictions.
+predictions = model.transform(testData)
+
+# Select example rows to display.
+predictions.select("prediction", "indexedLabel", "features").show(5)
+
+# Select (prediction, true label) and compute test error
+evaluator = MulticlassClassificationEvaluator(
+    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
+accuracy = evaluator.evaluate(predictions)
+print("Test Error = %g" % (1.0 - accuracy))
+
+rfModel = model.stages[2]
+print(rfModel)  # summary only
+
+
Find full example code at "examples/src/main/python/ml/random_forest_classifier_example.py" in the Spark repo.
+
+
+ +

Gradient-boosted tree classifier

+ +

Gradient-boosted trees (GBTs) are a popular classification and regression method using ensembles of decision trees. +More information about the spark.ml implementation can be found further in the section on GBTs.

+ +

Example

+ +

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use two feature transformers to prepare the data; these help index categories for the label and categorical features, adding metadata to the DataFrame which the tree-based algorithms can recognize.

+ +
+
+ +

Refer to the Scala API docs for more details.

+ +
import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
+import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
+
+// Load and parse the data file, converting it to a DataFrame.
+val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+// Index labels, adding metadata to the label column.
+// Fit on whole dataset to include all labels in index.
+val labelIndexer = new StringIndexer()
+  .setInputCol("label")
+  .setOutputCol("indexedLabel")
+  .fit(data)
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+val featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data)
+
+// Split the data into training and test sets (30% held out for testing)
+val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
+
+// Train a GBT model.
+val gbt = new GBTClassifier()
+  .setLabelCol("indexedLabel")
+  .setFeaturesCol("indexedFeatures")
+  .setMaxIter(10)
+
+// Convert indexed labels back to original labels.
+val labelConverter = new IndexToString()
+  .setInputCol("prediction")
+  .setOutputCol("predictedLabel")
+  .setLabels(labelIndexer.labels)
+
+// Chain indexers and GBT in a Pipeline
+val pipeline = new Pipeline()
+  .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))
+
+// Train model.  This also runs the indexers.
+val model = pipeline.fit(trainingData)
+
+// Make predictions.
+val predictions = model.transform(testData)
+
+// Select example rows to display.
+predictions.select("predictedLabel", "label", "features").show(5)
+
+// Select (prediction, true label) and compute test error
+val evaluator = new MulticlassClassificationEvaluator()
+  .setLabelCol("indexedLabel")
+  .setPredictionCol("prediction")
+  .setMetricName("precision")
+val accuracy = evaluator.evaluate(predictions)
+println("Test Error = " + (1.0 - accuracy))
+
+val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
+println("Learned classification GBT model:\n" + gbtModel.toDebugString)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeClassifierExample.scala" in the Spark repo.
+
+ +
+ +

Refer to the Java API docs for more details.

+ +
import org.apache.spark.ml.Pipeline;
+import org.apache.spark.ml.PipelineModel;
+import org.apache.spark.ml.PipelineStage;
+import org.apache.spark.ml.classification.GBTClassificationModel;
+import org.apache.spark.ml.classification.GBTClassifier;
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
+import org.apache.spark.ml.feature.*;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load and parse the data file, converting it to a DataFrame.
+DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
+
+// Index labels, adding metadata to the label column.
+// Fit on whole dataset to include all labels in index.
+StringIndexerModel labelIndexer = new StringIndexer()
+  .setInputCol("label")
+  .setOutputCol("indexedLabel")
+  .fit(data);
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+VectorIndexerModel featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data);
+
+// Split the data into training and test sets (30% held out for testing)
+DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
+DataFrame trainingData = splits[0];
+DataFrame testData = splits[1];
+
+// Train a GBT model.
+GBTClassifier gbt = new GBTClassifier()
+  .setLabelCol("indexedLabel")
+  .setFeaturesCol("indexedFeatures")
+  .setMaxIter(10);
+
+// Convert indexed labels back to original labels.
+IndexToString labelConverter = new IndexToString()
+  .setInputCol("prediction")
+  .setOutputCol("predictedLabel")
+  .setLabels(labelIndexer.labels());
+
+// Chain indexers and GBT in a Pipeline
+Pipeline pipeline = new Pipeline()
+  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});
+
+// Train model.  This also runs the indexers.
+PipelineModel model = pipeline.fit(trainingData);
+
+// Make predictions.
+DataFrame predictions = model.transform(testData);
+
+// Select example rows to display.
+predictions.select("predictedLabel", "label", "features").show(5);
+
+// Select (prediction, true label) and compute test error
+MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
+  .setLabelCol("indexedLabel")
+  .setPredictionCol("prediction")
+  .setMetricName("precision");
+double accuracy = evaluator.evaluate(predictions);
+System.out.println("Test Error = " + (1.0 - accuracy));
+
+GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
+System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeClassifierExample.java" in the Spark repo.
+
+ +
+ +

Refer to the Python API docs for more details.

+ +
from pyspark.ml import Pipeline
+from pyspark.ml.classification import GBTClassifier
+from pyspark.ml.feature import StringIndexer, VectorIndexer
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+
+# Load and parse the data file, converting it to a DataFrame.
+data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+# Index labels, adding metadata to the label column.
+# Fit on whole dataset to include all labels in index.
+labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
+# Automatically identify categorical features, and index them.
+# Set maxCategories so features with > 4 distinct values are treated as continuous.
+featureIndexer =\
+    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
+
+# Split the data into training and test sets (30% held out for testing)
+(trainingData, testData) = data.randomSplit([0.7, 0.3])
+
+# Train a GBT model.
+gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
+
+# Chain indexers and GBT in a Pipeline
+pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
+
+# Train model.  This also runs the indexers.
+model = pipeline.fit(trainingData)
+
+# Make predictions.
+predictions = model.transform(testData)
+
+# Select example rows to display.
+predictions.select("prediction", "indexedLabel", "features").show(5)
+
+# Select (prediction, true label) and compute test error
+evaluator = MulticlassClassificationEvaluator(
+    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
+accuracy = evaluator.evaluate(predictions)
+print("Test Error = %g" % (1.0 - accuracy))
+
+gbtModel = model.stages[2]
+print(gbtModel)  # summary only
+
+
Find full example code at "examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py" in the Spark repo.
+
+
+ +

Multilayer perceptron classifier

+ +

Multilayer perceptron classifier (MLPC) is a classifier based on the feedforward artificial neural network. +MLPC consists of multiple layers of nodes. +Each layer is fully connected to the next layer in the network. Nodes in the input layer represent the input data. All other nodes maps inputs to the outputs +by performing linear combination of the inputs with the node’s weights $\wv$ and bias $\bv$ and applying an activation function. +It can be written in matrix form for MLPC with $K+1$ layers as follows: +\[ +\mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) +\] +Nodes in intermediate layers use sigmoid (logistic) function: +\[ +\mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} +\] +Nodes in the output layer use softmax function: +\[ +\mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} +\] +The number of nodes $N$ in the output layer corresponds to the number of classes.

+ +

MLPC employes backpropagation for learning the model. We use logistic loss function for optimization and L-BFGS as optimization routine.

+ +

Example

+ +
+ +
+
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
+
+// Load the data stored in LIBSVM format as a DataFrame.
+val data = sqlContext.read.format("libsvm")
+  .load("data/mllib/sample_multiclass_classification_data.txt")
+// Split the data into train and test
+val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
+val train = splits(0)
+val test = splits(1)
+// specify layers for the neural network:
+// input layer of size 4 (features), two intermediate of size 5 and 4
+// and output of size 3 (classes)
+val layers = Array[Int](4, 5, 4, 3)
+// create the trainer and set its parameters
+val trainer = new MultilayerPerceptronClassifier()
+  .setLayers(layers)
+  .setBlockSize(128)
+  .setSeed(1234L)
+  .setMaxIter(100)
+// train the model
+val model = trainer.fit(train)
+// compute precision on the test set
+val result = model.transform(test)
+val predictionAndLabels = result.select("prediction", "label")
+val evaluator = new MulticlassClassificationEvaluator()
+  .setMetricName("precision")
+println("Precision:" + evaluator.evaluate(predictionAndLabels))
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/MultilayerPerceptronClassifierExample.scala" in the Spark repo.
+
+ +
+
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
+import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
+import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
+import org.apache.spark.sql.DataFrame;
+
+// Load training data
+String path = "data/mllib/sample_multiclass_classification_data.txt";
+DataFrame dataFrame = jsql.read().format("libsvm").load(path);
+// Split the data into train and test
+DataFrame[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
+DataFrame train = splits[0];
+DataFrame test = splits[1];
+// specify layers for the neural network:
+// input layer of size 4 (features), two intermediate of size 5 and 4
+// and output of size 3 (classes)
+int[] layers = new int[] {4, 5, 4, 3};
+// create the trainer and set its parameters
+MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
+  .setLayers(layers)
+  .setBlockSize(128)
+  .setSeed(1234L)
+  .setMaxIter(100);
+// train the model
+MultilayerPerceptronClassificationModel model = trainer.fit(train);
+// compute precision on the test set
+DataFrame result = model.transform(test);
+DataFrame predictionAndLabels = result.select("prediction", "label");
+MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
+  .setMetricName("precision");
+System.out.println("Precision = " + evaluator.evaluate(predictionAndLabels));
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaMultilayerPerceptronClassifierExample.java" in the Spark repo.
+
+ +
+
from pyspark.ml.classification import MultilayerPerceptronClassifier
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+
+# Load training data
+data = sqlContext.read.format("libsvm")\
+    .load("data/mllib/sample_multiclass_classification_data.txt")
+# Split the data into train and test
+splits = data.randomSplit([0.6, 0.4], 1234)
+train = splits[0]
+test = splits[1]
+# specify layers for the neural network:
+# input layer of size 4 (features), two intermediate of size 5 and 4
+# and output of size 3 (classes)
+layers = [4, 5, 4, 3]
+# create the trainer and set its parameters
+trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
+# train the model
+model = trainer.fit(train)
+# compute precision on the test set
+result = model.transform(test)
+predictionAndLabels = result.select("prediction", "label")
+evaluator = MulticlassClassificationEvaluator(metricName="precision")
+print("Precision:" + str(evaluator.evaluate(predictionAndLabels)))
+
+
Find full example code at "examples/src/main/python/ml/multilayer_perceptron_classification.py" in the Spark repo.
+
+ +
+ +

One-vs-Rest classifier (a.k.a. One-vs-All)

+ +

OneVsRest is an example of a machine learning reduction for performing multiclass classification given a base classifier that can perform binary classification efficiently. It is also known as “One-vs-All.”

+ +

OneVsRest is implemented as an Estimator. For the base classifier it takes instances of Classifier and creates a binary classification problem for each of the k classes. The classifier for class i is trained to predict whether the label is i or not, distinguishing class i from all other classes.

+ +

Predictions are done by evaluating each binary classifier and the index of the most confident classifier is output as label.

+ +

Example

+ +

The example below demonstrates how to load the +Iris dataset, parse it as a DataFrame and perform multiclass classification using OneVsRest. The test error is calculated to measure the algorithm accuracy.

+ +
+
+ +

Refer to the Scala API docs for more details.

+ +
import org.apache.spark.examples.mllib.AbstractParams
+import org.apache.spark.ml.classification.{OneVsRest, LogisticRegression}
+import org.apache.spark.ml.util.MetadataUtils
+import org.apache.spark.mllib.evaluation.MulticlassMetrics
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.sql.DataFrame
+
+val inputData = sqlContext.read.format("libsvm").load(params.input)
+// compute the train/test split: if testInput is not provided use part of input.
+val data = params.testInput match {
+  case Some(t) => {
+    // compute the number of features in the training set.
+    val numFeatures = inputData.first().getAs[Vector](1).size
+    val testData = sqlContext.read.option("numFeatures", numFeatures.toString)
+      .format("libsvm").load(t)
+    Array[DataFrame](inputData, testData)
+  }
+  case None => {
+    val f = params.fracTest
+    inputData.randomSplit(Array(1 - f, f), seed = 12345)
+  }
+}
+val Array(train, test) = data.map(_.cache())
+
+// instantiate the base classifier
+val classifier = new LogisticRegression()
+  .setMaxIter(params.maxIter)
+  .setTol(params.tol)
+  .setFitIntercept(params.fitIntercept)
+
+// Set regParam, elasticNetParam if specified in params
+params.regParam.foreach(classifier.setRegParam)
+params.elasticNetParam.foreach(classifier.setElasticNetParam)
+
+// instantiate the One Vs Rest Classifier.
+
+val ovr = new OneVsRest()
+ovr.setClassifier(classifier)
+
+// train the multiclass model.
+val (trainingDuration, ovrModel) = time(ovr.fit(train))
+
+// score the model on test data.
+val (predictionDuration, predictions) = time(ovrModel.transform(test))
+
+// evaluate the model
+val predictionsAndLabels = predictions.select("prediction", "label")
+  .map(row => (row.getDouble(0), row.getDouble(1)))
+
+val metrics = new MulticlassMetrics(predictionsAndLabels)
+
+val confusionMatrix = metrics.confusionMatrix
+
+// compute the false positive rate per label
+val predictionColSchema = predictions.schema("prediction")
+val numClasses = MetadataUtils.getNumClasses(predictionColSchema).get
+val fprs = Range(0, numClasses).map(p => (p, metrics.falsePositiveRate(p.toDouble)))
+
+println(s" Training Time ${trainingDuration} sec\n")
+
+println(s" Prediction Time ${predictionDuration} sec\n")
+
+println(s" Confusion Matrix\n ${confusionMatrix.toString}\n")
+
+println("label\tfpr")
+
+println(fprs.map {case (label, fpr) => label + "\t" + fpr}.mkString("\n"))
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala" in the Spark repo.
+
+ +
+ +

Refer to the Java API docs for more details.

+ +
import org.apache.spark.ml.classification.LogisticRegression;
+import org.apache.spark.ml.classification.OneVsRest;
+import org.apache.spark.ml.classification.OneVsRestModel;
+import org.apache.spark.ml.util.MetadataUtils;
+import org.apache.spark.mllib.evaluation.MulticlassMetrics;
+import org.apache.spark.mllib.linalg.Matrix;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.StructField;
+
+// configure the base classifier
+LogisticRegression classifier = new LogisticRegression()
+  .setMaxIter(params.maxIter)
+  .setTol(params.tol)
+  .setFitIntercept(params.fitIntercept);
+
+if (params.regParam != null) {
+  classifier.setRegParam(params.regParam);
+}
+if (params.elasticNetParam != null) {
+  classifier.setElasticNetParam(params.elasticNetParam);
+}
+
+// instantiate the One Vs Rest Classifier
+OneVsRest ovr = new OneVsRest().setClassifier(classifier);
+
+String input = params.input;
+DataFrame inputData = jsql.read().format("libsvm").load(input);
+DataFrame train;
+DataFrame test;
+
+// compute the train/ test split: if testInput is not provided use part of input
+String testInput = params.testInput;
+if (testInput != null) {
+  train = inputData;
+  // compute the number of features in the training set.
+  int numFeatures = inputData.first().<Vector>getAs(1).size();
+  test = jsql.read().format("libsvm").option("numFeatures",
+    String.valueOf(numFeatures)).load(testInput);
+} else {
+  double f = params.fracTest;
+  DataFrame[] tmp = inputData.randomSplit(new double[]{1 - f, f}, 12345);
+  train = tmp[0];
+  test = tmp[1];
+}
+
+// train the multiclass model
+OneVsRestModel ovrModel = ovr.fit(train.cache());
+
+// score the model on test data
+DataFrame predictions = ovrModel.transform(test.cache())
+  .select("prediction", "label");
+
+// obtain metrics
+MulticlassMetrics metrics = new MulticlassMetrics(predictions);
+StructField predictionColSchema = predictions.schema().apply("prediction");
+Integer numClasses = (Integer) MetadataUtils.getNumClasses(predictionColSchema).get();
+
+// compute the false positive rate per label
+StringBuilder results = new StringBuilder();
+results.append("label\tfpr\n");
+for (int label = 0; label < numClasses; label++) {
+  results.append(label);
+  results.append("\t");
+  results.append(metrics.falsePositiveRate((double) label));
+  results.append("\n");
+}
+
+Matrix confusionMatrix = metrics.confusionMatrix();
+// output the Confusion Matrix
+System.out.println("Confusion Matrix");
+System.out.println(confusionMatrix);
+System.out.println();
+System.out.println(results);
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java" in the Spark repo.
+
+
+ +

Regression

+ +

Linear regression

+ +

The interface for working with linear regression models and model +summaries is similar to the logistic regression case.

+ +

Example

+ +

The following +example demonstrates training an elastic net regularized linear +regression model and extracting model summary statistics.

+ +
+ +
+
import org.apache.spark.ml.regression.LinearRegression
+
+// Load training data
+val training = sqlCtx.read.format("libsvm")
+  .load("data/mllib/sample_linear_regression_data.txt")
+
+val lr = new LinearRegression()
+  .setMaxIter(10)
+  .setRegParam(0.3)
+  .setElasticNetParam(0.8)
+
+// Fit the model
+val lrModel = lr.fit(training)
+
+// Print the coefficients and intercept for linear regression
+println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
+
+// Summarize the model over the training set and print out some metrics
+val trainingSummary = lrModel.summary
+println(s"numIterations: ${trainingSummary.totalIterations}")
+println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")
+trainingSummary.residuals.show()
+println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
+println(s"r2: ${trainingSummary.r2}")
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala" in the Spark repo.
+
+ +
+
import org.apache.spark.ml.regression.LinearRegression;
+import org.apache.spark.ml.regression.LinearRegressionModel;
+import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
+import org.apache.spark.mllib.linalg.Vectors;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load training data
+DataFrame training = sqlContext.read().format("libsvm")
+  .load("data/mllib/sample_linear_regression_data.txt");
+
+LinearRegression lr = new LinearRegression()
+  .setMaxIter(10)
+  .setRegParam(0.3)
+  .setElasticNetParam(0.8);
+
+// Fit the model
+LinearRegressionModel lrModel = lr.fit(training);
+
+// Print the coefficients and intercept for linear regression
+System.out.println("Coefficients: "
+  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());
+
+// Summarize the model over the training set and print out some metrics
+LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
+System.out.println("numIterations: " + trainingSummary.totalIterations());
+System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
+trainingSummary.residuals().show();
+System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
+System.out.println("r2: " + trainingSummary.r2());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java" in the Spark repo.
+
+ +
+ +
from pyspark.ml.regression import LinearRegression
+
+# Load training data
+training = sqlContext.read.format("libsvm")\
+    .load("data/mllib/sample_linear_regression_data.txt")
+
+lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
+
+# Fit the model
+lrModel = lr.fit(training)
+
+# Print the coefficients and intercept for linear regression
+print("Coefficients: " + str(lrModel.coefficients))
+print("Intercept: " + str(lrModel.intercept))
+
+
Find full example code at "examples/src/main/python/ml/linear_regression_with_elastic_net.py" in the Spark repo.
+
+ +
+ +

Decision tree regression

+ +

Decision trees are a popular family of classification and regression methods. +More information about the spark.ml implementation can be found further in the section on decision trees.

+ +

Example

+ +

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use a feature transformer to index categorical features, adding metadata to the DataFrame which the Decision Tree algorithm can recognize.

+ +
+
+ +

More details on parameters can be found in the Scala API documentation.

+ +
import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.regression.DecisionTreeRegressor
+import org.apache.spark.ml.regression.DecisionTreeRegressionModel
+import org.apache.spark.ml.feature.VectorIndexer
+import org.apache.spark.ml.evaluation.RegressionEvaluator
+
+// Load the data stored in LIBSVM format as a DataFrame.
+val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+// Automatically identify categorical features, and index them.
+// Here, we treat features with > 4 distinct values as continuous.
+val featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data)
+
+// Split the data into training and test sets (30% held out for testing)
+val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
+
+// Train a DecisionTree model.
+val dt = new DecisionTreeRegressor()
+  .setLabelCol("label")
+  .setFeaturesCol("indexedFeatures")
+
+// Chain indexer and tree in a Pipeline
+val pipeline = new Pipeline()
+  .setStages(Array(featureIndexer, dt))
+
+// Train model.  This also runs the indexer.
+val model = pipeline.fit(trainingData)
+
+// Make predictions.
+val predictions = model.transform(testData)
+
+// Select example rows to display.
+predictions.select("prediction", "label", "features").show(5)
+
+// Select (prediction, true label) and compute test error
+val evaluator = new RegressionEvaluator()
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setMetricName("rmse")
+val rmse = evaluator.evaluate(predictions)
+println("Root Mean Squared Error (RMSE) on test data = " + rmse)
+
+val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
+println("Learned regression tree model:\n" + treeModel.toDebugString)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeRegressionExample.scala" in the Spark repo.
+
+ +
+ +

More details on parameters can be found in the Java API documentation.

+ +
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.ml.Pipeline;
+import org.apache.spark.ml.PipelineModel;
+import org.apache.spark.ml.PipelineStage;
+import org.apache.spark.ml.evaluation.RegressionEvaluator;
+import org.apache.spark.ml.feature.VectorIndexer;
+import org.apache.spark.ml.feature.VectorIndexerModel;
+import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
+import org.apache.spark.ml.regression.DecisionTreeRegressor;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load the data stored in LIBSVM format as a DataFrame.
+DataFrame data = sqlContext.read().format("libsvm")
+  .load("data/mllib/sample_libsvm_data.txt");
+
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+VectorIndexerModel featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data);
+
+// Split the data into training and test sets (30% held out for testing)
+DataFrame[] splits = data.randomSplit(new double[]{0.7, 0.3});
+DataFrame trainingData = splits[0];
+DataFrame testData = splits[1];
+
+// Train a DecisionTree model.
+DecisionTreeRegressor dt = new DecisionTreeRegressor()
+  .setFeaturesCol("indexedFeatures");
+
+// Chain indexer and tree in a Pipeline
+Pipeline pipeline = new Pipeline()
+  .setStages(new PipelineStage[]{featureIndexer, dt});
+
+// Train model.  This also runs the indexer.
+PipelineModel model = pipeline.fit(trainingData);
+
+// Make predictions.
+DataFrame predictions = model.transform(testData);
+
+// Select example rows to display.
+predictions.select("label", "features").show(5);
+
+// Select (prediction, true label) and compute test error
+RegressionEvaluator evaluator = new RegressionEvaluator()
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setMetricName("rmse");
+double rmse = evaluator.evaluate(predictions);
+System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
+
+DecisionTreeRegressionModel treeModel =
+  (DecisionTreeRegressionModel) (model.stages()[1]);
+System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeRegressionExample.java" in the Spark repo.
+
+ +
+ +

More details on parameters can be found in the Python API documentation.

+ +
from pyspark.ml import Pipeline
+from pyspark.ml.regression import DecisionTreeRegressor
+from pyspark.ml.feature import VectorIndexer
+from pyspark.ml.evaluation import RegressionEvaluator
+
+# Load the data stored in LIBSVM format as a DataFrame.
+data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+# Automatically identify categorical features, and index them.
+# We specify maxCategories so features with > 4 distinct values are treated as continuous.
+featureIndexer =\
+    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
+
+# Split the data into training and test sets (30% held out for testing)
+(trainingData, testData) = data.randomSplit([0.7, 0.3])
+
+# Train a DecisionTree model.
+dt = DecisionTreeRegressor(featuresCol="indexedFeatures")
+
+# Chain indexer and tree in a Pipeline
+pipeline = Pipeline(stages=[featureIndexer, dt])
+
+# Train model.  This also runs the indexer.
+model = pipeline.fit(trainingData)
+
+# Make predictions.
+predictions = model.transform(testData)
+
+# Select example rows to display.
+predictions.select("prediction", "label", "features").show(5)
+
+# Select (prediction, true label) and compute test error
+evaluator = RegressionEvaluator(
+    labelCol="label", predictionCol="prediction", metricName="rmse")
+rmse = evaluator.evaluate(predictions)
+print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
+
+treeModel = model.stages[1]
+# summary only
+print(treeModel)
+
+
Find full example code at "examples/src/main/python/ml/decision_tree_regression_example.py" in the Spark repo.
+
+ +
+ +

Random forest regression

+ +

Random forests are a popular family of classification and regression methods. +More information about the spark.ml implementation can be found further in the section on random forests.

+ +

Example

+ +

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use a feature transformer to index categorical features, adding metadata to the DataFrame which the tree-based algorithms can recognize.

+ +
+
+ +

Refer to the Scala API docs for more details.

+ +
import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.evaluation.RegressionEvaluator
+import org.apache.spark.ml.feature.VectorIndexer
+import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
+
+// Load and parse the data file, converting it to a DataFrame.
+val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+val featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data)
+
+// Split the data into training and test sets (30% held out for testing)
+val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
+
+// Train a RandomForest model.
+val rf = new RandomForestRegressor()
+  .setLabelCol("label")
+  .setFeaturesCol("indexedFeatures")
+
+// Chain indexer and forest in a Pipeline
+val pipeline = new Pipeline()
+  .setStages(Array(featureIndexer, rf))
+
+// Train model.  This also runs the indexer.
+val model = pipeline.fit(trainingData)
+
+// Make predictions.
+val predictions = model.transform(testData)
+
+// Select example rows to display.
+predictions.select("prediction", "label", "features").show(5)
+
+// Select (prediction, true label) and compute test error
+val evaluator = new RegressionEvaluator()
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setMetricName("rmse")
+val rmse = evaluator.evaluate(predictions)
+println("Root Mean Squared Error (RMSE) on test data = " + rmse)
+
+val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
+println("Learned regression forest model:\n" + rfModel.toDebugString)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestRegressorExample.scala" in the Spark repo.
+
+ +
+ +

Refer to the Java API docs for more details.

+ +
import org.apache.spark.ml.Pipeline;
+import org.apache.spark.ml.PipelineModel;
+import org.apache.spark.ml.PipelineStage;
+import org.apache.spark.ml.evaluation.RegressionEvaluator;
+import org.apache.spark.ml.feature.VectorIndexer;
+import org.apache.spark.ml.feature.VectorIndexerModel;
+import org.apache.spark.ml.regression.RandomForestRegressionModel;
+import org.apache.spark.ml.regression.RandomForestRegressor;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load and parse the data file, converting it to a DataFrame.
+DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
+
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+VectorIndexerModel featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data);
+
+// Split the data into training and test sets (30% held out for testing)
+DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
+DataFrame trainingData = splits[0];
+DataFrame testData = splits[1];
+
+// Train a RandomForest model.
+RandomForestRegressor rf = new RandomForestRegressor()
+  .setLabelCol("label")
+  .setFeaturesCol("indexedFeatures");
+
+// Chain indexer and forest in a Pipeline
+Pipeline pipeline = new Pipeline()
+  .setStages(new PipelineStage[] {featureIndexer, rf});
+
+// Train model.  This also runs the indexer.
+PipelineModel model = pipeline.fit(trainingData);
+
+// Make predictions.
+DataFrame predictions = model.transform(testData);
+
+// Select example rows to display.
+predictions.select("prediction", "label", "features").show(5);
+
+// Select (prediction, true label) and compute test error
+RegressionEvaluator evaluator = new RegressionEvaluator()
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setMetricName("rmse");
+double rmse = evaluator.evaluate(predictions);
+System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
+
+RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
+System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java" in the Spark repo.
+
+ +
+ +

Refer to the Python API docs for more details.

+ +
from pyspark.ml import Pipeline
+from pyspark.ml.regression import RandomForestRegressor
+from pyspark.ml.feature import VectorIndexer
+from pyspark.ml.evaluation import RegressionEvaluator
+
+# Load and parse the data file, converting it to a DataFrame.
+data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+# Automatically identify categorical features, and index them.
+# Set maxCategories so features with > 4 distinct values are treated as continuous.
+featureIndexer =\
+    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
+
+# Split the data into training and test sets (30% held out for testing)
+(trainingData, testData) = data.randomSplit([0.7, 0.3])
+
+# Train a RandomForest model.
+rf = RandomForestRegressor(featuresCol="indexedFeatures")
+
+# Chain indexer and forest in a Pipeline
+pipeline = Pipeline(stages=[featureIndexer, rf])
+
+# Train model.  This also runs the indexer.
+model = pipeline.fit(trainingData)
+
+# Make predictions.
+predictions = model.transform(testData)
+
+# Select example rows to display.
+predictions.select("prediction", "label", "features").show(5)
+
+# Select (prediction, true label) and compute test error
+evaluator = RegressionEvaluator(
+    labelCol="label", predictionCol="prediction", metricName="rmse")
+rmse = evaluator.evaluate(predictions)
+print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
+
+rfModel = model.stages[1]
+print(rfModel)  # summary only
+
+
Find full example code at "examples/src/main/python/ml/random_forest_regressor_example.py" in the Spark repo.
+
+
+ +

Gradient-boosted tree regression

+ +

Gradient-boosted trees (GBTs) are a popular regression method using ensembles of decision trees. +More information about the spark.ml implementation can be found further in the section on GBTs.

+ +

Example

+ +

Note: For this example dataset, GBTRegressor actually only needs 1 iteration, but that will not +be true in general.

+ +
+
+ +

Refer to the Scala API docs for more details.

+ +
import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.evaluation.RegressionEvaluator
+import org.apache.spark.ml.feature.VectorIndexer
+import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
+
+// Load and parse the data file, converting it to a DataFrame.
+val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+val featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data)
+
+// Split the data into training and test sets (30% held out for testing)
+val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
+
+// Train a GBT model.
+val gbt = new GBTRegressor()
+  .setLabelCol("label")
+  .setFeaturesCol("indexedFeatures")
+  .setMaxIter(10)
+
+// Chain indexer and GBT in a Pipeline
+val pipeline = new Pipeline()
+  .setStages(Array(featureIndexer, gbt))
+
+// Train model.  This also runs the indexer.
+val model = pipeline.fit(trainingData)
+
+// Make predictions.
+val predictions = model.transform(testData)
+
+// Select example rows to display.
+predictions.select("prediction", "label", "features").show(5)
+
+// Select (prediction, true label) and compute test error
+val evaluator = new RegressionEvaluator()
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setMetricName("rmse")
+val rmse = evaluator.evaluate(predictions)
+println("Root Mean Squared Error (RMSE) on test data = " + rmse)
+
+val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
+println("Learned regression GBT model:\n" + gbtModel.toDebugString)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeRegressorExample.scala" in the Spark repo.
+
+ +
+ +

Refer to the Java API docs for more details.

+ +
import org.apache.spark.ml.Pipeline;
+import org.apache.spark.ml.PipelineModel;
+import org.apache.spark.ml.PipelineStage;
+import org.apache.spark.ml.evaluation.RegressionEvaluator;
+import org.apache.spark.ml.feature.VectorIndexer;
+import org.apache.spark.ml.feature.VectorIndexerModel;
+import org.apache.spark.ml.regression.GBTRegressionModel;
+import org.apache.spark.ml.regression.GBTRegressor;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+// Load and parse the data file, converting it to a DataFrame.
+DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
+
+// Automatically identify categorical features, and index them.
+// Set maxCategories so features with > 4 distinct values are treated as continuous.
+VectorIndexerModel featureIndexer = new VectorIndexer()
+  .setInputCol("features")
+  .setOutputCol("indexedFeatures")
+  .setMaxCategories(4)
+  .fit(data);
+
+// Split the data into training and test sets (30% held out for testing)
+DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
+DataFrame trainingData = splits[0];
+DataFrame testData = splits[1];
+
+// Train a GBT model.
+GBTRegressor gbt = new GBTRegressor()
+  .setLabelCol("label")
+  .setFeaturesCol("indexedFeatures")
+  .setMaxIter(10);
+
+// Chain indexer and GBT in a Pipeline
+Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});
+
+// Train model.  This also runs the indexer.
+PipelineModel model = pipeline.fit(trainingData);
+
+// Make predictions.
+DataFrame predictions = model.transform(testData);
+
+// Select example rows to display.
+predictions.select("prediction", "label", "features").show(5);
+
+// Select (prediction, true label) and compute test error
+RegressionEvaluator evaluator = new RegressionEvaluator()
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setMetricName("rmse");
+double rmse = evaluator.evaluate(predictions);
+System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
+
+GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
+System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeRegressorExample.java" in the Spark repo.
+
+ +
+ +

Refer to the Python API docs for more details.

+ +
from pyspark.ml import Pipeline
+from pyspark.ml.regression import GBTRegressor
+from pyspark.ml.feature import VectorIndexer
+from pyspark.ml.evaluation import RegressionEvaluator
+
+# Load and parse the data file, converting it to a DataFrame.
+data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+# Automatically identify categorical features, and index them.
+# Set maxCategories so features with > 4 distinct values are treated as continuous.
+featureIndexer =\
+    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
+
+# Split the data into training and test sets (30% held out for testing)
+(trainingData, testData) = data.randomSplit([0.7, 0.3])
+
+# Train a GBT model.
+gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)
+
+# Chain indexer and GBT in a Pipeline
+pipeline = Pipeline(stages=[featureIndexer, gbt])
+
+# Train model.  This also runs the indexer.
+model = pipeline.fit(trainingData)
+
+# Make predictions.
+predictions = model.transform(testData)
+
+# Select example rows to display.
+predictions.select("prediction", "label", "features").show(5)
+
+# Select (prediction, true label) and compute test error
+evaluator = RegressionEvaluator(
+    labelCol="label", predictionCol="prediction", metricName="rmse")
+rmse = evaluator.evaluate(predictions)
+print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
+
+gbtModel = model.stages[1]
+print(gbtModel)  # summary only
+
+
Find full example code at "examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py" in the Spark repo.
+
+
+ +

Survival regression

+ +

In spark.ml, we implement the Accelerated failure time (AFT) +model which is a parametric survival regression model for censored data. +It describes a model for the log of survival time, so it’s often called +log-linear model for survival analysis. Different from +Proportional hazards model +designed for the same purpose, the AFT model is more easily to parallelize +because each instance contribute to the objective function independently.

+ +

Given the values of the covariates $x^{‘}$, for random lifetime $t_{i}$ of +subjects i = 1, …, n, with possible right-censoring, +the likelihood function under the AFT model is given as: +\[ +L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} +\] +Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not. +Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$, the log-likelihood function +assumes the form: +\[ +\iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] +\] +Where $S_{0}(\epsilon_{i})$ is the baseline survivor function, +and $f_{0}(\epsilon_{i})$ is corresponding density function.

+ +

The most commonly used AFT model is based on the Weibull distribution of the survival time. +The Weibull distribution for lifetime corresponding to extreme value distribution for +log of the lifetime, and the $S_{0}(\epsilon)$ function is: +\[ +S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) +\] +the $f_{0}(\epsilon_{i})$ function is: +\[ +f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) +\] +The log-likelihood function for AFT model with Weibull distribution of lifetime is: +\[ +\iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] +\] +Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability, +the loss function we use to optimize is $-\iota(\beta,\sigma)$. +The gradient functions for $\beta$ and $\log\sigma$ respectively are: +\[ +\frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} +\] +\[ +\frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] +\]

+ +

The AFT model can be formulated as a convex optimization problem, +i.e. the task of finding a minimizer of a convex function $-\iota(\beta,\sigma)$ +that depends coefficients vector $\beta$ and the log of scale parameter $\log\sigma$. +The optimization algorithm underlying the implementation is L-BFGS. +The implementation matches the result from R’s survival function +survreg

+ +

Example

+ +
+ +
+
import org.apache.spark.ml.regression.AFTSurvivalRegression
+import org.apache.spark.mllib.linalg.Vectors
+
+val training = sqlContext.createDataFrame(Seq(
+  (1.218, 1.0, Vectors.dense(1.560, -0.605)),
+  (2.949, 0.0, Vectors.dense(0.346, 2.158)),
+  (3.627, 0.0, Vectors.dense(1.380, 0.231)),
+  (0.273, 1.0, Vectors.dense(0.520, 1.151)),
+  (4.199, 0.0, Vectors.dense(0.795, -0.226))
+)).toDF("label", "censor", "features")
+val quantileProbabilities = Array(0.3, 0.6)
+val aft = new AFTSurvivalRegression()
+  .setQuantileProbabilities(quantileProbabilities)
+  .setQuantilesCol("quantiles")
+
+val model = aft.fit(training)
+
+// Print the coefficients, intercept and scale parameter for AFT survival regression
+println(s"Coefficients: ${model.coefficients} Intercept: " +
+  s"${model.intercept} Scale: ${model.scale}")
+model.transform(training).show(false)
+
+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/AFTSurvivalRegressionExample.scala" in the Spark repo.
+
+ +
+
import java.util.Arrays;
+import java.util.List;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.ml.regression.AFTSurvivalRegression;
+import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
+import org.apache.spark.mllib.linalg.*;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.*;
+
+List<Row> data = Arrays.asList(
+  RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
+  RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
+  RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
+  RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
+  RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
+);
+StructType schema = new StructType(new StructField[]{
+  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
+  new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
+  new StructField("features", new VectorUDT(), false, Metadata.empty())
+});
+DataFrame training = jsql.createDataFrame(data, schema);
+double[] quantileProbabilities = new double[]{0.3, 0.6};
+AFTSurvivalRegression aft = new AFTSurvivalRegression()
+  .setQuantileProbabilities(quantileProbabilities)
+  .setQuantilesCol("quantiles");
+
+AFTSurvivalRegressionModel model = aft.fit(training);
+
+// Print the coefficients, intercept and scale parameter for AFT survival regression
+System.out.println("Coefficients: " + model.coefficients() + " Intercept: "
+  + model.intercept() + " Scale: " + model.scale());
+model.transform(training).show(false);
+
+
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java" in the Spark repo.
+
+ +
+
from pyspark.ml.regression import AFTSurvivalRegression
+from pyspark.mllib.linalg import Vectors
+
+training = sqlContext.createDataFrame([
+    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
+    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
+    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
+    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
+    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
+quantileProbabilities = [0.3, 0.6]
+aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
+                            quantilesCol="quantiles")
+
+model = aft.fit(training)
+
+# Print the coefficients, intercept and scale parameter for AFT survival regression
+print("Coefficients: " + str(model.coefficients))
+print("Intercept: " + str(model.intercept))
+print("Scale: " + str(model.scale))
+model.transform(training).show(truncate=False)
+
+
Find full example code at "examples/src/main/python/ml/aft_survival_regression.py" in the Spark repo.
+
+ +
+ +

Decision trees

+ +

Decision trees +and their ensembles are popular methods for the machine learning tasks of +classification and regression. Decision trees are widely used since they are easy to interpret, +handle categorical features, extend to the multiclass classification setting, do not require +feature scaling, and are able to capture non-linearities and feature interactions. Tree ensemble +algorithms such as random forests and boosting are among the top performers for classification and +regression tasks.

+ +

The spark.ml implementation supports decision trees for binary and multiclass classification and for regression, +using both continuous and categorical features. The implementation partitions data by rows, +allowing distributed training with millions or even billions of instances.

+ +

Users can find more information about the decision tree algorithm in the MLlib Decision Tree guide. +The main differences between this API and the original MLlib Decision Tree API are:

+ +
    +
  • support for ML Pipelines
  • +
  • separation of Decision Trees for classification vs. regression
  • +
  • use of DataFrame metadata to distinguish continuous and categorical features
  • +
+ +

The Pipelines API for Decision Trees offers a bit more functionality than the original API. In particular, for classification, users can get the predicted probability of each class (a.k.a. class conditional probabilities).

+ +

Ensembles of trees (Random Forests and Gradient-Boosted Trees) are described below in the Tree ensembles section.

+ +

Inputs and Outputs

+ +

We list the input and output (prediction) column types here. +All output columns are optional; to exclude an output column, set its corresponding Param to an empty string.

+ +

Input Columns

+ + + + + + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescription
labelColDouble"label"Label to predict
featuresColVector"features"Feature vector
+ +

Output Columns

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescriptionNotes
predictionColDouble"prediction"Predicted label
rawPredictionColVector"rawPrediction"Vector of length # classes, with the counts of training instance labels at the tree node which makes the predictionClassification only
probabilityColVector"probability"Vector of length # classes equal to rawPrediction normalized to a multinomial distributionClassification only
+ +

Tree Ensembles

+ +

The DataFrame API supports two major tree ensemble algorithms: Random Forests and Gradient-Boosted Trees (GBTs). +Both use spark.ml decision trees as their base models.

+ +

Users can find more information about ensemble algorithms in the MLlib Ensemble guide.
+In this section, we demonstrate the DataFrame API for ensembles.

+ +

The main differences between this API and the original MLlib ensembles API are:

+ +
    +
  • support for DataFrames and ML Pipelines
  • +
  • separation of classification vs. regression
  • +
  • use of DataFrame metadata to distinguish continuous and categorical features
  • +
  • more functionality for random forests: estimates of feature importance, as well as the predicted probability of each class (a.k.a. class conditional probabilities) for classification.
  • +
+ +

Random Forests

+ +

Random forests +are ensembles of decision trees. +Random forests combine many decision trees in order to reduce the risk of overfitting. +The spark.ml implementation supports random forests for binary and multiclass classification and for regression, +using both continuous and categorical features.

+ +

For more information on the algorithm itself, please see the spark.mllib documentation on random forests.

+ +

Inputs and Outputs

+ +

We list the input and output (prediction) column types here. +All output columns are optional; to exclude an output column, set its corresponding Param to an empty string.

+ +

Input Columns

+ + + + + + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescription
labelColDouble"label"Label to predict
featuresColVector"features"Feature vector
+ +

Output Columns (Predictions)

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescriptionNotes
predictionColDouble"prediction"Predicted label
rawPredictionColVector"rawPrediction"Vector of length # classes, with the counts of training instance labels at the tree node which makes the predictionClassification only
probabilityColVector"probability"Vector of length # classes equal to rawPrediction normalized to a multinomial distributionClassification only
+ +

Gradient-Boosted Trees (GBTs)

+ +

Gradient-Boosted Trees (GBTs) +are ensembles of decision trees. +GBTs iteratively train decision trees in order to minimize a loss function. +The spark.ml implementation supports GBTs for binary classification and for regression, +using both continuous and categorical features.

+ +

For more information on the algorithm itself, please see the spark.mllib documentation on GBTs.

+ +

Inputs and Outputs

+ +

We list the input and output (prediction) column types here. +All output columns are optional; to exclude an output column, set its corresponding Param to an empty string.

+ +

Input Columns

+ + + + + + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescription
labelColDouble"label"Label to predict
featuresColVector"features"Feature vector
+ +

Note that GBTClassifier currently only supports binary labels.

+ +

Output Columns (Predictions)

+ + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescriptionNotes
predictionColDouble"prediction"Predicted label
+ +

In the future, GBTClassifier will also output columns for rawPrediction and probability, just as RandomForestClassifier does.

+ + + +
+ + +
+ + + + + + + + + + + -- cgit v1.2.3