aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r--python/pyspark/mllib/classification.py4
-rw-r--r--python/pyspark/mllib/clustering.py4
-rw-r--r--python/pyspark/mllib/feature.py5
-rw-r--r--python/pyspark/mllib/linalg.py13
-rw-r--r--python/pyspark/mllib/random.py2
-rw-r--r--python/pyspark/mllib/recommendation.py7
-rw-r--r--python/pyspark/mllib/regression.py4
-rw-r--r--python/pyspark/mllib/stat.py7
-rw-r--r--python/pyspark/mllib/tree.py8
-rw-r--r--python/pyspark/mllib/util.py6
10 files changed, 37 insertions, 23 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index cd43982191..e295c9d095 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,7 +21,7 @@ import numpy
from numpy import array
from pyspark import SparkContext, PickleSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
@@ -244,7 +244,7 @@ class NaiveBayes(object):
:param lambda_: The smoothing parameter
"""
sc = data.context
- jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+ jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(_to_java_object_rdd(data), lambda_)
labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 12c5602271..5ee7997104 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -17,7 +17,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
__all__ = ['KMeansModel', 'KMeans']
@@ -85,7 +85,7 @@ class KMeans(object):
# cache serialized data to avoid objects over head in JVM
cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
- cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+ _to_java_object_rdd(cached), k, maxIterations, runs, initializationMode)
bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
centers = ser.loads(str(bytes))
return KMeansModel([c.toArray() for c in centers])
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index f4cbf31b94..b5a3f22c69 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -19,8 +19,7 @@
Python package for feature in MLlib.
"""
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
__all__ = ['Word2Vec', 'Word2VecModel']
@@ -176,7 +175,7 @@ class Word2Vec(object):
seed = self.seed
model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
- data._to_java_object_rdd(), vectorSize,
+ _to_java_object_rdd(data), vectorSize,
learningRate, numPartitions, numIterations, seed)
return Word2VecModel(sc, model)
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 24c5480b2f..773d8d3938 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -29,6 +29,8 @@ import copy_reg
import numpy as np
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
+
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
@@ -50,6 +52,17 @@ except:
_have_scipy = False
+# this will call the MLlib version of pythonToJava()
+def _to_java_object_rdd(rdd):
+ """ Return an JavaRDD of Object by unpickling
+
+ It will convert each Python object into Java object by Pyrolite, whenever the
+ RDD is serialized in batch or not.
+ """
+ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
+ return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
+
+
def _convert_to_vector(l):
if isinstance(l, Vector):
return l
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index a787e4dea2..73baba4ace 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -32,7 +32,7 @@ def serialize(f):
@wraps(f)
def func(sc, *a, **kw):
jrdd = f(sc, *a, **kw)
- return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc,
BatchedSerializer(PickleSerializer(), 1024))
return func
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 59c1c5ff0c..17f96b8700 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -18,6 +18,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MatrixFactorizationModel', 'ALS']
@@ -77,9 +78,9 @@ class MatrixFactorizationModel(object):
first = tuple(map(int, first))
assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
sc = self._context
- tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
jresult = self._java_model.predict(tuplerdd).toJavaRDD()
- return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
AutoBatchedSerializer(PickleSerializer()))
@@ -97,7 +98,7 @@ class ALS(object):
# serialize them by AutoBatchedSerializer before cache to reduce the
# objects overhead in JVM
cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
- return cached._to_java_object_rdd()
+ return _to_java_object_rdd(cached)
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 12b322aaae..93e17faf5c 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -19,8 +19,8 @@ import numpy as np
from numpy import array
from pyspark import SparkContext
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -131,7 +131,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights)
# use AutoBatchedSerializer before cache to reduce the memory
# overhead in JVM
cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
- ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+ ans = train_func(_to_java_object_rdd(cached), initial_bytes)
assert len(ans) == 2, "JVM call result had unexpected length"
weights = ser.loads(str(ans[0]))
return modelClass(weights, ans[1])
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index b9de0909a6..a6019dadf7 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
from functools import wraps
from pyspark import PickleSerializer
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
@@ -106,7 +107,7 @@ class Statistics(object):
array([ 2., 0., 0., -2.])
"""
sc = rdd.ctx
- jrdd = rdd._to_java_object_rdd()
+ jrdd = _to_java_object_rdd(rdd)
cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)
@@ -162,14 +163,14 @@ class Statistics(object):
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
- jx = x._to_java_object_rdd()
+ jx = _to_java_object_rdd(x)
if not y:
resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
bytes = sc._jvm.SerDe.dumps(resultMat)
ser = PickleSerializer()
return ser.loads(str(bytes)).toArray()
else:
- jy = y._to_java_object_rdd()
+ jy = _to_java_object_rdd(y)
return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5d7abfb96b..0938eebd3a 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -19,7 +19,7 @@ from py4j.java_collections import MapConverter
from pyspark import SparkContext, RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
-from pyspark.mllib.linalg import Vector, _convert_to_vector
+from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint
__all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -61,8 +61,8 @@ class DecisionTreeModel(object):
return self._sc.parallelize([])
if not isinstance(first[0], Vector):
x = x.map(_convert_to_vector)
- jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
- jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+ jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD()
+ jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred)
return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
else:
@@ -104,7 +104,7 @@ class DecisionTree(object):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
sc = data.context
- jrdd = data._to_java_object_rdd()
+ jrdd = _to_java_object_rdd(data)
cfiMap = MapConverter().convert(categoricalFeaturesInfo,
sc._gateway._gateway_client)
model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1357fd4fbc..84b39a4861 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,7 @@ import numpy as np
import warnings
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
@@ -174,8 +174,8 @@ class MLUtils(object):
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
- jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
- return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
+ jpyrdd = sc._jvm.SerDe.javaToPython(jrdd)
+ return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer()))
def _test():