aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-10 22:26:16 -0800
committerXiangrui Meng <meng@databricks.com>2014-11-10 22:26:16 -0800
commit65083e93ddd552b7d3e4eb09f87c091ef2ae83a2 (patch)
tree38c82101a74f8dd099f062b574e6b7781b4801d8 /python
parent3c07b8f08240bafcdff5d174989fb433f4bc80b6 (diff)
downloadspark-65083e93ddd552b7d3e4eb09f87c091ef2ae83a2.tar.gz
spark-65083e93ddd552b7d3e4eb09f87c091ef2ae83a2.tar.bz2
spark-65083e93ddd552b7d3e4eb09f87c091ef2ae83a2.zip
[SPARK-4324] [PySpark] [MLlib] support numpy.array for all MLlib API
This PR check all of the existing Python MLlib API to make sure that numpy.array is supported as Vector (also RDD of numpy.array). It also improve some docstring and doctest. cc mateiz mengxr Author: Davies Liu <davies@databricks.com> Closes #3189 from davies/numpy and squashes the following commits: d5057c4 [Davies Liu] fix tests 6987611 [Davies Liu] support numpy.array for all MLlib API
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/classification.py13
-rw-r--r--python/pyspark/mllib/feature.py31
-rw-r--r--python/pyspark/mllib/random.py45
-rw-r--r--python/pyspark/mllib/recommendation.py6
-rw-r--r--python/pyspark/mllib/regression.py15
-rw-r--r--python/pyspark/mllib/stat.py16
-rw-r--r--python/pyspark/mllib/util.py11
7 files changed, 105 insertions, 32 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 297a2bf37d..5d90dddb5d 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -62,6 +62,7 @@ class LogisticRegressionModel(LinearModel):
"""
def predict(self, x):
+ x = _convert_to_vector(x)
margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
@@ -79,7 +80,7 @@ class LogisticRegressionWithSGD(object):
"""
Train a logistic regression model on the given data.
- :param data: The training data.
+ :param data: The training data, an RDD of LabeledPoint.
:param iterations: The number of iterations (default: 100).
:param step: The step parameter used in SGD
(default: 1.0).
@@ -136,6 +137,7 @@ class SVMModel(LinearModel):
"""
def predict(self, x):
+ x = _convert_to_vector(x)
margin = self.weights.dot(x) + self.intercept
return 1 if margin >= 0 else 0
@@ -148,7 +150,7 @@ class SVMWithSGD(object):
"""
Train a support vector machine on the given data.
- :param data: The training data.
+ :param data: The training data, an RDD of LabeledPoint.
:param iterations: The number of iterations (default: 100).
:param step: The step parameter used in SGD
(default: 1.0).
@@ -233,11 +235,12 @@ class NaiveBayes(object):
classification. By making every vector a 0-1 vector, it can also be
used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
- :param data: RDD of NumPy vectors, one per element, where the first
- coordinate is the label and the rest is the feature vector
- (e.g. a count vector).
+ :param data: RDD of LabeledPoint.
:param lambda_: The smoothing parameter
"""
+ first = data.first()
+ if not isinstance(first, LabeledPoint):
+ raise ValueError("`data` should be an RDD of LabeledPoint")
labels, pi, theta = callMLlibFunc("trainNaiveBayes", data, lambda_)
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index 44bf6f269d..9ec28079ae 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -25,7 +25,7 @@ from py4j.protocol import Py4JJavaError
from pyspark import RDD, SparkContext
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.linalg import Vectors, _convert_to_vector
__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
@@ -81,12 +81,16 @@ class Normalizer(VectorTransformer):
"""
Applies unit length normalization on a vector.
- :param vector: vector to be normalized.
+ :param vector: vector or RDD of vector to be normalized.
:return: normalized vector. If the norm of the input is zero, it
will return the input vector.
"""
sc = SparkContext._active_spark_context
assert sc is not None, "SparkContext should be initialized first"
+ if isinstance(vector, RDD):
+ vector = vector.map(_convert_to_vector)
+ else:
+ vector = _convert_to_vector(vector)
return callMLlibFunc("normalizeVector", self.p, vector)
@@ -95,8 +99,12 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer):
Wrapper for the model in JVM
"""
- def transform(self, dataset):
- return self.call("transform", dataset)
+ def transform(self, vector):
+ if isinstance(vector, RDD):
+ vector = vector.map(_convert_to_vector)
+ else:
+ vector = _convert_to_vector(vector)
+ return self.call("transform", vector)
class StandardScalerModel(JavaVectorTransformer):
@@ -109,7 +117,7 @@ class StandardScalerModel(JavaVectorTransformer):
"""
Applies standardization transformation on a vector.
- :param vector: Vector to be standardized.
+ :param vector: Vector or RDD of Vector to be standardized.
:return: Standardized vector. If the variance of a column is zero,
it will return default `0.0` for the column with zero variance.
"""
@@ -154,6 +162,7 @@ class StandardScaler(object):
the transformation model.
:return: a StandardScalarModel
"""
+ dataset = dataset.map(_convert_to_vector)
jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset)
return StandardScalerModel(jmodel)
@@ -211,6 +220,8 @@ class IDFModel(JavaVectorTransformer):
:param dataset: an RDD of term frequency vectors
:return: an RDD of TF-IDF vectors
"""
+ if not isinstance(dataset, RDD):
+ raise TypeError("dataset should be an RDD of term frequency vectors")
return JavaVectorTransformer.transform(self, dataset)
@@ -255,7 +266,9 @@ class IDF(object):
:param dataset: an RDD of term frequency vectors
"""
- jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset)
+ if not isinstance(dataset, RDD):
+ raise TypeError("dataset should be an RDD of term frequency vectors")
+ jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector))
return IDFModel(jmodel)
@@ -287,6 +300,8 @@ class Word2VecModel(JavaVectorTransformer):
Note: local use only
"""
+ if not isinstance(word, basestring):
+ word = _convert_to_vector(word)
words, similarity = self.call("findSynonyms", word, num)
return zip(words, similarity)
@@ -374,9 +389,11 @@ class Word2Vec(object):
"""
Computes the vector representation of each word in vocabulary.
- :param data: training data. RDD of subtype of Iterable[String]
+ :param data: training data. RDD of list of string
:return: Word2VecModel instance
"""
+ if not isinstance(data, RDD):
+ raise TypeError("data should be an RDD of list of string")
jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
float(self.learningRate), int(self.numPartitions),
int(self.numIterations), long(self.seed))
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 7eebfc6bcd..cb4304f921 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -52,6 +52,12 @@ class RandomRDDs(object):
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
.map(lambda v: a + (b - a) * v)}
+ :param sc: SparkContext used to create the RDD.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
+
>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
>>> len(x)
100
@@ -76,6 +82,12 @@ class RandomRDDs(object):
C{RandomRDDs.normal(sc, n, p, seed)\
.map(lambda v: mean + sigma * v)}
+ :param sc: SparkContext used to create the RDD.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
+
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
>>> stats = x.stats()
>>> stats.count()
@@ -93,6 +105,13 @@ class RandomRDDs(object):
Generates an RDD comprised of i.i.d. samples from the Poisson
distribution with the input mean.
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean, or lambda, for the Poisson distribution.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
+
>>> mean = 100.0
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
>>> stats = x.stats()
@@ -104,7 +123,7 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
- return callMLlibFunc("poissonRDD", sc._jsc, mean, size, numPartitions, seed)
+ return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
@staticmethod
@toArray
@@ -113,6 +132,13 @@ class RandomRDDs(object):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the uniform distribution U(0.0, 1.0).
+ :param sc: SparkContext used to create the RDD.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD.
+ :param seed: Seed for the RNG that generates the seed for the generator in each partition.
+ :return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
+
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
>>> mat.shape
@@ -131,6 +157,13 @@ class RandomRDDs(object):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the standard normal distribution.
+ :param sc: SparkContext used to create the RDD.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
+
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
>>> mat.shape
@@ -149,6 +182,14 @@ class RandomRDDs(object):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Poisson distribution with the input mean.
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean, or lambda, for the Poisson distribution.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
+
>>> import numpy as np
>>> mean = 100.0
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
@@ -161,7 +202,7 @@ class RandomRDDs(object):
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
- return callMLlibFunc("poissonVectorRDD", sc._jsc, mean, numRows, numCols,
+ return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
numPartitions, seed)
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index e26b152e0c..41bbd9a779 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -32,7 +32,7 @@ class Rating(object):
return Rating, (self.user, self.product, self.rating)
def __repr__(self):
- return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+ return "Rating(%d, %d, %s)" % (self.user, self.product, self.rating)
class MatrixFactorizationModel(JavaModelWrapper):
@@ -51,7 +51,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
>>> testset = sc.parallelize([(1, 2), (1, 1)])
>>> model = ALS.train(ratings, 1, seed=10)
>>> model.predictAll(testset).collect()
- [Rating(1, 1, 1), Rating(1, 2, 1)]
+ [Rating(1, 1, 1.0471...), Rating(1, 2, 1.9679...)]
>>> model = ALS.train(ratings, 4, seed=10)
>>> model.userFeatures().collect()
@@ -79,7 +79,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
0.4473...
"""
def predict(self, user, product):
- return self._java_model.predict(user, product)
+ return self._java_model.predict(int(user), int(product))
def predictAll(self, user_product):
assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 43c1a2fc10..66e25a48df 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -36,7 +36,7 @@ class LabeledPoint(object):
"""
def __init__(self, label, features):
- self.label = label
+ self.label = float(label)
self.features = _convert_to_vector(features)
def __reduce__(self):
@@ -46,7 +46,7 @@ class LabeledPoint(object):
return "(" + ",".join((str(self.label), str(self.features))) + ")"
def __repr__(self):
- return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
+ return "LabeledPoint(%s, %s)" % (self.label, self.features)
class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
def __init__(self, weights, intercept):
self._coeff = _convert_to_vector(weights)
- self._intercept = intercept
+ self._intercept = float(intercept)
@property
def weights(self):
@@ -66,7 +66,7 @@ class LinearModel(object):
return self._intercept
def __repr__(self):
- return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept)
+ return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)
class LinearRegressionModelBase(LinearModel):
@@ -85,6 +85,7 @@ class LinearRegressionModelBase(LinearModel):
Predict the value of the dependent variable given a vector x
containing values for the independent variables.
"""
+ x = _convert_to_vector(x)
return self.weights.dot(x) + self.intercept
@@ -124,6 +125,9 @@ class LinearRegressionModel(LinearRegressionModelBase):
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
+ first = data.first()
+ if not isinstance(first, LabeledPoint):
+ raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first)
initial_weights = initial_weights or [0.0] * len(data.first().features)
weights, intercept = train_func(_to_java_object_rdd(data, cache=True),
_convert_to_vector(initial_weights))
@@ -264,7 +268,8 @@ class RidgeRegressionWithSGD(object):
def _test():
import doctest
from pyspark import SparkContext
- globs = globals().copy()
+ import pyspark.mllib.regression
+ globs = pyspark.mllib.regression.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 0700f8a8e5..1980f5b03f 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Matrix, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
__all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics']
@@ -107,6 +108,11 @@ class Statistics(object):
"""
Computes column-wise summary statistics for the input RDD[Vector].
+ :param rdd: an RDD[Vector] for which column-wise summary statistics
+ are to be computed.
+ :return: :class:`MultivariateStatisticalSummary` object containing
+ column-wise summary statistics.
+
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
@@ -140,6 +146,13 @@ class Statistics(object):
to specify the method to be used for single RDD inout.
If two RDDs of floats are passed in, a single float is returned.
+ :param x: an RDD of vector for which the correlation matrix is to be computed,
+ or an RDD of float of the same cardinality as y when y is specified.
+ :param y: an RDD of float of the same cardinality as x.
+ :param method: String specifying the method to use for computing correlation.
+ Supported: `pearson` (default), `spearman`
+ :return: Correlation matrix comparing columns in x.
+
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
@@ -242,7 +255,6 @@ class Statistics(object):
>>> print round(chi.statistic, 4)
21.9958
- >>> from pyspark.mllib.regression import LabeledPoint
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
@@ -257,6 +269,8 @@ class Statistics(object):
1.5
"""
if isinstance(observed, RDD):
+ if not isinstance(observed.first(), LabeledPoint):
+ raise ValueError("observed should be an RDD of LabeledPoint")
jmodels = callMLlibFunc("chiSqTest", observed)
return [ChiSqTestResult(m) for m in jmodels]
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 96aef8f510..4ed978b454 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -161,15 +161,8 @@ class MLUtils(object):
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
- >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
- >>> type(loaded[0]) == LabeledPoint
- True
- >>> print examples[0]
- (1.1,(3,[0,2],[-1.23,4.56e-07]))
- >>> type(examples[1]) == LabeledPoint
- True
- >>> print examples[1]
- (0.0,[1.01,2.02,3.03])
+ >>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
+ [LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])]
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)