aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala33
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala33
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala31
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala47
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala121
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala60
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala25
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala39
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala30
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala42
-rw-r--r--python/pyspark/mllib/_common.py72
-rw-r--r--python/pyspark/mllib/linalg.py34
-rw-r--r--python/pyspark/mllib/regression.py5
-rw-r--r--python/pyspark/mllib/util.py69
18 files changed, 579 insertions, 72 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
index 9832bec90d..b3cc361154 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -99,7 +99,7 @@ object DecisionTreeRunner {
val sc = new SparkContext(conf)
// Load training data and cache it.
- val examples = MLUtils.loadLabeledData(sc, params.input).cache()
+ val examples = MLUtils.loadLabeledPoints(sc, params.input).cache()
val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 7c65b0d475..c44173793b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -20,12 +20,13 @@ package org.apache.spark.mllib.api.python
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
/**
@@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable {
private val DENSE_MATRIX_MAGIC: Byte = 3
private val LABELED_POINT_MAGIC: Byte = 4
- private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+ private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
require(bytes.length - offset >= 5, "Byte array too short")
val magic = bytes(offset)
if (magic == DENSE_VECTOR_MAGIC) {
@@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable {
bytes
}
- private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
+ private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
case s: SparseVector =>
serializeSparseVector(s)
case _ =>
@@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable {
bytes
}
- private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
+ private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
+ val fb = serializeDoubleVector(p.features)
+ val bytes = new Array[Byte](1 + 8 + fb.length)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.put(LABELED_POINT_MAGIC)
+ bb.putDouble(p.label)
+ bb.put(fb)
+ bytes
+ }
+
+ private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
require(bytes.length >= 9, "Byte array too short")
val magic = bytes(0)
if (magic != LABELED_POINT_MAGIC) {
@@ -179,6 +191,19 @@ class PythonMLLibAPI extends Serializable {
LabeledPoint(label, deserializeDoubleVector(bytes, 9))
}
+ /**
+ * Loads and serializes labeled points saved with `RDD#saveAsTextFile`.
+ * @param jsc Java SparkContext
+ * @param path file or directory path in any Hadoop-supported file system URI
+ * @param minPartitions min number of partitions
+ * @return serialized labeled points stored in a JavaRDD of byte array
+ */
+ def loadLabeledPoints(
+ jsc: JavaSparkContext,
+ path: String,
+ minPartitions: Int): JavaRDD[Array[Byte]] =
+ MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD()
+
private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]],
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 84d223908c..c818a0b9c3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -17,13 +17,16 @@
package org.apache.spark.mllib.linalg
-import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => JavaDouble}
+import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable}
import java.util.Arrays
import scala.annotation.varargs
import scala.collection.JavaConverters._
-import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV}
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
+
+import org.apache.spark.mllib.util.NumericParser
+import org.apache.spark.SparkException
/**
* Represents a numeric vector, whose index type is Int and value type is Double.
@@ -125,6 +128,25 @@ object Vectors {
}
/**
+ * Parses a string resulted from `Vector#toString` into
+ * an [[org.apache.spark.mllib.linalg.Vector]].
+ */
+ def parse(s: String): Vector = {
+ parseNumeric(NumericParser.parse(s))
+ }
+
+ private[mllib] def parseNumeric(any: Any): Vector = {
+ any match {
+ case values: Array[Double] =>
+ Vectors.dense(values)
+ case Seq(size: Double, indices: Array[Double], values: Array[Double]) =>
+ Vectors.sparse(size.toInt, indices.map(_.toInt), values)
+ case other =>
+ throw new SparkException(s"Cannot parse $other.")
+ }
+ }
+
+ /**
* Creates a vector instance from a breeze vector.
*/
private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = {
@@ -175,9 +197,10 @@ class SparseVector(
val indices: Array[Int],
val values: Array[Double]) extends Vector {
- override def toString: String = {
- "(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
- }
+ require(indices.length == values.length)
+
+ override def toString: String =
+ "(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]"))
override def toArray: Array[Double] = {
val data = new Array[Double](size)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 3deab1ab78..62a03af4a9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -17,7 +17,9 @@
package org.apache.spark.mllib.regression
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.mllib.util.NumericParser
+import org.apache.spark.SparkException
/**
* Class that represents the features and labels of a data point.
@@ -27,6 +29,31 @@ import org.apache.spark.mllib.linalg.Vector
*/
case class LabeledPoint(label: Double, features: Vector) {
override def toString: String = {
- "LabeledPoint(%s, %s)".format(label, features)
+ "(%s,%s)".format(label, features)
+ }
+}
+
+/**
+ * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
+ */
+private[mllib] object LabeledPointParser {
+ /**
+ * Parses a string resulted from `LabeledPoint#toString` into
+ * an [[org.apache.spark.mllib.regression.LabeledPoint]].
+ */
+ def parse(s: String): LabeledPoint = {
+ if (s.startsWith("(")) {
+ NumericParser.parse(s) match {
+ case Seq(label: Double, numeric: Any) =>
+ LabeledPoint(label, Vectors.parseNumeric(numeric))
+ case other =>
+ throw new SparkException(s"Cannot parse $other.")
+ }
+ } else { // dense format used before v1.0
+ val parts = s.split(',')
+ val label = java.lang.Double.parseDouble(parts(0))
+ val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble))
+ LabeledPoint(label, features)
+ }
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index c8e160d00c..69299c2198 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -129,7 +129,8 @@ object LinearDataGenerator {
val sc = new SparkContext(sparkMaster, "LinearDataGenerator")
val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts)
- MLUtils.saveLabeledData(data, outputPath)
+ data.saveAsTextFile(outputPath)
+
sc.stop()
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
index c82cd8fd46..9d802678c4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator {
val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator")
val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts)
- MLUtils.saveLabeledData(data, outputPath)
+ data.saveAsTextFile(outputPath)
+
sc.stop()
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index e598b6cb17..aaf92a1a88 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.util.random.BernoulliSampler
-import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel
@@ -180,7 +180,39 @@ object MLUtils {
}
/**
- * :: Experimental ::
+ * Loads vectors saved using `RDD[Vector].saveAsTextFile`.
+ * @param sc Spark context
+ * @param path file or directory path in any Hadoop-supported file system URI
+ * @param minPartitions min number of partitions
+ * @return vectors stored as an RDD[Vector]
+ */
+ def loadVectors(sc: SparkContext, path: String, minPartitions: Int): RDD[Vector] =
+ sc.textFile(path, minPartitions).map(Vectors.parse)
+
+ /**
+ * Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default number of partitions.
+ */
+ def loadVectors(sc: SparkContext, path: String): RDD[Vector] =
+ sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse)
+
+ /**
+ * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`.
+ * @param sc Spark context
+ * @param path file or directory path in any Hadoop-supported file system URI
+ * @param minPartitions min number of partitions
+ * @return labeled points stored as an RDD[LabeledPoint]
+ */
+ def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] =
+ sc.textFile(path, minPartitions).map(LabeledPointParser.parse)
+
+ /**
+ * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of
+ * partitions.
+ */
+ def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
+ loadLabeledPoints(sc, dir, sc.defaultMinPartitions)
+
+ /**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
@@ -189,8 +221,11 @@ object MLUtils {
* @param dir Directory to the input data files.
* @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is
* the label, and the second element represents the feature values (an array of Double).
+ *
+ * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and
+ * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading.
*/
- @Experimental
+ @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1")
def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
sc.textFile(dir).map { line =>
val parts = line.split(',')
@@ -201,15 +236,17 @@ object MLUtils {
}
/**
- * :: Experimental ::
* Save labeled data to a file. The data format used here is
* <L>, <f1> <f2> ...
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
*
* @param data An RDD of LabeledPoints containing data to be saved.
* @param dir Directory to save the data.
+ *
+ * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and
+ * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading.
*/
- @Experimental
+ @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1")
def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
dataStr.saveAsTextFile(dir)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala
new file mode 100644
index 0000000000..f7cba6c6cb
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util
+
+import java.util.StringTokenizer
+
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+import org.apache.spark.SparkException
+
+/**
+ * Simple parser for a numeric structure consisting of three types:
+ *
+ * - number: a double in Java's floating number format
+ * - array: an array of numbers stored as `[v0,v1,...,vn]`
+ * - tuple: a list of numbers, arrays, or tuples stored as `(...)`
+ */
+private[mllib] object NumericParser {
+
+ /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */
+ def parse(s: String): Any = {
+ val tokenizer = new StringTokenizer(s, "()[],", true)
+ if (tokenizer.hasMoreTokens()) {
+ val token = tokenizer.nextToken()
+ if (token == "(") {
+ parseTuple(tokenizer)
+ } else if (token == "[") {
+ parseArray(tokenizer)
+ } else {
+ // expecting a number
+ parseDouble(token)
+ }
+ } else {
+ throw new SparkException(s"Cannot find any token from the input string.")
+ }
+ }
+
+ private def parseArray(tokenizer: StringTokenizer): Array[Double] = {
+ val values = ArrayBuffer.empty[Double]
+ var parsing = true
+ var allowComma = false
+ var token: String = null
+ while (parsing && tokenizer.hasMoreTokens()) {
+ token = tokenizer.nextToken()
+ if (token == "]") {
+ parsing = false
+ } else if (token == ",") {
+ if (allowComma) {
+ allowComma = false
+ } else {
+ throw new SparkException("Found a ',' at a wrong position.")
+ }
+ } else {
+ // expecting a number
+ values.append(parseDouble(token))
+ allowComma = true
+ }
+ }
+ if (parsing) {
+ throw new SparkException(s"An array must end with ']'.")
+ }
+ values.toArray
+ }
+
+ private def parseTuple(tokenizer: StringTokenizer): Seq[_] = {
+ val items = ListBuffer.empty[Any]
+ var parsing = true
+ var allowComma = false
+ var token: String = null
+ while (parsing && tokenizer.hasMoreTokens()) {
+ token = tokenizer.nextToken()
+ if (token == "(") {
+ items.append(parseTuple(tokenizer))
+ allowComma = true
+ } else if (token == "[") {
+ items.append(parseArray(tokenizer))
+ allowComma = true
+ } else if (token == ",") {
+ if (allowComma) {
+ allowComma = false
+ } else {
+ throw new SparkException("Found a ',' at a wrong position.")
+ }
+ } else if (token == ")") {
+ parsing = false
+ } else {
+ // expecting a number
+ items.append(parseDouble(token))
+ allowComma = true
+ }
+ }
+ if (parsing) {
+ throw new SparkException(s"A tuple must end with ')'.")
+ }
+ items
+ }
+
+ private def parseDouble(s: String): Double = {
+ try {
+ java.lang.Double.parseDouble(s)
+ } catch {
+ case e: Throwable =>
+ throw new SparkException(s"Cannot parse a double from: $s", e)
+ }
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index ba8190b0e0..7db97e6bac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -65,7 +65,7 @@ object SVMDataGenerator {
LabeledPoint(y, Vectors.dense(x))
}
- MLUtils.saveLabeledData(data, outputPath)
+ data.saveAsTextFile(outputPath)
sc.stop()
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
new file mode 100644
index 0000000000..642843f902
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
+
+class PythonMLLibAPISuite extends FunSuite {
+ val py = new PythonMLLibAPI
+
+ test("vector serialization") {
+ val vectors = Seq(
+ Vectors.dense(Array.empty[Double]),
+ Vectors.dense(0.0),
+ Vectors.dense(0.0, -2.0),
+ Vectors.sparse(0, Array.empty[Int], Array.empty[Double]),
+ Vectors.sparse(1, Array.empty[Int], Array.empty[Double]),
+ Vectors.sparse(2, Array(1), Array(-2.0)))
+ vectors.foreach { v =>
+ val bytes = py.serializeDoubleVector(v)
+ val u = py.deserializeDoubleVector(bytes)
+ assert(u.getClass === v.getClass)
+ assert(u === v)
+ }
+ }
+
+ test("labeled point serialization") {
+ val points = Seq(
+ LabeledPoint(0.0, Vectors.dense(Array.empty[Double])),
+ LabeledPoint(1.0, Vectors.dense(0.0)),
+ LabeledPoint(-0.5, Vectors.dense(0.0, -2.0)),
+ LabeledPoint(0.0, Vectors.sparse(0, Array.empty[Int], Array.empty[Double])),
+ LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], Array.empty[Double])),
+ LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0))))
+ points.foreach { p =>
+ val bytes = py.serializeLabeledPoint(p)
+ val q = py.deserializeLabeledPoint(bytes)
+ assert(q.label === p.label)
+ assert(q.features.getClass === p.features.getClass)
+ assert(q.features === p.features)
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index cfe8a27fcb..7972ceea1f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg
import org.scalatest.FunSuite
+import org.apache.spark.SparkException
+
class VectorsSuite extends FunSuite {
val arr = Array(0.1, 0.0, 0.3, 0.4)
@@ -100,4 +102,27 @@ class VectorsSuite extends FunSuite {
assert(vec2(6) === 4.0)
assert(vec2(7) === 0.0)
}
+
+ test("parse vectors") {
+ val vectors = Seq(
+ Vectors.dense(Array.empty[Double]),
+ Vectors.dense(1.0),
+ Vectors.dense(1.0E6, 0.0, -2.0e-7),
+ Vectors.sparse(0, Array.empty[Int], Array.empty[Double]),
+ Vectors.sparse(1, Array(0), Array(1.0)),
+ Vectors.sparse(3, Array(0, 2), Array(1.0, -2.0)))
+ vectors.foreach { v =>
+ val v1 = Vectors.parse(v.toString)
+ assert(v.getClass === v1.getClass)
+ assert(v === v1)
+ }
+
+ val malformatted = Seq("1", "[1,,]", "[1,2b]", "(1,[1,2])", "([1],[2.0,1.0])")
+ malformatted.foreach { s =>
+ intercept[SparkException] {
+ Vectors.parse(s)
+ println(s"Didn't detect malformatted string $s.")
+ }
+ }
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
new file mode 100644
index 0000000000..d9308aaba6
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Vectors
+
+class LabeledPointSuite extends FunSuite {
+
+ test("parse labeled points") {
+ val points = Seq(
+ LabeledPoint(1.0, Vectors.dense(1.0, 0.0)),
+ LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))))
+ points.foreach { p =>
+ assert(p === LabeledPointParser.parse(p.toString))
+ }
+ }
+
+ test("parse labeled points with v0.9 format") {
+ val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0")
+ assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0)))
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 3d05fb6898..c14870fb96 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -160,5 +160,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
}
}
-}
+ test("loadVectors") {
+ val vectors = sc.parallelize(Seq(
+ Vectors.dense(1.0, 2.0),
+ Vectors.sparse(2, Array(1), Array(-1.0)),
+ Vectors.dense(0.0, 1.0)
+ ), 2)
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "vectors")
+ val path = outputDir.toURI.toString
+ vectors.saveAsTextFile(path)
+ val loaded = loadVectors(sc, path)
+ assert(vectors.collect().toSet === loaded.collect().toSet)
+ Utils.deleteRecursively(tempDir)
+ }
+ test("loadLabeledPoints") {
+ val points = sc.parallelize(Seq(
+ LabeledPoint(1.0, Vectors.dense(1.0, 2.0)),
+ LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))),
+ LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
+ ), 2)
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "points")
+ val path = outputDir.toURI.toString
+ points.saveAsTextFile(path)
+ val loaded = loadLabeledPoints(sc, path)
+ assert(points.collect().toSet === loaded.collect().toSet)
+ Utils.deleteRecursively(tempDir)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala
new file mode 100644
index 0000000000..f68fb95eac
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkException
+
+class NumericParserSuite extends FunSuite {
+
+ test("parser") {
+ val s = "((1.0,2e3),-4,[5e-6,7.0E8],+9)"
+ val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]]
+ assert(parsed(0).asInstanceOf[Seq[_]] === Seq(1.0, 2.0e3))
+ assert(parsed(1).asInstanceOf[Double] === -4.0)
+ assert(parsed(2).asInstanceOf[Array[Double]] === Array(5.0e-6, 7.0e8))
+ assert(parsed(3).asInstanceOf[Double] === 9.0)
+
+ val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4")
+ malformatted.foreach { s =>
+ intercept[SparkException] {
+ NumericParser.parse(s)
+ println(s"Didn't detect malformatted string $s.")
+ }
+ }
+ }
+}
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 802a27a8da..a411a5d591 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -22,6 +22,7 @@ from pyspark import SparkContext, RDD
from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer
+
"""
Common utilities shared throughout MLlib, primarily for dealing with
different data types. These include:
@@ -147,7 +148,7 @@ def _serialize_sparse_vector(v):
return ba
-def _deserialize_double_vector(ba):
+def _deserialize_double_vector(ba, offset=0):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
@@ -160,43 +161,46 @@ def _deserialize_double_vector(ba):
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 5:
+ nb = len(ba) - offset
+ if nb < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- if ba[0] == DENSE_VECTOR_MAGIC:
- return _deserialize_dense_vector(ba)
- elif ba[0] == SPARSE_VECTOR_MAGIC:
- return _deserialize_sparse_vector(ba)
+ "which is too short" % nb)
+ if ba[offset] == DENSE_VECTOR_MAGIC:
+ return _deserialize_dense_vector(ba, offset)
+ elif ba[offset] == SPARSE_VECTOR_MAGIC:
+ return _deserialize_sparse_vector(ba, offset)
else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")
-def _deserialize_dense_vector(ba):
+def _deserialize_dense_vector(ba, offset=0):
"""Deserialize a dense vector into a numpy array."""
- if len(ba) < 5:
+ nb = len(ba) - offset
+ if nb < 5:
raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
- if len(ba) != 8 * length + 5:
+ "which is too short" % nb)
+ length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0]
+ if nb < 8 * length + 5:
raise TypeError("_deserialize_dense_vector called on bytearray "
"with wrong length")
- return _deserialize_numpy_array([length], ba, 5)
+ return _deserialize_numpy_array([length], ba, offset + 5)
-def _deserialize_sparse_vector(ba):
+def _deserialize_sparse_vector(ba, offset=0):
"""Deserialize a sparse vector into a MLlib SparseVector object."""
- if len(ba) < 9:
+ nb = len(ba) - offset
+ if nb < 9:
raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ "which is too short" % nb)
+ header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32)
size = header[0]
nonzeros = header[1]
- if len(ba) != 9 + 12 * nonzeros:
+ if nb < 9 + 12 * nonzeros:
raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
- indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
- values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
+ indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32)
+ values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64)
return SparseVector(int(size), indices, values)
@@ -243,7 +247,23 @@ def _deserialize_double_matrix(ba):
def _serialize_labeled_point(p):
- """Serialize a LabeledPoint with a features vector of any type."""
+ """
+ Serialize a LabeledPoint with a features vector of any type.
+
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]))
+ >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0))
+ >>> dp1.label == dp0.label
+ True
+ >>> array_equal(dp1.features, dp0.features)
+ True
+ >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5]))
+ >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0))
+ >>> sp1.label == sp1.label
+ True
+ >>> sp1.features == sp0.features
+ True
+ """
from pyspark.mllib.regression import LabeledPoint
serialized_features = _serialize_double_vector(p.features)
header = bytearray(9)
@@ -252,6 +272,16 @@ def _serialize_labeled_point(p):
header_float[0] = p.label
return header + serialized_features
+def _deserialize_labeled_point(ba, offset=0):
+ """Deserialize a LabeledPoint from a mutually understood format."""
+ from pyspark.mllib.regression import LabeledPoint
+ if type(ba) != bytearray:
+ raise TypeError("Expecting a bytearray but got %s" % type(ba))
+ if ba[offset] != LABELED_POINT_MAGIC:
+ raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0]))
+ label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0]
+ features = _deserialize_double_vector(ba, offset + 9)
+ return LabeledPoint(label, features)
def _copyto(array, buffer, offset, shape, dtype):
"""
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 2766842720..db39ed0acd 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -43,11 +43,11 @@ class SparseVector(object):
or two sorted lists containing indices and values.
>>> print SparseVector(4, {1: 1.0, 3: 5.5})
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print SparseVector(4, [1, 3], [1.0, 5.5])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
"""
self.size = int(size)
assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
@@ -160,10 +160,9 @@ class SparseVector(object):
return result
def __str__(self):
- inds = self.indices
- vals = self.values
- entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
- return "[" + entries + "]"
+ inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
+ vals = "[" + ",".join([str(v) for v in self.values]) + "]"
+ return "(" + ",".join((str(self.size), inds, vals)) + ")"
def __repr__(self):
inds = self.indices
@@ -213,11 +212,11 @@ class Vectors(object):
or two sorted lists containing indices and values.
>>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
"""
return SparseVector(size, *args)
@@ -232,6 +231,21 @@ class Vectors(object):
"""
return array(elements, dtype=float64)
+ @staticmethod
+ def stringify(vector):
+ """
+ Converts a vector into a string, which can be recognized by
+ Vectors.parse().
+
+ >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0]))
+ '(2,[1],[1.0])'
+ >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
+ '[0.0,1.0]'
+ """
+ if type(vector) == SparseVector:
+ return str(vector)
+ else:
+ return "[" + ",".join([str(v) for v in vector]) + "]"
def _test():
import doctest
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index bc7de6d2e8..b84bc531de 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -23,7 +23,7 @@ from pyspark.mllib._common import \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
_linear_predictor_typecheck, _have_scipy, _scipy_issparse
-from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.linalg import SparseVector, Vectors
class LabeledPoint(object):
@@ -44,6 +44,9 @@ class LabeledPoint(object):
else:
raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+ def __str__(self):
+ return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")"
+
class LinearModel(object):
"""A linear model that has a vector of coefficients and an intercept."""
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 0e5f4520b9..e24c144f45 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,10 @@ import numpy as np
from pyspark.mllib.linalg import Vectors, SparseVector
from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib._common import _convert_vector
+from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
+from pyspark.rdd import RDD
+from pyspark.serializers import NoOpSerializer
+
class MLUtils:
@@ -105,24 +108,18 @@ class MLUtils:
>>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
>>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect()
>>> tempFile.close()
- >>> examples[0].label
- 1.0
- >>> examples[0].features.size
- 6
- >>> print examples[0].features
- [0: 1.0, 2: 2.0, 4: 3.0]
- >>> examples[1].label
- 0.0
- >>> examples[1].features.size
- 6
- >>> print examples[1].features
- []
- >>> examples[2].label
- 0.0
- >>> examples[2].features.size
- 6
- >>> print examples[2].features
- [1: 4.0, 3: 5.0, 5: 6.0]
+ >>> type(examples[0]) == LabeledPoint
+ True
+ >>> print examples[0]
+ (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
+ >>> type(examples[1]) == LabeledPoint
+ True
+ >>> print examples[1]
+ (0.0,(6,[],[]))
+ >>> type(examples[2]) == LabeledPoint
+ True
+ >>> print examples[2]
+ (0.0,(6,[1,3,5],[4.0,5.0,6.0]))
>>> multiclass_examples[1].label
-1.0
"""
@@ -158,6 +155,40 @@ class MLUtils:
lines.saveAsTextFile(dir)
+ @staticmethod
+ def loadLabeledPoints(sc, path, minPartitions=None):
+ """
+ Load labeled points saved using RDD.saveAsTextFile.
+
+ @param sc: Spark context
+ @param path: file or directory path in any Hadoop-supported file
+ system URI
+ @param minPartitions: min number of partitions
+ @return: labeled data stored as an RDD of LabeledPoint
+
+ >>> from tempfile import NamedTemporaryFile
+ >>> from pyspark.mllib.util import MLUtils
+ >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \
+ LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+ >>> tempFile = NamedTemporaryFile(delete=True)
+ >>> tempFile.close()
+ >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
+ >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
+ >>> type(loaded[0]) == LabeledPoint
+ True
+ >>> print examples[0]
+ (1.1,(3,[0,2],[-1.23,4.56e-07]))
+ >>> type(examples[1]) == LabeledPoint
+ True
+ >>> print examples[1]
+ (0.0,[1.01,2.02,3.03])
+ """
+ minPartitions = minPartitions or min(sc.defaultParallelism, 2)
+ jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
+ serialized = RDD(jSerialized, sc, NoOpSerializer())
+ return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes)))
+
+
def _test():
import doctest
from pyspark.context import SparkContext