diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala | 14 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 52 | ||||
-rw-r--r-- | python/pyspark/context.py | 2 | ||||
-rw-r--r-- | python/pyspark/mllib/classification.py | 4 | ||||
-rw-r--r-- | python/pyspark/mllib/clustering.py | 4 | ||||
-rw-r--r-- | python/pyspark/mllib/feature.py | 5 | ||||
-rw-r--r-- | python/pyspark/mllib/linalg.py | 13 | ||||
-rw-r--r-- | python/pyspark/mllib/random.py | 2 | ||||
-rw-r--r-- | python/pyspark/mllib/recommendation.py | 7 | ||||
-rw-r--r-- | python/pyspark/mllib/regression.py | 4 | ||||
-rw-r--r-- | python/pyspark/mllib/stat.py | 7 | ||||
-rw-r--r-- | python/pyspark/mllib/tree.py | 8 | ||||
-rw-r--r-- | python/pyspark/mllib/util.py | 6 |
14 files changed, 101 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 4acbdf9d5e..29ca751519 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -23,6 +23,7 @@ import java.nio.charset.Charset import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials @@ -746,6 +747,7 @@ private[spark] object PythonRDD extends Logging { def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler + SerDeUtil.initialize() iter.flatMap { row => unpickle.loads(row) match { // in case of objects are pickled in batch mode @@ -785,7 +787,7 @@ private[spark] object PythonRDD extends Logging { }.toJavaRDD() } - private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] { + private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] { private val pickle = new Pickler() private var batch = 1 private val buffer = new mutable.ArrayBuffer[Any] @@ -822,11 +824,12 @@ private[spark] object PythonRDD extends Logging { */ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = { pyRDD.rdd.mapPartitions { iter => + SerDeUtil.initialize() val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) if (batched) { - obj.asInstanceOf[JArrayList[_]] + obj.asInstanceOf[JArrayList[_]].asScala } else { Seq(obj) } diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 7903457b17..ebdc3533e0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException} import org.apache.spark.rdd.RDD /** Utilities for serialization / deserialization between Python and Java, using Pickle. */ -private[python] object SerDeUtil extends Logging { +private[spark] object SerDeUtil extends Logging { // Unpickle array.array generated by Python 2.6 class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor { // /* Description of types */ @@ -76,9 +76,18 @@ private[python] object SerDeUtil extends Logging { } } + private var initialized = false + // This should be called before trying to unpickle array.array from Python + // In cluster mode, this should be put in closure def initialize() = { - Unpickler.registerConstructor("array", "array", new ArrayConstructor()) + synchronized{ + if (!initialized) { + Unpickler.registerConstructor("array", "array", new ArrayConstructor()) + initialized = true + } + } } + initialize() private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = { val pickle = new Pickler @@ -143,6 +152,7 @@ private[python] object SerDeUtil extends Logging { obj.asInstanceOf[Array[_]].length == 2 } pyRDD.mapPartitions { iter => + initialize() val unpickle = new Unpickler val unpickled = if (batchSerialized) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f7251e65e0..9a100170b7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.api.python import java.io.OutputStream +import java.util.{ArrayList => JArrayList} import scala.collection.JavaConverters._ import scala.language.existentials @@ -27,6 +28,7 @@ import net.razorvine.pickle._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.feature.Word2Vec @@ -639,13 +641,24 @@ private[spark] object SerDe extends Serializable { } } + var initialized = false + // This should be called before trying to serialize any above classes + // In cluster mode, this should be put in the closure def initialize(): Unit = { - new DenseVectorPickler().register() - new DenseMatrixPickler().register() - new SparseVectorPickler().register() - new LabeledPointPickler().register() - new RatingPickler().register() + SerDeUtil.initialize() + synchronized { + if (!initialized) { + new DenseVectorPickler().register() + new DenseMatrixPickler().register() + new SparseVectorPickler().register() + new LabeledPointPickler().register() + new RatingPickler().register() + initialized = true + } + } } + // will not called in Executor automatically + initialize() def dumps(obj: AnyRef): Array[Byte] = { new Pickler().dumps(obj) @@ -659,4 +672,33 @@ private[spark] object SerDe extends Serializable { def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = { rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int])) } + + /** + * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by + * PySpark. + */ + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { + jRDD.rdd.mapPartitions { iter => + initialize() // let it called in executor + new PythonRDD.AutoBatchedPickler(iter) + } + } + + /** + * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark. + */ + def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = { + pyRDD.rdd.mapPartitions { iter => + initialize() // let it called in executor + val unpickle = new Unpickler + iter.flatMap { row => + val obj = unpickle.loads(row) + if (batched) { + obj.asInstanceOf[JArrayList[_]].asScala + } else { + Seq(obj) + } + } + }.toJavaRDD() + } } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 89d2e2e5b4..8d27ccb95f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -215,8 +215,6 @@ class SparkContext(object): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile - SparkContext._jvm.SerDeUtil.initialize() - SparkContext._jvm.SerDe.initialize() if instance: if (SparkContext._active_spark_context and diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index cd43982191..e295c9d095 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -21,7 +21,7 @@ import numpy from numpy import array from pyspark import SparkContext, PickleSerializer -from pyspark.mllib.linalg import SparseVector, _convert_to_vector +from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper @@ -244,7 +244,7 @@ class NaiveBayes(object): :param lambda_: The smoothing parameter """ sc = data.context - jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_) + jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(_to_java_object_rdd(data), lambda_) labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist))) return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta)) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 12c5602271..5ee7997104 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -17,7 +17,7 @@ from pyspark import SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import SparseVector, _convert_to_vector +from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd __all__ = ['KMeansModel', 'KMeans'] @@ -85,7 +85,7 @@ class KMeans(object): # cache serialized data to avoid objects over head in JVM cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache() model = sc._jvm.PythonMLLibAPI().trainKMeansModel( - cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode) + _to_java_object_rdd(cached), k, maxIterations, runs, initializationMode) bytes = sc._jvm.SerDe.dumps(model.clusterCenters()) centers = ser.loads(str(bytes)) return KMeansModel([c.toArray() for c in centers]) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index f4cbf31b94..b5a3f22c69 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -19,8 +19,7 @@ Python package for feature in MLlib. """ from pyspark.serializers import PickleSerializer, AutoBatchedSerializer - -from pyspark.mllib.linalg import _convert_to_vector +from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd __all__ = ['Word2Vec', 'Word2VecModel'] @@ -176,7 +175,7 @@ class Word2Vec(object): seed = self.seed model = sc._jvm.PythonMLLibAPI().trainWord2Vec( - data._to_java_object_rdd(), vectorSize, + _to_java_object_rdd(data), vectorSize, learningRate, numPartitions, numIterations, seed) return Word2VecModel(sc, model) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 24c5480b2f..773d8d3938 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -29,6 +29,8 @@ import copy_reg import numpy as np +from pyspark.serializers import AutoBatchedSerializer, PickleSerializer + __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] @@ -50,6 +52,17 @@ except: _have_scipy = False +# this will call the MLlib version of pythonToJava() +def _to_java_object_rdd(rdd): + """ Return an JavaRDD of Object by unpickling + + It will convert each Python object into Java object by Pyrolite, whenever the + RDD is serialized in batch or not. + """ + rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) + return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True) + + def _convert_to_vector(l): if isinstance(l, Vector): return l diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index a787e4dea2..73baba4ace 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -32,7 +32,7 @@ def serialize(f): @wraps(f) def func(sc, *a, **kw): jrdd = f(sc, *a, **kw) - return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc, + return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc, BatchedSerializer(PickleSerializer(), 1024)) return func diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 59c1c5ff0c..17f96b8700 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -18,6 +18,7 @@ from pyspark import SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.rdd import RDD +from pyspark.mllib.linalg import _to_java_object_rdd __all__ = ['MatrixFactorizationModel', 'ALS'] @@ -77,9 +78,9 @@ class MatrixFactorizationModel(object): first = tuple(map(int, first)) assert all(type(x) is int for x in first), "user and product in user_product shoul be int" sc = self._context - tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) + tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd()) jresult = self._java_model.predict(tuplerdd).toJavaRDD() - return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + return RDD(sc._jvm.SerDe.javaToPython(jresult), sc, AutoBatchedSerializer(PickleSerializer())) @@ -97,7 +98,7 @@ class ALS(object): # serialize them by AutoBatchedSerializer before cache to reduce the # objects overhead in JVM cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() - return cached._to_java_object_rdd() + return _to_java_object_rdd(cached) @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 12b322aaae..93e17faf5c 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -19,8 +19,8 @@ import numpy as np from numpy import array from pyspark import SparkContext -from pyspark.mllib.linalg import SparseVector, _convert_to_vector from pyspark.serializers import PickleSerializer, AutoBatchedSerializer +from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd __all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel', 'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD'] @@ -131,7 +131,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights) # use AutoBatchedSerializer before cache to reduce the memory # overhead in JVM cached = data._reserialize(AutoBatchedSerializer(ser)).cache() - ans = train_func(cached._to_java_object_rdd(), initial_bytes) + ans = train_func(_to_java_object_rdd(cached), initial_bytes) assert len(ans) == 2, "JVM call result had unexpected length" weights = ser.loads(str(ans[0])) return modelClass(weights, ans[1]) diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index b9de0909a6..a6019dadf7 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -22,6 +22,7 @@ Python package for statistical functions in MLlib. from functools import wraps from pyspark import PickleSerializer +from pyspark.mllib.linalg import _to_java_object_rdd __all__ = ['MultivariateStatisticalSummary', 'Statistics'] @@ -106,7 +107,7 @@ class Statistics(object): array([ 2., 0., 0., -2.]) """ sc = rdd.ctx - jrdd = rdd._to_java_object_rdd() + jrdd = _to_java_object_rdd(rdd) cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd) return MultivariateStatisticalSummary(sc, cStats) @@ -162,14 +163,14 @@ class Statistics(object): if type(y) == str: raise TypeError("Use 'method=' to specify method name.") - jx = x._to_java_object_rdd() + jx = _to_java_object_rdd(x) if not y: resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method) bytes = sc._jvm.SerDe.dumps(resultMat) ser = PickleSerializer() return ser.loads(str(bytes)).toArray() else: - jy = y._to_java_object_rdd() + jy = _to_java_object_rdd(y) return sc._jvm.PythonMLLibAPI().corr(jx, jy, method) diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index 5d7abfb96b..0938eebd3a 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -19,7 +19,7 @@ from py4j.java_collections import MapConverter from pyspark import SparkContext, RDD from pyspark.serializers import BatchedSerializer, PickleSerializer -from pyspark.mllib.linalg import Vector, _convert_to_vector +from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd from pyspark.mllib.regression import LabeledPoint __all__ = ['DecisionTreeModel', 'DecisionTree'] @@ -61,8 +61,8 @@ class DecisionTreeModel(object): return self._sc.parallelize([]) if not isinstance(first[0], Vector): x = x.map(_convert_to_vector) - jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD() - jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred) + jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD() + jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred) return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024)) else: @@ -104,7 +104,7 @@ class DecisionTree(object): first = data.first() assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" sc = data.context - jrdd = data._to_java_object_rdd() + jrdd = _to_java_object_rdd(data) cfiMap = MapConverter().convert(categoricalFeaturesInfo, sc._gateway._gateway_client) model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel( diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 1357fd4fbc..84b39a4861 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -19,7 +19,7 @@ import numpy as np import warnings from pyspark.rdd import RDD -from pyspark.serializers import BatchedSerializer, PickleSerializer +from pyspark.serializers import AutoBatchedSerializer, PickleSerializer from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector from pyspark.mllib.regression import LabeledPoint @@ -174,8 +174,8 @@ class MLUtils(object): """ minPartitions = minPartitions or min(sc.defaultParallelism, 2) jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) - jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd) - return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer())) + jpyrdd = sc._jvm.SerDe.javaToPython(jrdd) + return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer())) def _test(): |