aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-11-05 17:52:39 -0800
committerJosh Rosen <joshrosen@apache.org>2013-11-10 16:45:38 -0800
commitcbb7f04aef2220ece93dea9f3fa98b5db5f270d6 (patch)
tree5feaed6b6064b81272fcb74b48ee2579e32de4e6 /core
parent7d68a81a8ed5f49fefb3bd0fa0b9d3835cc7d86e (diff)
downloadspark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.tar.gz
spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.tar.bz2
spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.zip
Add custom serializer support to PySpark.
For now, this only adds MarshalSerializer, but it lays the groundwork for other supporting custom serializers. Many of these mechanisms can also be used to support deserialization of different data formats sent by Java, such as data encoded by MsgPack. This also fixes a bug in SparkContext.union().
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala23
1 files changed, 1 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index eb0b0db0cc..ef9bf4db9b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -221,18 +221,6 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
- def writeStringAsPickle(elem: String, dOut: DataOutputStream) {
- val s = elem.getBytes("UTF-8")
- val length = 2 + 1 + 4 + s.length + 1
- dOut.writeInt(length)
- dOut.writeByte(Pickle.PROTO)
- dOut.writeByte(Pickle.TWO)
- dOut.write(Pickle.BINUNICODE)
- dOut.writeInt(Integer.reverseBytes(s.length))
- dOut.write(s)
- dOut.writeByte(Pickle.STOP)
- }
-
def writeToStream(elem: Any, dataOut: DataOutputStream) {
elem match {
case bytes: Array[Byte] =>
@@ -244,9 +232,7 @@ private[spark] object PythonRDD {
dataOut.writeInt(pair._2.length)
dataOut.write(pair._2)
case str: String =>
- // Until we've implemented full custom serializer support, we need to return
- // strings as Pickles to properly support union() and cartesian():
- writeStringAsPickle(str, dataOut)
+ dataOut.writeUTF(str)
case other =>
throw new SparkException("Unexpected element type " + other.getClass)
}
@@ -271,13 +257,6 @@ private[spark] object PythonRDD {
}
}
-private object Pickle {
- val PROTO: Byte = 0x80.toByte
- val TWO: Byte = 0x02.toByte
- val BINUNICODE: Byte = 'X'
- val STOP: Byte = '.'
-}
-
private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
}