aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-10-28 03:50:22 -0700
committerXiangrui Meng <meng@databricks.com>2014-10-28 03:50:22 -0700
commitfae095bc7c4097859af522ced77f09cf6be17691 (patch)
tree8b4be3f716e8312c372105b4cc5132860bed2412 /python/pyspark/mllib
parent46c63417c1bb1aea07baf9036cc5b8f1c3781bbe (diff)
downloadspark-fae095bc7c4097859af522ced77f09cf6be17691.tar.gz
spark-fae095bc7c4097859af522ced77f09cf6be17691.tar.bz2
spark-fae095bc7c4097859af522ced77f09cf6be17691.zip
[SPARK-3961] [MLlib] [PySpark] Python API for mllib.feature
Added completed Python API for MLlib.feature Normalizer StandardScalerModel StandardScaler HashTF IDFModel IDF cc mengxr Author: Davies Liu <davies@databricks.com> Author: Davies Liu <davies.liu@gmail.com> Closes #2819 from davies/feature and squashes the following commits: 4f48f48 [Davies Liu] add a note for HashingTF 67f6d21 [Davies Liu] address comments b628693 [Davies Liu] rollback changes in Word2Vec efb4f4f [Davies Liu] Merge branch 'master' into feature 806c7c2 [Davies Liu] address comments 3abb8c2 [Davies Liu] address comments 59781b9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into feature a405ae7 [Davies Liu] fix tests 7a1891a [Davies Liu] fix tests 486795f [Davies Liu] update programming guide, HashTF -> HashingTF 8a50584 [Davies Liu] Python API for mllib.feature
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r--python/pyspark/mllib/feature.py395
-rw-r--r--python/pyspark/mllib/linalg.py16
2 files changed, 354 insertions, 57 deletions
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index b5a3f22c69..324343443e 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -18,59 +18,357 @@
"""
Python package for feature in MLlib.
"""
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+ 'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+_old_smart_decode = py4j.protocol.smart_decode
+
+_float_str_mapping = {
+ u'nan': u'NaN',
+ u'inf': u'Infinity',
+ u'-inf': u'-Infinity',
+}
+
+
+def _new_smart_decode(obj):
+ if isinstance(obj, float):
+ s = unicode(obj)
+ return _float_str_mapping.get(s, s)
+ return _old_smart_decode(obj)
+
+py4j.protocol.smart_decode = _new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+ 'LinkedList',
+ 'SparseVector',
+ 'DenseVector',
+ 'DenseMatrix',
+ 'Rating',
+ 'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+ """ Convert Python object into Java """
+ if isinstance(a, RDD):
+ a = _to_java_object_rdd(a)
+ elif not isinstance(a, (int, long, float, bool, basestring)):
+ bytes = bytearray(PickleSerializer().dumps(a))
+ a = sc._jvm.SerDe.loads(bytes)
+ return a
+
+
+def _java2py(sc, r):
+ if isinstance(r, JavaObject):
+ clsName = r.getClass().getSimpleName()
+ if clsName in ("RDD", "JavaRDD"):
+ if clsName == "RDD":
+ r = r.toJavaRDD()
+ jrdd = sc._jvm.SerDe.javaToPython(r)
+ return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
-__all__ = ['Word2Vec', 'Word2VecModel']
+ elif clsName in _picklable_classes:
+ r = sc._jvm.SerDe.dumps(r)
+ if isinstance(r, bytearray):
+ r = PickleSerializer().loads(str(r))
+ return r
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+ """ Call Java Function
"""
- class for Word2Vec model
+ args = [_py2java(sc, a) for a in args]
+ return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+ """ Call API in PythonMLLibAPI
"""
- def __init__(self, sc, java_model):
+ api = getattr(sc._jvm.PythonMLLibAPI(), name)
+ return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+ """
+ :: DeveloperApi ::
+
+ Base class for transformation of a vector or RDD of vector
+ """
+ def transform(self, vector):
+ """
+ Applies transformation on a vector.
+
+ :param vector: vector to be transformed.
+ """
+ raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+ """
+ :: Experimental ::
+
+ Normalizes samples individually to unit L\ :sup:`p`\ norm
+
+ For any 1 <= `p` <= float('inf'), normalizes samples using
+ sum(abs(vector). :sup:`p`) :sup:`(1/p)` as norm.
+
+ For `p` = float('inf'), max(abs(vector)) will be used as norm for normalization.
+
+ >>> v = Vectors.dense(range(3))
+ >>> nor = Normalizer(1)
+ >>> nor.transform(v)
+ DenseVector([0.0, 0.3333, 0.6667])
+
+ >>> rdd = sc.parallelize([v])
+ >>> nor.transform(rdd).collect()
+ [DenseVector([0.0, 0.3333, 0.6667])]
+
+ >>> nor2 = Normalizer(float("inf"))
+ >>> nor2.transform(v)
+ DenseVector([0.0, 0.5, 1.0])
+ """
+ def __init__(self, p=2.0):
"""
- :param sc: Spark context
- :param java_model: Handle to Java model object
+ :param p: Normalization in L^p^ space, p = 2 by default.
"""
+ assert p >= 1.0, "p should be greater than 1.0"
+ self.p = float(p)
+
+ def transform(self, vector):
+ """
+ Applies unit length normalization on a vector.
+
+ :param vector: vector to be normalized.
+ :return: normalized vector. If the norm of the input is zero, it
+ will return the input vector.
+ """
+ sc = SparkContext._active_spark_context
+ assert sc is not None, "SparkContext should be initialized first"
+ return _callAPI(sc, "normalizeVector", self.p, vector)
+
+
+class JavaModelWrapper(VectorTransformer):
+ """
+ Wrapper for the model in JVM
+ """
+ def __init__(self, sc, java_model):
self._sc = sc
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
- def transform(self, word):
+ def transform(self, dataset):
+ return _callJavaFunc(self._sc, self._java_model.transform, dataset)
+
+
+class StandardScalerModel(JavaModelWrapper):
+ """
+ :: Experimental ::
+
+ Represents a StandardScaler model that can transform vectors.
+ """
+ def transform(self, vector):
"""
- :param word: a word
- :return: vector representation of word
+ Applies standardization transformation on a vector.
+
+ :param vector: Vector to be standardized.
+ :return: Standardized vector. If the variance of a column is zero,
+ it will return default `0.0` for the column with zero variance.
+ """
+ return JavaModelWrapper.transform(self, vector)
+
+
+class StandardScaler(object):
+ """
+ :: Experimental ::
+
+ Standardizes features by removing the mean and scaling to unit
+ variance using column summary statistics on the samples in the
+ training set.
+ >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
+ >>> dataset = sc.parallelize(vs)
+ >>> standardizer = StandardScaler(True, True)
+ >>> model = standardizer.fit(dataset)
+ >>> result = model.transform(dataset)
+ >>> for r in result.collect(): r
+ DenseVector([-0.7071, 0.7071, -0.7071])
+ DenseVector([0.7071, -0.7071, 0.7071])
+ """
+ def __init__(self, withMean=False, withStd=True):
+ """
+ :param withMean: False by default. Centers the data with mean
+ before scaling. It will build a dense output, so this
+ does not work on sparse input and will raise an exception.
+ :param withStd: True by default. Scales the data to unit standard
+ deviation.
+ """
+ if not (withMean or withStd):
+ warnings.warn("Both withMean and withStd are false. The model does nothing.")
+ self.withMean = withMean
+ self.withStd = withStd
+
+ def fit(self, dataset):
+ """
+ Computes the mean and variance and stores as a model to be used for later scaling.
+
+ :param data: The data used to compute the mean and variance to build
+ the transformation model.
+ :return: a StandardScalarModel
+ """
+ sc = dataset.context
+ jmodel = _callAPI(sc, "fitStandardScaler", self.withMean, self.withStd, dataset)
+ return StandardScalerModel(sc, jmodel)
+
+
+class HashingTF(object):
+ """
+ :: Experimental ::
+
+ Maps a sequence of terms to their term frequencies using the hashing trick.
+
+ Note: the terms must be hashable (can not be dict/set/list...).
+
+ >>> htf = HashingTF(100)
+ >>> doc = "a a b b c d".split(" ")
+ >>> htf.transform(doc)
+ SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
+ """
+ def __init__(self, numFeatures=1 << 20):
+ """
+ :param numFeatures: number of features (default: 2^20)
+ """
+ self.numFeatures = numFeatures
+
+ def indexOf(self, term):
+ """ Returns the index of the input term. """
+ return hash(term) % self.numFeatures
+
+ def transform(self, document):
+ """
+ Transforms the input document (list of terms) to term frequency vectors,
+ or transform the RDD of document to RDD of term frequency vectors.
+ """
+ if isinstance(document, RDD):
+ return document.map(self.transform)
+
+ freq = {}
+ for term in document:
+ i = self.indexOf(term)
+ freq[i] = freq.get(i, 0) + 1.0
+ return Vectors.sparse(self.numFeatures, freq.items())
+
+
+class IDFModel(JavaModelWrapper):
+ """
+ Represents an IDF model that can transform term frequency vectors.
+ """
+ def transform(self, dataset):
+ """
+ Transforms term frequency (TF) vectors to TF-IDF vectors.
+
+ If `minDocFreq` was set for the IDF calculation,
+ the terms which occur in fewer than `minDocFreq`
+ documents will have an entry of 0.
+
+ :param dataset: an RDD of term frequency vectors
+ :return: an RDD of TF-IDF vectors
+ """
+ return JavaModelWrapper.transform(self, dataset)
+
+
+class IDF(object):
+ """
+ :: Experimental ::
+
+ Inverse document frequency (IDF).
+
+ The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`,
+ where `m` is the total number of documents and `d(t)` is the number
+ of documents that contain term `t`.
+
+ This implementation supports filtering out terms which do not appear
+ in a minimum number of documents (controlled by the variable `minDocFreq`).
+ For terms that are not in at least `minDocFreq` documents, the IDF is
+ found as 0, resulting in TF-IDFs of 0.
+
+ >>> n = 4
+ >>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
+ ... Vectors.dense([0.0, 1.0, 2.0, 3.0]),
+ ... Vectors.sparse(n, [1], [1.0])]
+ >>> data = sc.parallelize(freqs)
+ >>> idf = IDF()
+ >>> model = idf.fit(data)
+ >>> tfidf = model.transform(data)
+ >>> for r in tfidf.collect(): r
+ SparseVector(4, {1: 0.0, 3: 0.5754})
+ DenseVector([0.0, 0.0, 1.3863, 0.863])
+ SparseVector(4, {1: 0.0})
+ """
+ def __init__(self, minDocFreq=0):
+ """
+ :param minDocFreq: minimum of documents in which a term
+ should appear for filtering
+ """
+ self.minDocFreq = minDocFreq
+
+ def fit(self, dataset):
+ """
+ Computes the inverse document frequency.
+
+ :param dataset: an RDD of term frequency vectors
+ """
+ sc = dataset.context
+ jmodel = _callAPI(sc, "fitIDF", self.minDocFreq, dataset)
+ return IDFModel(sc, jmodel)
+
+
+class Word2VecModel(JavaModelWrapper):
+ """
+ class for Word2Vec model
+ """
+ def transform(self, word):
+ """
Transforms a word to its vector representation
Note: local use only
+
+ :param word: a word
+ :return: vector representation of word(s)
"""
- # TODO: make transform usable in RDD operations from python side
- result = self._java_model.transform(word)
- return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result)))
+ try:
+ return _callJavaFunc(self._sc, self._java_model.transform, word)
+ except Py4JJavaError:
+ raise ValueError("%s not found" % word)
- def findSynonyms(self, x, num):
+ def findSynonyms(self, word, num):
"""
- :param x: a word or a vector representation of word
+ Find synonyms of a word
+
+ :param word: a word or a vector representation of word
:param num: number of synonyms to find
:return: array of (word, cosineSimilarity)
- Find synonyms of a word
-
Note: local use only
"""
- # TODO: make findSynonyms usable in RDD operations from python side
- ser = PickleSerializer()
- if type(x) == str:
- jlist = self._java_model.findSynonyms(x, num)
- else:
- bytes = bytearray(ser.dumps(_convert_to_vector(x)))
- vec = self._sc._jvm.SerDe.loads(bytes)
- jlist = self._java_model.findSynonyms(vec, num)
- words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist)))
+ words, similarity = _callJavaFunc(self._sc, self._java_model.findSynonyms, word, num)
return zip(words, similarity)
@@ -85,6 +383,7 @@ class Word2Vec(object):
We used skip-gram model in our implementation and hierarchical softmax
method to train the model. The variable names in the implementation
matches the original C implementation.
+
For original C implementation, see https://code.google.com/p/word2vec/
For research papers, see
Efficient Estimation of Word Representations in Vector Space
@@ -95,33 +394,26 @@ class Word2Vec(object):
>>> localDoc = [sentence, sentence]
>>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
>>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
+
>>> syms = model.findSynonyms("a", 2)
- >>> str(syms[0][0])
- 'b'
- >>> str(syms[1][0])
- 'c'
- >>> len(syms)
- 2
+ >>> [s[0] for s in syms]
+ [u'b', u'c']
>>> vec = model.transform("a")
- >>> len(vec)
- 10
>>> syms = model.findSynonyms(vec, 2)
- >>> str(syms[0][0])
- 'b'
- >>> str(syms[1][0])
- 'c'
- >>> len(syms)
- 2
+ >>> [s[0] for s in syms]
+ [u'b', u'c']
"""
def __init__(self):
"""
Construct Word2Vec instance
"""
+ import random # this can't be on the top because of mllib.random
+
self.vectorSize = 100
self.learningRate = 0.025
self.numPartitions = 1
self.numIterations = 1
- self.seed = 42L
+ self.seed = random.randint(0, sys.maxint)
def setVectorSize(self, vectorSize):
"""
@@ -164,20 +456,13 @@ class Word2Vec(object):
Computes the vector representation of each word in vocabulary.
:param data: training data. RDD of subtype of Iterable[String]
- :return: python Word2VecModel instance
+ :return: Word2VecModel instance
"""
sc = data.context
- ser = PickleSerializer()
- vectorSize = self.vectorSize
- learningRate = self.learningRate
- numPartitions = self.numPartitions
- numIterations = self.numIterations
- seed = self.seed
-
- model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
- _to_java_object_rdd(data), vectorSize,
- learningRate, numPartitions, numIterations, seed)
- return Word2VecModel(sc, model)
+ jmodel = _callAPI(sc, "trainWord2Vec", data, int(self.vectorSize),
+ float(self.learningRate), int(self.numPartitions),
+ int(self.numIterations), long(self.seed))
+ return Word2VecModel(sc, jmodel)
def _test():
@@ -191,4 +476,8 @@ def _test():
exit(-1)
if __name__ == "__main__":
+ # remove current path from list of search paths to avoid importing mllib.random
+ # for C{import random}, which is done in an external dependency of pyspark during doctests.
+ import sys
+ sys.path.pop(0)
_test()
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 773d8d3938..1b9bf59624 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -111,6 +111,13 @@ def _vector_size(v):
raise TypeError("Cannot treat type %s as a vector" % type(v))
+def _format_float(f, digits=4):
+ s = str(round(f, digits))
+ if '.' in s:
+ s = s[:s.index('.') + 1 + digits]
+ return s
+
+
class Vector(object):
"""
Abstract class for DenseVector and SparseVector
@@ -228,7 +235,7 @@ class DenseVector(Vector):
return "[" + ",".join([str(v) for v in self.array]) + "]"
def __repr__(self):
- return "DenseVector(%r)" % self.array
+ return "DenseVector([%s])" % (', '.join(_format_float(i) for i in self.array))
def __eq__(self, other):
return isinstance(other, DenseVector) and self.array == other.array
@@ -416,7 +423,7 @@ class SparseVector(Vector):
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
arr = np.zeros((self.size,), dtype=np.float64)
- for i in xrange(self.indices.size):
+ for i in xrange(len(self.indices)):
arr[self.indices[i]] = self.values[i]
return arr
@@ -431,7 +438,8 @@ class SparseVector(Vector):
def __repr__(self):
inds = self.indices
vals = self.values
- entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ entries = ", ".join(["{0}: {1}".format(inds[i], _format_float(vals[i]))
+ for i in xrange(len(inds))])
return "SparseVector({0}, {{{1}}})".format(self.size, entries)
def __eq__(self, other):
@@ -491,7 +499,7 @@ class Vectors(object):
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
- DenseVector(array('d', [1.0, 2.0, 3.0]))
+ DenseVector([1.0, 2.0, 3.0])
"""
return DenseVector(elements)