diff options
author | Josh Rosen <joshrosen@apache.org> | 2013-11-05 17:52:39 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2013-11-10 16:45:38 -0800 |
commit | cbb7f04aef2220ece93dea9f3fa98b5db5f270d6 (patch) | |
tree | 5feaed6b6064b81272fcb74b48ee2579e32de4e6 /core | |
parent | 7d68a81a8ed5f49fefb3bd0fa0b9d3835cc7d86e (diff) | |
download | spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.tar.gz spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.tar.bz2 spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.zip |
Add custom serializer support to PySpark.
For now, this only adds MarshalSerializer, but it lays the groundwork
for other supporting custom serializers. Many of these mechanisms
can also be used to support deserialization of different data formats
sent by Java, such as data encoded by MsgPack.
This also fixes a bug in SparkContext.union().
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 23 |
1 files changed, 1 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index eb0b0db0cc..ef9bf4db9b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -221,18 +221,6 @@ private[spark] object PythonRDD { JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } - def writeStringAsPickle(elem: String, dOut: DataOutputStream) { - val s = elem.getBytes("UTF-8") - val length = 2 + 1 + 4 + s.length + 1 - dOut.writeInt(length) - dOut.writeByte(Pickle.PROTO) - dOut.writeByte(Pickle.TWO) - dOut.write(Pickle.BINUNICODE) - dOut.writeInt(Integer.reverseBytes(s.length)) - dOut.write(s) - dOut.writeByte(Pickle.STOP) - } - def writeToStream(elem: Any, dataOut: DataOutputStream) { elem match { case bytes: Array[Byte] => @@ -244,9 +232,7 @@ private[spark] object PythonRDD { dataOut.writeInt(pair._2.length) dataOut.write(pair._2) case str: String => - // Until we've implemented full custom serializer support, we need to return - // strings as Pickles to properly support union() and cartesian(): - writeStringAsPickle(str, dataOut) + dataOut.writeUTF(str) case other => throw new SparkException("Unexpected element type " + other.getClass) } @@ -271,13 +257,6 @@ private[spark] object PythonRDD { } } -private object Pickle { - val PROTO: Byte = 0x80.toByte - val TWO: Byte = 0x02.toByte - val BINUNICODE: Byte = 'X' - val STOP: Byte = '.' -} - private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8") } |