aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala110
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala121
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala10
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala2
-rw-r--r--python/pyspark/context.py58
-rw-r--r--python/pyspark/mllib/common.py2
-rw-r--r--python/pyspark/mllib/recommendation.py2
-rw-r--r--python/pyspark/rdd.py91
-rw-r--r--python/pyspark/serializers.py36
-rw-r--r--python/pyspark/shuffle.py7
-rw-r--r--python/pyspark/sql.py18
-rw-r--r--python/pyspark/tests.py66
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala10
14 files changed, 201 insertions, 338 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 49dc95f349..5ba66178e2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -61,8 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
- conf: Broadcast[SerializableWritable[Configuration]],
- batchSize: Int) extends Converter[Any, Any] {
+ conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter(
map.put(convertWritable(k), convertWritable(v))
}
map
- case w: Writable =>
- if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
+ case w: Writable => WritableUtils.clone(w, conf.value.value)
case other => other
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 61b125ef7c..e94ccdcd47 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -22,12 +22,10 @@ import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
import com.google.common.base.Charsets.UTF_8
-import net.razorvine.pickle.{Pickler, Unpickler}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
@@ -442,7 +440,7 @@ private[spark] object PythonRDD extends Logging {
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
- new WritableToJavaConverter(confBroadcasted, batchSize))
+ new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
@@ -468,7 +466,7 @@ private[spark] object PythonRDD extends Logging {
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
- new WritableToJavaConverter(confBroadcasted, batchSize))
+ new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
@@ -494,7 +492,7 @@ private[spark] object PythonRDD extends Logging {
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
- new WritableToJavaConverter(confBroadcasted, batchSize))
+ new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
@@ -537,7 +535,7 @@ private[spark] object PythonRDD extends Logging {
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
- new WritableToJavaConverter(confBroadcasted, batchSize))
+ new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
@@ -563,7 +561,7 @@ private[spark] object PythonRDD extends Logging {
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
- new WritableToJavaConverter(confBroadcasted, batchSize))
+ new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}
@@ -746,104 +744,6 @@ private[spark] object PythonRDD extends Logging {
converted.saveAsHadoopDataset(new JobConf(conf))
}
}
-
-
- /**
- * Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
- */
- @deprecated("PySpark does not use it anymore", "1.1")
- def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
- pyRDD.rdd.mapPartitions { iter =>
- val unpickle = new Unpickler
- SerDeUtil.initialize()
- iter.flatMap { row =>
- unpickle.loads(row) match {
- // in case of objects are pickled in batch mode
- case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
- // not in batch mode
- case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
- }
- }
- }
- }
-
- /**
- * Convert an RDD of serialized Python tuple to Array (no recursive conversions).
- * It is only used by pyspark.sql.
- */
- def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {
-
- def toArray(obj: Any): Array[_] = {
- obj match {
- case objs: JArrayList[_] =>
- objs.toArray
- case obj if obj.getClass.isArray =>
- obj.asInstanceOf[Array[_]].toArray
- }
- }
-
- pyRDD.rdd.mapPartitions { iter =>
- val unpickle = new Unpickler
- iter.flatMap { row =>
- val obj = unpickle.loads(row)
- if (batched) {
- obj.asInstanceOf[JArrayList[_]].map(toArray)
- } else {
- Seq(toArray(obj))
- }
- }
- }.toJavaRDD()
- }
-
- private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
- private val pickle = new Pickler()
- private var batch = 1
- private val buffer = new mutable.ArrayBuffer[Any]
-
- override def hasNext(): Boolean = iter.hasNext
-
- override def next(): Array[Byte] = {
- while (iter.hasNext && buffer.length < batch) {
- buffer += iter.next()
- }
- val bytes = pickle.dumps(buffer.toArray)
- val size = bytes.length
- // let 1M < size < 10M
- if (size < 1024 * 1024) {
- batch *= 2
- } else if (size > 1024 * 1024 * 10 && batch > 1) {
- batch /= 2
- }
- buffer.clear()
- bytes
- }
- }
-
- /**
- * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
- * PySpark.
- */
- def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
- jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
- }
-
- /**
- * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
- */
- def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
- pyRDD.rdd.mapPartitions { iter =>
- SerDeUtil.initialize()
- val unpickle = new Unpickler
- iter.flatMap { row =>
- val obj = unpickle.loads(row)
- if (batched) {
- obj.asInstanceOf[JArrayList[_]].asScala
- } else {
- Seq(obj)
- }
- }
- }.toJavaRDD()
- }
}
private
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index ebdc3533e0..a4153aaa92 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -18,8 +18,13 @@
package org.apache.spark.api.python
import java.nio.ByteOrder
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.spark.api.java.JavaRDD
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.Failure
import scala.util.Try
@@ -89,6 +94,73 @@ private[spark] object SerDeUtil extends Logging {
}
initialize()
+
+ /**
+ * Convert an RDD of Java objects to Array (no recursive conversions).
+ * It is only used by pyspark.sql.
+ */
+ def toJavaArray(jrdd: JavaRDD[Any]): JavaRDD[Array[_]] = {
+ jrdd.rdd.map {
+ case objs: JArrayList[_] =>
+ objs.toArray
+ case obj if obj.getClass.isArray =>
+ obj.asInstanceOf[Array[_]].toArray
+ }.toJavaRDD()
+ }
+
+ /**
+ * Choose batch size based on size of objects
+ */
+ private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
+ private val pickle = new Pickler()
+ private var batch = 1
+ private val buffer = new mutable.ArrayBuffer[Any]
+
+ override def hasNext: Boolean = iter.hasNext
+
+ override def next(): Array[Byte] = {
+ while (iter.hasNext && buffer.length < batch) {
+ buffer += iter.next()
+ }
+ val bytes = pickle.dumps(buffer.toArray)
+ val size = bytes.length
+ // let 1M < size < 10M
+ if (size < 1024 * 1024) {
+ batch *= 2
+ } else if (size > 1024 * 1024 * 10 && batch > 1) {
+ batch /= 2
+ }
+ buffer.clear()
+ bytes
+ }
+ }
+
+ /**
+ * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
+ * PySpark.
+ */
+ private[spark] def javaToPython(jRDD: JavaRDD[_]): JavaRDD[Array[Byte]] = {
+ jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
+ }
+
+ /**
+ * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+ */
+ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
+ pyRDD.rdd.mapPartitions { iter =>
+ initialize()
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ val obj = unpickle.loads(row)
+ if (batched) {
+ obj.asInstanceOf[JArrayList[_]].asScala
+ } else {
+ Seq(obj)
+ }
+ }
+ }.toJavaRDD()
+ }
+
private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
val kt = Try {
@@ -128,17 +200,18 @@ private[spark] object SerDeUtil extends Logging {
*/
def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
+
rdd.mapPartitions { iter =>
- val pickle = new Pickler
val cleaned = iter.map { case (k, v) =>
val key = if (keyFailed) k.toString else k
val value = if (valueFailed) v.toString else v
Array[Any](key, value)
}
- if (batchSize > 1) {
- cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+ if (batchSize == 0) {
+ new AutoBatchedPickler(cleaned)
} else {
- cleaned.map(pickle.dumps(_))
+ val pickle = new Pickler
+ cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
}
}
}
@@ -146,36 +219,22 @@ private[spark] object SerDeUtil extends Logging {
/**
* Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
*/
- def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+ def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batched: Boolean): RDD[(K, V)] = {
def isPair(obj: Any): Boolean = {
- Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+ Option(obj.getClass.getComponentType).exists(!_.isPrimitive) &&
obj.asInstanceOf[Array[_]].length == 2
}
- pyRDD.mapPartitions { iter =>
- initialize()
- val unpickle = new Unpickler
- val unpickled =
- if (batchSerialized) {
- iter.flatMap { batch =>
- unpickle.loads(batch) match {
- case objs: java.util.List[_] => collectionAsScalaIterable(objs)
- case other => throw new SparkException(
- s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
- }
- }
- } else {
- iter.map(unpickle.loads(_))
- }
- unpickled.map {
- case obj if isPair(obj) =>
- // we only accept (K, V)
- val arr = obj.asInstanceOf[Array[_]]
- (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
- case other => throw new SparkException(
- s"RDD element of type ${other.getClass.getName} cannot be used")
- }
+
+ val rdd = pythonToJava(pyRDD, batched).rdd
+ rdd.first match {
+ case obj if isPair(obj) =>
+ // we only accept (K, V)
+ case other => throw new SparkException(
+ s"RDD element of type ${other.getClass.getName} cannot be used")
+ }
+ rdd.map { obj =>
+ val arr = obj.asInstanceOf[Array[_]]
+ (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
}
}
-
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index e9ca9166eb..c0cbd28a84 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -176,11 +176,11 @@ object WriteInputFormatTestDataGenerator {
// Create test data for arbitrary custom writable TestWritable
val testClass = Seq(
- ("1", TestWritable("test1", 123, 54.0)),
- ("2", TestWritable("test2", 456, 8762.3)),
- ("1", TestWritable("test3", 123, 423.1)),
- ("3", TestWritable("test56", 456, 423.5)),
- ("2", TestWritable("test2", 123, 5435.2))
+ ("1", TestWritable("test1", 1, 1.0)),
+ ("2", TestWritable("test2", 2, 2.3)),
+ ("3", TestWritable("test3", 3, 3.1)),
+ ("5", TestWritable("test56", 5, 5.5)),
+ ("4", TestWritable("test4", 4, 4.2))
)
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index acdc67ddc6..65b98a8cee 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -736,7 +736,7 @@ private[spark] object SerDe extends Serializable {
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
initialize() // let it called in executor
- new PythonRDD.AutoBatchedPickler(iter)
+ new SerDeUtil.AutoBatchedPickler(iter)
}
}
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5f8dcedb1e..a0e4821728 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -63,7 +63,6 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- _default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
@@ -115,9 +114,7 @@ class SparkContext(object):
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
- if batchSize == 1:
- self.serializer = self._unbatched_serializer
- elif batchSize == 0:
+ if batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
@@ -305,12 +302,8 @@ class SparkContext(object):
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
- batchSize = min(len(c) // numSlices, self._batchSize)
- if batchSize > 1:
- serializer = BatchedSerializer(self._unbatched_serializer,
- batchSize)
- else:
- serializer = self._unbatched_serializer
+ batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+ serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
serializer.dump_stream(c, tempFile)
tempFile.close()
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
@@ -328,8 +321,7 @@ class SparkContext(object):
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
minPartitions = minPartitions or self.defaultMinPartitions
- return RDD(self._jsc.objectFile(name, minPartitions), self,
- BatchedSerializer(PickleSerializer()))
+ return RDD(self._jsc.objectFile(name, minPartitions), self)
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
@@ -405,7 +397,7 @@ class SparkContext(object):
return jm
def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
- valueConverter=None, minSplits=None, batchSize=None):
+ valueConverter=None, minSplits=None, batchSize=0):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -427,17 +419,15 @@ class SparkContext(object):
:param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -458,18 +448,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -487,18 +475,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -519,18 +505,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -548,15 +532,13 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
@@ -836,7 +818,7 @@ def _test():
import doctest
import tempfile
globs = globals().copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index 76864d8163..dbe5f698b7 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -96,7 +96,7 @@ def _java2py(sc, r):
if clsName == 'JavaRDD':
jrdd = sc._jvm.SerDe.javaToPython(r)
- return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
+ return RDD(jrdd, sc)
elif isinstance(r, (JavaArray, JavaList)) or clsName in _picklable_classes:
r = sc._jvm.SerDe.dumps(r)
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 6b32af07c9..e8b998414d 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -117,7 +117,7 @@ def _test():
import doctest
import pyspark.mllib.recommendation
globs = pyspark.mllib.recommendation.__dict__.copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4f025b9f11..879655dc53 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -120,7 +120,7 @@ class RDD(object):
operated on in parallel.
"""
- def __init__(self, jrdd, ctx, jrdd_deserializer):
+ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
self._jrdd = jrdd
self.is_cached = False
self.is_checkpointed = False
@@ -129,12 +129,8 @@ class RDD(object):
self._id = jrdd.id()
self._partitionFunc = None
- def _toPickleSerialization(self):
- if (self._jrdd_deserializer == PickleSerializer() or
- self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
- return self
- else:
- return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+ def _pickled(self):
+ return self._reserialize(AutoBatchedSerializer(PickleSerializer()))
def id(self):
"""
@@ -446,12 +442,11 @@ class RDD(object):
def _reserialize(self, serializer=None):
serializer = serializer or self.ctx.serializer
- if self._jrdd_deserializer == serializer:
- return self
- else:
- converted = self.map(lambda x: x, preservesPartitioning=True)
- converted._jrdd_deserializer = serializer
- return converted
+ if self._jrdd_deserializer != serializer:
+ if not isinstance(self, PipelinedRDD):
+ self = self.map(lambda x: x, preservesPartitioning=True)
+ self._jrdd_deserializer = serializer
+ return self
def __add__(self, other):
"""
@@ -1120,9 +1115,8 @@ class RDD(object):
:param valueConverter: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
keyConverter, valueConverter, True)
def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1147,9 +1141,8 @@ class RDD(object):
:param conf: Hadoop job configuration, passed in as a dict (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path,
outputFormatClass,
keyClass, valueClass,
keyConverter, valueConverter, jconf)
@@ -1166,9 +1159,8 @@ class RDD(object):
:param valueConverter: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
keyConverter, valueConverter, False)
def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1195,9 +1187,8 @@ class RDD(object):
:param compressionCodecClass: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path,
outputFormatClass,
keyClass, valueClass,
keyConverter, valueConverter,
@@ -1215,9 +1206,8 @@ class RDD(object):
:param path: path to sequence file
:param compressionCodecClass: (None by default)
"""
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True,
path, compressionCodecClass)
def saveAsPickleFile(self, path, batchSize=10):
@@ -1232,8 +1222,11 @@ class RDD(object):
>>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
[1, 2, 'rdd', 'spark']
"""
- self._reserialize(BatchedSerializer(PickleSerializer(),
- batchSize))._jrdd.saveAsObjectFile(path)
+ if batchSize == 0:
+ ser = AutoBatchedSerializer(PickleSerializer())
+ else:
+ ser = BatchedSerializer(PickleSerializer(), batchSize)
+ self._reserialize(ser)._jrdd.saveAsObjectFile(path)
def saveAsTextFile(self, path):
"""
@@ -1774,13 +1767,10 @@ class RDD(object):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
- if self.getNumPartitions() != other.getNumPartitions():
- raise ValueError("Can only zip with RDD which has the same number of partitions")
-
def get_batch_size(ser):
if isinstance(ser, BatchedSerializer):
return ser.batchSize
- return 0
+ return 1
def batch_as(rdd, batchSize):
ser = rdd._jrdd_deserializer
@@ -1790,12 +1780,16 @@ class RDD(object):
my_batch = get_batch_size(self._jrdd_deserializer)
other_batch = get_batch_size(other._jrdd_deserializer)
- if my_batch != other_batch:
- # use the greatest batchSize to batch the other one.
- if my_batch > other_batch:
- other = batch_as(other, my_batch)
- else:
- self = batch_as(self, other_batch)
+ # use the smallest batchSize for both of them
+ batchSize = min(my_batch, other_batch)
+ if batchSize <= 0:
+ # auto batched or unlimited
+ batchSize = 100
+ other = batch_as(other, batchSize)
+ self = batch_as(self, batchSize)
+
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("Can only zip with RDD which has the same number of partitions")
# There will be an Exception in JVM if there are different number
# of items in each partitions.
@@ -1934,25 +1928,14 @@ class RDD(object):
return values.collect()
- def _is_pickled(self):
- """ Return this RDD is serialized by Pickle or not. """
- der = self._jrdd_deserializer
- if isinstance(der, PickleSerializer):
- return True
- if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
- return True
- return False
-
def _to_java_object_rdd(self):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
- rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
- if not self._is_pickled() else self
- is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
+ rdd = self._pickled()
+ return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
def countApprox(self, timeout, confidence=0.95):
"""
@@ -2132,7 +2115,7 @@ def _test():
globs = globals().copy()
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 904bd9f265..d597cbf94e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -33,9 +33,8 @@ The serializer is chosen when creating L{SparkContext}:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.stop()
-By default, PySpark serialize objects in batches; the batch size can be
-controlled through SparkContext's C{batchSize} parameter
-(the default size is 1024 objects):
+PySpark serialize objects in batches; By default, the batch size is chosen based
+on the size of objects, also configurable by SparkContext's C{batchSize} parameter:
>>> sc = SparkContext('local', 'test', batchSize=2)
>>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
@@ -48,16 +47,6 @@ which contains two batches of two objects:
>>> rdd._jrdd.count()
8L
>>> sc.stop()
-
-A batch size of -1 uses an unlimited batch size, and a size of 1 disables
-batching:
-
->>> sc = SparkContext('local', 'test', batchSize=1)
->>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
->>> rdd.glom().collect()
-[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
->>> rdd._jrdd.count()
-16L
"""
import cPickle
@@ -73,7 +62,7 @@ import itertools
from pyspark import cloudpickle
-__all__ = ["PickleSerializer", "MarshalSerializer"]
+__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"]
class SpecialLengths(object):
@@ -113,7 +102,7 @@ class Serializer(object):
return not self.__eq__(other)
def __repr__(self):
- return "<%s object>" % self.__class__.__name__
+ return "%s()" % self.__class__.__name__
def __hash__(self):
return hash(str(self))
@@ -181,6 +170,7 @@ class BatchedSerializer(Serializer):
"""
UNLIMITED_BATCH_SIZE = -1
+ UNKNOWN_BATCH_SIZE = 0
def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
self.serializer = serializer
@@ -213,10 +203,10 @@ class BatchedSerializer(Serializer):
def __eq__(self, other):
return (isinstance(other, BatchedSerializer) and
- other.serializer == self.serializer)
+ other.serializer == self.serializer and other.batchSize == self.batchSize)
def __repr__(self):
- return "BatchedSerializer<%s>" % str(self.serializer)
+ return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
class AutoBatchedSerializer(BatchedSerializer):
@@ -225,7 +215,7 @@ class AutoBatchedSerializer(BatchedSerializer):
"""
def __init__(self, serializer, bestSize=1 << 16):
- BatchedSerializer.__init__(self, serializer, -1)
+ BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE)
self.bestSize = bestSize
def dump_stream(self, iterator, stream):
@@ -248,10 +238,10 @@ class AutoBatchedSerializer(BatchedSerializer):
def __eq__(self, other):
return (isinstance(other, AutoBatchedSerializer) and
- other.serializer == self.serializer)
+ other.serializer == self.serializer and other.bestSize == self.bestSize)
def __str__(self):
- return "AutoBatchedSerializer<%s>" % str(self.serializer)
+ return "AutoBatchedSerializer(%s)" % str(self.serializer)
class CartesianDeserializer(FramedSerializer):
@@ -284,7 +274,7 @@ class CartesianDeserializer(FramedSerializer):
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
def __repr__(self):
- return "CartesianDeserializer<%s, %s>" % \
+ return "CartesianDeserializer(%s, %s)" % \
(str(self.key_ser), str(self.val_ser))
@@ -311,7 +301,7 @@ class PairDeserializer(CartesianDeserializer):
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
def __repr__(self):
- return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser))
+ return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
class NoOpSerializer(FramedSerializer):
@@ -430,7 +420,7 @@ class MarshalSerializer(FramedSerializer):
class AutoSerializer(FramedSerializer):
"""
- Choose marshal or cPickle as serialization protocol autumatically
+ Choose marshal or cPickle as serialization protocol automatically
"""
def __init__(self):
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index d57a802e47..5931e923c2 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -25,7 +25,7 @@ import itertools
import random
import pyspark.heapq3 as heapq
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
try:
import psutil
@@ -213,8 +213,7 @@ class ExternalMerger(Merger):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
- self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
self.localdirs = localdirs or _get_local_dirs(str(id(self)))
# number of partitions when spill data into disks
self.partitions = partitions
@@ -470,7 +469,7 @@ class ExternalSorter(object):
def __init__(self, memory_limit, serializer=None):
self.memory_limit = memory_limit
self.local_dirs = _get_local_dirs("sort")
- self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
def _get_path(self, n):
""" Choose one directory for spill by number n """
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index d16c18bc79..e5d62a466c 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -44,7 +44,8 @@ from py4j.protocol import Py4JError
from py4j.java_collections import ListConverter, MapConverter
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
+ CloudPickleSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -1233,7 +1234,6 @@ class SQLContext(object):
self._sc = sparkContext
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
- self._pythonToJava = self._jvm.PythonRDD.pythonToJavaArray
self._scala_SQLContext = sqlContext
@property
@@ -1263,8 +1263,8 @@ class SQLContext(object):
"""
func = lambda _, it: imap(lambda x: f(*x), it)
command = (func, None,
- BatchedSerializer(PickleSerializer(), 1024),
- BatchedSerializer(PickleSerializer(), 1024))
+ AutoBatchedSerializer(PickleSerializer()),
+ AutoBatchedSerializer(PickleSerializer()))
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
@@ -1443,8 +1443,7 @@ class SQLContext(object):
converter = _python_to_sql_converter(schema)
rdd = rdd.map(converter)
- batched = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- jrdd = self._pythonToJava(rdd._jrdd, batched)
+ jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return SchemaRDD(srdd.toJavaSchemaRDD(), self)
@@ -1841,7 +1840,7 @@ class SchemaRDD(RDD):
self.is_checkpointed = False
self.ctx = self.sql_ctx._sc
# the _jrdd is created by javaToPython(), serialized by pickle
- self._jrdd_deserializer = BatchedSerializer(PickleSerializer())
+ self._jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
@property
def _jrdd(self):
@@ -2071,16 +2070,13 @@ class SchemaRDD(RDD):
def _test():
import doctest
- from array import array
from pyspark.context import SparkContext
# let doctest run in pyspark.sql, so DataTypes can be picklable
import pyspark.sql
from pyspark.sql import Row, SQLContext
from pyspark.tests import ExamplePoint, ExamplePointUDT
globs = pyspark.sql.__dict__.copy()
- # The small batch size here ensures that we see multiple batches,
- # even in these small test examples:
- sc = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize(
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index e947b09468..7e61b017ef 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -242,7 +242,7 @@ class PySparkTestCase(unittest.TestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
- self.sc = SparkContext('local[4]', class_name, batchSize=2)
+ self.sc = SparkContext('local[4]', class_name)
def tearDown(self):
self.sc.stop()
@@ -253,7 +253,7 @@ class ReusedPySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.sc = SparkContext('local[4]', cls.__name__, batchSize=2)
+ cls.sc = SparkContext('local[4]', cls.__name__)
@classmethod
def tearDownClass(cls):
@@ -671,7 +671,7 @@ class ProfilerTests(PySparkTestCase):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
conf = SparkConf().set("spark.python.profile", "true")
- self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf)
+ self.sc = SparkContext('local[4]', class_name, conf=conf)
def test_profiler(self):
@@ -1012,16 +1012,19 @@ class InputFormatTests(ReusedPySparkTestCase):
clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
- ec = (u'1',
- {u'__class__': u'org.apache.spark.api.python.TestWritable',
- u'double': 54.0, u'int': 123, u'str': u'test1'})
- self.assertEqual(clazz[0], ec)
+ cname = u'org.apache.spark.api.python.TestWritable'
+ ec = [(u'1', {u'__class__': cname, u'double': 1.0, u'int': 1, u'str': u'test1'}),
+ (u'2', {u'__class__': cname, u'double': 2.3, u'int': 2, u'str': u'test2'}),
+ (u'3', {u'__class__': cname, u'double': 3.1, u'int': 3, u'str': u'test3'}),
+ (u'4', {u'__class__': cname, u'double': 4.2, u'int': 4, u'str': u'test4'}),
+ (u'5', {u'__class__': cname, u'double': 5.5, u'int': 5, u'str': u'test56'})]
+ self.assertEqual(clazz, ec)
unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable",
- batchSize=1).collect())
- self.assertEqual(unbatched_clazz[0], ec)
+ ).collect())
+ self.assertEqual(unbatched_clazz, ec)
def test_oldhadoop(self):
basepath = self.tempdir.name
@@ -1341,51 +1344,6 @@ class OutputFormatTests(ReusedPySparkTestCase):
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
- def test_unbatched_save_and_read(self):
- basepath = self.tempdir.name
- ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
- self.sc.parallelize(ei, len(ei)).saveAsSequenceFile(
- basepath + "/unbatched/")
-
- unbatched_sequence = sorted(self.sc.sequenceFile(
- basepath + "/unbatched/",
- batchSize=1).collect())
- self.assertEqual(unbatched_sequence, ei)
-
- unbatched_hadoopFile = sorted(self.sc.hadoopFile(
- basepath + "/unbatched/",
- "org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- batchSize=1).collect())
- self.assertEqual(unbatched_hadoopFile, ei)
-
- unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
- basepath + "/unbatched/",
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- batchSize=1).collect())
- self.assertEqual(unbatched_newAPIHadoopFile, ei)
-
- oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
- unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
- "org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- conf=oldconf,
- batchSize=1).collect())
- self.assertEqual(unbatched_hadoopRDD, ei)
-
- newconf = {"mapred.input.dir": basepath + "/unbatched/"}
- unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- conf=newconf,
- batchSize=1).collect())
- self.assertEqual(unbatched_newAPIHadoopRDD, ei)
-
def test_malformed_RDD(self):
basepath = self.tempdir.name
# non-batch-serialized RDD[[(K, V)]] should be rejected
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 3ee2ea05cf..fbec2f9f4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import java.util.{List => JList}
+import org.apache.spark.api.python.SerDeUtil
+
import scala.collection.JavaConversions._
import net.razorvine.pickle.Pickler
@@ -385,12 +387,8 @@ class SchemaRDD(
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldTypes = schema.fields.map(_.dataType)
- this.mapPartitions { iter =>
- val pickle = new Pickler
- iter.map { row =>
- EvaluatePython.rowToArray(row, fieldTypes)
- }.grouped(100).map(batched => pickle.dumps(batched.toArray))
- }
+ val jrdd = this.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
+ SerDeUtil.javaToPython(jrdd)
}
/**