aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-10-16 14:56:50 -0700
committerXiangrui Meng <meng@databricks.com>2014-10-16 14:56:50 -0700
commit091d32c52e9d73da95896016c1d920e89858abfa (patch)
tree904edd29e64b57fa1ab72d3ca37ed2996aa9d1e4 /core
parent4c589cac4496c6a4bb8485a340bd0641dca13847 (diff)
downloadspark-091d32c52e9d73da95896016c1d920e89858abfa.tar.gz
spark-091d32c52e9d73da95896016c1d920e89858abfa.tar.bz2
spark-091d32c52e9d73da95896016c1d920e89858abfa.zip
[SPARK-3971] [MLLib] [PySpark] hotfix: Customized pickler should work in cluster mode
Customized pickler should be registered before unpickling, but in executor, there is no way to register the picklers before run the tasks. So, we need to register the picklers in the tasks itself, duplicate the javaToPython() and pythonToJava() in MLlib, call SerDe.initialize() before pickling or unpickling. Author: Davies Liu <davies.liu@gmail.com> Closes #2830 from davies/fix_pickle and squashes the following commits: 0c85fb9 [Davies Liu] revert the privacy change 6b94e15 [Davies Liu] use JavaConverters instead of JavaConversions 0f02050 [Davies Liu] hotfix: Customized pickler does not work in cluster
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala14
2 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4acbdf9d5e..29ca751519 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,6 +23,7 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
@@ -746,6 +747,7 @@ private[spark] object PythonRDD extends Logging {
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
+ SerDeUtil.initialize()
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
@@ -785,7 +787,7 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}
- private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
+ private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]
@@ -822,11 +824,12 @@ private[spark] object PythonRDD extends Logging {
*/
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
pyRDD.rdd.mapPartitions { iter =>
+ SerDeUtil.initialize()
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
- obj.asInstanceOf[JArrayList[_]]
+ obj.asInstanceOf[JArrayList[_]].asScala
} else {
Seq(obj)
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 7903457b17..ebdc3533e0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
-private[python] object SerDeUtil extends Logging {
+private[spark] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
@@ -76,9 +76,18 @@ private[python] object SerDeUtil extends Logging {
}
}
+ private var initialized = false
+ // This should be called before trying to unpickle array.array from Python
+ // In cluster mode, this should be put in closure
def initialize() = {
- Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+ synchronized{
+ if (!initialized) {
+ Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+ initialized = true
+ }
+ }
}
+ initialize()
private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
@@ -143,6 +152,7 @@ private[python] object SerDeUtil extends Logging {
obj.asInstanceOf[Array[_]].length == 2
}
pyRDD.mapPartitions { iter =>
+ initialize()
val unpickle = new Unpickler
val unpickled =
if (batchSerialized) {