aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-01-28 19:50:26 -0800
committerJosh Rosen <joshrosen@apache.org>2014-01-28 20:20:08 -0800
commit1381fc72f7a34f690a98ab72cec8ffb61e0e564d (patch)
tree8ae129c4b291b4b5589a77b919f508c4535fbf2c /core
parent84670f2715392859624df290c1b52eb4ed4a9cb1 (diff)
downloadspark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.tar.gz
spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.tar.bz2
spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.zip
Switch from MUTF8 to UTF8 in PySpark serializers.
This fixes SPARK-1043, a bug introduced in 0.9.0 where PySpark couldn't serialize strings > 64kB. This fix was written by @tyro89 and @bouk in #512. This commit squashes and rebases their pull request in order to fix some merge conflicts.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala35
2 files changed, 48 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 70516bde8b..9cbd26b607 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -64,7 +64,7 @@ private[spark] class PythonRDD[T: ClassTag](
// Partition index
dataOut.writeInt(split.index)
// sparkFilesDir
- dataOut.writeUTF(SparkFiles.getRootDirectory)
+ PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Broadcast variables
dataOut.writeInt(broadcastVars.length)
for (broadcast <- broadcastVars) {
@@ -74,7 +74,9 @@ private[spark] class PythonRDD[T: ClassTag](
}
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.length)
- pythonIncludes.foreach(dataOut.writeUTF)
+ for (include <- pythonIncludes) {
+ PythonRDD.writeUTF(include, dataOut)
+ }
dataOut.flush()
// Serialized command:
dataOut.writeInt(command.length)
@@ -228,7 +230,7 @@ private[spark] object PythonRDD {
}
case string: String =>
newIter.asInstanceOf[Iterator[String]].foreach { str =>
- dataOut.writeUTF(str)
+ writeUTF(str, dataOut)
}
case pair: Tuple2[_, _] =>
pair._1 match {
@@ -241,8 +243,8 @@ private[spark] object PythonRDD {
}
case stringPair: String =>
newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
- dataOut.writeUTF(pair._1)
- dataOut.writeUTF(pair._2)
+ writeUTF(pair._1, dataOut)
+ writeUTF(pair._2, dataOut)
}
case other =>
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
@@ -253,6 +255,12 @@ private[spark] object PythonRDD {
}
}
+ def writeUTF(str: String, dataOut: DataOutputStream) {
+ val bytes = str.getBytes("UTF-8")
+ dataOut.writeInt(bytes.length)
+ dataOut.write(bytes)
+ }
+
def writeToFile[T](items: java.util.Iterator[T], filename: String) {
import scala.collection.JavaConverters._
writeToFile(items.asScala, filename)
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
new file mode 100644
index 0000000000..1bebfe5ec8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.api.python.PythonRDD
+
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+
+class PythonRDDSuite extends FunSuite {
+
+ test("Writing large strings to the worker") {
+ val input: List[String] = List("a"*100000)
+ val buffer = new DataOutputStream(new ByteArrayOutputStream)
+ PythonRDD.writeIteratorToStream(input.iterator, buffer)
+ }
+
+}
+