aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/mllib/classification.py13
-rw-r--r--python/pyspark/mllib/feature.py31
-rw-r--r--python/pyspark/mllib/random.py45
-rw-r--r--python/pyspark/mllib/recommendation.py6
-rw-r--r--python/pyspark/mllib/regression.py15
-rw-r--r--python/pyspark/mllib/stat.py16
-rw-r--r--python/pyspark/mllib/util.py11
7 files changed, 105 insertions, 32 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 297a2bf37d..5d90dddb5d 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -62,6 +62,7 @@ class LogisticRegressionModel(LinearModel):
"""
def predict(self, x):
+ x = _convert_to_vector(x)
margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
@@ -79,7 +80,7 @@ class LogisticRegressionWithSGD(object):
"""
Train a logistic regression model on the given data.
- :param data: The training data.
+ :param data: The training data, an RDD of LabeledPoint.
:param iterations: The number of iterations (default: 100).
:param step: The step parameter used in SGD
(default: 1.0).
@@ -136,6 +137,7 @@ class SVMModel(LinearModel):
"""
def predict(self, x):
+ x = _convert_to_vector(x)
margin = self.weights.dot(x) + self.intercept
return 1 if margin >= 0 else 0
@@ -148,7 +150,7 @@ class SVMWithSGD(object):
"""
Train a support vector machine on the given data.
- :param data: The training data.
+ :param data: The training data, an RDD of LabeledPoint.
:param iterations: The number of iterations (default: 100).
:param step: The step parameter used in SGD
(default: 1.0).
@@ -233,11 +235,12 @@ class NaiveBayes(object):
classification. By making every vector a 0-1 vector, it can also be
used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
- :param data: RDD of NumPy vectors, one per element, where the first
- coordinate is the label and the rest is the feature vector
- (e.g. a count vector).
+ :param data: RDD of LabeledPoint.
:param lambda_: The smoothing parameter
"""
+ first = data.first()
+ if not isinstance(first, LabeledPoint):
+ raise ValueError("`data` should be an RDD of LabeledPoint")
labels, pi, theta = callMLlibFunc("trainNaiveBayes", data, lambda_)
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index 44bf6f269d..9ec28079ae 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -25,7 +25,7 @@ from py4j.protocol import Py4JJavaError
from pyspark import RDD, SparkContext
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.linalg import Vectors, _convert_to_vector
__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
@@ -81,12 +81,16 @@ class Normalizer(VectorTransformer):
"""
Applies unit length normalization on a vector.
- :param vector: vector to be normalized.
+ :param vector: vector or RDD of vector to be normalized.
:return: normalized vector. If the norm of the input is zero, it
will return the input vector.
"""
sc = SparkContext._active_spark_context
assert sc is not None, "SparkContext should be initialized first"
+ if isinstance(vector, RDD):
+ vector = vector.map(_convert_to_vector)
+ else:
+ vector = _convert_to_vector(vector)
return callMLlibFunc("normalizeVector", self.p, vector)
@@ -95,8 +99,12 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer):
Wrapper for the model in JVM
"""
- def transform(self, dataset):
- return self.call("transform", dataset)
+ def transform(self, vector):
+ if isinstance(vector, RDD):
+ vector = vector.map(_convert_to_vector)
+ else:
+ vector = _convert_to_vector(vector)
+ return self.call("transform", vector)
class StandardScalerModel(JavaVectorTransformer):
@@ -109,7 +117,7 @@ class StandardScalerModel(JavaVectorTransformer):
"""
Applies standardization transformation on a vector.
- :param vector: Vector to be standardized.
+ :param vector: Vector or RDD of Vector to be standardized.
:return: Standardized vector. If the variance of a column is zero,
it will return default `0.0` for the column with zero variance.
"""
@@ -154,6 +162,7 @@ class StandardScaler(object):
the transformation model.
:return: a StandardScalarModel
"""
+ dataset = dataset.map(_convert_to_vector)
jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset)
return StandardScalerModel(jmodel)
@@ -211,6 +220,8 @@ class IDFModel(JavaVectorTransformer):
:param dataset: an RDD of term frequency vectors
:return: an RDD of TF-IDF vectors
"""
+ if not isinstance(dataset, RDD):
+ raise TypeError("dataset should be an RDD of term frequency vectors")
return JavaVectorTransformer.transform(self, dataset)
@@ -255,7 +266,9 @@ class IDF(object):
:param dataset: an RDD of term frequency vectors
"""
- jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset)
+ if not isinstance(dataset, RDD):
+ raise TypeError("dataset should be an RDD of term frequency vectors")
+ jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector))
return IDFModel(jmodel)
@@ -287,6 +300,8 @@ class Word2VecModel(JavaVectorTransformer):
Note: local use only
"""
+ if not isinstance(word, basestring):
+ word = _convert_to_vector(word)
words, similarity = self.call("findSynonyms", word, num)
return zip(words, similarity)
@@ -374,9 +389,11 @@ class Word2Vec(object):
"""
Computes the vector representation of each word in vocabulary.
- :param data: training data. RDD of subtype of Iterable[String]
+ :param data: training data. RDD of list of string
:return: Word2VecModel instance
"""
+ if not isinstance(data, RDD):
+ raise TypeError("data should be an RDD of list of string")
jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
float(self.learningRate), int(self.numPartitions),
int(self.numIterations), long(self.seed))
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 7eebfc6bcd..cb4304f921 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -52,6 +52,12 @@ class RandomRDDs(object):
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
.map(lambda v: a + (b - a) * v)}
+ :param sc: SparkContext used to create the RDD.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
+
>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
>>> len(x)
100
@@ -76,6 +82,12 @@ class RandomRDDs(object):
C{RandomRDDs.normal(sc, n, p, seed)\
.map(lambda v: mean + sigma * v)}
+ :param sc: SparkContext used to create the RDD.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
+
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
>>> stats = x.stats()
>>> stats.count()
@@ -93,6 +105,13 @@ class RandomRDDs(object):
Generates an RDD comprised of i.i.d. samples from the Poisson
distribution with the input mean.
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean, or lambda, for the Poisson distribution.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
+
>>> mean = 100.0
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
>>> stats = x.stats()
@@ -104,7 +123,7 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
- return callMLlibFunc("poissonRDD", sc._jsc, mean, size, numPartitions, seed)
+ return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
@staticmethod
@toArray
@@ -113,6 +132,13 @@ class RandomRDDs(object):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the uniform distribution U(0.0, 1.0).
+ :param sc: SparkContext used to create the RDD.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD.
+ :param seed: Seed for the RNG that generates the seed for the generator in each partition.
+ :return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
+
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
>>> mat.shape
@@ -131,6 +157,13 @@ class RandomRDDs(object):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the standard normal distribution.
+ :param sc: SparkContext used to create the RDD.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
+
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
>>> mat.shape
@@ -149,6 +182,14 @@ class RandomRDDs(object):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Poisson distribution with the input mean.
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean, or lambda, for the Poisson distribution.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
+
>>> import numpy as np
>>> mean = 100.0
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
@@ -161,7 +202,7 @@ class RandomRDDs(object):
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
- return callMLlibFunc("poissonVectorRDD", sc._jsc, mean, numRows, numCols,
+ return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
numPartitions, seed)
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index e26b152e0c..41bbd9a779 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -32,7 +32,7 @@ class Rating(object):
return Rating, (self.user, self.product, self.rating)
def __repr__(self):
- return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+ return "Rating(%d, %d, %s)" % (self.user, self.product, self.rating)
class MatrixFactorizationModel(JavaModelWrapper):
@@ -51,7 +51,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
>>> testset = sc.parallelize([(1, 2), (1, 1)])
>>> model = ALS.train(ratings, 1, seed=10)
>>> model.predictAll(testset).collect()
- [Rating(1, 1, 1), Rating(1, 2, 1)]
+ [Rating(1, 1, 1.0471...), Rating(1, 2, 1.9679...)]
>>> model = ALS.train(ratings, 4, seed=10)
>>> model.userFeatures().collect()
@@ -79,7 +79,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
0.4473...
"""
def predict(self, user, product):
- return self._java_model.predict(user, product)
+ return self._java_model.predict(int(user), int(product))
def predictAll(self, user_product):
assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 43c1a2fc10..66e25a48df 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -36,7 +36,7 @@ class LabeledPoint(object):
"""
def __init__(self, label, features):
- self.label = label
+ self.label = float(label)
self.features = _convert_to_vector(features)
def __reduce__(self):
@@ -46,7 +46,7 @@ class LabeledPoint(object):
return "(" + ",".join((str(self.label), str(self.features))) + ")"
def __repr__(self):
- return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
+ return "LabeledPoint(%s, %s)" % (self.label, self.features)
class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
def __init__(self, weights, intercept):
self._coeff = _convert_to_vector(weights)
- self._intercept = intercept
+ self._intercept = float(intercept)
@property
def weights(self):
@@ -66,7 +66,7 @@ class LinearModel(object):
return self._intercept
def __repr__(self):
- return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept)
+ return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)
class LinearRegressionModelBase(LinearModel):
@@ -85,6 +85,7 @@ class LinearRegressionModelBase(LinearModel):
Predict the value of the dependent variable given a vector x
containing values for the independent variables.
"""
+ x = _convert_to_vector(x)
return self.weights.dot(x) + self.intercept
@@ -124,6 +125,9 @@ class LinearRegressionModel(LinearRegressionModelBase):
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
+ first = data.first()
+ if not isinstance(first, LabeledPoint):
+ raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first)
initial_weights = initial_weights or [0.0] * len(data.first().features)
weights, intercept = train_func(_to_java_object_rdd(data, cache=True),
_convert_to_vector(initial_weights))
@@ -264,7 +268,8 @@ class RidgeRegressionWithSGD(object):
def _test():
import doctest
from pyspark import SparkContext
- globs = globals().copy()
+ import pyspark.mllib.regression
+ globs = pyspark.mllib.regression.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 0700f8a8e5..1980f5b03f 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Matrix, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
__all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics']
@@ -107,6 +108,11 @@ class Statistics(object):
"""
Computes column-wise summary statistics for the input RDD[Vector].
+ :param rdd: an RDD[Vector] for which column-wise summary statistics
+ are to be computed.
+ :return: :class:`MultivariateStatisticalSummary` object containing
+ column-wise summary statistics.
+
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
@@ -140,6 +146,13 @@ class Statistics(object):
to specify the method to be used for single RDD inout.
If two RDDs of floats are passed in, a single float is returned.
+ :param x: an RDD of vector for which the correlation matrix is to be computed,
+ or an RDD of float of the same cardinality as y when y is specified.
+ :param y: an RDD of float of the same cardinality as x.
+ :param method: String specifying the method to use for computing correlation.
+ Supported: `pearson` (default), `spearman`
+ :return: Correlation matrix comparing columns in x.
+
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
@@ -242,7 +255,6 @@ class Statistics(object):
>>> print round(chi.statistic, 4)
21.9958
- >>> from pyspark.mllib.regression import LabeledPoint
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
@@ -257,6 +269,8 @@ class Statistics(object):
1.5
"""
if isinstance(observed, RDD):
+ if not isinstance(observed.first(), LabeledPoint):
+ raise ValueError("observed should be an RDD of LabeledPoint")
jmodels = callMLlibFunc("chiSqTest", observed)
return [ChiSqTestResult(m) for m in jmodels]
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 96aef8f510..4ed978b454 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -161,15 +161,8 @@ class MLUtils(object):
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
- >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
- >>> type(loaded[0]) == LabeledPoint
- True
- >>> print examples[0]
- (1.1,(3,[0,2],[-1.23,4.56e-07]))
- >>> type(examples[1]) == LabeledPoint
- True
- >>> print examples[1]
- (0.0,[1.01,2.02,3.03])
+ >>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
+ [LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])]
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)