aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-08-16 15:13:34 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-16 15:13:34 -0700
commit7e70708a99949549adde00cb6246a9582bbc4929 (patch)
tree2fe8186f0152c80892578fcda8e6d74b5bc5fcae /mllib
parent76fa0eaf515fd6771cdd69422b1259485debcae5 (diff)
downloadspark-7e70708a99949549adde00cb6246a9582bbc4929.tar.gz
spark-7e70708a99949549adde00cb6246a9582bbc4929.tar.bz2
spark-7e70708a99949549adde00cb6246a9582bbc4929.zip
[SPARK-3048][MLLIB] add LabeledPoint.parse and remove loadStreamingLabeledPoints
Move `parse()` from `LabeledPointParser` to `LabeledPoint` and make it public. This breaks binary compatibility only when a user uses synthesized methods like `tupled` and `curried`, which is rare. `LabeledPoint.parse` is more consistent with `Vectors.parse`, which is why `LabeledPointParser` is not preferred. freeman-lab tdas Author: Xiangrui Meng <meng@databricks.com> Closes #1952 from mengxr/labelparser and squashes the following commits: c818fb2 [Xiangrui Meng] merge master ce20e6f [Xiangrui Meng] update mima excludes b386b8d [Xiangrui Meng] fix tests 2436b3d [Xiangrui Meng] add parse() to LabeledPoint
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala17
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala6
5 files changed, 9 insertions, 22 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 62a03af4a9..17c753c566 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -36,7 +36,7 @@ case class LabeledPoint(label: Double, features: Vector) {
/**
* Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
-private[mllib] object LabeledPointParser {
+object LabeledPoint {
/**
* Parses a string resulted from `LabeledPoint#toString` into
* an [[org.apache.spark.mllib.regression.LabeledPoint]].
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
index 8851097050..1d11fde247 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
@@ -18,7 +18,7 @@
package org.apache.spark.mllib.regression
import org.apache.spark.annotation.Experimental
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.linalg.Vector
/**
* Train or predict a linear regression model on streaming data. Training uses
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index f4cce86a65..ca35100aa9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.util.random.BernoulliSampler
-import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
+import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
@@ -185,7 +185,7 @@ object MLUtils {
* @return labeled points stored as an RDD[LabeledPoint]
*/
def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] =
- sc.textFile(path, minPartitions).map(LabeledPointParser.parse)
+ sc.textFile(path, minPartitions).map(LabeledPoint.parse)
/**
* Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of
@@ -195,19 +195,6 @@ object MLUtils {
loadLabeledPoints(sc, dir, sc.defaultMinPartitions)
/**
- * Loads streaming labeled points from a stream of text files
- * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`.
- * See `StreamingContext.textFileStream` for more details on how to
- * generate a stream from files
- *
- * @param ssc Streaming context
- * @param dir Directory path in any Hadoop-supported file system URI
- * @return Labeled points stored as a DStream[LabeledPoint]
- */
- def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] =
- ssc.textFileStream(dir).map(LabeledPointParser.parse)
-
- /**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
index d9308aaba6..110c44a719 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
@@ -28,12 +28,12 @@ class LabeledPointSuite extends FunSuite {
LabeledPoint(1.0, Vectors.dense(1.0, 0.0)),
LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))))
points.foreach { p =>
- assert(p === LabeledPointParser.parse(p.toString))
+ assert(p === LabeledPoint.parse(p.toString))
}
}
test("parse labeled points with v0.9 format") {
- val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0")
+ val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0")
assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0)))
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
index ed21f84472..45e25eecf5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext, MLUtils}
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils
@@ -55,7 +55,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
val numBatches = 10
val batchDuration = Milliseconds(1000)
val ssc = new StreamingContext(sc, batchDuration)
- val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
+ val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.1)
@@ -97,7 +97,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
val batchDuration = Milliseconds(2000)
val ssc = new StreamingContext(sc, batchDuration)
val numBatches = 5
- val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
+ val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0))
.setStepSize(0.1)