aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-06-04 12:56:56 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-04 12:56:56 -0700
commit189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299 (patch)
tree72f891e5194a7ea17d30bf1eea5e5600198fe8de /python
parentd341b17c2a0a4fce04045e13fb4a3b0621296320 (diff)
downloadspark-189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299.tar.gz
spark-189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299.tar.bz2
spark-189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299.zip
[SPARK-1752][MLLIB] Standardize text format for vectors and labeled points
We should standardize the text format used to represent vectors and labeled points. The proposed formats are the following: 1. dense vector: `[v0,v1,..]` 2. sparse vector: `(size,[i0,i1],[v0,v1])` 3. labeled point: `(label,vector)` where "(..)" indicates a tuple and "[...]" indicate an array. `loadLabeledPoints` is added to pyspark's `MLUtils`. I didn't add `loadVectors` to pyspark because `RDD.saveAsTextFile` cannot stringify dense vectors in the proposed format automatically. `MLUtils#saveLabeledData` and `MLUtils#loadLabeledData` are deprecated. Users should use `RDD#saveAsTextFile` and `MLUtils#loadLabeledPoints` instead. In Scala, `MLUtils#loadLabeledPoints` is compatible with the format used by `MLUtils#loadLabeledData`. CC: @mateiz, @srowen Author: Xiangrui Meng <meng@databricks.com> Closes #685 from mengxr/labeled-io and squashes the following commits: 2d1116a [Xiangrui Meng] make loadLabeledData/saveLabeledData deprecated since 1.0.1 297be75 [Xiangrui Meng] change LabeledPoint.parse to LabeledPointParser.parse to maintain binary compatibility d6b1473 [Xiangrui Meng] Merge branch 'master' into labeled-io 56746ea [Xiangrui Meng] replace # by . 623a5f0 [Xiangrui Meng] merge master f06d5ba [Xiangrui Meng] add docs and minor updates 640fe0c [Xiangrui Meng] throw SparkException 5bcfbc4 [Xiangrui Meng] update test to add scientific notations e86bf38 [Xiangrui Meng] remove NumericTokenizer 050fca4 [Xiangrui Meng] use StringTokenizer 6155b75 [Xiangrui Meng] merge master f644438 [Xiangrui Meng] remove parse methods based on eval from pyspark a41675a [Xiangrui Meng] python loadLabeledPoint uses Scala's implementation ce9a475 [Xiangrui Meng] add deserialize_labeled_point to pyspark with tests e9fcd49 [Xiangrui Meng] add serializeLabeledPoint and tests aea4ae3 [Xiangrui Meng] minor updates 810d6df [Xiangrui Meng] update tokenizer/parser implementation 7aac03a [Xiangrui Meng] remove Scala parsers c1885c1 [Xiangrui Meng] add headers and minor changes b0c50cb [Xiangrui Meng] add customized parser d731817 [Xiangrui Meng] style update 63dc396 [Xiangrui Meng] add loadLabeledPoints to pyspark ea122b5 [Xiangrui Meng] Merge branch 'master' into labeled-io cd6c78f [Xiangrui Meng] add __str__ and parse to LabeledPoint a7a178e [Xiangrui Meng] add stringify to pyspark's Vectors 5c2dbfa [Xiangrui Meng] add parse to pyspark's Vectors 7853f88 [Xiangrui Meng] update pyspark's SparseVector.__str__ e761d32 [Xiangrui Meng] make LabelPoint.parse compatible with the dense format used before v1.0 and deprecate loadLabeledData and saveLabeledData 9e63a02 [Xiangrui Meng] add loadVectors and loadLabeledPoints 19aa523 [Xiangrui Meng] update toString and add parsers for Vectors and LabeledPoint
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/_common.py72
-rw-r--r--python/pyspark/mllib/linalg.py34
-rw-r--r--python/pyspark/mllib/regression.py5
-rw-r--r--python/pyspark/mllib/util.py69
4 files changed, 129 insertions, 51 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 802a27a8da..a411a5d591 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -22,6 +22,7 @@ from pyspark import SparkContext, RDD
from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer
+
"""
Common utilities shared throughout MLlib, primarily for dealing with
different data types. These include:
@@ -147,7 +148,7 @@ def _serialize_sparse_vector(v):
return ba
-def _deserialize_double_vector(ba):
+def _deserialize_double_vector(ba, offset=0):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
@@ -160,43 +161,46 @@ def _deserialize_double_vector(ba):
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 5:
+ nb = len(ba) - offset
+ if nb < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- if ba[0] == DENSE_VECTOR_MAGIC:
- return _deserialize_dense_vector(ba)
- elif ba[0] == SPARSE_VECTOR_MAGIC:
- return _deserialize_sparse_vector(ba)
+ "which is too short" % nb)
+ if ba[offset] == DENSE_VECTOR_MAGIC:
+ return _deserialize_dense_vector(ba, offset)
+ elif ba[offset] == SPARSE_VECTOR_MAGIC:
+ return _deserialize_sparse_vector(ba, offset)
else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")
-def _deserialize_dense_vector(ba):
+def _deserialize_dense_vector(ba, offset=0):
"""Deserialize a dense vector into a numpy array."""
- if len(ba) < 5:
+ nb = len(ba) - offset
+ if nb < 5:
raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
- if len(ba) != 8 * length + 5:
+ "which is too short" % nb)
+ length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0]
+ if nb < 8 * length + 5:
raise TypeError("_deserialize_dense_vector called on bytearray "
"with wrong length")
- return _deserialize_numpy_array([length], ba, 5)
+ return _deserialize_numpy_array([length], ba, offset + 5)
-def _deserialize_sparse_vector(ba):
+def _deserialize_sparse_vector(ba, offset=0):
"""Deserialize a sparse vector into a MLlib SparseVector object."""
- if len(ba) < 9:
+ nb = len(ba) - offset
+ if nb < 9:
raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ "which is too short" % nb)
+ header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32)
size = header[0]
nonzeros = header[1]
- if len(ba) != 9 + 12 * nonzeros:
+ if nb < 9 + 12 * nonzeros:
raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
- indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
- values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
+ indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32)
+ values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64)
return SparseVector(int(size), indices, values)
@@ -243,7 +247,23 @@ def _deserialize_double_matrix(ba):
def _serialize_labeled_point(p):
- """Serialize a LabeledPoint with a features vector of any type."""
+ """
+ Serialize a LabeledPoint with a features vector of any type.
+
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]))
+ >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0))
+ >>> dp1.label == dp0.label
+ True
+ >>> array_equal(dp1.features, dp0.features)
+ True
+ >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5]))
+ >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0))
+ >>> sp1.label == sp1.label
+ True
+ >>> sp1.features == sp0.features
+ True
+ """
from pyspark.mllib.regression import LabeledPoint
serialized_features = _serialize_double_vector(p.features)
header = bytearray(9)
@@ -252,6 +272,16 @@ def _serialize_labeled_point(p):
header_float[0] = p.label
return header + serialized_features
+def _deserialize_labeled_point(ba, offset=0):
+ """Deserialize a LabeledPoint from a mutually understood format."""
+ from pyspark.mllib.regression import LabeledPoint
+ if type(ba) != bytearray:
+ raise TypeError("Expecting a bytearray but got %s" % type(ba))
+ if ba[offset] != LABELED_POINT_MAGIC:
+ raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0]))
+ label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0]
+ features = _deserialize_double_vector(ba, offset + 9)
+ return LabeledPoint(label, features)
def _copyto(array, buffer, offset, shape, dtype):
"""
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 2766842720..db39ed0acd 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -43,11 +43,11 @@ class SparseVector(object):
or two sorted lists containing indices and values.
>>> print SparseVector(4, {1: 1.0, 3: 5.5})
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print SparseVector(4, [1, 3], [1.0, 5.5])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
"""
self.size = int(size)
assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
@@ -160,10 +160,9 @@ class SparseVector(object):
return result
def __str__(self):
- inds = self.indices
- vals = self.values
- entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
- return "[" + entries + "]"
+ inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
+ vals = "[" + ",".join([str(v) for v in self.values]) + "]"
+ return "(" + ",".join((str(self.size), inds, vals)) + ")"
def __repr__(self):
inds = self.indices
@@ -213,11 +212,11 @@ class Vectors(object):
or two sorted lists containing indices and values.
>>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
- [1: 1.0, 3: 5.5]
+ (4,[1,3],[1.0,5.5])
"""
return SparseVector(size, *args)
@@ -232,6 +231,21 @@ class Vectors(object):
"""
return array(elements, dtype=float64)
+ @staticmethod
+ def stringify(vector):
+ """
+ Converts a vector into a string, which can be recognized by
+ Vectors.parse().
+
+ >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0]))
+ '(2,[1],[1.0])'
+ >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
+ '[0.0,1.0]'
+ """
+ if type(vector) == SparseVector:
+ return str(vector)
+ else:
+ return "[" + ",".join([str(v) for v in vector]) + "]"
def _test():
import doctest
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index bc7de6d2e8..b84bc531de 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -23,7 +23,7 @@ from pyspark.mllib._common import \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
_linear_predictor_typecheck, _have_scipy, _scipy_issparse
-from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.linalg import SparseVector, Vectors
class LabeledPoint(object):
@@ -44,6 +44,9 @@ class LabeledPoint(object):
else:
raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+ def __str__(self):
+ return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")"
+
class LinearModel(object):
"""A linear model that has a vector of coefficients and an intercept."""
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 0e5f4520b9..e24c144f45 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,10 @@ import numpy as np
from pyspark.mllib.linalg import Vectors, SparseVector
from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib._common import _convert_vector
+from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
+from pyspark.rdd import RDD
+from pyspark.serializers import NoOpSerializer
+
class MLUtils:
@@ -105,24 +108,18 @@ class MLUtils:
>>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
>>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect()
>>> tempFile.close()
- >>> examples[0].label
- 1.0
- >>> examples[0].features.size
- 6
- >>> print examples[0].features
- [0: 1.0, 2: 2.0, 4: 3.0]
- >>> examples[1].label
- 0.0
- >>> examples[1].features.size
- 6
- >>> print examples[1].features
- []
- >>> examples[2].label
- 0.0
- >>> examples[2].features.size
- 6
- >>> print examples[2].features
- [1: 4.0, 3: 5.0, 5: 6.0]
+ >>> type(examples[0]) == LabeledPoint
+ True
+ >>> print examples[0]
+ (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
+ >>> type(examples[1]) == LabeledPoint
+ True
+ >>> print examples[1]
+ (0.0,(6,[],[]))
+ >>> type(examples[2]) == LabeledPoint
+ True
+ >>> print examples[2]
+ (0.0,(6,[1,3,5],[4.0,5.0,6.0]))
>>> multiclass_examples[1].label
-1.0
"""
@@ -158,6 +155,40 @@ class MLUtils:
lines.saveAsTextFile(dir)
+ @staticmethod
+ def loadLabeledPoints(sc, path, minPartitions=None):
+ """
+ Load labeled points saved using RDD.saveAsTextFile.
+
+ @param sc: Spark context
+ @param path: file or directory path in any Hadoop-supported file
+ system URI
+ @param minPartitions: min number of partitions
+ @return: labeled data stored as an RDD of LabeledPoint
+
+ >>> from tempfile import NamedTemporaryFile
+ >>> from pyspark.mllib.util import MLUtils
+ >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \
+ LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+ >>> tempFile = NamedTemporaryFile(delete=True)
+ >>> tempFile.close()
+ >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
+ >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
+ >>> type(loaded[0]) == LabeledPoint
+ True
+ >>> print examples[0]
+ (1.1,(3,[0,2],[-1.23,4.56e-07]))
+ >>> type(examples[1]) == LabeledPoint
+ True
+ >>> print examples[1]
+ (0.0,[1.01,2.02,3.03])
+ """
+ minPartitions = minPartitions or min(sc.defaultParallelism, 2)
+ jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
+ serialized = RDD(jSerialized, sc, NoOpSerializer())
+ return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes)))
+
+
def _test():
import doctest
from pyspark.context import SparkContext