aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-05-05 18:32:54 -0700
committerMatei Zaharia <matei@databricks.com>2014-05-05 18:33:05 -0700
commit32c960a01d58b2d66cb60ff9013787ff48538d0c (patch)
tree788ea1f1ec8dadec8c19873f330c0f452461b2d0 /mllib
parenta5f765cabb9f8f7f465ad2cd3d5a64d9ad165928 (diff)
downloadspark-32c960a01d58b2d66cb60ff9013787ff48538d0c.tar.gz
spark-32c960a01d58b2d66cb60ff9013787ff48538d0c.tar.bz2
spark-32c960a01d58b2d66cb60ff9013787ff48538d0c.zip
[SPARK-1594][MLLIB] Cleaning up MLlib APIs and guide
Final pass before the v1.0 release. * Remove `VectorRDDs` * Move `BinaryClassificationMetrics` from `evaluation.binary` to `evaluation` * Change default value of `addIntercept` to false and allow to add intercept in Ridge and Lasso. * Clean `DecisionTree` package doc and test suite. * Mark model constructors `private[spark]` * Rename `loadLibSVMData` to `loadLibSVMFile` and hide `LabelParser` from users. * Add `saveAsLibSVMFile`. * Add `appendBias` to `MLUtils`. Author: Xiangrui Meng <meng@databricks.com> Closes #524 from mengxr/mllib-cleaning and squashes the following commits: 295dc8b [Xiangrui Meng] update loadLibSVMFile doc 1977ac1 [Xiangrui Meng] fix doc of appendBias 649fcf0 [Xiangrui Meng] rename loadLibSVMData to loadLibSVMFile; hide LabelParser from user APIs 54b812c [Xiangrui Meng] add appendBias a71e7d0 [Xiangrui Meng] add saveAsLibSVMFile d976295 [Xiangrui Meng] Merge branch 'master' into mllib-cleaning b7e5cec [Xiangrui Meng] remove some experimental annotations and make model constructors private[mllib] 9b02b93 [Xiangrui Meng] minor code style update a593ddc [Xiangrui Meng] fix python tests fc28c18 [Xiangrui Meng] mark more classes experimental f6cbbff [Xiangrui Meng] fix Java tests 0af70b0 [Xiangrui Meng] minor 6e139ef [Xiangrui Meng] Merge branch 'master' into mllib-cleaning 94e6dce [Xiangrui Meng] move BinaryLabelCounter and BinaryConfusionMatrixImpl to evaluation.binary df34907 [Xiangrui Meng] clean DecisionTreeSuite to use LocalSparkContext c81807f [Xiangrui Meng] set the default value of AddIntercept to false 03389c0 [Xiangrui Meng] allow to add intercept in Ridge and Lasso c66c56f [Xiangrui Meng] move tree md to package object doc a2695df [Xiangrui Meng] update guide for BinaryClassificationMetrics 9194f4c [Xiangrui Meng] move BinaryClassificationMetrics one level up 1c1a0e3 [Xiangrui Meng] remove VectorRDDs because it only contains one function that is not necessary for us to maintain (cherry picked from commit 98750a74daf7e2b873da85d2d5067f47e3bbdc4e) Signed-off-by: Matei Zaharia <matei@databricks.com>
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala7
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala7
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala (renamed from mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala)144
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala29
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala50
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala16
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala10
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala15
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/README.md17
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala (renamed from mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala)19
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala13
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala155
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java6
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java3
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java3
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala20
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala5
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala (renamed from mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala)3
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala33
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala1
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala16
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala66
34 files changed, 381 insertions, 320 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index 780e8bae42..90aa8ac998 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -17,6 +17,7 @@
package org.apache.spark.mllib.classification
+import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
@@ -29,7 +30,7 @@ import org.apache.spark.rdd.RDD
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
-class LogisticRegressionModel(
+class LogisticRegressionModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
@@ -37,18 +38,22 @@ class LogisticRegressionModel(
private var threshold: Option[Double] = Some(0.5)
/**
+ * :: Experimental ::
* Sets the threshold that separates positive predictions from negative predictions. An example
* with prediction score greater than or equal to this threshold is identified as an positive,
* and negative otherwise. The default value is 0.5.
*/
+ @Experimental
def setThreshold(threshold: Double): this.type = {
this.threshold = Some(threshold)
this
}
/**
+ * :: Experimental ::
* Clears the threshold so that `predict` will output raw prediction scores.
*/
+ @Experimental
def clearThreshold(): this.type = {
threshold = None
this
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index f6f62ce2de..b6e0c4a80e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -19,7 +19,6 @@ package org.apache.spark.mllib.classification
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
-import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector
@@ -27,7 +26,6 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
/**
- * :: Experimental ::
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
@@ -35,8 +33,7 @@ import org.apache.spark.rdd.RDD
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
*/
-@Experimental
-class NaiveBayesModel(
+class NaiveBayesModel private[mllib] (
val labels: Array[Double],
val pi: Array[Double],
val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
@@ -124,6 +121,9 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
}
}
+/**
+ * Top-level methods for calling naive Bayes.
+ */
object NaiveBayes {
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index 81b126717e..e05213536e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -17,6 +17,7 @@
package org.apache.spark.mllib.classification
+import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
@@ -29,7 +30,7 @@ import org.apache.spark.rdd.RDD
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
-class SVMModel(
+class SVMModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
@@ -37,18 +38,22 @@ class SVMModel(
private var threshold: Option[Double] = Some(0.0)
/**
+ * :: Experimental ::
* Sets the threshold that separates positive predictions from negative predictions. An example
* with prediction score greater than or equal to this threshold is identified as an positive,
* and negative otherwise. The default value is 0.0.
*/
+ @Experimental
def setThreshold(threshold: Double): this.type = {
this.threshold = Some(threshold)
this
}
/**
+ * :: Experimental ::
* Clears the threshold so that `predict` will output raw prediction scores.
*/
+ @Experimental
def clearThreshold(): this.type = {
threshold = None
this
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index a64c5d44be..de22fbb6ff 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
+import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
@@ -81,6 +82,7 @@ class KMeans private (
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
*/
+ @Experimental
def setRuns(runs: Int): KMeans = {
if (runs <= 0) {
throw new IllegalArgumentException("Number of runs must be positive")
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 18abbf2758..ce14b06241 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.Vector
/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
*/
-class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {
+class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Serializable {
/** Total number of clusters. */
def k: Int = clusterCenters.length
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
index ed7b0fc943..079743742d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
@@ -15,83 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.evaluation.binary
+package org.apache.spark.mllib.evaluation
-import org.apache.spark.rdd.{UnionRDD, RDD}
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.evaluation.AreaUnderCurve
+import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.evaluation.binary._
+import org.apache.spark.rdd.{RDD, UnionRDD}
/**
- * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]].
- *
- * @param count label counter for labels with scores greater than or equal to the current score
- * @param totalCount label counter for all labels
- */
-private case class BinaryConfusionMatrixImpl(
- count: LabelCounter,
- totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable {
-
- /** number of true positives */
- override def numTruePositives: Long = count.numPositives
-
- /** number of false positives */
- override def numFalsePositives: Long = count.numNegatives
-
- /** number of false negatives */
- override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives
-
- /** number of true negatives */
- override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives
-
- /** number of positives */
- override def numPositives: Long = totalCount.numPositives
-
- /** number of negatives */
- override def numNegatives: Long = totalCount.numNegatives
-}
-
-/**
+ * :: Experimental ::
* Evaluator for binary classification.
*
* @param scoreAndLabels an RDD of (score, label) pairs.
*/
-class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)])
- extends Serializable with Logging {
-
- private lazy val (
- cumulativeCounts: RDD[(Double, LabelCounter)],
- confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
- // Create a bin for each distinct score value, count positives and negatives within each bin,
- // and then sort by score values in descending order.
- val counts = scoreAndLabels.combineByKey(
- createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label,
- mergeValue = (c: LabelCounter, label: Double) => c += label,
- mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2
- ).sortByKey(ascending = false)
- val agg = counts.values.mapPartitions({ iter =>
- val agg = new LabelCounter()
- iter.foreach(agg += _)
- Iterator(agg)
- }, preservesPartitioning = true).collect()
- val partitionwiseCumulativeCounts =
- agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c)
- val totalCount = partitionwiseCumulativeCounts.last
- logInfo(s"Total counts: $totalCount")
- val cumulativeCounts = counts.mapPartitionsWithIndex(
- (index: Int, iter: Iterator[(Double, LabelCounter)]) => {
- val cumCount = partitionwiseCumulativeCounts(index)
- iter.map { case (score, c) =>
- cumCount += c
- (score, cumCount.clone())
- }
- }, preservesPartitioning = true)
- cumulativeCounts.persist()
- val confusions = cumulativeCounts.map { case (score, cumCount) =>
- (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
- }
- (cumulativeCounts, confusions)
- }
+@Experimental
+class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends Logging {
/** Unpersist intermediate RDDs used in the computation. */
def unpersist() {
@@ -154,6 +93,41 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)])
/** Returns the (threshold, recall) curve. */
def recallByThreshold(): RDD[(Double, Double)] = createCurve(Recall)
+ private lazy val (
+ cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
+ confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
+ // Create a bin for each distinct score value, count positives and negatives within each bin,
+ // and then sort by score values in descending order.
+ val counts = scoreAndLabels.combineByKey(
+ createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
+ mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
+ mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
+ ).sortByKey(ascending = false)
+ val agg = counts.values.mapPartitions({ iter =>
+ val agg = new BinaryLabelCounter()
+ iter.foreach(agg += _)
+ Iterator(agg)
+ }, preservesPartitioning = true).collect()
+ val partitionwiseCumulativeCounts =
+ agg.scanLeft(new BinaryLabelCounter())(
+ (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)
+ val totalCount = partitionwiseCumulativeCounts.last
+ logInfo(s"Total counts: $totalCount")
+ val cumulativeCounts = counts.mapPartitionsWithIndex(
+ (index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
+ val cumCount = partitionwiseCumulativeCounts(index)
+ iter.map { case (score, c) =>
+ cumCount += c
+ (score, cumCount.clone())
+ }
+ }, preservesPartitioning = true)
+ cumulativeCounts.persist()
+ val confusions = cumulativeCounts.map { case (score, cumCount) =>
+ (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
+ }
+ (cumulativeCounts, confusions)
+ }
+
/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (s, c) =>
@@ -170,35 +144,3 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)])
}
}
}
-
-/**
- * A counter for positives and negatives.
- *
- * @param numPositives number of positive labels
- * @param numNegatives number of negative labels
- */
-private class LabelCounter(
- var numPositives: Long = 0L,
- var numNegatives: Long = 0L) extends Serializable {
-
- /** Processes a label. */
- def +=(label: Double): LabelCounter = {
- // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle
- // -1.0 for negative as well.
- if (label > 0.5) numPositives += 1L else numNegatives += 1L
- this
- }
-
- /** Merges another counter. */
- def +=(other: LabelCounter): LabelCounter = {
- numPositives += other.numPositives
- numNegatives += other.numNegatives
- this
- }
-
- override def clone: LabelCounter = {
- new LabelCounter(numPositives, numNegatives)
- }
-
- override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}"
-}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala
index 75a75b2160..559c6ef7e7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala
@@ -39,3 +39,32 @@ private[evaluation] trait BinaryConfusionMatrix {
/** number of negatives */
def numNegatives: Long = numFalsePositives + numTrueNegatives
}
+
+/**
+ * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]].
+ *
+ * @param count label counter for labels with scores greater than or equal to the current score
+ * @param totalCount label counter for all labels
+ */
+private[evaluation] case class BinaryConfusionMatrixImpl(
+ count: BinaryLabelCounter,
+ totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix {
+
+ /** number of true positives */
+ override def numTruePositives: Long = count.numPositives
+
+ /** number of false positives */
+ override def numFalsePositives: Long = count.numNegatives
+
+ /** number of false negatives */
+ override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives
+
+ /** number of true negatives */
+ override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives
+
+ /** number of positives */
+ override def numPositives: Long = totalCount.numPositives
+
+ /** number of negatives */
+ override def numNegatives: Long = totalCount.numNegatives
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala
new file mode 100644
index 0000000000..1e610c2009
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.evaluation.binary
+
+/**
+ * A counter for positives and negatives.
+ *
+ * @param numPositives number of positive labels
+ * @param numNegatives number of negative labels
+ */
+private[evaluation] class BinaryLabelCounter(
+ var numPositives: Long = 0L,
+ var numNegatives: Long = 0L) extends Serializable {
+
+ /** Processes a label. */
+ def +=(label: Double): BinaryLabelCounter = {
+ // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle
+ // -1.0 for negative as well.
+ if (label > 0.5) numPositives += 1L else numNegatives += 1L
+ this
+ }
+
+ /** Merges another counter. */
+ def +=(other: BinaryLabelCounter): BinaryLabelCounter = {
+ numPositives += other.numPositives
+ numNegatives += other.numNegatives
+ this
+ }
+
+ override def clone: BinaryLabelCounter = {
+ new BinaryLabelCounter(numPositives, numNegatives)
+ }
+
+ override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}"
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
index 46b1054574..9669c364ba 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
@@ -17,5 +17,11 @@
package org.apache.spark.mllib.linalg
-/** Represents singular value decomposition (SVD) factors. */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represents singular value decomposition (SVD) factors.
+ */
+@Experimental
case class SingularValueDecomposition[UType, VType](U: UType, s: Vector, V: VType)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
index 56b8fdcda6..06d8915f3b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
@@ -25,11 +25,13 @@ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
/**
+ * :: Experimental ::
* Represents an entry in an distributed matrix.
* @param i row index
* @param j column index
* @param value value of the entry
*/
+@Experimental
case class MatrixEntry(i: Long, j: Long, value: Double)
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 0c0afcd9ec..b10857fe7c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -427,6 +427,7 @@ class RowMatrix(
}
}
+@Experimental
object RowMatrix {
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index c75909bac9..7030eeabe4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -21,19 +21,17 @@ import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseVector => BDV}
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
- * :: DeveloperApi ::
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
*/
-@DeveloperApi
-class GradientDescent(private var gradient: Gradient, private var updater: Updater)
+class GradientDescent private[mllib] (private var gradient: Gradient, private var updater: Updater)
extends Optimizer with Logging {
private var stepSize: Double = 1.0
@@ -51,9 +49,11 @@ class GradientDescent(private var gradient: Gradient, private var updater: Updat
}
/**
+ * :: Experimental ::
* Set fraction of data to be used for each SGD iteration.
* Default 1.0 (corresponding to deterministic/classical gradient descent)
*/
+ @Experimental
def setMiniBatchFraction(fraction: Double): this.type = {
this.miniBatchFraction = fraction
this
@@ -95,6 +95,14 @@ class GradientDescent(private var gradient: Gradient, private var updater: Updat
this
}
+ /**
+ * :: DeveloperApi ::
+ * Runs gradient descent on the given training data.
+ * @param data training data
+ * @param initialWeights initial weights
+ * @return solution vector
+ */
+ @DeveloperApi
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 471546cd82..899286d235 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -17,7 +17,7 @@
package org.apache.spark.mllib.recommendation
-import org.jblas._
+import org.jblas.DoubleMatrix
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
@@ -25,7 +25,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.api.python.PythonMLLibAPI
-
/**
* Model representing the result of matrix factorization.
*
@@ -35,12 +34,10 @@ import org.apache.spark.mllib.api.python.PythonMLLibAPI
* @param productFeatures RDD of tuples where each tuple represents the productId
* and the features computed for this product.
*/
-class MatrixFactorizationModel(
+class MatrixFactorizationModel private[mllib] (
val rank: Int,
val userFeatures: RDD[(Int, Array[Double])],
- val productFeatures: RDD[(Int, Array[Double])])
- extends Serializable
-{
+ val productFeatures: RDD[(Int, Array[Double])]) extends Serializable {
/** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double = {
val userVector = new DoubleMatrix(userFeatures.lookup(user).head)
@@ -76,6 +73,7 @@ class MatrixFactorizationModel(
* @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
* @return JavaRDD of serialized Rating objects.
*/
+ @DeveloperApi
def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
val pythonAPI = new PythonMLLibAPI()
val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes))
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index d969e7aa60..8cca926f1c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -19,13 +19,14 @@ package org.apache.spark.mllib.regression
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
+ * :: DeveloperApi ::
* GeneralizedLinearModel (GLM) represents a model trained using
* GeneralizedLinearAlgorithm. GLMs consist of a weight vector and
* an intercept.
@@ -33,6 +34,7 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
+@DeveloperApi
abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double)
extends Serializable {
@@ -72,9 +74,11 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double
}
/**
+ * :: DeveloperApi ::
* GeneralizedLinearAlgorithm implements methods to train a Generalized Linear Model (GLM).
* This class should be extended with an Optimizer to create a new GLM.
*/
+@DeveloperApi
abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
extends Logging with Serializable {
@@ -83,8 +87,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
/** The optimizer to solve the problem. */
def optimizer: Optimizer
- /** Whether to add intercept (default: true). */
- protected var addIntercept: Boolean = true
+ /** Whether to add intercept (default: false). */
+ protected var addIntercept: Boolean = false
protected var validateData: Boolean = true
@@ -94,7 +98,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
protected def createModel(weights: Vector, intercept: Double): M
/**
- * Set if the algorithm should add an intercept. Default true.
+ * Set if the algorithm should add an intercept. Default false.
+ * We set the default to false because adding the intercept will cause memory allocation.
*/
def setIntercept(addIntercept: Boolean): this.type = {
this.addIntercept = addIntercept
@@ -102,10 +107,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
}
/**
- * :: Experimental ::
* Set if the algorithm should validate data before training. Default true.
*/
- @Experimental
def setValidateData(validateData: Boolean): this.type = {
this.validateData = validateData
this
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
index 0e6fb1b1ca..a05dfc045f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
@@ -17,6 +17,7 @@
package org.apache.spark.mllib.regression
+import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.rdd.RDD
@@ -27,7 +28,7 @@ import org.apache.spark.rdd.RDD
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
-class LassoModel(
+class LassoModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
@@ -64,21 +65,12 @@ class LassoWithSGD private (
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
- // We don't want to penalize the intercept, so set this to false.
- super.setIntercept(false)
-
/**
* Construct a Lasso object with default parameters: {stepSize: 1.0, numIterations: 100,
* regParam: 1.0, miniBatchFraction: 1.0}.
*/
def this() = this(1.0, 100, 1.0, 1.0)
- override def setIntercept(addIntercept: Boolean): this.type = {
- // TODO: Support adding intercept.
- if (addIntercept) throw new UnsupportedOperationException("Adding intercept is not supported.")
- this
- }
-
override protected def createModel(weights: Vector, intercept: Double) = {
new LassoModel(weights, intercept)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
index 1532ff90d8..0ebad4eb58 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
@@ -27,7 +27,7 @@ import org.apache.spark.mllib.optimization._
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
-class LinearRegressionModel(
+class LinearRegressionModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
index 5f7e25a9b8..bd983bac00 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
@@ -17,6 +17,7 @@
package org.apache.spark.mllib.regression
+import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.Vector
@@ -27,7 +28,7 @@ import org.apache.spark.mllib.linalg.Vector
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
-class RidgeRegressionModel(
+class RidgeRegressionModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
@@ -65,21 +66,12 @@ class RidgeRegressionWithSGD private (
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
- // We don't want to penalize the intercept in RidgeRegression, so set this to false.
- super.setIntercept(false)
-
/**
* Construct a RidgeRegression object with default parameters: {stepSize: 1.0, numIterations: 100,
* regParam: 1.0, miniBatchFraction: 1.0}.
*/
def this() = this(1.0, 100, 1.0, 1.0)
- override def setIntercept(addIntercept: Boolean): this.type = {
- // TODO: Support adding intercept.
- if (addIntercept) throw new UnsupportedOperationException("Adding intercept is not supported.")
- this
- }
-
override protected def createModel(weights: Vector, intercept: Double) = {
new RidgeRegressionModel(weights, intercept)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/README.md b/mllib/src/main/scala/org/apache/spark/mllib/tree/README.md
deleted file mode 100644
index 0fd71aa973..0000000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-This package contains the default implementation of the decision tree algorithm.
-
-The decision tree algorithm supports:
-+ Binary classification
-+ Regression
-+ Information loss calculation with entropy and gini for classification and variance for regression
-+ Both continuous and categorical features
-
-# Tree improvements
-+ Node model pruning
-+ Printing to dot files
-
-# Future Ensemble Extensions
-
-+ Random forests
-+ Boosting
-+ Extremely randomized trees
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala
index 9096d6a1a1..bcaacc1b1f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDs.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/package.scala
@@ -15,18 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.rdd
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.linalg.{Vectors, Vector}
+package org.apache.spark.mllib
/**
- * Factory methods for `RDD[Vector]`.
+ * This package contains the default implementation of the decision tree algorithm, which supports:
+ * - binary classification,
+ * - regression,
+ * - information loss calculation with entropy and Gini for classification and
+ * variance for regression,
+ * - both continuous and categorical features.
*/
-object VectorRDDs {
-
- /**
- * Converts an `RDD[Array[Double]]` to `RDD[Vector]`.
- */
- def fromArrayRDD(rdd: RDD[Array[Double]]): RDD[Vector] = rdd.map(v => Vectors.dense(v))
+package object tree {
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala
index f7966d3ebb..e25bf18b78 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LabelParsers.scala
@@ -18,16 +18,23 @@
package org.apache.spark.mllib.util
/** Trait for label parsers. */
-trait LabelParser extends Serializable {
+private trait LabelParser extends Serializable {
/** Parses a string label into a double label. */
def parse(labelString: String): Double
}
+/** Factory methods for label parsers. */
+private object LabelParser {
+ def getInstance(multiclass: Boolean): LabelParser = {
+ if (multiclass) MulticlassLabelParser else BinaryLabelParser
+ }
+}
+
/**
* Label parser for binary labels, which outputs 1.0 (positive) if the value is greater than 0.5,
* or 0.0 (negative) otherwise. So it works with +1/-1 labeling and +1/0 labeling.
*/
-object BinaryLabelParser extends LabelParser {
+private object BinaryLabelParser extends LabelParser {
/** Gets the default instance of BinaryLabelParser. */
def getInstance(): LabelParser = this
@@ -41,7 +48,7 @@ object BinaryLabelParser extends LabelParser {
/**
* Label parser for multiclass labels, which converts the input label to double.
*/
-object MulticlassLabelParser extends LabelParser {
+private object MulticlassLabelParser extends LabelParser {
/** Gets the default instance of MulticlassLabelParser. */
def getInstance(): LabelParser = this
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 3d6e7e0d5c..e598b6cb17 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -19,16 +19,17 @@ package org.apache.spark.mllib.util
import scala.reflect.ClassTag
-import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
+import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
+ squaredDistance => breezeSquaredDistance}
import org.apache.spark.annotation.Experimental
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
-import org.apache.spark.SparkContext._
import org.apache.spark.util.random.BernoulliSampler
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.storage.StorageLevel
/**
* Helper methods to load, save and pre-process data used in ML Lib.
@@ -54,13 +55,16 @@ object MLUtils {
*
* @param sc Spark context
* @param path file or directory path in any Hadoop-supported file system URI
- * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
+ * @param labelParser parser for labels
* @param numFeatures number of features, which will be determined from the input data if a
- * negative value is given. The default value is -1.
- * @param minPartitions min number of partitions, default: sc.defaultMinPartitions
+ * nonpositive value is given. This is useful when the dataset is already split
+ * into multiple files and you want to load them separately, because some
+ * features may not present in certain files, which leads to inconsistent
+ * feature dimensions.
+ * @param minPartitions min number of partitions
* @return labeled data stored as an RDD[LabeledPoint]
*/
- def loadLibSVMData(
+ private def loadLibSVMFile(
sc: SparkContext,
path: String,
labelParser: LabelParser,
@@ -68,63 +72,112 @@ object MLUtils {
minPartitions: Int): RDD[LabeledPoint] = {
val parsed = sc.textFile(path, minPartitions)
.map(_.trim)
- .filter(!_.isEmpty)
- .map(_.split(' '))
+ .filter(line => !(line.isEmpty || line.startsWith("#")))
+ .map { line =>
+ val items = line.split(' ')
+ val label = labelParser.parse(items.head)
+ val (indices, values) = items.tail.map { item =>
+ val indexAndValue = item.split(':')
+ val index = indexAndValue(0).toInt - 1 // Convert 1-based indices to 0-based.
+ val value = indexAndValue(1).toDouble
+ (index, value)
+ }.unzip
+ (label, indices.toArray, values.toArray)
+ }
+
// Determine number of features.
- val d = if (numFeatures >= 0) {
+ val d = if (numFeatures > 0) {
numFeatures
} else {
- parsed.map { items =>
- if (items.length > 1) {
- items.last.split(':')(0).toInt
- } else {
- 0
- }
- }.reduce(math.max)
+ parsed.persist(StorageLevel.MEMORY_ONLY)
+ parsed.map { case (label, indices, values) =>
+ indices.lastOption.getOrElse(0)
+ }.reduce(math.max) + 1
}
- parsed.map { items =>
- val label = labelParser.parse(items.head)
- val (indices, values) = items.tail.map { item =>
- val indexAndValue = item.split(':')
- val index = indexAndValue(0).toInt - 1
- val value = indexAndValue(1).toDouble
- (index, value)
- }.unzip
- LabeledPoint(label, Vectors.sparse(d, indices.toArray, values.toArray))
+
+ parsed.map { case (label, indices, values) =>
+ LabeledPoint(label, Vectors.sparse(d, indices, values))
}
}
- // Convenient methods for calling from Java.
+ // Convenient methods for `loadLibSVMFile`.
/**
- * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
- * with number of features determined automatically and the default number of partitions.
+ * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint].
+ * The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR.
+ * Each line represents a labeled sparse feature vector using the following format:
+ * {{{label index1:value1 index2:value2 ...}}}
+ * where the indices are one-based and in ascending order.
+ * This method parses each line into a [[org.apache.spark.mllib.regression.LabeledPoint]],
+ * where the feature indices are converted to zero-based.
+ *
+ * @param sc Spark context
+ * @param path file or directory path in any Hadoop-supported file system URI
+ * @param multiclass whether the input labels contain more than two classes. If false, any label
+ * with value greater than 0.5 will be mapped to 1.0, or 0.0 otherwise. So it
+ * works for both +1/-1 and 1/0 cases. If true, the double value parsed directly
+ * from the label string will be used as the label value.
+ * @param numFeatures number of features, which will be determined from the input data if a
+ * nonpositive value is given. This is useful when the dataset is already split
+ * into multiple files and you want to load them separately, because some
+ * features may not present in certain files, which leads to inconsistent
+ * feature dimensions.
+ * @param minPartitions min number of partitions
+ * @return labeled data stored as an RDD[LabeledPoint]
*/
- def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
- loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions)
+ def loadLibSVMFile(
+ sc: SparkContext,
+ path: String,
+ multiclass: Boolean,
+ numFeatures: Int,
+ minPartitions: Int): RDD[LabeledPoint] =
+ loadLibSVMFile(sc, path, LabelParser.getInstance(multiclass), numFeatures, minPartitions)
/**
- * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
- * with the given label parser, number of features determined automatically,
- * and the default number of partitions.
+ * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], with the default number of
+ * partitions.
*/
- def loadLibSVMData(
+ def loadLibSVMFile(
sc: SparkContext,
path: String,
- labelParser: LabelParser): RDD[LabeledPoint] =
- loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions)
+ multiclass: Boolean,
+ numFeatures: Int): RDD[LabeledPoint] =
+ loadLibSVMFile(sc, path, multiclass, numFeatures, sc.defaultMinPartitions)
/**
- * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
- * with the given label parser, number of features specified explicitly,
- * and the default number of partitions.
+ * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], with the number of features
+ * determined automatically and the default number of partitions.
*/
- def loadLibSVMData(
+ def loadLibSVMFile(
sc: SparkContext,
path: String,
- labelParser: LabelParser,
- numFeatures: Int): RDD[LabeledPoint] =
- loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions)
+ multiclass: Boolean): RDD[LabeledPoint] =
+ loadLibSVMFile(sc, path, multiclass, -1, sc.defaultMinPartitions)
+
+ /**
+ * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint], with number of
+ * features determined automatically and the default number of partitions.
+ */
+ def loadLibSVMFile(sc: SparkContext, path: String): RDD[LabeledPoint] =
+ loadLibSVMFile(sc, path, multiclass = false, -1, sc.defaultMinPartitions)
+
+ /**
+ * Save labeled data in LIBSVM format.
+ * @param data an RDD of LabeledPoint to be saved
+ * @param dir directory to save the data
+ *
+ * @see [[org.apache.spark.mllib.util.MLUtils#loadLibSVMFile]]
+ */
+ def saveAsLibSVMFile(data: RDD[LabeledPoint], dir: String) {
+ // TODO: allow to specify label precision and feature precision.
+ val dataStr = data.map { case LabeledPoint(label, features) =>
+ val featureStrings = features.toBreeze.activeIterator.map { case (i, v) =>
+ s"${i + 1}:$v"
+ }
+ (Iterator(label) ++ featureStrings).mkString(" ")
+ }
+ dataStr.saveAsTextFile(dir)
+ }
/**
* :: Experimental ::
@@ -163,10 +216,12 @@ object MLUtils {
}
/**
+ * :: Experimental ::
* Return a k element array of pairs of RDDs with the first element of each pair
* containing the training data, a complement of the validation data and the second
* element, the validation data, containing a unique 1/kth of the data. Where k=numFolds.
*/
+ @Experimental
def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = {
val numFoldsF = numFolds.toFloat
(1 to numFolds).map { fold =>
@@ -179,6 +234,18 @@ object MLUtils {
}
/**
+ * Returns a new vector with `1.0` (bias) appended to the input vector.
+ */
+ def appendBias(vector: Vector): Vector = {
+ val vector1 = vector.toBreeze match {
+ case dv: BDV[Double] => BDV.vertcat(dv, new BDV[Double](Array(1.0)))
+ case sv: BSV[Double] => BSV.vertcat(sv, new BSV[Double](Array(0), Array(1.0), 1))
+ case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+ }
+ Vectors.fromBreeze(vector1)
+ }
+
+ /**
* Returns the squared Euclidean distance between two vectors. The following formula will be used
* if it does not introduce too much numerical error:
* <pre>
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java
index e18e3bc6a8..d75d3a6b26 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java
@@ -68,6 +68,7 @@ public class JavaLogisticRegressionSuite implements Serializable {
LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17);
LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD();
+ lrImpl.setIntercept(true);
lrImpl.optimizer().setStepSize(1.0)
.setRegParam(1.0)
.setNumIterations(100);
@@ -80,8 +81,8 @@ public class JavaLogisticRegressionSuite implements Serializable {
@Test
public void runLRUsingStaticMethods() {
int nPoints = 10000;
- double A = 2.0;
- double B = -1.5;
+ double A = 0.0;
+ double B = -2.5;
JavaRDD<LabeledPoint> testRDD = sc.parallelize(
LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache();
@@ -92,6 +93,7 @@ public class JavaLogisticRegressionSuite implements Serializable {
testRDD.rdd(), 100, 1.0, 1.0);
int numAccurate = validatePrediction(validationData, model);
+ System.out.println(numAccurate);
Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
index 4701a5e545..667f76a1bd 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
@@ -67,6 +67,7 @@ public class JavaSVMSuite implements Serializable {
SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17);
SVMWithSGD svmSGDImpl = new SVMWithSGD();
+ svmSGDImpl.setIntercept(true);
svmSGDImpl.optimizer().setStepSize(1.0)
.setRegParam(1.0)
.setNumIterations(100);
@@ -79,7 +80,7 @@ public class JavaSVMSuite implements Serializable {
@Test
public void runSVMUsingStaticMethods() {
int nPoints = 10000;
- double A = 2.0;
+ double A = 0.0;
double[] weights = {-1.5, 1.0};
JavaRDD<LabeledPoint> testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A,
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
index 5a4410a632..7151e55351 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java
@@ -68,6 +68,7 @@ public class JavaLinearRegressionSuite implements Serializable {
LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1);
LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD();
+ linSGDImpl.setIntercept(true);
LinearRegressionModel model = linSGDImpl.run(testRDD.rdd());
int numAccurate = validatePrediction(validationData, model);
@@ -77,7 +78,7 @@ public class JavaLinearRegressionSuite implements Serializable {
@Test
public void runLinearRegressionUsingStaticMethods() {
int nPoints = 100;
- double A = 3.0;
+ double A = 0.0;
double[] weights = {10, 10};
JavaRDD<LabeledPoint> testRDD = sc.parallelize(
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 1e03c9df82..4d7b984e3e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -46,24 +46,14 @@ object LogisticRegressionSuite {
val rnd = new Random(seed)
val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
- // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1)
- val unifRand = new scala.util.Random(45)
- val rLogis = (0 until nPoints).map { i =>
- val u = unifRand.nextDouble()
- math.log(u) - math.log(1.0-u)
- }
-
- // y <- A + B*x + rLogis()
- // y <- as.numeric(y > 0)
- val y: Seq[Int] = (0 until nPoints).map { i =>
- val yVal = offset + scale * x1(i) + rLogis(i)
- if (yVal > 0) 1 else 0
+ val y = (0 until nPoints).map { i =>
+ val p = 1.0 / (1.0 + math.exp(-(offset + scale * x1(i))))
+ if (rnd.nextDouble() < p) 1.0 else 0.0
}
val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
testData
}
-
}
class LogisticRegressionSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
@@ -85,7 +75,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val lr = new LogisticRegressionWithSGD()
+ val lr = new LogisticRegressionWithSGD().setIntercept(true)
lr.optimizer.setStepSize(10.0).setNumIterations(20)
val model = lr.run(testRDD)
@@ -118,7 +108,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul
testRDD.cache()
// Use half as many iterations as the previous test.
- val lr = new LogisticRegressionWithSGD()
+ val lr = new LogisticRegressionWithSGD().setIntercept(true)
lr.optimizer.setStepSize(10.0).setNumIterations(10)
val model = lr.run(testRDD, initialWeights)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index dfacbfeee6..77d6f04b32 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -69,7 +69,6 @@ class SVMSuite extends FunSuite with LocalSparkContext {
assert(numOffPredictions < input.length / 5)
}
-
test("SVM using local random SGD") {
val nPoints = 10000
@@ -83,7 +82,7 @@ class SVMSuite extends FunSuite with LocalSparkContext {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val svm = new SVMWithSGD()
+ val svm = new SVMWithSGD().setIntercept(true)
svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
val model = svm.run(testRDD)
@@ -115,7 +114,7 @@ class SVMSuite extends FunSuite with LocalSparkContext {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val svm = new SVMWithSGD()
+ val svm = new SVMWithSGD().setIntercept(true)
svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
val model = svm.run(testRDD, initialWeights)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
index 173fdaefab..9d16182f9d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.evaluation.binary
+package org.apache.spark.mllib.evaluation
import org.scalatest.FunSuite
import org.apache.spark.mllib.util.LocalSparkContext
-import org.apache.spark.mllib.evaluation.AreaUnderCurve
class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext {
test("binary evaluation metrics") {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala
deleted file mode 100644
index 692f025e95..0000000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.rdd
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.LocalSparkContext
-
-class VectorRDDsSuite extends FunSuite with LocalSparkContext {
-
- test("from array rdd") {
- val data = Seq(Array(1.0, 2.0), Array(3.0, 4.0))
- val arrayRdd = sc.parallelize(data, 2)
- val vectorRdd = VectorRDDs.fromArrayRDD(arrayRdd)
- assert(arrayRdd.collect().map(v => Vectors.dense(v)) === vectorRdd.collect())
- }
-}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 4dfcd4b52e..2d944f3eb7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -27,7 +27,6 @@ import org.jblas.DoubleMatrix
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.Partitioner
object ALSSuite {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index 6aad9eb84e..bfa42959c8 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -112,10 +112,4 @@ class LassoSuite extends FunSuite with LocalSparkContext {
// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
-
- test("do not support intercept") {
- intercept[UnsupportedOperationException] {
- new LassoWithSGD().setIntercept(true)
- }
- }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 2f7d30708c..7aaad7d7a3 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -37,7 +37,7 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
test("linear regression") {
val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
3.0, Array(10.0, 10.0), 100, 42), 2).cache()
- val linReg = new LinearRegressionWithSGD()
+ val linReg = new LinearRegressionWithSGD().setIntercept(true)
linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
val model = linReg.run(testRDD)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index f66fc6ea6c..67768e17fb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -72,10 +72,4 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
assert(ridgeErr < linearErr,
"ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
}
-
- test("do not support intercept") {
- intercept[UnsupportedOperationException] {
- new RidgeRegressionWithSGD().setIntercept(true)
- }
- }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
index 350130c914..be383aab71 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
@@ -17,10 +17,8 @@
package org.apache.spark.mllib.tree
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Variance}
import org.apache.spark.mllib.tree.model.Filter
@@ -28,19 +26,9 @@ import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.FeatureType._
import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.util.LocalSparkContext
-class DecisionTreeSuite extends FunSuite with BeforeAndAfterAll {
-
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class DecisionTreeSuite extends FunSuite with LocalSparkContext {
test("split and bin calculation") {
val arr = DecisionTreeSuite.generateOrderedLabeledPointsWithLabel1()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 674378a34c..3f64baf6fe 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -19,8 +19,8 @@ package org.apache.spark.mllib.util
import java.io.File
+import scala.io.Source
import scala.math
-import scala.util.Random
import org.scalatest.FunSuite
@@ -29,7 +29,8 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNor
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
+import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils._
class MLUtilsSuite extends FunSuite with LocalSparkContext {
@@ -58,7 +59,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
}
}
- test("loadLibSVMData") {
+ test("loadLibSVMFile") {
val lines =
"""
|+1 1:1.0 3:2.0 5:3.0
@@ -70,8 +71,8 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
Files.write(lines, file, Charsets.US_ASCII)
val path = tempDir.toURI.toString
- val pointsWithNumFeatures = MLUtils.loadLibSVMData(sc, path, BinaryLabelParser, 6).collect()
- val pointsWithoutNumFeatures = MLUtils.loadLibSVMData(sc, path).collect()
+ val pointsWithNumFeatures = loadLibSVMFile(sc, path, multiclass = false, 6).collect()
+ val pointsWithoutNumFeatures = loadLibSVMFile(sc, path).collect()
for (points <- Seq(pointsWithNumFeatures, pointsWithoutNumFeatures)) {
assert(points.length === 3)
@@ -83,29 +84,54 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
assert(points(2).features === Vectors.sparse(6, Seq((1, 4.0), (3, 5.0), (5, 6.0))))
}
- val multiclassPoints = MLUtils.loadLibSVMData(sc, path, MulticlassLabelParser).collect()
+ val multiclassPoints = loadLibSVMFile(sc, path, multiclass = true).collect()
assert(multiclassPoints.length === 3)
assert(multiclassPoints(0).label === 1.0)
assert(multiclassPoints(1).label === -1.0)
assert(multiclassPoints(2).label === -1.0)
- try {
- file.delete()
- tempDir.delete()
- } catch {
- case t: Throwable =>
- }
+ deleteQuietly(tempDir)
+ }
+
+ test("saveAsLibSVMFile") {
+ val examples = sc.parallelize(Seq(
+ LabeledPoint(1.1, Vectors.sparse(3, Seq((0, 1.23), (2, 4.56)))),
+ LabeledPoint(0.0, Vectors.dense(1.01, 2.02, 3.03))
+ ), 2)
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output")
+ MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
+ val lines = outputDir.listFiles()
+ .filter(_.getName.startsWith("part-"))
+ .flatMap(Source.fromFile(_).getLines())
+ .toSet
+ val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
+ assert(lines === expected)
+ deleteQuietly(tempDir)
+ }
+
+ test("appendBias") {
+ val sv = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
+ val sv1 = appendBias(sv).asInstanceOf[SparseVector]
+ assert(sv1.size === 4)
+ assert(sv1.indices === Array(0, 2, 3))
+ assert(sv1.values === Array(1.0, 3.0, 1.0))
+
+ val dv = Vectors.dense(1.0, 0.0, 3.0)
+ val dv1 = appendBias(dv).asInstanceOf[DenseVector]
+ assert(dv1.size === 4)
+ assert(dv1.values === Array(1.0, 0.0, 3.0, 1.0))
}
test("kFold") {
val data = sc.parallelize(1 to 100, 2)
val collectedData = data.collect().sorted
- val twoFoldedRdd = MLUtils.kFold(data, 2, 1)
+ val twoFoldedRdd = kFold(data, 2, 1)
assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted)
assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted)
for (folds <- 2 to 10) {
for (seed <- 1 to 5) {
- val foldedRdds = MLUtils.kFold(data, folds, seed)
+ val foldedRdds = kFold(data, folds, seed)
assert(foldedRdds.size === folds)
foldedRdds.map { case (training, validation) =>
val result = validation.union(training).collect().sorted
@@ -132,4 +158,16 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
}
}
+ /** Delete a file/directory quietly. */
+ def deleteQuietly(f: File) {
+ if (f.isDirectory) {
+ f.listFiles().foreach(deleteQuietly)
+ }
+ try {
+ f.delete()
+ } catch {
+ case _: Throwable =>
+ }
+ }
}
+