From 32c960a01d58b2d66cb60ff9013787ff48538d0c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 5 May 2014 18:32:54 -0700 Subject: [SPARK-1594][MLLIB] Cleaning up MLlib APIs and guide Final pass before the v1.0 release. * Remove `VectorRDDs` * Move `BinaryClassificationMetrics` from `evaluation.binary` to `evaluation` * Change default value of `addIntercept` to false and allow to add intercept in Ridge and Lasso. * Clean `DecisionTree` package doc and test suite. * Mark model constructors `private[spark]` * Rename `loadLibSVMData` to `loadLibSVMFile` and hide `LabelParser` from users. * Add `saveAsLibSVMFile`. * Add `appendBias` to `MLUtils`. Author: Xiangrui Meng Closes #524 from mengxr/mllib-cleaning and squashes the following commits: 295dc8b [Xiangrui Meng] update loadLibSVMFile doc 1977ac1 [Xiangrui Meng] fix doc of appendBias 649fcf0 [Xiangrui Meng] rename loadLibSVMData to loadLibSVMFile; hide LabelParser from user APIs 54b812c [Xiangrui Meng] add appendBias a71e7d0 [Xiangrui Meng] add saveAsLibSVMFile d976295 [Xiangrui Meng] Merge branch 'master' into mllib-cleaning b7e5cec [Xiangrui Meng] remove some experimental annotations and make model constructors private[mllib] 9b02b93 [Xiangrui Meng] minor code style update a593ddc [Xiangrui Meng] fix python tests fc28c18 [Xiangrui Meng] mark more classes experimental f6cbbff [Xiangrui Meng] fix Java tests 0af70b0 [Xiangrui Meng] minor 6e139ef [Xiangrui Meng] Merge branch 'master' into mllib-cleaning 94e6dce [Xiangrui Meng] move BinaryLabelCounter and BinaryConfusionMatrixImpl to evaluation.binary df34907 [Xiangrui Meng] clean DecisionTreeSuite to use LocalSparkContext c81807f [Xiangrui Meng] set the default value of AddIntercept to false 03389c0 [Xiangrui Meng] allow to add intercept in Ridge and Lasso c66c56f [Xiangrui Meng] move tree md to package object doc a2695df [Xiangrui Meng] update guide for BinaryClassificationMetrics 9194f4c [Xiangrui Meng] move BinaryClassificationMetrics one level up 1c1a0e3 [Xiangrui Meng] remove VectorRDDs because it only contains one function that is not necessary for us to maintain (cherry picked from commit 98750a74daf7e2b873da85d2d5067f47e3bbdc4e) Signed-off-by: Matei Zaharia --- .../mllib/classification/LogisticRegression.scala | 7 +- .../spark/mllib/classification/NaiveBayes.scala | 8 +- .../apache/spark/mllib/classification/SVM.scala | 7 +- .../org/apache/spark/mllib/clustering/KMeans.scala | 2 + .../spark/mllib/clustering/KMeansModel.scala | 2 +- .../evaluation/BinaryClassificationMetrics.scala | 146 +++++++++++++++ .../binary/BinaryClassificationMetrics.scala | 204 --------------------- .../evaluation/binary/BinaryConfusionMatrix.scala | 29 +++ .../evaluation/binary/BinaryLabelCounter.scala | 50 +++++ .../mllib/linalg/SingularValueDecomposition.scala | 8 +- .../linalg/distributed/CoordinateMatrix.scala | 2 + .../spark/mllib/linalg/distributed/RowMatrix.scala | 1 + .../spark/mllib/optimization/GradientDescent.scala | 16 +- .../org/apache/spark/mllib/rdd/VectorRDDs.scala | 32 ---- .../recommendation/MatrixFactorizationModel.scala | 10 +- .../regression/GeneralizedLinearAlgorithm.scala | 15 +- .../org/apache/spark/mllib/regression/Lasso.scala | 12 +- .../spark/mllib/regression/LinearRegression.scala | 2 +- .../spark/mllib/regression/RidgeRegression.scala | 12 +- .../scala/org/apache/spark/mllib/tree/README.md | 17 -- .../org/apache/spark/mllib/tree/package.scala | 29 +++ .../org/apache/spark/mllib/util/LabelParsers.scala | 13 +- .../org/apache/spark/mllib/util/MLUtils.scala | 155 +++++++++++----- .../JavaLogisticRegressionSuite.java | 6 +- .../spark/mllib/classification/JavaSVMSuite.java | 3 +- .../regression/JavaLinearRegressionSuite.java | 3 +- .../classification/LogisticRegressionSuite.scala | 20 +- .../spark/mllib/classification/SVMSuite.scala | 5 +- .../BinaryClassificationMetricsSuite.scala | 54 ++++++ .../binary/BinaryClassificationMetricsSuite.scala | 55 ------ .../apache/spark/mllib/rdd/VectorRDDsSuite.scala | 33 ---- .../spark/mllib/recommendation/ALSSuite.scala | 1 - .../apache/spark/mllib/regression/LassoSuite.scala | 6 - .../mllib/regression/LinearRegressionSuite.scala | 2 +- .../mllib/regression/RidgeRegressionSuite.scala | 6 - .../spark/mllib/tree/DecisionTreeSuite.scala | 16 +- .../org/apache/spark/mllib/util/MLUtilsSuite.scala | 66 +++++-- 37 files changed, 558 insertions(+), 497 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/tree/README.md create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 780e8bae42..90aa8ac998 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.classification +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.regression._ @@ -29,7 +30,7 @@ import org.apache.spark.rdd.RDD * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. */ -class LogisticRegressionModel( +class LogisticRegressionModel private[mllib] ( override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable { @@ -37,18 +38,22 @@ class LogisticRegressionModel( private var threshold: Option[Double] = Some(0.5) /** + * :: Experimental :: * Sets the threshold that separates positive predictions from negative predictions. An example * with prediction score greater than or equal to this threshold is identified as an positive, * and negative otherwise. The default value is 0.5. */ + @Experimental def setThreshold(threshold: Double): this.type = { this.threshold = Some(threshold) this } /** + * :: Experimental :: * Clears the threshold so that `predict` will output raw prediction scores. */ + @Experimental def clearThreshold(): this.type = { threshold = None this diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f6f62ce2de..b6e0c4a80e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -19,7 +19,6 @@ package org.apache.spark.mllib.classification import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} -import org.apache.spark.annotation.Experimental import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vector @@ -27,7 +26,6 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD /** - * :: Experimental :: * Model for Naive Bayes Classifiers. * * @param labels list of labels @@ -35,8 +33,7 @@ import org.apache.spark.rdd.RDD * @param theta log of class conditional probabilities, whose dimension is C-by-D, * where D is number of features */ -@Experimental -class NaiveBayesModel( +class NaiveBayesModel private[mllib] ( val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { @@ -124,6 +121,9 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with } } +/** + * Top-level methods for calling naive Bayes. + */ object NaiveBayes { /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index 81b126717e..e05213536e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.classification +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.regression._ @@ -29,7 +30,7 @@ import org.apache.spark.rdd.RDD * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. */ -class SVMModel( +class SVMModel private[mllib] ( override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable { @@ -37,18 +38,22 @@ class SVMModel( private var threshold: Option[Double] = Some(0.0) /** + * :: Experimental :: * Sets the threshold that separates positive predictions from negative predictions. An example * with prediction score greater than or equal to this threshold is identified as an positive, * and negative otherwise. The default value is 0.0. */ + @Experimental def setThreshold(threshold: Double): this.type = { this.threshold = Some(threshold) this } /** + * :: Experimental :: * Clears the threshold so that `predict` will output raw prediction scores. */ + @Experimental def clearThreshold(): this.type = { threshold = None this diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index a64c5d44be..de22fbb6ff 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} +import org.apache.spark.annotation.Experimental import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{Vector, Vectors} @@ -81,6 +82,7 @@ class KMeans private ( * this many times with random starting conditions (configured by the initialization mode), then * return the best clustering found over any run. Default: 1. */ + @Experimental def setRuns(runs: Int): KMeans = { if (runs <= 0) { throw new IllegalArgumentException("Number of runs must be positive") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 18abbf2758..ce14b06241 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.Vector /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ -class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable { +class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Serializable { /** Total number of clusters. */ def k: Int = clusterCenters.length diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala new file mode 100644 index 0000000000..079743742d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.apache.spark.annotation.Experimental +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.evaluation.binary._ +import org.apache.spark.rdd.{RDD, UnionRDD} + +/** + * :: Experimental :: + * Evaluator for binary classification. + * + * @param scoreAndLabels an RDD of (score, label) pairs. + */ +@Experimental +class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends Logging { + + /** Unpersist intermediate RDDs used in the computation. */ + def unpersist() { + cumulativeCounts.unpersist() + } + + /** Returns thresholds in descending order. */ + def thresholds(): RDD[Double] = cumulativeCounts.map(_._1) + + /** + * Returns the receiver operating characteristic (ROC) curve, + * which is an RDD of (false positive rate, true positive rate) + * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. + * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic + */ + def roc(): RDD[(Double, Double)] = { + val rocCurve = createCurve(FalsePositiveRate, Recall) + val sc = confusions.context + val first = sc.makeRDD(Seq((0.0, 0.0)), 1) + val last = sc.makeRDD(Seq((1.0, 1.0)), 1) + new UnionRDD[(Double, Double)](sc, Seq(first, rocCurve, last)) + } + + /** + * Computes the area under the receiver operating characteristic (ROC) curve. + */ + def areaUnderROC(): Double = AreaUnderCurve.of(roc()) + + /** + * Returns the precision-recall curve, which is an RDD of (recall, precision), + * NOT (precision, recall), with (0.0, 1.0) prepended to it. + * @see http://en.wikipedia.org/wiki/Precision_and_recall + */ + def pr(): RDD[(Double, Double)] = { + val prCurve = createCurve(Recall, Precision) + val sc = confusions.context + val first = sc.makeRDD(Seq((0.0, 1.0)), 1) + first.union(prCurve) + } + + /** + * Computes the area under the precision-recall curve. + */ + def areaUnderPR(): Double = AreaUnderCurve.of(pr()) + + /** + * Returns the (threshold, F-Measure) curve. + * @param beta the beta factor in F-Measure computation. + * @return an RDD of (threshold, F-Measure) pairs. + * @see http://en.wikipedia.org/wiki/F1_score + */ + def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) + + /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ + def fMeasureByThreshold(): RDD[(Double, Double)] = fMeasureByThreshold(1.0) + + /** Returns the (threshold, precision) curve. */ + def precisionByThreshold(): RDD[(Double, Double)] = createCurve(Precision) + + /** Returns the (threshold, recall) curve. */ + def recallByThreshold(): RDD[(Double, Double)] = createCurve(Recall) + + private lazy val ( + cumulativeCounts: RDD[(Double, BinaryLabelCounter)], + confusions: RDD[(Double, BinaryConfusionMatrix)]) = { + // Create a bin for each distinct score value, count positives and negatives within each bin, + // and then sort by score values in descending order. + val counts = scoreAndLabels.combineByKey( + createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label, + mergeValue = (c: BinaryLabelCounter, label: Double) => c += label, + mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 + ).sortByKey(ascending = false) + val agg = counts.values.mapPartitions({ iter => + val agg = new BinaryLabelCounter() + iter.foreach(agg += _) + Iterator(agg) + }, preservesPartitioning = true).collect() + val partitionwiseCumulativeCounts = + agg.scanLeft(new BinaryLabelCounter())( + (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c) + val totalCount = partitionwiseCumulativeCounts.last + logInfo(s"Total counts: $totalCount") + val cumulativeCounts = counts.mapPartitionsWithIndex( + (index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => { + val cumCount = partitionwiseCumulativeCounts(index) + iter.map { case (score, c) => + cumCount += c + (score, cumCount.clone()) + } + }, preservesPartitioning = true) + cumulativeCounts.persist() + val confusions = cumulativeCounts.map { case (score, cumCount) => + (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) + } + (cumulativeCounts, confusions) + } + + /** Creates a curve of (threshold, metric). */ + private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { + confusions.map { case (s, c) => + (s, y(c)) + } + } + + /** Creates a curve of (metricX, metricY). */ + private def createCurve( + x: BinaryClassificationMetricComputer, + y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { + confusions.map { case (_, c) => + (x(c), y(c)) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala deleted file mode 100644 index ed7b0fc943..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.evaluation.binary - -import org.apache.spark.rdd.{UnionRDD, RDD} -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.evaluation.AreaUnderCurve -import org.apache.spark.Logging - -/** - * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]]. - * - * @param count label counter for labels with scores greater than or equal to the current score - * @param totalCount label counter for all labels - */ -private case class BinaryConfusionMatrixImpl( - count: LabelCounter, - totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { - - /** number of true positives */ - override def numTruePositives: Long = count.numPositives - - /** number of false positives */ - override def numFalsePositives: Long = count.numNegatives - - /** number of false negatives */ - override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives - - /** number of true negatives */ - override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives - - /** number of positives */ - override def numPositives: Long = totalCount.numPositives - - /** number of negatives */ - override def numNegatives: Long = totalCount.numNegatives -} - -/** - * Evaluator for binary classification. - * - * @param scoreAndLabels an RDD of (score, label) pairs. - */ -class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) - extends Serializable with Logging { - - private lazy val ( - cumulativeCounts: RDD[(Double, LabelCounter)], - confusions: RDD[(Double, BinaryConfusionMatrix)]) = { - // Create a bin for each distinct score value, count positives and negatives within each bin, - // and then sort by score values in descending order. - val counts = scoreAndLabels.combineByKey( - createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label, - mergeValue = (c: LabelCounter, label: Double) => c += label, - mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2 - ).sortByKey(ascending = false) - val agg = counts.values.mapPartitions({ iter => - val agg = new LabelCounter() - iter.foreach(agg += _) - Iterator(agg) - }, preservesPartitioning = true).collect() - val partitionwiseCumulativeCounts = - agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c) - val totalCount = partitionwiseCumulativeCounts.last - logInfo(s"Total counts: $totalCount") - val cumulativeCounts = counts.mapPartitionsWithIndex( - (index: Int, iter: Iterator[(Double, LabelCounter)]) => { - val cumCount = partitionwiseCumulativeCounts(index) - iter.map { case (score, c) => - cumCount += c - (score, cumCount.clone()) - } - }, preservesPartitioning = true) - cumulativeCounts.persist() - val confusions = cumulativeCounts.map { case (score, cumCount) => - (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) - } - (cumulativeCounts, confusions) - } - - /** Unpersist intermediate RDDs used in the computation. */ - def unpersist() { - cumulativeCounts.unpersist() - } - - /** Returns thresholds in descending order. */ - def thresholds(): RDD[Double] = cumulativeCounts.map(_._1) - - /** - * Returns the receiver operating characteristic (ROC) curve, - * which is an RDD of (false positive rate, true positive rate) - * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. - * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic - */ - def roc(): RDD[(Double, Double)] = { - val rocCurve = createCurve(FalsePositiveRate, Recall) - val sc = confusions.context - val first = sc.makeRDD(Seq((0.0, 0.0)), 1) - val last = sc.makeRDD(Seq((1.0, 1.0)), 1) - new UnionRDD[(Double, Double)](sc, Seq(first, rocCurve, last)) - } - - /** - * Computes the area under the receiver operating characteristic (ROC) curve. - */ - def areaUnderROC(): Double = AreaUnderCurve.of(roc()) - - /** - * Returns the precision-recall curve, which is an RDD of (recall, precision), - * NOT (precision, recall), with (0.0, 1.0) prepended to it. - * @see http://en.wikipedia.org/wiki/Precision_and_recall - */ - def pr(): RDD[(Double, Double)] = { - val prCurve = createCurve(Recall, Precision) - val sc = confusions.context - val first = sc.makeRDD(Seq((0.0, 1.0)), 1) - first.union(prCurve) - } - - /** - * Computes the area under the precision-recall curve. - */ - def areaUnderPR(): Double = AreaUnderCurve.of(pr()) - - /** - * Returns the (threshold, F-Measure) curve. - * @param beta the beta factor in F-Measure computation. - * @return an RDD of (threshold, F-Measure) pairs. - * @see http://en.wikipedia.org/wiki/F1_score - */ - def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) - - /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ - def fMeasureByThreshold(): RDD[(Double, Double)] = fMeasureByThreshold(1.0) - - /** Returns the (threshold, precision) curve. */ - def precisionByThreshold(): RDD[(Double, Double)] = createCurve(Precision) - - /** Returns the (threshold, recall) curve. */ - def recallByThreshold(): RDD[(Double, Double)] = createCurve(Recall) - - /** Creates a curve of (threshold, metric). */ - private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { - confusions.map { case (s, c) => - (s, y(c)) - } - } - - /** Creates a curve of (metricX, metricY). */ - private def createCurve( - x: BinaryClassificationMetricComputer, - y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { - confusions.map { case (_, c) => - (x(c), y(c)) - } - } -} - -/** - * A counter for positives and negatives. - * - * @param numPositives number of positive labels - * @param numNegatives number of negative labels - */ -private class LabelCounter( - var numPositives: Long = 0L, - var numNegatives: Long = 0L) extends Serializable { - - /** Processes a label. */ - def +=(label: Double): LabelCounter = { - // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle - // -1.0 for negative as well. - if (label > 0.5) numPositives += 1L else numNegatives += 1L - this - } - - /** Merges another counter. */ - def +=(other: LabelCounter): LabelCounter = { - numPositives += other.numPositives - numNegatives += other.numNegatives - this - } - - override def clone: LabelCounter = { - new LabelCounter(numPositives, numNegatives) - } - - override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala index 75a75b2160..559c6ef7e7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala @@ -39,3 +39,32 @@ private[evaluation] trait BinaryConfusionMatrix { /** number of negatives */ def numNegatives: Long = numFalsePositives + numTrueNegatives } + +/** + * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]]. + * + * @param count label counter for labels with scores greater than or equal to the current score + * @param totalCount label counter for all labels + */ +private[evaluation] case class BinaryConfusionMatrixImpl( + count: BinaryLabelCounter, + totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix { + + /** number of true positives */ + override def numTruePositives: Long = count.numPositives + + /** number of false positives */ + override def numFalsePositives: Long = count.numNegatives + + /** number of false negatives */ + override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives + + /** number of true negatives */ + override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives + + /** number of positives */ + override def numPositives: Long = totalCount.numPositives + + /** number of negatives */ + override def numNegatives: Long = totalCount.numNegatives +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala new file mode 100644 index 0000000000..1e610c2009 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation.binary + +/** + * A counter for positives and negatives. + * + * @param numPositives number of positive labels + * @param numNegatives number of negative labels + */ +private[evaluation] class BinaryLabelCounter( + var numPositives: Long = 0L, + var numNegatives: Long = 0L) extends Serializable { + + /** Processes a label. */ + def +=(label: Double): BinaryLabelCounter = { + // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle + // -1.0 for negative as well. + if (label > 0.5) numPositives += 1L else numNegatives += 1L + this + } + + /** Merges another counter. */ + def +=(other: BinaryLabelCounter): BinaryLabelCounter = { + numPositives += other.numPositives + numNegatives += other.numNegatives + this + } + + override def clone: BinaryLabelCounter = { + new BinaryLabelCounter(numPositives, numNegatives) + } + + override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala index 46b1054574..9669c364ba 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala @@ -17,5 +17,11 @@ package org.apache.spark.mllib.linalg -/** Represents singular value decomposition (SVD) factors. */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents singular value decomposition (SVD) factors. + */ +@Experimental case class SingularValueDecomposition[UType, VType](U: UType, s: Vector, V: VType) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala index 56b8fdcda6..06d8915f3b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala @@ -25,11 +25,13 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors /** + * :: Experimental :: * Represents an entry in an distributed matrix. * @param i row index * @param j column index * @param value value of the entry */ +@Experimental case class MatrixEntry(i: Long, j: Long, value: Double) /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 0c0afcd9ec..b10857fe7c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -427,6 +427,7 @@ class RowMatrix( } } +@Experimental object RowMatrix { /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index c75909bac9..7030eeabe4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -21,19 +21,17 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} /** - * :: DeveloperApi :: * Class used to solve an optimization problem using Gradient Descent. * @param gradient Gradient function to be used. * @param updater Updater to be used to update weights after every iteration. */ -@DeveloperApi -class GradientDescent(private var gradient: Gradient, private var updater: Updater) +class GradientDescent private[mllib] (private var gradient: Gradient, private var updater: Updater) extends Optimizer with Logging { private var stepSize: Double = 1.0 @@ -51,9 +49,11 @@ class GradientDescent(private var gradient: Gradient, private var updater: Updat } /** + * :: Experimental :: * Set fraction of data to be used for each SGD iteration. * Default 1.0 (corresponding to deterministic/classical gradient descent) */ + @Experimental def setMiniBatchFraction(fraction: Double): this.type = { this.miniBatchFraction = fraction this @@ -95,6 +95,14 @@ class GradientDescent(private var gradient: Gradient, private var updater: Updat this } + /** + * :: DeveloperApi :: + * Runs gradient descent on the given training data. + * @param data training data + * @param initialWeights initial weights + * @return solution vector + */ + @DeveloperApi def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { val (weights, _) = GradientDescent.runMiniBatchSGD( data, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala deleted file mode 100644 index 9096d6a1a1..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.rdd - -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.{Vectors, Vector} - -/** - * Factory methods for `RDD[Vector]`. - */ -object VectorRDDs { - - /** - * Converts an `RDD[Array[Double]]` to `RDD[Vector]`. - */ - def fromArrayRDD(rdd: RDD[Array[Double]]): RDD[Vector] = rdd.map(v => Vectors.dense(v)) -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 471546cd82..899286d235 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.recommendation -import org.jblas._ +import org.jblas.DoubleMatrix import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaRDD @@ -25,7 +25,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.mllib.api.python.PythonMLLibAPI - /** * Model representing the result of matrix factorization. * @@ -35,12 +34,10 @@ import org.apache.spark.mllib.api.python.PythonMLLibAPI * @param productFeatures RDD of tuples where each tuple represents the productId * and the features computed for this product. */ -class MatrixFactorizationModel( +class MatrixFactorizationModel private[mllib] ( val rank: Int, val userFeatures: RDD[(Int, Array[Double])], - val productFeatures: RDD[(Int, Array[Double])]) - extends Serializable -{ + val productFeatures: RDD[(Int, Array[Double])]) extends Serializable { /** Predict the rating of one user for one product. */ def predict(user: Int, product: Int): Double = { val userVector = new DoubleMatrix(userFeatures.lookup(user).head) @@ -76,6 +73,7 @@ class MatrixFactorizationModel( * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) * @return JavaRDD of serialized Rating objects. */ + @DeveloperApi def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { val pythonAPI = new PythonMLLibAPI() val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index d969e7aa60..8cca926f1c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -19,13 +19,14 @@ package org.apache.spark.mllib.regression import breeze.linalg.{DenseVector => BDV, SparseVector => BSV} -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{Logging, SparkException} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg.{Vectors, Vector} /** + * :: DeveloperApi :: * GeneralizedLinearModel (GLM) represents a model trained using * GeneralizedLinearAlgorithm. GLMs consist of a weight vector and * an intercept. @@ -33,6 +34,7 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector} * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. */ +@DeveloperApi abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double) extends Serializable { @@ -72,9 +74,11 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double } /** + * :: DeveloperApi :: * GeneralizedLinearAlgorithm implements methods to train a Generalized Linear Model (GLM). * This class should be extended with an Optimizer to create a new GLM. */ +@DeveloperApi abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] extends Logging with Serializable { @@ -83,8 +87,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] /** The optimizer to solve the problem. */ def optimizer: Optimizer - /** Whether to add intercept (default: true). */ - protected var addIntercept: Boolean = true + /** Whether to add intercept (default: false). */ + protected var addIntercept: Boolean = false protected var validateData: Boolean = true @@ -94,7 +98,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] protected def createModel(weights: Vector, intercept: Double): M /** - * Set if the algorithm should add an intercept. Default true. + * Set if the algorithm should add an intercept. Default false. + * We set the default to false because adding the intercept will cause memory allocation. */ def setIntercept(addIntercept: Boolean): this.type = { this.addIntercept = addIntercept @@ -102,10 +107,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] } /** - * :: Experimental :: * Set if the algorithm should validate data before training. Default true. */ - @Experimental def setValidateData(validateData: Boolean): this.type = { this.validateData = validateData this diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala index 0e6fb1b1ca..a05dfc045f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.regression +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.optimization._ import org.apache.spark.rdd.RDD @@ -27,7 +28,7 @@ import org.apache.spark.rdd.RDD * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. */ -class LassoModel( +class LassoModel private[mllib] ( override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) @@ -64,21 +65,12 @@ class LassoWithSGD private ( .setRegParam(regParam) .setMiniBatchFraction(miniBatchFraction) - // We don't want to penalize the intercept, so set this to false. - super.setIntercept(false) - /** * Construct a Lasso object with default parameters: {stepSize: 1.0, numIterations: 100, * regParam: 1.0, miniBatchFraction: 1.0}. */ def this() = this(1.0, 100, 1.0, 1.0) - override def setIntercept(addIntercept: Boolean): this.type = { - // TODO: Support adding intercept. - if (addIntercept) throw new UnsupportedOperationException("Adding intercept is not supported.") - this - } - override protected def createModel(weights: Vector, intercept: Double) = { new LassoModel(weights, intercept) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index 1532ff90d8..0ebad4eb58 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -27,7 +27,7 @@ import org.apache.spark.mllib.optimization._ * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. */ -class LinearRegressionModel( +class LinearRegressionModel private[mllib] ( override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala index 5f7e25a9b8..bd983bac00 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.regression +import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg.Vector @@ -27,7 +28,7 @@ import org.apache.spark.mllib.linalg.Vector * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. */ -class RidgeRegressionModel( +class RidgeRegressionModel private[mllib] ( override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) @@ -65,21 +66,12 @@ class RidgeRegressionWithSGD private ( .setRegParam(regParam) .setMiniBatchFraction(miniBatchFraction) - // We don't want to penalize the intercept in RidgeRegression, so set this to false. - super.setIntercept(false) - /** * Construct a RidgeRegression object with default parameters: {stepSize: 1.0, numIterations: 100, * regParam: 1.0, miniBatchFraction: 1.0}. */ def this() = this(1.0, 100, 1.0, 1.0) - override def setIntercept(addIntercept: Boolean): this.type = { - // TODO: Support adding intercept. - if (addIntercept) throw new UnsupportedOperationException("Adding intercept is not supported.") - this - } - override protected def createModel(weights: Vector, intercept: Double) = { new RidgeRegressionModel(weights, intercept) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/README.md b/mllib/src/main/scala/org/apache/spark/mllib/tree/README.md deleted file mode 100644 index 0fd71aa973..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/README.md +++ /dev/null @@ -1,17 +0,0 @@ -This package contains the default implementation of the decision tree algorithm. - -The decision tree algorithm supports: -+ Binary classification -+ Regression -+ Information loss calculation with entropy and gini for classification and variance for regression -+ Both continuous and categorical features - -# Tree improvements -+ Node model pruning -+ Printing to dot files - -# Future Ensemble Extensions - -+ Random forests -+ Boosting -+ Extremely randomized trees diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala new file mode 100644 index 0000000000..bcaacc1b1f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib + +/** + * This package contains the default implementation of the decision tree algorithm, which supports: + * - binary classification, + * - regression, + * - information loss calculation with entropy and Gini for classification and + * variance for regression, + * - both continuous and categorical features. + */ +package object tree { +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala index f7966d3ebb..e25bf18b78 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala @@ -18,16 +18,23 @@ package org.apache.spark.mllib.util /** Trait for label parsers. */ -trait LabelParser extends Serializable { +private trait LabelParser extends Serializable { /** Parses a string label into a double label. */ def parse(labelString: String): Double } +/** Factory methods for label parsers. */ +private object LabelParser { + def getInstance(multiclass: Boolean): LabelParser = { + if (multiclass) MulticlassLabelParser else BinaryLabelParser + } +} + /** * Label parser for binary labels, which outputs 1.0 (positive) if the value is greater than 0.5, * or 0.0 (negative) otherwise. So it works with +1/-1 labeling and +1/0 labeling. */ -object BinaryLabelParser extends LabelParser { +private object BinaryLabelParser extends LabelParser { /** Gets the default instance of BinaryLabelParser. */ def getInstance(): LabelParser = this @@ -41,7 +48,7 @@ object BinaryLabelParser extends LabelParser { /** * Label parser for multiclass labels, which converts the input label to double. */ -object MulticlassLabelParser extends LabelParser { +private object MulticlassLabelParser extends LabelParser { /** Gets the default instance of MulticlassLabelParser. */ def getInstance(): LabelParser = this diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 3d6e7e0d5c..e598b6cb17 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -19,16 +19,17 @@ package org.apache.spark.mllib.util import scala.reflect.ClassTag -import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance} +import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, + squaredDistance => breezeSquaredDistance} import org.apache.spark.annotation.Experimental import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD -import org.apache.spark.SparkContext._ import org.apache.spark.util.random.BernoulliSampler import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.storage.StorageLevel /** * Helper methods to load, save and pre-process data used in ML Lib. @@ -54,13 +55,16 @@ object MLUtils { * * @param sc Spark context * @param path file or directory path in any Hadoop-supported file system URI - * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise + * @param labelParser parser for labels * @param numFeatures number of features, which will be determined from the input data if a - * negative value is given. The default value is -1. - * @param minPartitions min number of partitions, default: sc.defaultMinPartitions + * nonpositive value is given. This is useful when the dataset is already split + * into multiple files and you want to load them separately, because some + * features may not present in certain files, which leads to inconsistent + * feature dimensions. + * @param minPartitions min number of partitions * @return labeled data stored as an RDD[LabeledPoint] */ - def loadLibSVMData( + private def loadLibSVMFile( sc: SparkContext, path: String, labelParser: LabelParser, @@ -68,63 +72,112 @@ object MLUtils { minPartitions: Int): RDD[LabeledPoint] = { val parsed = sc.textFile(path, minPartitions) .map(_.trim) - .filter(!_.isEmpty) - .map(_.split(' ')) + .filter(line => !(line.isEmpty || line.startsWith("#"))) + .map { line => + val items = line.split(' ') + val label = labelParser.parse(items.head) + val (indices, values) = items.tail.map { item => + val indexAndValue = item.split(':') + val index = indexAndValue(0).toInt - 1 // Convert 1-based indices to 0-based. + val value = indexAndValue(1).toDouble + (index, value) + }.unzip + (label, indices.toArray, values.toArray) + } + // Determine number of features. - val d = if (numFeatures >= 0) { + val d = if (numFeatures > 0) { numFeatures } else { - parsed.map { items => - if (items.length > 1) { - items.last.split(':')(0).toInt - } else { - 0 - } - }.reduce(math.max) + parsed.persist(StorageLevel.MEMORY_ONLY) + parsed.map { case (label, indices, values) => + indices.lastOption.getOrElse(0) + }.reduce(math.max) + 1 } - parsed.map { items => - val label = labelParser.parse(items.head) - val (indices, values) = items.tail.map { item => - val indexAndValue = item.split(':') - val index = indexAndValue(0).toInt - 1 - val value = indexAndValue(1).toDouble - (index, value) - }.unzip - LabeledPoint(label, Vectors.sparse(d, indices.toArray, values.toArray)) + + parsed.map { case (label, indices, values) => + LabeledPoint(label, Vectors.sparse(d, indices, values)) } } - // Convenient methods for calling from Java. + // Convenient methods for `loadLibSVMFile`. /** - * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint], - * with number of features determined automatically and the default number of partitions. + * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint]. + * The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR. + * Each line represents a labeled sparse feature vector using the following format: + * {{{label index1:value1 index2:value2 ...}}} + * where the indices are one-based and in ascending order. + * This method parses each line into a [[org.apache.spark.mllib.regression.LabeledPoint]], + * where the feature indices are converted to zero-based. + * + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param multiclass whether the input labels contain more than two classes. If false, any label + * with value greater than 0.5 will be mapped to 1.0, or 0.0 otherwise. So it + * works for both +1/-1 and 1/0 cases. If true, the double value parsed directly + * from the label string will be used as the label value. + * @param numFeatures number of features, which will be determined from the input data if a + * nonpositive value is given. This is useful when the dataset is already split + * into multiple files and you want to load them separately, because some + * features may not present in certain files, which leads to inconsistent + * feature dimensions. + * @param minPartitions min number of partitions + * @return labeled data stored as an RDD[LabeledPoint] */ - def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] = - loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions) + def loadLibSVMFile( + sc: SparkContext, + path: String, + multiclass: Boolean, + numFeatures: Int, + minPartitions: Int): RDD[LabeledPoint] = + loadLibSVMFile(sc, path, LabelParser.getInstance(multiclass), numFeatures, minPartitions) /** - * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], - * with the given label parser, number of features determined automatically, - * and the default number of partitions. + * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], with the default number of + * partitions. */ - def loadLibSVMData( + def loadLibSVMFile( sc: SparkContext, path: String, - labelParser: LabelParser): RDD[LabeledPoint] = - loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions) + multiclass: Boolean, + numFeatures: Int): RDD[LabeledPoint] = + loadLibSVMFile(sc, path, multiclass, numFeatures, sc.defaultMinPartitions) /** - * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], - * with the given label parser, number of features specified explicitly, - * and the default number of partitions. + * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], with the number of features + * determined automatically and the default number of partitions. */ - def loadLibSVMData( + def loadLibSVMFile( sc: SparkContext, path: String, - labelParser: LabelParser, - numFeatures: Int): RDD[LabeledPoint] = - loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions) + multiclass: Boolean): RDD[LabeledPoint] = + loadLibSVMFile(sc, path, multiclass, -1, sc.defaultMinPartitions) + + /** + * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint], with number of + * features determined automatically and the default number of partitions. + */ + def loadLibSVMFile(sc: SparkContext, path: String): RDD[LabeledPoint] = + loadLibSVMFile(sc, path, multiclass = false, -1, sc.defaultMinPartitions) + + /** + * Save labeled data in LIBSVM format. + * @param data an RDD of LabeledPoint to be saved + * @param dir directory to save the data + * + * @see [[org.apache.spark.mllib.util.MLUtils#loadLibSVMFile]] + */ + def saveAsLibSVMFile(data: RDD[LabeledPoint], dir: String) { + // TODO: allow to specify label precision and feature precision. + val dataStr = data.map { case LabeledPoint(label, features) => + val featureStrings = features.toBreeze.activeIterator.map { case (i, v) => + s"${i + 1}:$v" + } + (Iterator(label) ++ featureStrings).mkString(" ") + } + dataStr.saveAsTextFile(dir) + } /** * :: Experimental :: @@ -163,10 +216,12 @@ object MLUtils { } /** + * :: Experimental :: * Return a k element array of pairs of RDDs with the first element of each pair * containing the training data, a complement of the validation data and the second * element, the validation data, containing a unique 1/kth of the data. Where k=numFolds. */ + @Experimental def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat (1 to numFolds).map { fold => @@ -178,6 +233,18 @@ object MLUtils { }.toArray } + /** + * Returns a new vector with `1.0` (bias) appended to the input vector. + */ + def appendBias(vector: Vector): Vector = { + val vector1 = vector.toBreeze match { + case dv: BDV[Double] => BDV.vertcat(dv, new BDV[Double](Array(1.0))) + case sv: BSV[Double] => BSV.vertcat(sv, new BSV[Double](Array(0), Array(1.0), 1)) + case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + Vectors.fromBreeze(vector1) + } + /** * Returns the squared Euclidean distance between two vectors. The following formula will be used * if it does not introduce too much numerical error: diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java index e18e3bc6a8..d75d3a6b26 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -68,6 +68,7 @@ public class JavaLogisticRegressionSuite implements Serializable { LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD(); + lrImpl.setIntercept(true); lrImpl.optimizer().setStepSize(1.0) .setRegParam(1.0) .setNumIterations(100); @@ -80,8 +81,8 @@ public class JavaLogisticRegressionSuite implements Serializable { @Test public void runLRUsingStaticMethods() { int nPoints = 10000; - double A = 2.0; - double B = -1.5; + double A = 0.0; + double B = -2.5; JavaRDD testRDD = sc.parallelize( LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); @@ -92,6 +93,7 @@ public class JavaLogisticRegressionSuite implements Serializable { testRDD.rdd(), 100, 1.0, 1.0); int numAccurate = validatePrediction(validationData, model); + System.out.println(numAccurate); Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java index 4701a5e545..667f76a1bd 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java @@ -67,6 +67,7 @@ public class JavaSVMSuite implements Serializable { SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); SVMWithSGD svmSGDImpl = new SVMWithSGD(); + svmSGDImpl.setIntercept(true); svmSGDImpl.optimizer().setStepSize(1.0) .setRegParam(1.0) .setNumIterations(100); @@ -79,7 +80,7 @@ public class JavaSVMSuite implements Serializable { @Test public void runSVMUsingStaticMethods() { int nPoints = 10000; - double A = 2.0; + double A = 0.0; double[] weights = {-1.5, 1.0}; JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java index 5a4410a632..7151e55351 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -68,6 +68,7 @@ public class JavaLinearRegressionSuite implements Serializable { LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD(); + linSGDImpl.setIntercept(true); LinearRegressionModel model = linSGDImpl.run(testRDD.rdd()); int numAccurate = validatePrediction(validationData, model); @@ -77,7 +78,7 @@ public class JavaLinearRegressionSuite implements Serializable { @Test public void runLinearRegressionUsingStaticMethods() { int nPoints = 100; - double A = 3.0; + double A = 0.0; double[] weights = {10, 10}; JavaRDD testRDD = sc.parallelize( diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala index 1e03c9df82..4d7b984e3e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -46,24 +46,14 @@ object LogisticRegressionSuite { val rnd = new Random(seed) val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) - // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1) - val unifRand = new scala.util.Random(45) - val rLogis = (0 until nPoints).map { i => - val u = unifRand.nextDouble() - math.log(u) - math.log(1.0-u) - } - - // y <- A + B*x + rLogis() - // y <- as.numeric(y > 0) - val y: Seq[Int] = (0 until nPoints).map { i => - val yVal = offset + scale * x1(i) + rLogis(i) - if (yVal > 0) 1 else 0 + val y = (0 until nPoints).map { i => + val p = 1.0 / (1.0 + math.exp(-(offset + scale * x1(i)))) + if (rnd.nextDouble() < p) 1.0 else 0.0 } val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i))))) testData } - } class LogisticRegressionSuite extends FunSuite with LocalSparkContext with ShouldMatchers { @@ -85,7 +75,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val lr = new LogisticRegressionWithSGD() + val lr = new LogisticRegressionWithSGD().setIntercept(true) lr.optimizer.setStepSize(10.0).setNumIterations(20) val model = lr.run(testRDD) @@ -118,7 +108,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul testRDD.cache() // Use half as many iterations as the previous test. - val lr = new LogisticRegressionWithSGD() + val lr = new LogisticRegressionWithSGD().setIntercept(true) lr.optimizer.setStepSize(10.0).setNumIterations(10) val model = lr.run(testRDD, initialWeights) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala index dfacbfeee6..77d6f04b32 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -69,7 +69,6 @@ class SVMSuite extends FunSuite with LocalSparkContext { assert(numOffPredictions < input.length / 5) } - test("SVM using local random SGD") { val nPoints = 10000 @@ -83,7 +82,7 @@ class SVMSuite extends FunSuite with LocalSparkContext { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val svm = new SVMWithSGD() + val svm = new SVMWithSGD().setIntercept(true) svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.run(testRDD) @@ -115,7 +114,7 @@ class SVMSuite extends FunSuite with LocalSparkContext { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val svm = new SVMWithSGD() + val svm = new SVMWithSGD().setIntercept(true) svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.run(testRDD, initialWeights) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala new file mode 100644 index 0000000000..9d16182f9d --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext + +class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { + test("binary evaluation metrics") { + val scoreAndLabels = sc.parallelize( + Seq((0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)), 2) + val metrics = new BinaryClassificationMetrics(scoreAndLabels) + val threshold = Seq(0.8, 0.6, 0.4, 0.1) + val numTruePositives = Seq(1, 3, 3, 4) + val numFalsePositives = Seq(0, 1, 2, 3) + val numPositives = 4 + val numNegatives = 3 + val precision = numTruePositives.zip(numFalsePositives).map { case (t, f) => + t.toDouble / (t + f) + } + val recall = numTruePositives.map(t => t.toDouble / numPositives) + val fpr = numFalsePositives.map(f => f.toDouble / numNegatives) + val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recall) ++ Seq((1.0, 1.0)) + val pr = recall.zip(precision) + val prCurve = Seq((0.0, 1.0)) ++ pr + val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r) } + val f2 = pr.map { case (r, p) => 5.0 * (p * r) / (4.0 * p + r)} + assert(metrics.thresholds().collect().toSeq === threshold) + assert(metrics.roc().collect().toSeq === rocCurve) + assert(metrics.areaUnderROC() === AreaUnderCurve.of(rocCurve)) + assert(metrics.pr().collect().toSeq === prCurve) + assert(metrics.areaUnderPR() === AreaUnderCurve.of(prCurve)) + assert(metrics.fMeasureByThreshold().collect().toSeq === threshold.zip(f1)) + assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === threshold.zip(f2)) + assert(metrics.precisionByThreshold().collect().toSeq === threshold.zip(precision)) + assert(metrics.recallByThreshold().collect().toSeq === threshold.zip(recall)) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala deleted file mode 100644 index 173fdaefab..0000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.evaluation.binary - -import org.scalatest.FunSuite - -import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.mllib.evaluation.AreaUnderCurve - -class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { - test("binary evaluation metrics") { - val scoreAndLabels = sc.parallelize( - Seq((0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)), 2) - val metrics = new BinaryClassificationMetrics(scoreAndLabels) - val threshold = Seq(0.8, 0.6, 0.4, 0.1) - val numTruePositives = Seq(1, 3, 3, 4) - val numFalsePositives = Seq(0, 1, 2, 3) - val numPositives = 4 - val numNegatives = 3 - val precision = numTruePositives.zip(numFalsePositives).map { case (t, f) => - t.toDouble / (t + f) - } - val recall = numTruePositives.map(t => t.toDouble / numPositives) - val fpr = numFalsePositives.map(f => f.toDouble / numNegatives) - val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recall) ++ Seq((1.0, 1.0)) - val pr = recall.zip(precision) - val prCurve = Seq((0.0, 1.0)) ++ pr - val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r) } - val f2 = pr.map { case (r, p) => 5.0 * (p * r) / (4.0 * p + r)} - assert(metrics.thresholds().collect().toSeq === threshold) - assert(metrics.roc().collect().toSeq === rocCurve) - assert(metrics.areaUnderROC() === AreaUnderCurve.of(rocCurve)) - assert(metrics.pr().collect().toSeq === prCurve) - assert(metrics.areaUnderPR() === AreaUnderCurve.of(prCurve)) - assert(metrics.fMeasureByThreshold().collect().toSeq === threshold.zip(f1)) - assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === threshold.zip(f2)) - assert(metrics.precisionByThreshold().collect().toSeq === threshold.zip(precision)) - assert(metrics.recallByThreshold().collect().toSeq === threshold.zip(recall)) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala deleted file mode 100644 index 692f025e95..0000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.rdd - -import org.scalatest.FunSuite - -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.LocalSparkContext - -class VectorRDDsSuite extends FunSuite with LocalSparkContext { - - test("from array rdd") { - val data = Seq(Array(1.0, 2.0), Array(3.0, 4.0)) - val arrayRdd = sc.parallelize(data, 2) - val vectorRdd = VectorRDDs.fromArrayRDD(arrayRdd) - assert(arrayRdd.collect().map(v => Vectors.dense(v)) === vectorRdd.collect()) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 4dfcd4b52e..2d944f3eb7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -27,7 +27,6 @@ import org.jblas.DoubleMatrix import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.SparkContext._ -import org.apache.spark.Partitioner object ALSSuite { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala index 6aad9eb84e..bfa42959c8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala @@ -112,10 +112,4 @@ class LassoSuite extends FunSuite with LocalSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } - - test("do not support intercept") { - intercept[UnsupportedOperationException] { - new LassoWithSGD().setIntercept(true) - } - } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala index 2f7d30708c..7aaad7d7a3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala @@ -37,7 +37,7 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext { test("linear regression") { val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( 3.0, Array(10.0, 10.0), 100, 42), 2).cache() - val linReg = new LinearRegressionWithSGD() + val linReg = new LinearRegressionWithSGD().setIntercept(true) linReg.optimizer.setNumIterations(1000).setStepSize(1.0) val model = linReg.run(testRDD) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala index f66fc6ea6c..67768e17fb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala @@ -72,10 +72,4 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext { assert(ridgeErr < linearErr, "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")") } - - test("do not support intercept") { - intercept[UnsupportedOperationException] { - new RidgeRegressionWithSGD().setIntercept(true) - } - } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala index 350130c914..be383aab71 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala @@ -17,10 +17,8 @@ package org.apache.spark.mllib.tree -import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.spark.SparkContext import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Variance} import org.apache.spark.mllib.tree.model.Filter @@ -28,19 +26,9 @@ import org.apache.spark.mllib.tree.configuration.Strategy import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.configuration.FeatureType._ import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.LocalSparkContext -class DecisionTreeSuite extends FunSuite with BeforeAndAfterAll { - - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class DecisionTreeSuite extends FunSuite with LocalSparkContext { test("split and bin calculation") { val arr = DecisionTreeSuite.generateOrderedLabeledPointsWithLabel1() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 674378a34c..3f64baf6fe 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -19,8 +19,8 @@ package org.apache.spark.mllib.util import java.io.File +import scala.io.Source import scala.math -import scala.util.Random import org.scalatest.FunSuite @@ -29,7 +29,8 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNor import com.google.common.base.Charsets import com.google.common.io.Files -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors} +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils._ class MLUtilsSuite extends FunSuite with LocalSparkContext { @@ -58,7 +59,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { } } - test("loadLibSVMData") { + test("loadLibSVMFile") { val lines = """ |+1 1:1.0 3:2.0 5:3.0 @@ -70,8 +71,8 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { Files.write(lines, file, Charsets.US_ASCII) val path = tempDir.toURI.toString - val pointsWithNumFeatures = MLUtils.loadLibSVMData(sc, path, BinaryLabelParser, 6).collect() - val pointsWithoutNumFeatures = MLUtils.loadLibSVMData(sc, path).collect() + val pointsWithNumFeatures = loadLibSVMFile(sc, path, multiclass = false, 6).collect() + val pointsWithoutNumFeatures = loadLibSVMFile(sc, path).collect() for (points <- Seq(pointsWithNumFeatures, pointsWithoutNumFeatures)) { assert(points.length === 3) @@ -83,29 +84,54 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { assert(points(2).features === Vectors.sparse(6, Seq((1, 4.0), (3, 5.0), (5, 6.0)))) } - val multiclassPoints = MLUtils.loadLibSVMData(sc, path, MulticlassLabelParser).collect() + val multiclassPoints = loadLibSVMFile(sc, path, multiclass = true).collect() assert(multiclassPoints.length === 3) assert(multiclassPoints(0).label === 1.0) assert(multiclassPoints(1).label === -1.0) assert(multiclassPoints(2).label === -1.0) - try { - file.delete() - tempDir.delete() - } catch { - case t: Throwable => - } + deleteQuietly(tempDir) + } + + test("saveAsLibSVMFile") { + val examples = sc.parallelize(Seq( + LabeledPoint(1.1, Vectors.sparse(3, Seq((0, 1.23), (2, 4.56)))), + LabeledPoint(0.0, Vectors.dense(1.01, 2.02, 3.03)) + ), 2) + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output") + MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString) + val lines = outputDir.listFiles() + .filter(_.getName.startsWith("part-")) + .flatMap(Source.fromFile(_).getLines()) + .toSet + val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03") + assert(lines === expected) + deleteQuietly(tempDir) + } + + test("appendBias") { + val sv = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) + val sv1 = appendBias(sv).asInstanceOf[SparseVector] + assert(sv1.size === 4) + assert(sv1.indices === Array(0, 2, 3)) + assert(sv1.values === Array(1.0, 3.0, 1.0)) + + val dv = Vectors.dense(1.0, 0.0, 3.0) + val dv1 = appendBias(dv).asInstanceOf[DenseVector] + assert(dv1.size === 4) + assert(dv1.values === Array(1.0, 0.0, 3.0, 1.0)) } test("kFold") { val data = sc.parallelize(1 to 100, 2) val collectedData = data.collect().sorted - val twoFoldedRdd = MLUtils.kFold(data, 2, 1) + val twoFoldedRdd = kFold(data, 2, 1) assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted) assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted) for (folds <- 2 to 10) { for (seed <- 1 to 5) { - val foldedRdds = MLUtils.kFold(data, folds, seed) + val foldedRdds = kFold(data, folds, seed) assert(foldedRdds.size === folds) foldedRdds.map { case (training, validation) => val result = validation.union(training).collect().sorted @@ -132,4 +158,16 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { } } + /** Delete a file/directory quietly. */ + def deleteQuietly(f: File) { + if (f.isDirectory) { + f.listFiles().foreach(deleteQuietly) + } + try { + f.delete() + } catch { + case _: Throwable => + } + } } + -- cgit v1.2.3