aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-19 15:01:11 -0700
committerXiangrui Meng <meng@databricks.com>2014-09-19 15:01:11 -0700
commitfce5e251d636c788cda91345867e0294280c074d (patch)
tree4bded23a826bcfeb02deef73bd735cf0a05d4ee7 /python/pyspark
parenta03e5b81e91d9d792b6a2e01d1505394ea303dd8 (diff)
downloadspark-fce5e251d636c788cda91345867e0294280c074d.tar.gz
spark-fce5e251d636c788cda91345867e0294280c074d.tar.bz2
spark-fce5e251d636c788cda91345867e0294280c074d.zip
[SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib
Currently, we serialize the data between JVM and Python case by case manually, this cannot scale to support so many APIs in MLlib. This patch will try to address this problem by serialize the data using pickle protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle protocol can be easily extended to support customized class. All the modules are refactored to use this protocol. Known issues: There will be some performance regression (both CPU and memory, the serialized data increased) Author: Davies Liu <davies.liu@gmail.com> Closes #2378 from davies/pickle_mllib and squashes the following commits: dffbba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into pickle_mllib 810f97f [Davies Liu] fix equal of matrix 032cd62 [Davies Liu] add more type check and conversion for user_product bd738ab [Davies Liu] address comments e431377 [Davies Liu] fix cache of rdd, refactor 19d0967 [Davies Liu] refactor Picklers 2511e76 [Davies Liu] cleanup 1fccf1a [Davies Liu] address comments a2cc855 [Davies Liu] fix tests 9ceff73 [Davies Liu] test size of serialized Rating 44e0551 [Davies Liu] fix cache a379a81 [Davies Liu] fix pickle array in python2.7 df625c7 [Davies Liu] Merge commit '154d141' into pickle_mllib 154d141 [Davies Liu] fix autobatchedpickler 44736d7 [Davies Liu] speed up pickling array in Python 2.7 e1d1bfc [Davies Liu] refactor 708dc02 [Davies Liu] fix tests 9dcfb63 [Davies Liu] fix style 88034f0 [Davies Liu] rafactor, address comments 46a501e [Davies Liu] choose batch size automatically df19464 [Davies Liu] memorize the module and class name during pickleing f3506c5 [Davies Liu] Merge branch 'master' into pickle_mllib 722dd96 [Davies Liu] cleanup _common.py 0ee1525 [Davies Liu] remove outdated tests b02e34f [Davies Liu] remove _common.py 84c721d [Davies Liu] Merge branch 'master' into pickle_mllib 4d7963e [Davies Liu] remove muanlly serialization 6d26b03 [Davies Liu] fix tests c383544 [Davies Liu] classification f2a0856 [Davies Liu] mllib/regression d9f691f [Davies Liu] mllib/util cccb8b1 [Davies Liu] mllib/tree 8fe166a [Davies Liu] Merge branch 'pickle' into pickle_mllib aa2287e [Davies Liu] random f1544c4 [Davies Liu] refactor clustering 52d1350 [Davies Liu] use new protocol in mllib/stat b30ef35 [Davies Liu] use pickle to serialize data for mllib/recommendation f44f771 [Davies Liu] enable tests about array 3908f5c [Davies Liu] Merge branch 'master' into pickle c77c87b [Davies Liu] cleanup debugging code 60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/context.py1
-rw-r--r--python/pyspark/mllib/_common.py562
-rw-r--r--python/pyspark/mllib/classification.py61
-rw-r--r--python/pyspark/mllib/clustering.py38
-rw-r--r--python/pyspark/mllib/linalg.py256
-rw-r--r--python/pyspark/mllib/random.py54
-rw-r--r--python/pyspark/mllib/recommendation.py69
-rw-r--r--python/pyspark/mllib/regression.py105
-rw-r--r--python/pyspark/mllib/stat.py63
-rw-r--r--python/pyspark/mllib/tests.py99
-rw-r--r--python/pyspark/mllib/tree.py167
-rw-r--r--python/pyspark/mllib/util.py43
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/serializers.py36
14 files changed, 649 insertions, 915 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a17f2c1203..064a24bff5 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -211,6 +211,7 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()
+ SparkContext._jvm.SerDe.initialize()
if instance:
if (SparkContext._active_spark_context and
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
deleted file mode 100644
index 68f6033616..0000000000
--- a/python/pyspark/mllib/_common.py
+++ /dev/null
@@ -1,562 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import struct
-import sys
-import numpy
-from numpy import ndarray, float64, int64, int32, array_equal, array
-from pyspark import SparkContext, RDD
-from pyspark.mllib.linalg import SparseVector
-from pyspark.serializers import FramedSerializer
-
-
-"""
-Common utilities shared throughout MLlib, primarily for dealing with
-different data types. These include:
-- Serialization utilities to / from byte arrays that Java can handle
-- Serializers for other data types, like ALS Rating objects
-- Common methods for linear models
-- Methods to deal with the different vector types we support, such as
- SparseVector and scipy.sparse matrices.
-"""
-
-
-# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
-# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
-
-_have_scipy = False
-_scipy_issparse = None
-try:
- import scipy.sparse
- _have_scipy = True
- _scipy_issparse = scipy.sparse.issparse
-except:
- # No SciPy in environment, but that's okay
- pass
-
-
-# Serialization functions to and from Scala. These use the following formats, understood
-# by the PythonMLLibAPI class in Scala:
-#
-# Dense double vector format:
-#
-# [1-byte 1] [4-byte length] [length*8 bytes of data]
-#
-# Sparse double vector format:
-#
-# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \
-# [nonzeros*8 bytes of values]
-#
-# Double matrix format:
-#
-# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
-#
-# LabeledPoint format:
-#
-# [1-byte 4] [8-byte label] [dense or sparse vector]
-#
-# This is all in machine-endian. That means that the Java interpreter and the
-# Python interpreter must agree on what endian the machine is.
-
-
-DENSE_VECTOR_MAGIC = 1
-SPARSE_VECTOR_MAGIC = 2
-DENSE_MATRIX_MAGIC = 3
-LABELED_POINT_MAGIC = 4
-
-
-# Workaround for SPARK-2954: before Python 2.7, struct.unpack couldn't unpack bytearray()s.
-if sys.version_info[:2] <= (2, 6):
- def _unpack(fmt, string):
- return struct.unpack(fmt, buffer(string))
-else:
- _unpack = struct.unpack
-
-
-def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
- """
- Deserialize a numpy array of the given type from an offset in
- bytearray ba, assigning it the given shape.
-
- >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
- >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
- True
- >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
- >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
- True
- >>> x = array([1, 2, 3], dtype=int32)
- >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
- True
- """
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
- return ar.copy()
-
-
-def _serialize_double(d):
- """
- Serialize a double (float or numpy.float64) into a mutually understood format.
- """
- if type(d) == float or type(d) == float64 or type(d) == int or type(d) == long:
- d = float64(d)
- ba = bytearray(8)
- _copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
- return ba
- else:
- raise TypeError("_serialize_double called on non-float input")
-
-
-def _serialize_double_vector(v):
- """
- Serialize a double vector into a mutually understood format.
-
- Note: we currently do not use a magic byte for double for storage
- efficiency. This should be reconsidered when we add Ser/De for other
- 8-byte types (e.g. Long), for safety. The corresponding deserializer,
- _deserialize_double, needs to be modified as well if the serialization
- scheme changes.
-
- >>> x = array([1,2,3])
- >>> y = _deserialize_double_vector(_serialize_double_vector(x))
- >>> array_equal(y, array([1.0, 2.0, 3.0]))
- True
- """
- v = _convert_vector(v)
- if type(v) == ndarray:
- return _serialize_dense_vector(v)
- elif type(v) == SparseVector:
- return _serialize_sparse_vector(v)
- else:
- raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray or SparseVector" % type(v))
-
-
-def _serialize_dense_vector(v):
- """Serialize a dense vector given as a NumPy array."""
- if v.ndim != 1:
- raise TypeError("_serialize_double_vector called on a %ddarray; "
- "wanted a 1darray" % v.ndim)
- if v.dtype != float64:
- if numpy.issubdtype(v.dtype, numpy.complex):
- raise TypeError("_serialize_double_vector called on an ndarray of %s; "
- "wanted ndarray of float64" % v.dtype)
- v = v.astype(float64)
- length = v.shape[0]
- ba = bytearray(5 + 8 * length)
- ba[0] = DENSE_VECTOR_MAGIC
- length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
- length_bytes[0] = length
- _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
- return ba
-
-
-def _serialize_sparse_vector(v):
- """Serialize a pyspark.mllib.linalg.SparseVector."""
- nonzeros = len(v.indices)
- ba = bytearray(9 + 12 * nonzeros)
- ba[0] = SPARSE_VECTOR_MAGIC
- header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
- header[0] = v.size
- header[1] = nonzeros
- _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
- values_offset = 9 + 4 * nonzeros
- _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
- return ba
-
-
-def _deserialize_double(ba, offset=0):
- """Deserialize a double from a mutually understood format.
-
- >>> import sys
- >>> _deserialize_double(_serialize_double(123.0)) == 123.0
- True
- >>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
- True
- >>> _deserialize_double(_serialize_double(1)) == 1.0
- True
- >>> _deserialize_double(_serialize_double(1L)) == 1.0
- True
- >>> x = sys.float_info.max
- >>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
- True
- >>> y = float64(sys.float_info.max)
- >>> _deserialize_double(_serialize_double(sys.float_info.max)) == y
- True
- """
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double called on a %s; wanted bytearray" % type(ba))
- if len(ba) - offset != 8:
- raise TypeError("_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb)
- return _unpack("d", ba[offset:])[0]
-
-
-def _deserialize_double_vector(ba, offset=0):
- """Deserialize a double vector from a mutually understood format.
-
- >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
- >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
- True
- >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
- >>> s == _deserialize_double_vector(_serialize_double_vector(s))
- True
- """
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double_vector called on a %s; "
- "wanted bytearray" % type(ba))
- nb = len(ba) - offset
- if nb < 5:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is too short" % nb)
- if ba[offset] == DENSE_VECTOR_MAGIC:
- return _deserialize_dense_vector(ba, offset)
- elif ba[offset] == SPARSE_VECTOR_MAGIC:
- return _deserialize_sparse_vector(ba, offset)
- else:
- raise TypeError("_deserialize_double_vector called on bytearray "
- "with wrong magic")
-
-
-def _deserialize_dense_vector(ba, offset=0):
- """Deserialize a dense vector into a numpy array."""
- nb = len(ba) - offset
- if nb < 5:
- raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
- "which is too short" % nb)
- length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0]
- if nb < 8 * length + 5:
- raise TypeError("_deserialize_dense_vector called on bytearray "
- "with wrong length")
- return _deserialize_numpy_array([length], ba, offset + 5)
-
-
-def _deserialize_sparse_vector(ba, offset=0):
- """Deserialize a sparse vector into a MLlib SparseVector object."""
- nb = len(ba) - offset
- if nb < 9:
- raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
- "which is too short" % nb)
- header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32)
- size = header[0]
- nonzeros = header[1]
- if nb < 9 + 12 * nonzeros:
- raise TypeError("_deserialize_sparse_vector called on bytearray "
- "with wrong length")
- indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32)
- values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64)
- return SparseVector(int(size), indices, values)
-
-
-def _serialize_double_matrix(m):
- """Serialize a double matrix into a mutually understood format."""
- if (type(m) == ndarray and m.ndim == 2):
- if m.dtype != float64:
- if numpy.issubdtype(m.dtype, numpy.complex):
- raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
- "wanted ndarray of float64" % m.dtype)
- m = m.astype(float64)
- rows = m.shape[0]
- cols = m.shape[1]
- ba = bytearray(9 + 8 * rows * cols)
- ba[0] = DENSE_MATRIX_MAGIC
- lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
- lengths[0] = rows
- lengths[1] = cols
- _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
- return ba
- else:
- raise TypeError("_serialize_double_matrix called on a "
- "non-double-matrix")
-
-
-def _deserialize_double_matrix(ba):
- """Deserialize a double matrix from a mutually understood format."""
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double_matrix called on a %s; "
- "wanted bytearray" % type(ba))
- if len(ba) < 9:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is too short" % len(ba))
- if ba[0] != DENSE_MATRIX_MAGIC:
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong magic")
- lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
- rows = lengths[0]
- cols = lengths[1]
- if (len(ba) != 8 * rows * cols + 9):
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong length")
- return _deserialize_numpy_array([rows, cols], ba, 9)
-
-
-def _serialize_labeled_point(p):
- """
- Serialize a LabeledPoint with a features vector of any type.
-
- >>> from pyspark.mllib.regression import LabeledPoint
- >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]))
- >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0))
- >>> dp1.label == dp0.label
- True
- >>> array_equal(dp1.features, dp0.features)
- True
- >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5]))
- >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0))
- >>> sp1.label == sp1.label
- True
- >>> sp1.features == sp0.features
- True
- """
- from pyspark.mllib.regression import LabeledPoint
- serialized_features = _serialize_double_vector(p.features)
- header = bytearray(9)
- header[0] = LABELED_POINT_MAGIC
- header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
- header_float[0] = p.label
- return header + serialized_features
-
-
-def _deserialize_labeled_point(ba, offset=0):
- """Deserialize a LabeledPoint from a mutually understood format."""
- from pyspark.mllib.regression import LabeledPoint
- if type(ba) != bytearray:
- raise TypeError("Expecting a bytearray but got %s" % type(ba))
- if ba[offset] != LABELED_POINT_MAGIC:
- raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0]))
- label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0]
- features = _deserialize_double_vector(ba, offset + 9)
- return LabeledPoint(label, features)
-
-
-def _copyto(array, buffer, offset, shape, dtype):
- """
- Copy the contents of a vector to a destination bytearray at the
- given offset.
-
- TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
- we should benchmark that to see whether it provides a benefit.
- """
- temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
- temp_array[...] = array
-
-
-def _get_unmangled_rdd(data, serializer, cache=True):
- """
- :param cache: If True, the serialized RDD is cached. (default = True)
- WARNING: Users should unpersist() this later!
- """
- dataBytes = data.map(serializer)
- dataBytes._bypass_serializer = True
- if cache:
- dataBytes.cache()
- return dataBytes
-
-
-def _get_unmangled_double_vector_rdd(data, cache=True):
- """
- Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
- _serialized_double_vectors.
- :param cache: If True, the serialized RDD is cached. (default = True)
- WARNING: Users should unpersist() this later!
- """
- return _get_unmangled_rdd(data, _serialize_double_vector, cache)
-
-
-def _get_unmangled_labeled_point_rdd(data, cache=True):
- """
- Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points.
- :param cache: If True, the serialized RDD is cached. (default = True)
- WARNING: Users should unpersist() this later!
- """
- return _get_unmangled_rdd(data, _serialize_labeled_point, cache)
-
-
-# Common functions for dealing with and training linear models
-
-def _linear_predictor_typecheck(x, coeffs):
- """
- Check that x is a one-dimensional vector of the right shape.
- This is a temporary hackaround until we actually implement bulk predict.
- """
- x = _convert_vector(x)
- if type(x) == ndarray:
- if x.ndim == 1:
- if x.shape != coeffs.shape:
- raise RuntimeError("Got array of %d elements; wanted %d" % (
- numpy.shape(x)[0], coeffs.shape[0]))
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif type(x) == SparseVector:
- if x.size != coeffs.shape[0]:
- raise RuntimeError("Got sparse vector of size %d; wanted %d" % (
- x.size, coeffs.shape[0]))
- elif isinstance(x, RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
-
-
-# If we weren't given initial weights, take a zero vector of the appropriate
-# length.
-def _get_initial_weights(initial_weights, data):
- if initial_weights is None:
- initial_weights = _convert_vector(data.first().features)
- if type(initial_weights) == ndarray:
- if initial_weights.ndim != 1:
- raise TypeError("At least one data element has "
- + initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = numpy.zeros([initial_weights.shape[0]])
- elif type(initial_weights) == SparseVector:
- initial_weights = numpy.zeros([initial_weights.size])
- return initial_weights
-
-
-# train_func should take two parameters, namely data and initial_weights, and
-# return the result of a call to the appropriate JVM stub.
-# _regression_train_wrapper is responsible for setup and error checking.
-def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
- initial_weights = _get_initial_weights(initial_weights, data)
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
- if len(ans) != 2:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]).__name__ + " which is not bytearray")
- elif type(ans[1]) != float:
- raise RuntimeError("JVM call result had second element of type "
- + type(ans[0]).__name__ + " which is not float")
- return klass(_deserialize_double_vector(ans[0]), ans[1])
-
-
-# Functions for serializing ALS Rating objects and tuples
-
-def _serialize_rating(r):
- ba = bytearray(16)
- intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
- doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
- intpart[0], intpart[1], doublepart[0] = r
- return ba
-
-
-class RatingDeserializer(FramedSerializer):
-
- def loads(self, string):
- res = ndarray(shape=(3, ), buffer=string, dtype=float64, offset=4)
- return int(res[0]), int(res[1]), res[2]
-
- def load_stream(self, stream):
- while True:
- try:
- yield self._read_with_length(stream)
- except struct.error:
- return
- except EOFError:
- return
-
-
-def _serialize_tuple(t):
- ba = bytearray(8)
- intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
- intpart[0], intpart[1] = t
- return ba
-
-
-# Vector math functions that support all of our vector types
-
-def _convert_vector(vec):
- """
- Convert a vector to a format we support internally. This does
- the following:
-
- * For dense NumPy vectors (ndarray), returns them as is
- * For our SparseVector class, returns that as is
- * For Python lists, converts them to NumPy vectors
- * For scipy.sparse.*_matrix column vectors, converts them to
- our own SparseVector type.
-
- This should be called before passing any data to our algorithms
- or attempting to serialize it to Java.
- """
- if type(vec) == ndarray or type(vec) == SparseVector:
- return vec
- elif type(vec) == list:
- return array(vec, dtype=float64)
- elif _have_scipy:
- if _scipy_issparse(vec):
- assert vec.shape[1] == 1, "Expected column vector"
- csc = vec.tocsc()
- return SparseVector(vec.shape[0], csc.indices, csc.data)
- raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
-
-
-def _squared_distance(v1, v2):
- """
- Squared distance of two NumPy or sparse vectors.
-
- >>> dense1 = array([1., 2.])
- >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
- >>> dense2 = array([2., 1.])
- >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
- >>> _squared_distance(dense1, dense2)
- 2.0
- >>> _squared_distance(dense1, sparse2)
- 2.0
- >>> _squared_distance(sparse1, dense2)
- 2.0
- >>> _squared_distance(sparse1, sparse2)
- 2.0
- """
- v1 = _convert_vector(v1)
- v2 = _convert_vector(v2)
- if type(v1) == ndarray and type(v2) == ndarray:
- diff = v1 - v2
- return numpy.dot(diff, diff)
- elif type(v1) == ndarray:
- return v2.squared_distance(v1)
- else:
- return v1.squared_distance(v2)
-
-
-def _dot(vec, target):
- """
- Compute the dot product of a vector of the types we support
- (Numpy array, list, SparseVector, or SciPy sparse) and a target
- NumPy array that is either 1- or 2-dimensional. Equivalent to
- calling numpy.dot of the two vectors, but for SciPy ones, we
- have to transpose them because they're column vectors.
- """
- if type(vec) == ndarray:
- return numpy.dot(vec, target)
- elif type(vec) == SparseVector:
- return vec.dot(target)
- elif type(vec) == list:
- return numpy.dot(_convert_vector(vec), target)
- else:
- return vec.transpose().dot(target)[0]
-
-
-def _test():
- import doctest
- globs = globals().copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
- globs['sc'].stop()
- if failure_count:
- exit(-1)
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 71ab46b61d..ac142fb49a 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -15,19 +15,14 @@
# limitations under the License.
#
+from math import exp
+
import numpy
+from numpy import array
-from numpy import array, shape
-from pyspark import SparkContext
-from pyspark.mllib._common import \
- _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
-from pyspark.mllib.linalg import SparseVector
-from pyspark.mllib.regression import LabeledPoint, LinearModel
-from math import exp, log
+from pyspark import SparkContext, PickleSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
__all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'SVMModel',
@@ -67,8 +62,7 @@ class LogisticRegressionModel(LinearModel):
"""
def predict(self, x):
- _linear_predictor_typecheck(x, self._coeff)
- margin = _dot(x, self._coeff) + self._intercept
+ margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
else:
@@ -81,7 +75,7 @@ class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
- initialWeights=None, regParam=1.0, regType=None, intercept=False):
+ initialWeights=None, regParam=1.0, regType="none", intercept=False):
"""
Train a logistic regression model on the given data.
@@ -106,11 +100,12 @@ class LogisticRegressionWithSGD(object):
are activated or not).
"""
sc = data.context
- if regType is None:
- regType = "none"
- train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
- d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
- return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data,
+
+ def train(jdata, i):
+ return sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
+ jdata, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
+
+ return _regression_train_wrapper(sc, train, LogisticRegressionModel, data,
initialWeights)
@@ -141,8 +136,7 @@ class SVMModel(LinearModel):
"""
def predict(self, x):
- _linear_predictor_typecheck(x, self._coeff)
- margin = _dot(x, self._coeff) + self._intercept
+ margin = self.weights.dot(x) + self.intercept
return 1 if margin >= 0 else 0
@@ -150,7 +144,7 @@ class SVMWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
- miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False):
+ miniBatchFraction=1.0, initialWeights=None, regType="none", intercept=False):
"""
Train a support vector machine on the given data.
@@ -175,11 +169,12 @@ class SVMWithSGD(object):
are activated or not).
"""
sc = data.context
- if regType is None:
- regType = "none"
- train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
- d._jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept)
- return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
+ jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept)
+
+ return _regression_train_wrapper(sc, train, SVMModel, data, initialWeights)
class NaiveBayesModel(object):
@@ -220,7 +215,8 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))]
+ x = _convert_to_vector(x)
+ return self.labels[numpy.argmax(self.pi + x.dot(self.theta.transpose()))]
class NaiveBayes(object):
@@ -242,12 +238,9 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
- return NaiveBayesModel(
- _deserialize_double_vector(ans[0]),
- _deserialize_double_vector(ans[1]),
- _deserialize_double_matrix(ans[2]))
+ jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+ labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
+ return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
def _test():
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index f3e952a1d8..12c5602271 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -15,15 +15,9 @@
# limitations under the License.
#
-from numpy import array, dot
-from math import sqrt
from pyspark import SparkContext
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
__all__ = ['KMeansModel', 'KMeans']
@@ -32,6 +26,7 @@ class KMeansModel(object):
"""A clustering model derived from the k-means method.
+ >>> from numpy import array
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
@@ -71,8 +66,9 @@ class KMeansModel(object):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = float("inf")
- for i in range(0, len(self.centers)):
- distance = _squared_distance(x, self.centers[i])
+ x = _convert_to_vector(x)
+ for i in xrange(len(self.centers)):
+ distance = x.squared_distance(self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
@@ -82,19 +78,17 @@ class KMeansModel(object):
class KMeans(object):
@classmethod
- def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"):
+ def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"):
"""Train a k-means clustering model."""
- sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
- ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(
- dataBytes._jrdd, k, maxIterations, runs, initializationMode)
- if len(ans) != 1:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray")
- matrix = _deserialize_double_matrix(ans[0])
- return KMeansModel([row for row in matrix])
+ sc = rdd.context
+ ser = PickleSerializer()
+ # cache serialized data to avoid objects over head in JVM
+ cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
+ model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
+ cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+ bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
+ centers = ser.loads(str(bytes))
+ return KMeansModel([c.toArray() for c in centers])
def _test():
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index e69051c104..0a5dcaac55 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -23,14 +23,148 @@ object from MLlib or pass SciPy C{scipy.sparse} column vectors if
SciPy is available in their environment.
"""
-import numpy
-from numpy import array, array_equal, ndarray, float64, int32
+import sys
+import array
+import copy_reg
+import numpy as np
-__all__ = ['SparseVector', 'Vectors']
+__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
-class SparseVector(object):
+if sys.version_info[:2] == (2, 7):
+ # speed up pickling array in Python 2.7
+ def fast_pickle_array(ar):
+ return array.array, (ar.typecode, ar.tostring())
+ copy_reg.pickle(array.array, fast_pickle_array)
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy in environment, but that's okay
+ _have_scipy = False
+
+
+def _convert_to_vector(l):
+ if isinstance(l, Vector):
+ return l
+ elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
+ return DenseVector(l)
+ elif _have_scipy and scipy.sparse.issparse(l):
+ assert l.shape[1] == 1, "Expected column vector"
+ csc = l.tocsc()
+ return SparseVector(l.shape[0], csc.indices, csc.data)
+ else:
+ raise TypeError("Cannot convert type %s into Vector" % type(l))
+
+
+class Vector(object):
+ """
+ Abstract class for DenseVector and SparseVector
+ """
+ def toArray(self):
+ """
+ Convert the vector into an numpy.ndarray
+ :return: numpy.ndarray
+ """
+ raise NotImplementedError
+
+
+class DenseVector(Vector):
+ def __init__(self, ar):
+ if not isinstance(ar, array.array):
+ ar = array.array('d', ar)
+ self.array = ar
+
+ def __reduce__(self):
+ return DenseVector, (self.array,)
+
+ def dot(self, other):
+ """
+ Compute the dot product of two Vectors. We support
+ (Numpy array, list, SparseVector, or SciPy sparse)
+ and a target NumPy array that is either 1- or 2-dimensional.
+ Equivalent to calling numpy.dot of the two vectors.
+
+ >>> dense = DenseVector(array.array('d', [1., 2.]))
+ >>> dense.dot(dense)
+ 5.0
+ >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
+ 4.0
+ >>> dense.dot(range(1, 3))
+ 5.0
+ >>> dense.dot(np.array(range(1, 3)))
+ 5.0
+ """
+ if isinstance(other, SparseVector):
+ return other.dot(self)
+ elif _have_scipy and scipy.sparse.issparse(other):
+ return other.transpose().dot(self.toArray())[0]
+ elif isinstance(other, Vector):
+ return np.dot(self.toArray(), other.toArray())
+ else:
+ return np.dot(self.toArray(), other)
+
+ def squared_distance(self, other):
+ """
+ Squared distance of two Vectors.
+
+ >>> dense1 = DenseVector(array.array('d', [1., 2.]))
+ >>> dense1.squared_distance(dense1)
+ 0.0
+ >>> dense2 = np.array([2., 1.])
+ >>> dense1.squared_distance(dense2)
+ 2.0
+ >>> dense3 = [2., 1.]
+ >>> dense1.squared_distance(dense3)
+ 2.0
+ >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
+ >>> dense1.squared_distance(sparse1)
+ 2.0
+ """
+ if isinstance(other, SparseVector):
+ return other.squared_distance(self)
+ elif _have_scipy and scipy.sparse.issparse(other):
+ return _convert_to_vector(other).squared_distance(self)
+
+ if isinstance(other, Vector):
+ other = other.toArray()
+ elif not isinstance(other, np.ndarray):
+ other = np.array(other)
+ diff = self.toArray() - other
+ return np.dot(diff, diff)
+
+ def toArray(self):
+ return np.array(self.array)
+
+ def __getitem__(self, item):
+ return self.array[item]
+
+ def __len__(self):
+ return len(self.array)
+
+ def __str__(self):
+ return "[" + ",".join([str(v) for v in self.array]) + "]"
+
+ def __repr__(self):
+ return "DenseVector(%r)" % self.array
+
+ def __eq__(self, other):
+ return isinstance(other, DenseVector) and self.array == other.array
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __getattr__(self, item):
+ return getattr(self.array, item)
+
+
+class SparseVector(Vector):
"""
A simple sparse vector class for passing data to MLlib. Users may
@@ -61,16 +195,19 @@ class SparseVector(object):
if type(pairs) == dict:
pairs = pairs.items()
pairs = sorted(pairs)
- self.indices = array([p[0] for p in pairs], dtype=int32)
- self.values = array([p[1] for p in pairs], dtype=float64)
+ self.indices = array.array('i', [p[0] for p in pairs])
+ self.values = array.array('d', [p[1] for p in pairs])
else:
assert len(args[0]) == len(args[1]), "index and value arrays not same length"
- self.indices = array(args[0], dtype=int32)
- self.values = array(args[1], dtype=float64)
+ self.indices = array.array('i', args[0])
+ self.values = array.array('d', args[1])
for i in xrange(len(self.indices) - 1):
if self.indices[i] >= self.indices[i + 1]:
raise TypeError("indices array must be sorted")
+ def __reduce__(self):
+ return (SparseVector, (self.size, self.indices, self.values))
+
def dot(self, other):
"""
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
@@ -78,15 +215,15 @@ class SparseVector(object):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
- >>> a.dot(array([1., 2., 3., 4.]))
+ >>> a.dot(array.array('d', [1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
- >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ >>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
"""
- if type(other) == ndarray:
+ if type(other) == np.ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
@@ -94,10 +231,17 @@ class SparseVector(object):
return result
elif other.ndim == 2:
results = [self.dot(other[:, i]) for i in xrange(other.shape[1])]
- return array(results)
+ return np.array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
- else:
+
+ elif type(other) in (array.array, DenseVector):
+ result = 0.0
+ for i in xrange(len(self.indices)):
+ result += self.values[i] * other[self.indices[i]]
+ return result
+
+ elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
@@ -110,6 +254,8 @@ class SparseVector(object):
else:
j += 1
return result
+ else:
+ return self.dot(_convert_to_vector(other))
def squared_distance(self, other):
"""
@@ -118,7 +264,9 @@ class SparseVector(object):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
- >>> a.squared_distance(array([1., 2., 3., 4.]))
+ >>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
+ 11.0
+ >>> a.squared_distance(np.array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
@@ -126,22 +274,22 @@ class SparseVector(object):
>>> b.squared_distance(a)
30.0
"""
- if type(other) == ndarray:
- if other.ndim == 1:
- result = 0.0
- j = 0 # index into our own array
- for i in xrange(other.shape[0]):
- if j < len(self.indices) and self.indices[j] == i:
- diff = self.values[j] - other[i]
- result += diff * diff
- j += 1
- else:
- result += other[i] * other[i]
- return result
- else:
+ if type(other) in (list, array.array, DenseVector, np.array, np.ndarray):
+ if type(other) is np.array and other.ndim != 1:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
- else:
+ result = 0.0
+ j = 0 # index into our own array
+ for i in xrange(len(other)):
+ if j < len(self.indices) and self.indices[j] == i:
+ diff = self.values[j] - other[i]
+ result += diff * diff
+ j += 1
+ else:
+ result += other[i] * other[i]
+ return result
+
+ elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
@@ -163,16 +311,21 @@ class SparseVector(object):
result += other.values[j] * other.values[j]
j += 1
return result
+ else:
+ return self.squared_distance(_convert_to_vector(other))
def toArray(self):
"""
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
- arr = numpy.zeros(self.size)
+ arr = np.zeros((self.size,), dtype=np.float64)
for i in xrange(self.indices.size):
arr[self.indices[i]] = self.values[i]
return arr
+ def __len__(self):
+ return self.size
+
def __str__(self):
inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
vals = "[" + ",".join([str(v) for v in self.values]) + "]"
@@ -198,8 +351,8 @@ class SparseVector(object):
return (isinstance(other, self.__class__)
and other.size == self.size
- and array_equal(other.indices, self.indices)
- and array_equal(other.values, self.values))
+ and other.indices == self.indices
+ and other.values == self.values)
def __ne__(self, other):
return not self.__eq__(other)
@@ -242,9 +395,9 @@ class Vectors(object):
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
- array([ 1., 2., 3.])
+ DenseVector(array('d', [1.0, 2.0, 3.0]))
"""
- return array(elements, dtype=float64)
+ return DenseVector(elements)
@staticmethod
def stringify(vector):
@@ -257,10 +410,39 @@ class Vectors(object):
>>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
'[0.0,1.0]'
"""
- if type(vector) == SparseVector:
- return str(vector)
- else:
- return "[" + ",".join([str(v) for v in vector]) + "]"
+ return str(vector)
+
+
+class Matrix(object):
+ """ the Matrix """
+ def __init__(self, nRow, nCol):
+ self.nRow = nRow
+ self.nCol = nCol
+
+ def toArray(self):
+ raise NotImplementedError
+
+
+class DenseMatrix(Matrix):
+ def __init__(self, nRow, nCol, values):
+ Matrix.__init__(self, nRow, nCol)
+ assert len(values) == nRow * nCol
+ self.values = values
+
+ def __reduce__(self):
+ return DenseMatrix, (self.nRow, self.nCol, self.values)
+
+ def toArray(self):
+ """
+ Return an numpy.ndarray
+
+ >>> arr = array.array('d', [float(i) for i in range(4)])
+ >>> m = DenseMatrix(2, 2, arr)
+ >>> m.toArray()
+ array([[ 0., 1.],
+ [ 2., 3.]])
+ """
+ return np.ndarray((self.nRow, self.nCol), np.float64, buffer=self.values.tostring())
def _test():
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index d53c95fd59..a787e4dea2 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -19,15 +19,32 @@
Python package for random data generation.
"""
+from functools import wraps
from pyspark.rdd import RDD
-from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
__all__ = ['RandomRDDs', ]
+def serialize(f):
+ @wraps(f)
+ def func(sc, *a, **kw):
+ jrdd = f(sc, *a, **kw)
+ return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+ BatchedSerializer(PickleSerializer(), 1024))
+ return func
+
+
+def toArray(f):
+ @wraps(f)
+ def func(sc, *a, **kw):
+ rdd = f(sc, *a, **kw)
+ return rdd.map(lambda vec: vec.toArray())
+ return func
+
+
class RandomRDDs(object):
"""
Generator methods for creating RDDs comprised of i.i.d samples from
@@ -35,6 +52,7 @@ class RandomRDDs(object):
"""
@staticmethod
+ @serialize
def uniformRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the
@@ -56,11 +74,10 @@ class RandomRDDs(object):
>>> parts == sc.defaultParallelism
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
- return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
@staticmethod
+ @serialize
def normalRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the standard normal
@@ -80,11 +97,10 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - 1.0) < 0.1
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
- return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
@staticmethod
+ @serialize
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the Poisson
@@ -101,11 +117,11 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
- return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
@staticmethod
+ @toArray
+ @serialize
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -120,12 +136,12 @@ class RandomRDDs(object):
>>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
4
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
- return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
+ @toArray
+ @serialize
def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -140,12 +156,12 @@ class RandomRDDs(object):
>>> abs(mat.std() - 1.0) < 0.1
True
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
- return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
+ @toArray
+ @serialize
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -163,10 +179,8 @@ class RandomRDDs(object):
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
- return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
def _test():
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 2df23394da..59c1c5ff0c 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -16,17 +16,25 @@
#
from pyspark import SparkContext
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _serialize_tuple, RatingDeserializer
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
__all__ = ['MatrixFactorizationModel', 'ALS']
+class Rating(object):
+ def __init__(self, user, product, rating):
+ self.user = int(user)
+ self.product = int(product)
+ self.rating = float(rating)
+
+ def __reduce__(self):
+ return Rating, (self.user, self.product, self.rating)
+
+ def __repr__(self):
+ return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+
+
class MatrixFactorizationModel(object):
"""A matrix factorisation model trained by regularized alternating
@@ -39,7 +47,9 @@ class MatrixFactorizationModel(object):
>>> model = ALS.trainImplicit(ratings, 1)
>>> model.predict(2,2) is not None
True
+
>>> testset = sc.parallelize([(1, 2), (1, 1)])
+ >>> model = ALS.train(ratings, 1)
>>> model.predictAll(testset).count() == 2
True
"""
@@ -54,34 +64,61 @@ class MatrixFactorizationModel(object):
def predict(self, user, product):
return self._java_model.predict(user, product)
- def predictAll(self, usersProducts):
- usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
- return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
- self._context, RatingDeserializer())
+ def predictAll(self, user_product):
+ assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
+ first = user_product.first()
+ if isinstance(first, list):
+ user_product = user_product.map(tuple)
+ first = tuple(first)
+ assert type(first) is tuple and len(first) == 2, \
+ "user_product should be RDD of (user, product)"
+ if any(isinstance(x, str) for x in first):
+ user_product = user_product.map(lambda (u, p): (int(x), int(p)))
+ first = tuple(map(int, first))
+ assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
+ sc = self._context
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ jresult = self._java_model.predict(tuplerdd).toJavaRDD()
+ return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ AutoBatchedSerializer(PickleSerializer()))
class ALS(object):
@classmethod
+ def _prepare(cls, ratings):
+ assert isinstance(ratings, RDD), "ratings should be RDD"
+ first = ratings.first()
+ if not isinstance(first, Rating):
+ if isinstance(first, (tuple, list)):
+ ratings = ratings.map(lambda x: Rating(*x))
+ else:
+ raise ValueError("rating should be RDD of Rating or tuple/list")
+ # serialize them by AutoBatchedSerializer before cache to reduce the
+ # objects overhead in JVM
+ cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
+ return cached._to_java_object_rdd()
+
+ @classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
sc = ratings.context
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
- mod = sc._jvm.PythonMLLibAPI().trainALSModel(
- ratingBytes._jrdd, rank, iterations, lambda_, blocks)
+ jrating = cls._prepare(ratings)
+ mod = sc._jvm.PythonMLLibAPI().trainALSModel(jrating, rank, iterations, lambda_, blocks)
return MatrixFactorizationModel(sc, mod)
@classmethod
def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
sc = ratings.context
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ jrating = cls._prepare(ratings)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(
- ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)
+ jrating, rank, iterations, lambda_, blocks, alpha)
return MatrixFactorizationModel(sc, mod)
def _test():
import doctest
- globs = globals().copy()
+ import pyspark.mllib.recommendation
+ globs = pyspark.mllib.recommendation.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index f572dcfb84..cbdbc09858 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,12 +15,12 @@
# limitations under the License.
#
-from numpy import array, ndarray
-from pyspark import SparkContext
-from pyspark.mllib._common import _dot, _regression_train_wrapper, \
- _linear_predictor_typecheck, _have_scipy, _scipy_issparse
-from pyspark.mllib.linalg import SparseVector, Vectors
+import numpy as np
+from numpy import array
+from pyspark import SparkContext
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel'
'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -38,16 +38,16 @@ class LabeledPoint(object):
def __init__(self, label, features):
self.label = label
- if (type(features) == ndarray or type(features) == SparseVector
- or (_have_scipy and _scipy_issparse(features))):
- self.features = features
- elif type(features) == list:
- self.features = array(features)
- else:
- raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+ self.features = _convert_to_vector(features)
+
+ def __reduce__(self):
+ return (LabeledPoint, (self.label, self.features))
def __str__(self):
- return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")"
+ return "(" + ",".join((str(self.label), str(self.features))) + ")"
+
+ def __repr__(self):
+ return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
"""A linear model that has a vector of coefficients and an intercept."""
def __init__(self, weights, intercept):
- self._coeff = weights
+ self._coeff = _convert_to_vector(weights)
self._intercept = intercept
@property
@@ -71,18 +71,19 @@ class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
- >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
- >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+ >>> lrmb = LinearRegressionModelBase(np.array([1.0, 2.0]), 0.1)
+ >>> abs(lrmb.predict(np.array([-1.03, 7.777])) - 14.624) < 1e-6
True
>>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
True
"""
def predict(self, x):
- """Predict the value of the dependent variable given a vector x"""
- """containing values for the independent variables."""
- _linear_predictor_typecheck(x, self._coeff)
- return _dot(x, self._coeff) + self._intercept
+ """
+ Predict the value of the dependent variable given a vector x
+ containing values for the independent variables.
+ """
+ return self.weights.dot(x) + self.intercept
class LinearRegressionModel(LinearRegressionModelBase):
@@ -96,10 +97,10 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=np.array([1.0]))
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
- >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -117,11 +118,27 @@ class LinearRegressionModel(LinearRegressionModelBase):
"""
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights):
+ initial_weights = initial_weights or [0.0] * len(data.first().features)
+ ser = PickleSerializer()
+ initial_bytes = bytearray(ser.dumps(_convert_to_vector(initial_weights)))
+ # use AutoBatchedSerializer before cache to reduce the memory
+ # overhead in JVM
+ cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
+ ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+ assert len(ans) == 2, "JVM call result had unexpected length"
+ weights = ser.loads(str(ans[0]))
+ return modelClass(weights, ans[1])
+
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
- initialWeights=None, regParam=1.0, regType=None, intercept=False):
+ initialWeights=None, regParam=1.0, regType="none", intercept=False):
"""
Train a linear regression model on the given data.
@@ -146,11 +163,12 @@ class LinearRegressionWithSGD(object):
are activated or not).
"""
sc = data.context
- if regType is None:
- regType = "none"
- train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
- d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
- return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
+ jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
+
+ return _regression_train_wrapper(sc, train, LinearRegressionModel, data, initialWeights)
class LassoModel(LinearRegressionModelBase):
@@ -166,9 +184,9 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
- >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -179,7 +197,7 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -193,9 +211,11 @@ class LassoWithSGD(object):
miniBatchFraction=1.0, initialWeights=None):
"""Train a Lasso regression model on the given data."""
sc = data.context
- train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
- d._jrdd, iterations, step, regParam, miniBatchFraction, i)
- return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
+ jrdd, iterations, step, regParam, miniBatchFraction, i)
+ return _regression_train_wrapper(sc, train, LassoModel, data, initialWeights)
class RidgeRegressionModel(LinearRegressionModelBase):
@@ -211,9 +231,9 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
- >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -224,7 +244,7 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -238,9 +258,12 @@ class RidgeRegressionWithSGD(object):
miniBatchFraction=1.0, initialWeights=None):
"""Train a ridge regression model on the given data."""
sc = data.context
- train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
- d._jrdd, iterations, step, regParam, miniBatchFraction, i)
- return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
+ jrdd, iterations, step, regParam, miniBatchFraction, i)
+
+ return _regression_train_wrapper(sc, train, RidgeRegressionModel, data, initialWeights)
def _test():
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 8c726f171c..b9de0909a6 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -19,14 +19,26 @@
Python package for statistical functions in MLlib.
"""
-from pyspark.mllib._common import \
- _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
- _serialize_double, _deserialize_double_matrix, _deserialize_double_vector
+from functools import wraps
+
+from pyspark import PickleSerializer
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
+def serialize(f):
+ ser = PickleSerializer()
+
+ @wraps(f)
+ def func(self):
+ jvec = f(self)
+ bytes = self._sc._jvm.SerDe.dumps(jvec)
+ return ser.loads(str(bytes)).toArray()
+
+ return func
+
+
class MultivariateStatisticalSummary(object):
"""
@@ -44,33 +56,38 @@ class MultivariateStatisticalSummary(object):
def __del__(self):
self._sc._gateway.detach(self._java_summary)
+ @serialize
def mean(self):
- return _deserialize_double_vector(self._java_summary.mean())
+ return self._java_summary.mean()
+ @serialize
def variance(self):
- return _deserialize_double_vector(self._java_summary.variance())
+ return self._java_summary.variance()
def count(self):
return self._java_summary.count()
+ @serialize
def numNonzeros(self):
- return _deserialize_double_vector(self._java_summary.numNonzeros())
+ return self._java_summary.numNonzeros()
+ @serialize
def max(self):
- return _deserialize_double_vector(self._java_summary.max())
+ return self._java_summary.max()
+ @serialize
def min(self):
- return _deserialize_double_vector(self._java_summary.min())
+ return self._java_summary.min()
class Statistics(object):
@staticmethod
- def colStats(X):
+ def colStats(rdd):
"""
Computes column-wise summary statistics for the input RDD[Vector].
- >>> from linalg import Vectors
+ >>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8])])
@@ -88,9 +105,9 @@ class Statistics(object):
>>> cStats.min()
array([ 2., 0., 0., -2.])
"""
- sc = X.ctx
- Xser = _get_unmangled_double_vector_rdd(X)
- cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd)
+ sc = rdd.ctx
+ jrdd = rdd._to_java_object_rdd()
+ cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)
@staticmethod
@@ -117,7 +134,7 @@ class Statistics(object):
>>> from math import isnan
>>> isnan(Statistics.corr(x, zeros))
True
- >>> from linalg import Vectors
+ >>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
>>> pearsonCorr = Statistics.corr(rdd)
@@ -144,18 +161,16 @@ class Statistics(object):
# check if y is used to specify the method name instead.
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
+
+ jx = x._to_java_object_rdd()
if not y:
- try:
- Xser = _get_unmangled_double_vector_rdd(x)
- except TypeError:
- raise TypeError("corr called on a single RDD not consisted of Vectors.")
- resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
- return _deserialize_double_matrix(resultMat)
+ resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
+ bytes = sc._jvm.SerDe.dumps(resultMat)
+ ser = PickleSerializer()
+ return ser.loads(str(bytes)).toArray()
else:
- xSer = _get_unmangled_rdd(x, _serialize_double)
- ySer = _get_unmangled_rdd(y, _serialize_double)
- result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method)
- return result
+ jy = y._to_java_object_rdd()
+ return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
def _test():
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8a851bd35c..f72e88ba6e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -20,6 +20,8 @@ Fuller unit tests for Python MLlib.
"""
import sys
+import array as pyarray
+
from numpy import array, array_equal
if sys.version_info[:2] <= (2, 6):
@@ -27,9 +29,8 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
-from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
- _deserialize_double_vector, _dot, _squared_distance
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer
+from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
from pyspark.tests import PySparkTestCase
@@ -42,39 +43,52 @@ except:
# No SciPy, but that's okay, we'll skip those tests
pass
+ser = PickleSerializer()
+
+
+def _squared_distance(a, b):
+ if isinstance(a, Vector):
+ return a.squared_distance(b)
+ else:
+ return b.squared_distance(a)
-class VectorTests(unittest.TestCase):
+
+class VectorTests(PySparkTestCase):
+
+ def _test_serialize(self, v):
+ jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
+ nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
+ self.assertEqual(v, nv)
+ vs = [v] * 100
+ jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
+ nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
+ self.assertEqual(vs, nvs)
def test_serialize(self):
- sv = SparseVector(4, {1: 1, 3: 2})
- dv = array([1., 2., 3., 4.])
- lst = [1, 2, 3, 4]
- self.assertTrue(sv is _convert_vector(sv))
- self.assertTrue(dv is _convert_vector(dv))
- self.assertTrue(array_equal(dv, _convert_vector(lst)))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv)))
- self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv))))
- self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst))))
+ self._test_serialize(DenseVector(range(10)))
+ self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
+ self._test_serialize(DenseVector(pyarray.array('d', range(10))))
+ self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
- dv = array([1., 2., 3., 4.])
- lst = [1, 2, 3, 4]
+ dv = DenseVector(array([1., 2., 3., 4.]))
+ lst = DenseVector([1, 2, 3, 4])
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
- self.assertEquals(10.0, _dot(sv, dv))
- self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
- self.assertEquals(30.0, _dot(dv, dv))
- self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
- self.assertEquals(30.0, _dot(lst, dv))
- self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
+ self.assertEquals(10.0, sv.dot(dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat)))
+ self.assertEquals(30.0, dv.dot(dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat)))
+ self.assertEquals(30.0, lst.dot(dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), lst.dot(mat)))
def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
- dv = array([1., 2., 3., 4.])
- lst = [4, 3, 2, 1]
+ dv = DenseVector(array([1., 2., 3., 4.]))
+ lst = DenseVector([4, 3, 2, 1])
self.assertEquals(15.0, _squared_distance(sv, dv))
self.assertEquals(25.0, _squared_distance(sv, lst))
self.assertEquals(20.0, _squared_distance(dv, lst))
@@ -198,41 +212,36 @@ class SciPyTests(PySparkTestCase):
lil[1, 0] = 1
lil[3, 0] = 2
sv = SparseVector(4, {1: 1, 3: 2})
- self.assertEquals(sv, _convert_vector(lil))
- self.assertEquals(sv, _convert_vector(lil.tocsc()))
- self.assertEquals(sv, _convert_vector(lil.tocoo()))
- self.assertEquals(sv, _convert_vector(lil.tocsr()))
- self.assertEquals(sv, _convert_vector(lil.todok()))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil)))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok())))
+ self.assertEquals(sv, _convert_to_vector(lil))
+ self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
+ self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
+ self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
+ self.assertEquals(sv, _convert_to_vector(lil.todok()))
+
+ def serialize(l):
+ return ser.loads(ser.dumps(_convert_to_vector(l)))
+ self.assertEquals(sv, serialize(lil))
+ self.assertEquals(sv, serialize(lil.tocsc()))
+ self.assertEquals(sv, serialize(lil.tocsr()))
+ self.assertEquals(sv, serialize(lil.todok()))
def test_dot(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
- dv = array([1., 2., 3., 4.])
- sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
- mat = array([[1., 2., 3., 4.],
- [1., 2., 3., 4.],
- [1., 2., 3., 4.],
- [1., 2., 3., 4.]])
- self.assertEquals(10.0, _dot(lil, dv))
- self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+ dv = DenseVector(array([1., 2., 3., 4.]))
+ self.assertEquals(10.0, dv.dot(lil))
def test_squared_distance(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 3
lil[3, 0] = 2
- dv = array([1., 2., 3., 4.])
+ dv = DenseVector(array([1., 2., 3., 4.]))
sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
- self.assertEquals(15.0, _squared_distance(lil, dv))
- self.assertEquals(15.0, _squared_distance(lil, sv))
- self.assertEquals(15.0, _squared_distance(dv, lil))
- self.assertEquals(15.0, _squared_distance(sv, lil))
+ self.assertEquals(15.0, dv.squared_distance(lil))
+ self.assertEquals(15.0, sv.squared_distance(lil))
def scipy_matrix(self, size, values):
"""Create a column SciPy matrix from a dictionary of values"""
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5b13ab682b..f59a818a6e 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -18,13 +18,9 @@
from py4j.java_collections import MapConverter
from pyspark import SparkContext, RDD
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _serialize_double_vector, \
- _deserialize_labeled_point, _get_unmangled_labeled_point_rdd, \
- _deserialize_double
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
-from pyspark.serializers import NoOpSerializer
-
__all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -55,21 +51,24 @@ class DecisionTreeModel(object):
:param x: Data point (feature vector),
or an RDD of data points (feature vectors).
"""
- pythonAPI = self._sc._jvm.PythonMLLibAPI()
+ SerDe = self._sc._jvm.SerDe
+ ser = PickleSerializer()
if isinstance(x, RDD):
# Bulk prediction
- if x.count() == 0:
+ first = x.take(1)
+ if not first:
return self._sc.parallelize([])
- dataBytes = _get_unmangled_double_vector_rdd(x, cache=False)
- jSerializedPreds = \
- pythonAPI.predictDecisionTreeModel(self._java_model,
- dataBytes._jrdd)
- serializedPreds = RDD(jSerializedPreds, self._sc, NoOpSerializer())
- return serializedPreds.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ if not isinstance(first[0], Vector):
+ x = x.map(_convert_to_vector)
+ jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
+ jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+ return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
+
else:
# Assume x is a single data point.
- x_ = _serialize_double_vector(x)
- return pythonAPI.predictDecisionTreeModel(self._java_model, x_)
+ bytes = bytearray(ser.dumps(_convert_to_vector(x)))
+ vec = self._sc._jvm.SerDe.loads(bytes)
+ return self._java_model.predict(vec)
def numNodes(self):
return self._java_model.numNodes()
@@ -77,7 +76,7 @@ class DecisionTreeModel(object):
def depth(self):
return self._java_model.depth()
- def __str__(self):
+ def __repr__(self):
return self._java_model.toString()
@@ -90,53 +89,24 @@ class DecisionTree(object):
EXPERIMENTAL: This is an experimental API.
It will probably be modified for Spark v1.2.
- Example usage:
-
- >>> from numpy import array
- >>> import sys
- >>> from pyspark.mllib.regression import LabeledPoint
- >>> from pyspark.mllib.tree import DecisionTree
- >>> from pyspark.mllib.linalg import SparseVector
- >>>
- >>> data = [
- ... LabeledPoint(0.0, [0.0]),
- ... LabeledPoint(1.0, [1.0]),
- ... LabeledPoint(1.0, [2.0]),
- ... LabeledPoint(1.0, [3.0])
- ... ]
- >>> categoricalFeaturesInfo = {} # no categorical features
- >>> model = DecisionTree.trainClassifier(sc.parallelize(data), numClasses=2,
- ... categoricalFeaturesInfo=categoricalFeaturesInfo)
- >>> sys.stdout.write(model)
- DecisionTreeModel classifier
- If (feature 0 <= 0.5)
- Predict: 0.0
- Else (feature 0 > 0.5)
- Predict: 1.0
- >>> model.predict(array([1.0])) > 0
- True
- >>> model.predict(array([0.0])) == 0
- True
- >>> sparse_data = [
- ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
- ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
- ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
- ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
- ... ]
- >>>
- >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data),
- ... categoricalFeaturesInfo=categoricalFeaturesInfo)
- >>> model.predict(array([0.0, 1.0])) == 1
- True
- >>> model.predict(array([0.0, 0.0])) == 0
- True
- >>> model.predict(SparseVector(2, {1: 1.0})) == 1
- True
- >>> model.predict(SparseVector(2, {1: 0.0})) == 0
- True
"""
@staticmethod
+ def _train(data, type, numClasses, categoricalFeaturesInfo,
+ impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
+ minInfoGain=0.0):
+ first = data.first()
+ assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
+ sc = data.context
+ jrdd = data._to_java_object_rdd()
+ cfiMap = MapConverter().convert(categoricalFeaturesInfo,
+ sc._gateway._gateway_client)
+ model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
+ jrdd, type, numClasses, cfiMap,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
+ return DecisionTreeModel(sc, model)
+
+ @staticmethod
def trainClassifier(data, numClasses, categoricalFeaturesInfo,
impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
minInfoGain=0.0):
@@ -159,18 +129,34 @@ class DecisionTree(object):
the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
+
+ Example usage:
+
+ >>> from numpy import array
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> from pyspark.mllib.tree import DecisionTree
+ >>> from pyspark.mllib.linalg import SparseVector
+ >>>
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
+ >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
+ >>> print model, # it already has newline
+ DecisionTreeModel classifier
+ If (feature 0 <= 0.5)
+ Predict: 0.0
+ Else (feature 0 > 0.5)
+ Predict: 1.0
+ >>> model.predict(array([1.0])) > 0
+ True
+ >>> model.predict(array([0.0])) == 0
+ True
"""
- sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- categoricalFeaturesInfoJMap = \
- MapConverter().convert(categoricalFeaturesInfo,
- sc._gateway._gateway_client)
- model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, "classification",
- numClasses, categoricalFeaturesInfoJMap,
- impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
- dataBytes.unpersist()
- return DecisionTreeModel(sc, model)
+ return DecisionTree._train(data, "classification", numClasses, categoricalFeaturesInfo,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
@staticmethod
def trainRegressor(data, categoricalFeaturesInfo,
@@ -194,18 +180,33 @@ class DecisionTree(object):
the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
+
+ Example usage:
+
+ >>> from numpy import array
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> from pyspark.mllib.tree import DecisionTree
+ >>> from pyspark.mllib.linalg import SparseVector
+ >>>
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>>
+ >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
+ >>> model.predict(array([0.0, 1.0])) == 1
+ True
+ >>> model.predict(array([0.0, 0.0])) == 0
+ True
+ >>> model.predict(SparseVector(2, {1: 1.0})) == 1
+ True
+ >>> model.predict(SparseVector(2, {1: 0.0})) == 0
+ True
"""
- sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- categoricalFeaturesInfoJMap = \
- MapConverter().convert(categoricalFeaturesInfo,
- sc._gateway._gateway_client)
- model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, "regression",
- 0, categoricalFeaturesInfoJMap,
- impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
- dataBytes.unpersist()
- return DecisionTreeModel(sc, model)
+ return DecisionTree._train(data, "regression", 0, categoricalFeaturesInfo,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
def _test():
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1c7b8c809a..8233d4e81f 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -18,11 +18,10 @@
import numpy as np
import warnings
-from pyspark.mllib.linalg import Vectors, SparseVector
-from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
from pyspark.rdd import RDD
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
class MLUtils(object):
@@ -32,15 +31,12 @@ class MLUtils(object):
"""
@staticmethod
- def _parse_libsvm_line(line, multiclass):
- warnings.warn("deprecated", DeprecationWarning)
- return _parse_libsvm_line(line)
-
- @staticmethod
- def _parse_libsvm_line(line):
+ def _parse_libsvm_line(line, multiclass=None):
"""
Parses a line in LIBSVM format into (label, indices, values).
"""
+ if multiclass is not None:
+ warnings.warn("deprecated", DeprecationWarning)
items = line.split(None)
label = float(items[0])
nnz = len(items) - 1
@@ -55,27 +51,20 @@ class MLUtils(object):
@staticmethod
def _convert_labeled_point_to_libsvm(p):
"""Converts a LabeledPoint to a string in LIBSVM format."""
+ assert isinstance(p, LabeledPoint)
items = [str(p.label)]
- v = _convert_vector(p.features)
- if type(v) == np.ndarray:
- for i in xrange(len(v)):
- items.append(str(i + 1) + ":" + str(v[i]))
- elif type(v) == SparseVector:
+ v = _convert_to_vector(p.features)
+ if isinstance(v, SparseVector):
nnz = len(v.indices)
for i in xrange(nnz):
items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
else:
- raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector"
- " but got " % type(v))
+ for i in xrange(len(v)):
+ items.append(str(i + 1) + ":" + str(v[i]))
return " ".join(items)
@staticmethod
- def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None):
- warnings.warn("deprecated", DeprecationWarning)
- return loadLibSVMFile(sc, path, numFeatures, minPartitions)
-
- @staticmethod
- def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None):
+ def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None, multiclass=None):
"""
Loads labeled data in the LIBSVM format into an RDD of
LabeledPoint. The LIBSVM format is a text-based format used by
@@ -122,6 +111,8 @@ class MLUtils(object):
>>> print examples[2]
(-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
"""
+ if multiclass is not None:
+ warnings.warn("deprecated", DeprecationWarning)
lines = sc.textFile(path, minPartitions)
parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l))
@@ -182,9 +173,9 @@ class MLUtils(object):
(0.0,[1.01,2.02,3.03])
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
- jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
- serialized = RDD(jSerialized, sc, NoOpSerializer())
- return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes)))
+ jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
+ jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
+ return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
def _test():
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b43606b730..8ef233bc80 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -34,7 +34,7 @@ from math import sqrt, log, isinf, isnan
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
- PickleSerializer, pack_long, CompressedSerializer
+ PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -1927,10 +1927,10 @@ class RDD(object):
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
- if not self._is_pickled():
- self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
- batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
- return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+ rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
+ if not self._is_pickled() else self
+ is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
def countApprox(self, timeout, confidence=0.95):
"""
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 44ac564283..2672da36c1 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -68,6 +68,7 @@ import sys
import types
import collections
import zlib
+import itertools
from pyspark import cloudpickle
@@ -214,6 +215,41 @@ class BatchedSerializer(Serializer):
return "BatchedSerializer<%s>" % str(self.serializer)
+class AutoBatchedSerializer(BatchedSerializer):
+ """
+ Choose the size of batch automatically based on the size of object
+ """
+
+ def __init__(self, serializer, bestSize=1 << 20):
+ BatchedSerializer.__init__(self, serializer, -1)
+ self.bestSize = bestSize
+
+ def dump_stream(self, iterator, stream):
+ batch, best = 1, self.bestSize
+ iterator = iter(iterator)
+ while True:
+ vs = list(itertools.islice(iterator, batch))
+ if not vs:
+ break
+
+ bytes = self.serializer.dumps(vs)
+ write_int(len(bytes), stream)
+ stream.write(bytes)
+
+ size = len(bytes)
+ if size < best:
+ batch *= 2
+ elif size > best * 10 and batch > 1:
+ batch /= 2
+
+ def __eq__(self, other):
+ return (isinstance(other, AutoBatchedSerializer) and
+ other.serializer == self.serializer)
+
+ def __str__(self):
+ return "BatchedSerializer<%s>" % str(self.serializer)
+
+
class CartesianDeserializer(FramedSerializer):
"""