diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-06-04 12:56:56 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-06-04 12:56:56 -0700 |
commit | 189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299 (patch) | |
tree | 72f891e5194a7ea17d30bf1eea5e5600198fe8de /mllib/src/main | |
parent | d341b17c2a0a4fce04045e13fb4a3b0621296320 (diff) | |
download | spark-189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299.tar.gz spark-189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299.tar.bz2 spark-189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299.zip |
[SPARK-1752][MLLIB] Standardize text format for vectors and labeled points
We should standardize the text format used to represent vectors and labeled points. The proposed formats are the following:
1. dense vector: `[v0,v1,..]`
2. sparse vector: `(size,[i0,i1],[v0,v1])`
3. labeled point: `(label,vector)`
where "(..)" indicates a tuple and "[...]" indicate an array. `loadLabeledPoints` is added to pyspark's `MLUtils`. I didn't add `loadVectors` to pyspark because `RDD.saveAsTextFile` cannot stringify dense vectors in the proposed format automatically.
`MLUtils#saveLabeledData` and `MLUtils#loadLabeledData` are deprecated. Users should use `RDD#saveAsTextFile` and `MLUtils#loadLabeledPoints` instead. In Scala, `MLUtils#loadLabeledPoints` is compatible with the format used by `MLUtils#loadLabeledData`.
CC: @mateiz, @srowen
Author: Xiangrui Meng <meng@databricks.com>
Closes #685 from mengxr/labeled-io and squashes the following commits:
2d1116a [Xiangrui Meng] make loadLabeledData/saveLabeledData deprecated since 1.0.1
297be75 [Xiangrui Meng] change LabeledPoint.parse to LabeledPointParser.parse to maintain binary compatibility
d6b1473 [Xiangrui Meng] Merge branch 'master' into labeled-io
56746ea [Xiangrui Meng] replace # by .
623a5f0 [Xiangrui Meng] merge master
f06d5ba [Xiangrui Meng] add docs and minor updates
640fe0c [Xiangrui Meng] throw SparkException
5bcfbc4 [Xiangrui Meng] update test to add scientific notations
e86bf38 [Xiangrui Meng] remove NumericTokenizer
050fca4 [Xiangrui Meng] use StringTokenizer
6155b75 [Xiangrui Meng] merge master
f644438 [Xiangrui Meng] remove parse methods based on eval from pyspark
a41675a [Xiangrui Meng] python loadLabeledPoint uses Scala's implementation
ce9a475 [Xiangrui Meng] add deserialize_labeled_point to pyspark with tests
e9fcd49 [Xiangrui Meng] add serializeLabeledPoint and tests
aea4ae3 [Xiangrui Meng] minor updates
810d6df [Xiangrui Meng] update tokenizer/parser implementation
7aac03a [Xiangrui Meng] remove Scala parsers
c1885c1 [Xiangrui Meng] add headers and minor changes
b0c50cb [Xiangrui Meng] add customized parser
d731817 [Xiangrui Meng] style update
63dc396 [Xiangrui Meng] add loadLabeledPoints to pyspark
ea122b5 [Xiangrui Meng] Merge branch 'master' into labeled-io
cd6c78f [Xiangrui Meng] add __str__ and parse to LabeledPoint
a7a178e [Xiangrui Meng] add stringify to pyspark's Vectors
5c2dbfa [Xiangrui Meng] add parse to pyspark's Vectors
7853f88 [Xiangrui Meng] update pyspark's SparseVector.__str__
e761d32 [Xiangrui Meng] make LabelPoint.parse compatible with the dense format used before v1.0 and deprecate loadLabeledData and saveLabeledData
9e63a02 [Xiangrui Meng] add loadVectors and loadLabeledPoints
19aa523 [Xiangrui Meng] update toString and add parsers for Vectors and LabeledPoint
Diffstat (limited to 'mllib/src/main')
8 files changed, 254 insertions, 19 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 7c65b0d475..c44173793b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -20,12 +20,13 @@ package org.apache.spark.mllib.api.python import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD /** @@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable { private val DENSE_MATRIX_MAGIC: Byte = 3 private val LABELED_POINT_MAGIC: Byte = 4 - private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { + private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { require(bytes.length - offset >= 5, "Byte array too short") val magic = bytes(offset) if (magic == DENSE_VECTOR_MAGIC) { @@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable { bytes } - private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { + private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { case s: SparseVector => serializeSparseVector(s) case _ => @@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable { bytes } - private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { + private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = { + val fb = serializeDoubleVector(p.features) + val bytes = new Array[Byte](1 + 8 + fb.length) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.put(LABELED_POINT_MAGIC) + bb.putDouble(p.label) + bb.put(fb) + bytes + } + + private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { require(bytes.length >= 9, "Byte array too short") val magic = bytes(0) if (magic != LABELED_POINT_MAGIC) { @@ -179,6 +191,19 @@ class PythonMLLibAPI extends Serializable { LabeledPoint(label, deserializeDoubleVector(bytes, 9)) } + /** + * Loads and serializes labeled points saved with `RDD#saveAsTextFile`. + * @param jsc Java SparkContext + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return serialized labeled points stored in a JavaRDD of byte array + */ + def loadLabeledPoints( + jsc: JavaSparkContext, + path: String, + minPartitions: Int): JavaRDD[Array[Byte]] = + MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD() + private def trainRegressionModel( trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel, dataBytesJRDD: JavaRDD[Array[Byte]], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 84d223908c..c818a0b9c3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -17,13 +17,16 @@ package org.apache.spark.mllib.linalg -import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => JavaDouble} +import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable} import java.util.Arrays import scala.annotation.varargs import scala.collection.JavaConverters._ -import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV} +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} + +import org.apache.spark.mllib.util.NumericParser +import org.apache.spark.SparkException /** * Represents a numeric vector, whose index type is Int and value type is Double. @@ -125,6 +128,25 @@ object Vectors { } /** + * Parses a string resulted from `Vector#toString` into + * an [[org.apache.spark.mllib.linalg.Vector]]. + */ + def parse(s: String): Vector = { + parseNumeric(NumericParser.parse(s)) + } + + private[mllib] def parseNumeric(any: Any): Vector = { + any match { + case values: Array[Double] => + Vectors.dense(values) + case Seq(size: Double, indices: Array[Double], values: Array[Double]) => + Vectors.sparse(size.toInt, indices.map(_.toInt), values) + case other => + throw new SparkException(s"Cannot parse $other.") + } + } + + /** * Creates a vector instance from a breeze vector. */ private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = { @@ -175,9 +197,10 @@ class SparseVector( val indices: Array[Int], val values: Array[Double]) extends Vector { - override def toString: String = { - "(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")" - } + require(indices.length == values.length) + + override def toString: String = + "(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]")) override def toArray: Array[Double] = { val data = new Array[Double](size) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 3deab1ab78..62a03af4a9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -17,7 +17,9 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.util.NumericParser +import org.apache.spark.SparkException /** * Class that represents the features and labels of a data point. @@ -27,6 +29,31 @@ import org.apache.spark.mllib.linalg.Vector */ case class LabeledPoint(label: Double, features: Vector) { override def toString: String = { - "LabeledPoint(%s, %s)".format(label, features) + "(%s,%s)".format(label, features) + } +} + +/** + * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ +private[mllib] object LabeledPointParser { + /** + * Parses a string resulted from `LabeledPoint#toString` into + * an [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ + def parse(s: String): LabeledPoint = { + if (s.startsWith("(")) { + NumericParser.parse(s) match { + case Seq(label: Double, numeric: Any) => + LabeledPoint(label, Vectors.parseNumeric(numeric)) + case other => + throw new SparkException(s"Cannot parse $other.") + } + } else { // dense format used before v1.0 + val parts = s.split(',') + val label = java.lang.Double.parseDouble(parts(0)) + val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble)) + LabeledPoint(label, features) + } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala index c8e160d00c..69299c2198 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala @@ -129,7 +129,8 @@ object LinearDataGenerator { val sc = new SparkContext(sparkMaster, "LinearDataGenerator") val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts) - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) + sc.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala index c82cd8fd46..9d802678c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator { val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator") val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts) - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) + sc.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index e598b6cb17..aaf92a1a88 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.util.random.BernoulliSampler -import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel @@ -180,7 +180,39 @@ object MLUtils { } /** - * :: Experimental :: + * Loads vectors saved using `RDD[Vector].saveAsTextFile`. + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return vectors stored as an RDD[Vector] + */ + def loadVectors(sc: SparkContext, path: String, minPartitions: Int): RDD[Vector] = + sc.textFile(path, minPartitions).map(Vectors.parse) + + /** + * Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default number of partitions. + */ + def loadVectors(sc: SparkContext, path: String): RDD[Vector] = + sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse) + + /** + * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`. + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return labeled points stored as an RDD[LabeledPoint] + */ + def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] = + sc.textFile(path, minPartitions).map(LabeledPointParser.parse) + + /** + * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of + * partitions. + */ + def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] = + loadLabeledPoints(sc, dir, sc.defaultMinPartitions) + + /** * Load labeled data from a file. The data format used here is * <L>, <f1> <f2> ... * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. @@ -189,8 +221,11 @@ object MLUtils { * @param dir Directory to the input data files. * @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is * the label, and the second element represents the feature values (an array of Double). + * + * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and + * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @Experimental + @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1") def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(',') @@ -201,15 +236,17 @@ object MLUtils { } /** - * :: Experimental :: * Save labeled data to a file. The data format used here is * <L>, <f1> <f2> ... * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. * * @param data An RDD of LabeledPoints containing data to be saved. * @param dir Directory to save the data. + * + * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and + * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @Experimental + @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1") def saveLabeledData(data: RDD[LabeledPoint], dir: String) { val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" ")) dataStr.saveAsTextFile(dir) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala new file mode 100644 index 0000000000..f7cba6c6cb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import java.util.StringTokenizer + +import scala.collection.mutable.{ArrayBuffer, ListBuffer} + +import org.apache.spark.SparkException + +/** + * Simple parser for a numeric structure consisting of three types: + * + * - number: a double in Java's floating number format + * - array: an array of numbers stored as `[v0,v1,...,vn]` + * - tuple: a list of numbers, arrays, or tuples stored as `(...)` + */ +private[mllib] object NumericParser { + + /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */ + def parse(s: String): Any = { + val tokenizer = new StringTokenizer(s, "()[],", true) + if (tokenizer.hasMoreTokens()) { + val token = tokenizer.nextToken() + if (token == "(") { + parseTuple(tokenizer) + } else if (token == "[") { + parseArray(tokenizer) + } else { + // expecting a number + parseDouble(token) + } + } else { + throw new SparkException(s"Cannot find any token from the input string.") + } + } + + private def parseArray(tokenizer: StringTokenizer): Array[Double] = { + val values = ArrayBuffer.empty[Double] + var parsing = true + var allowComma = false + var token: String = null + while (parsing && tokenizer.hasMoreTokens()) { + token = tokenizer.nextToken() + if (token == "]") { + parsing = false + } else if (token == ",") { + if (allowComma) { + allowComma = false + } else { + throw new SparkException("Found a ',' at a wrong position.") + } + } else { + // expecting a number + values.append(parseDouble(token)) + allowComma = true + } + } + if (parsing) { + throw new SparkException(s"An array must end with ']'.") + } + values.toArray + } + + private def parseTuple(tokenizer: StringTokenizer): Seq[_] = { + val items = ListBuffer.empty[Any] + var parsing = true + var allowComma = false + var token: String = null + while (parsing && tokenizer.hasMoreTokens()) { + token = tokenizer.nextToken() + if (token == "(") { + items.append(parseTuple(tokenizer)) + allowComma = true + } else if (token == "[") { + items.append(parseArray(tokenizer)) + allowComma = true + } else if (token == ",") { + if (allowComma) { + allowComma = false + } else { + throw new SparkException("Found a ',' at a wrong position.") + } + } else if (token == ")") { + parsing = false + } else { + // expecting a number + items.append(parseDouble(token)) + allowComma = true + } + } + if (parsing) { + throw new SparkException(s"A tuple must end with ')'.") + } + items + } + + private def parseDouble(s: String): Double = { + try { + java.lang.Double.parseDouble(s) + } catch { + case e: Throwable => + throw new SparkException(s"Cannot parse a double from: $s", e) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala index ba8190b0e0..7db97e6bac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala @@ -65,7 +65,7 @@ object SVMDataGenerator { LabeledPoint(y, Vectors.dense(x)) } - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) sc.stop() } |