diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/mllib/_common.py | 72 | ||||
-rw-r--r-- | python/pyspark/mllib/linalg.py | 34 | ||||
-rw-r--r-- | python/pyspark/mllib/regression.py | 5 | ||||
-rw-r--r-- | python/pyspark/mllib/util.py | 69 |
4 files changed, 129 insertions, 51 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 802a27a8da..a411a5d591 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -22,6 +22,7 @@ from pyspark import SparkContext, RDD from pyspark.mllib.linalg import SparseVector from pyspark.serializers import Serializer + """ Common utilities shared throughout MLlib, primarily for dealing with different data types. These include: @@ -147,7 +148,7 @@ def _serialize_sparse_vector(v): return ba -def _deserialize_double_vector(ba): +def _deserialize_double_vector(ba, offset=0): """Deserialize a double vector from a mutually understood format. >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]) @@ -160,43 +161,46 @@ def _deserialize_double_vector(ba): if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " "wanted bytearray" % type(ba)) - if len(ba) < 5: + nb = len(ba) - offset + if nb < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % len(ba)) - if ba[0] == DENSE_VECTOR_MAGIC: - return _deserialize_dense_vector(ba) - elif ba[0] == SPARSE_VECTOR_MAGIC: - return _deserialize_sparse_vector(ba) + "which is too short" % nb) + if ba[offset] == DENSE_VECTOR_MAGIC: + return _deserialize_dense_vector(ba, offset) + elif ba[offset] == SPARSE_VECTOR_MAGIC: + return _deserialize_sparse_vector(ba, offset) else: raise TypeError("_deserialize_double_vector called on bytearray " "with wrong magic") -def _deserialize_dense_vector(ba): +def _deserialize_dense_vector(ba, offset=0): """Deserialize a dense vector into a numpy array.""" - if len(ba) < 5: + nb = len(ba) - offset + if nb < 5: raise TypeError("_deserialize_dense_vector called on a %d-byte array, " - "which is too short" % len(ba)) - length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0] - if len(ba) != 8 * length + 5: + "which is too short" % nb) + length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0] + if nb < 8 * length + 5: raise TypeError("_deserialize_dense_vector called on bytearray " "with wrong length") - return _deserialize_numpy_array([length], ba, 5) + return _deserialize_numpy_array([length], ba, offset + 5) -def _deserialize_sparse_vector(ba): +def _deserialize_sparse_vector(ba, offset=0): """Deserialize a sparse vector into a MLlib SparseVector object.""" - if len(ba) < 9: + nb = len(ba) - offset + if nb < 9: raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " - "which is too short" % len(ba)) - header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) + "which is too short" % nb) + header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32) size = header[0] nonzeros = header[1] - if len(ba) != 9 + 12 * nonzeros: + if nb < 9 + 12 * nonzeros: raise TypeError("_deserialize_sparse_vector called on bytearray " "with wrong length") - indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32) - values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64) + indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32) + values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) return SparseVector(int(size), indices, values) @@ -243,7 +247,23 @@ def _deserialize_double_matrix(ba): def _serialize_labeled_point(p): - """Serialize a LabeledPoint with a features vector of any type.""" + """ + Serialize a LabeledPoint with a features vector of any type. + + >>> from pyspark.mllib.regression import LabeledPoint + >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])) + >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0)) + >>> dp1.label == dp0.label + True + >>> array_equal(dp1.features, dp0.features) + True + >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5])) + >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0)) + >>> sp1.label == sp1.label + True + >>> sp1.features == sp0.features + True + """ from pyspark.mllib.regression import LabeledPoint serialized_features = _serialize_double_vector(p.features) header = bytearray(9) @@ -252,6 +272,16 @@ def _serialize_labeled_point(p): header_float[0] = p.label return header + serialized_features +def _deserialize_labeled_point(ba, offset=0): + """Deserialize a LabeledPoint from a mutually understood format.""" + from pyspark.mllib.regression import LabeledPoint + if type(ba) != bytearray: + raise TypeError("Expecting a bytearray but got %s" % type(ba)) + if ba[offset] != LABELED_POINT_MAGIC: + raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0])) + label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0] + features = _deserialize_double_vector(ba, offset + 9) + return LabeledPoint(label, features) def _copyto(array, buffer, offset, shape, dtype): """ diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 2766842720..db39ed0acd 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -43,11 +43,11 @@ class SparseVector(object): or two sorted lists containing indices and values. >>> print SparseVector(4, {1: 1.0, 3: 5.5}) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print SparseVector(4, [(1, 1.0), (3, 5.5)]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print SparseVector(4, [1, 3], [1.0, 5.5]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) """ self.size = int(size) assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments" @@ -160,10 +160,9 @@ class SparseVector(object): return result def __str__(self): - inds = self.indices - vals = self.values - entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) - return "[" + entries + "]" + inds = "[" + ",".join([str(i) for i in self.indices]) + "]" + vals = "[" + ",".join([str(v) for v in self.values]) + "]" + return "(" + ",".join((str(self.size), inds, vals)) + ")" def __repr__(self): inds = self.indices @@ -213,11 +212,11 @@ class Vectors(object): or two sorted lists containing indices and values. >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5}) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) """ return SparseVector(size, *args) @@ -232,6 +231,21 @@ class Vectors(object): """ return array(elements, dtype=float64) + @staticmethod + def stringify(vector): + """ + Converts a vector into a string, which can be recognized by + Vectors.parse(). + + >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0])) + '(2,[1],[1.0])' + >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) + '[0.0,1.0]' + """ + if type(vector) == SparseVector: + return str(vector) + else: + return "[" + ",".join([str(v) for v in vector]) + "]" def _test(): import doctest diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index bc7de6d2e8..b84bc531de 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -23,7 +23,7 @@ from pyspark.mllib._common import \ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ _linear_predictor_typecheck, _have_scipy, _scipy_issparse -from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.linalg import SparseVector, Vectors class LabeledPoint(object): @@ -44,6 +44,9 @@ class LabeledPoint(object): else: raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") + def __str__(self): + return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")" + class LinearModel(object): """A linear model that has a vector of coefficients and an intercept.""" diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 0e5f4520b9..e24c144f45 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -19,7 +19,10 @@ import numpy as np from pyspark.mllib.linalg import Vectors, SparseVector from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib._common import _convert_vector +from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point +from pyspark.rdd import RDD +from pyspark.serializers import NoOpSerializer + class MLUtils: @@ -105,24 +108,18 @@ class MLUtils: >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect() >>> tempFile.close() - >>> examples[0].label - 1.0 - >>> examples[0].features.size - 6 - >>> print examples[0].features - [0: 1.0, 2: 2.0, 4: 3.0] - >>> examples[1].label - 0.0 - >>> examples[1].features.size - 6 - >>> print examples[1].features - [] - >>> examples[2].label - 0.0 - >>> examples[2].features.size - 6 - >>> print examples[2].features - [1: 4.0, 3: 5.0, 5: 6.0] + >>> type(examples[0]) == LabeledPoint + True + >>> print examples[0] + (1.0,(6,[0,2,4],[1.0,2.0,3.0])) + >>> type(examples[1]) == LabeledPoint + True + >>> print examples[1] + (0.0,(6,[],[])) + >>> type(examples[2]) == LabeledPoint + True + >>> print examples[2] + (0.0,(6,[1,3,5],[4.0,5.0,6.0])) >>> multiclass_examples[1].label -1.0 """ @@ -158,6 +155,40 @@ class MLUtils: lines.saveAsTextFile(dir) + @staticmethod + def loadLabeledPoints(sc, path, minPartitions=None): + """ + Load labeled points saved using RDD.saveAsTextFile. + + @param sc: Spark context + @param path: file or directory path in any Hadoop-supported file + system URI + @param minPartitions: min number of partitions + @return: labeled data stored as an RDD of LabeledPoint + + >>> from tempfile import NamedTemporaryFile + >>> from pyspark.mllib.util import MLUtils + >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ + LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] + >>> tempFile = NamedTemporaryFile(delete=True) + >>> tempFile.close() + >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name) + >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect() + >>> type(loaded[0]) == LabeledPoint + True + >>> print examples[0] + (1.1,(3,[0,2],[-1.23,4.56e-07])) + >>> type(examples[1]) == LabeledPoint + True + >>> print examples[1] + (0.0,[1.01,2.02,3.03]) + """ + minPartitions = minPartitions or min(sc.defaultParallelism, 2) + jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) + serialized = RDD(jSerialized, sc, NoOpSerializer()) + return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes))) + + def _test(): import doctest from pyspark.context import SparkContext |