diff options
author | Josh Rosen <joshrosen@apache.org> | 2014-01-28 19:50:26 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-01-28 20:20:08 -0800 |
commit | 1381fc72f7a34f690a98ab72cec8ffb61e0e564d (patch) | |
tree | 8ae129c4b291b4b5589a77b919f508c4535fbf2c /core/src | |
parent | 84670f2715392859624df290c1b52eb4ed4a9cb1 (diff) | |
download | spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.tar.gz spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.tar.bz2 spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.zip |
Switch from MUTF8 to UTF8 in PySpark serializers.
This fixes SPARK-1043, a bug introduced in 0.9.0
where PySpark couldn't serialize strings > 64kB.
This fix was written by @tyro89 and @bouk in #512.
This commit squashes and rebases their pull request
in order to fix some merge conflicts.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 18 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala | 35 |
2 files changed, 48 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 70516bde8b..9cbd26b607 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -64,7 +64,7 @@ private[spark] class PythonRDD[T: ClassTag]( // Partition index dataOut.writeInt(split.index) // sparkFilesDir - dataOut.writeUTF(SparkFiles.getRootDirectory) + PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut) // Broadcast variables dataOut.writeInt(broadcastVars.length) for (broadcast <- broadcastVars) { @@ -74,7 +74,9 @@ private[spark] class PythonRDD[T: ClassTag]( } // Python includes (*.zip and *.egg files) dataOut.writeInt(pythonIncludes.length) - pythonIncludes.foreach(dataOut.writeUTF) + for (include <- pythonIncludes) { + PythonRDD.writeUTF(include, dataOut) + } dataOut.flush() // Serialized command: dataOut.writeInt(command.length) @@ -228,7 +230,7 @@ private[spark] object PythonRDD { } case string: String => newIter.asInstanceOf[Iterator[String]].foreach { str => - dataOut.writeUTF(str) + writeUTF(str, dataOut) } case pair: Tuple2[_, _] => pair._1 match { @@ -241,8 +243,8 @@ private[spark] object PythonRDD { } case stringPair: String => newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair => - dataOut.writeUTF(pair._1) - dataOut.writeUTF(pair._2) + writeUTF(pair._1, dataOut) + writeUTF(pair._2, dataOut) } case other => throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) @@ -253,6 +255,12 @@ private[spark] object PythonRDD { } } + def writeUTF(str: String, dataOut: DataOutputStream) { + val bytes = str.getBytes("UTF-8") + dataOut.writeInt(bytes.length) + dataOut.write(bytes) + } + def writeToFile[T](items: java.util.Iterator[T], filename: String) { import scala.collection.JavaConverters._ writeToFile(items.asScala, filename) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala new file mode 100644 index 0000000000..1bebfe5ec8 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.api.python.PythonRDD + +import java.io.{ByteArrayOutputStream, DataOutputStream} + +class PythonRDDSuite extends FunSuite { + + test("Writing large strings to the worker") { + val input: List[String] = List("a"*100000) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } + +} + |