aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala52
-rw-r--r--python/pyspark/context.py2
-rw-r--r--python/pyspark/mllib/classification.py4
-rw-r--r--python/pyspark/mllib/clustering.py4
-rw-r--r--python/pyspark/mllib/feature.py5
-rw-r--r--python/pyspark/mllib/linalg.py13
-rw-r--r--python/pyspark/mllib/random.py2
-rw-r--r--python/pyspark/mllib/recommendation.py7
-rw-r--r--python/pyspark/mllib/regression.py4
-rw-r--r--python/pyspark/mllib/stat.py7
-rw-r--r--python/pyspark/mllib/tree.py8
-rw-r--r--python/pyspark/mllib/util.py6
14 files changed, 101 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4acbdf9d5e..29ca751519 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,6 +23,7 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
@@ -746,6 +747,7 @@ private[spark] object PythonRDD extends Logging {
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
+ SerDeUtil.initialize()
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
@@ -785,7 +787,7 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}
- private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
+ private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]
@@ -822,11 +824,12 @@ private[spark] object PythonRDD extends Logging {
*/
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
pyRDD.rdd.mapPartitions { iter =>
+ SerDeUtil.initialize()
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
- obj.asInstanceOf[JArrayList[_]]
+ obj.asInstanceOf[JArrayList[_]].asScala
} else {
Seq(obj)
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 7903457b17..ebdc3533e0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
-private[python] object SerDeUtil extends Logging {
+private[spark] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
@@ -76,9 +76,18 @@ private[python] object SerDeUtil extends Logging {
}
}
+ private var initialized = false
+ // This should be called before trying to unpickle array.array from Python
+ // In cluster mode, this should be put in closure
def initialize() = {
- Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+ synchronized{
+ if (!initialized) {
+ Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+ initialized = true
+ }
+ }
}
+ initialize()
private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
@@ -143,6 +152,7 @@ private[python] object SerDeUtil extends Logging {
obj.asInstanceOf[Array[_]].length == 2
}
pyRDD.mapPartitions { iter =>
+ initialize()
val unpickle = new Unpickler
val unpickled =
if (batchSerialized) {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f7251e65e0..9a100170b7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.api.python
import java.io.OutputStream
+import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConverters._
import scala.language.existentials
@@ -27,6 +28,7 @@ import net.razorvine.pickle._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.feature.Word2Vec
@@ -639,13 +641,24 @@ private[spark] object SerDe extends Serializable {
}
}
+ var initialized = false
+ // This should be called before trying to serialize any above classes
+ // In cluster mode, this should be put in the closure
def initialize(): Unit = {
- new DenseVectorPickler().register()
- new DenseMatrixPickler().register()
- new SparseVectorPickler().register()
- new LabeledPointPickler().register()
- new RatingPickler().register()
+ SerDeUtil.initialize()
+ synchronized {
+ if (!initialized) {
+ new DenseVectorPickler().register()
+ new DenseMatrixPickler().register()
+ new SparseVectorPickler().register()
+ new LabeledPointPickler().register()
+ new RatingPickler().register()
+ initialized = true
+ }
+ }
}
+ // will not called in Executor automatically
+ initialize()
def dumps(obj: AnyRef): Array[Byte] = {
new Pickler().dumps(obj)
@@ -659,4 +672,33 @@ private[spark] object SerDe extends Serializable {
def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int]))
}
+
+ /**
+ * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
+ * PySpark.
+ */
+ def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
+ jRDD.rdd.mapPartitions { iter =>
+ initialize() // let it called in executor
+ new PythonRDD.AutoBatchedPickler(iter)
+ }
+ }
+
+ /**
+ * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+ */
+ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
+ pyRDD.rdd.mapPartitions { iter =>
+ initialize() // let it called in executor
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ val obj = unpickle.loads(row)
+ if (batched) {
+ obj.asInstanceOf[JArrayList[_]].asScala
+ } else {
+ Seq(obj)
+ }
+ }
+ }.toJavaRDD()
+ }
}
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 89d2e2e5b4..8d27ccb95f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -215,8 +215,6 @@ class SparkContext(object):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
- SparkContext._jvm.SerDeUtil.initialize()
- SparkContext._jvm.SerDe.initialize()
if instance:
if (SparkContext._active_spark_context and
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index cd43982191..e295c9d095 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,7 +21,7 @@ import numpy
from numpy import array
from pyspark import SparkContext, PickleSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
@@ -244,7 +244,7 @@ class NaiveBayes(object):
:param lambda_: The smoothing parameter
"""
sc = data.context
- jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+ jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(_to_java_object_rdd(data), lambda_)
labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 12c5602271..5ee7997104 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -17,7 +17,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
__all__ = ['KMeansModel', 'KMeans']
@@ -85,7 +85,7 @@ class KMeans(object):
# cache serialized data to avoid objects over head in JVM
cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
- cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+ _to_java_object_rdd(cached), k, maxIterations, runs, initializationMode)
bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
centers = ser.loads(str(bytes))
return KMeansModel([c.toArray() for c in centers])
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index f4cbf31b94..b5a3f22c69 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -19,8 +19,7 @@
Python package for feature in MLlib.
"""
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
__all__ = ['Word2Vec', 'Word2VecModel']
@@ -176,7 +175,7 @@ class Word2Vec(object):
seed = self.seed
model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
- data._to_java_object_rdd(), vectorSize,
+ _to_java_object_rdd(data), vectorSize,
learningRate, numPartitions, numIterations, seed)
return Word2VecModel(sc, model)
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 24c5480b2f..773d8d3938 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -29,6 +29,8 @@ import copy_reg
import numpy as np
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
+
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
@@ -50,6 +52,17 @@ except:
_have_scipy = False
+# this will call the MLlib version of pythonToJava()
+def _to_java_object_rdd(rdd):
+ """ Return an JavaRDD of Object by unpickling
+
+ It will convert each Python object into Java object by Pyrolite, whenever the
+ RDD is serialized in batch or not.
+ """
+ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
+ return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
+
+
def _convert_to_vector(l):
if isinstance(l, Vector):
return l
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index a787e4dea2..73baba4ace 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -32,7 +32,7 @@ def serialize(f):
@wraps(f)
def func(sc, *a, **kw):
jrdd = f(sc, *a, **kw)
- return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc,
BatchedSerializer(PickleSerializer(), 1024))
return func
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 59c1c5ff0c..17f96b8700 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -18,6 +18,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MatrixFactorizationModel', 'ALS']
@@ -77,9 +78,9 @@ class MatrixFactorizationModel(object):
first = tuple(map(int, first))
assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
sc = self._context
- tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
jresult = self._java_model.predict(tuplerdd).toJavaRDD()
- return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
AutoBatchedSerializer(PickleSerializer()))
@@ -97,7 +98,7 @@ class ALS(object):
# serialize them by AutoBatchedSerializer before cache to reduce the
# objects overhead in JVM
cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
- return cached._to_java_object_rdd()
+ return _to_java_object_rdd(cached)
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 12b322aaae..93e17faf5c 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -19,8 +19,8 @@ import numpy as np
from numpy import array
from pyspark import SparkContext
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -131,7 +131,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights)
# use AutoBatchedSerializer before cache to reduce the memory
# overhead in JVM
cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
- ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+ ans = train_func(_to_java_object_rdd(cached), initial_bytes)
assert len(ans) == 2, "JVM call result had unexpected length"
weights = ser.loads(str(ans[0]))
return modelClass(weights, ans[1])
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index b9de0909a6..a6019dadf7 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
from functools import wraps
from pyspark import PickleSerializer
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
@@ -106,7 +107,7 @@ class Statistics(object):
array([ 2., 0., 0., -2.])
"""
sc = rdd.ctx
- jrdd = rdd._to_java_object_rdd()
+ jrdd = _to_java_object_rdd(rdd)
cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)
@@ -162,14 +163,14 @@ class Statistics(object):
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
- jx = x._to_java_object_rdd()
+ jx = _to_java_object_rdd(x)
if not y:
resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
bytes = sc._jvm.SerDe.dumps(resultMat)
ser = PickleSerializer()
return ser.loads(str(bytes)).toArray()
else:
- jy = y._to_java_object_rdd()
+ jy = _to_java_object_rdd(y)
return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5d7abfb96b..0938eebd3a 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -19,7 +19,7 @@ from py4j.java_collections import MapConverter
from pyspark import SparkContext, RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
-from pyspark.mllib.linalg import Vector, _convert_to_vector
+from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint
__all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -61,8 +61,8 @@ class DecisionTreeModel(object):
return self._sc.parallelize([])
if not isinstance(first[0], Vector):
x = x.map(_convert_to_vector)
- jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
- jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+ jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD()
+ jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred)
return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
else:
@@ -104,7 +104,7 @@ class DecisionTree(object):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
sc = data.context
- jrdd = data._to_java_object_rdd()
+ jrdd = _to_java_object_rdd(data)
cfiMap = MapConverter().convert(categoricalFeaturesInfo,
sc._gateway._gateway_client)
model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1357fd4fbc..84b39a4861 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,7 @@ import numpy as np
import warnings
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
@@ -174,8 +174,8 @@ class MLUtils(object):
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
- jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
- return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
+ jpyrdd = sc._jvm.SerDe.javaToPython(jrdd)
+ return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer()))
def _test():