aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Lyubimov <dlyubimov@apache.org>2013-07-30 19:00:58 -0700
committerDmitriy Lyubimov <dlyubimov@apache.org>2013-07-30 19:00:58 -0700
commitf4b4b8836ee48d6ec7bee98f1632228d71360033 (patch)
tree4d566e1d4f527e1b76e284f73ed3bc9208a7ce12
parentabada94ebfb89716ddb1fa1f534c98704983450a (diff)
downloadspark-f4b4b8836ee48d6ec7bee98f1632228d71360033.tar.gz
spark-f4b4b8836ee48d6ec7bee98f1632228d71360033.tar.bz2
spark-f4b4b8836ee48d6ec7bee98f1632228d71360033.zip
reverting back to one-by-one serialization for parallelize()
-rw-r--r--core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala47
1 files changed, 30 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index 58074278a7..d58be93ff8 100644
--- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -22,15 +22,15 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
import spark._
import java.io._
-import java.nio.ByteBuffer
import scala.Serializable
import akka.serialization.JavaSerializer
+import java.nio.ByteBuffer
private[spark] class ParallelCollectionPartition[T: ClassManifest](
var rddId: Long,
var slice: Int,
var values: Seq[T])
- extends Partition with Serializable {
+ extends Partition with Serializable {
def iterator: Iterator[T] = values.iterator
@@ -47,21 +47,25 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
private def writeObject(out: ObjectOutputStream): Unit = {
val sfactory = SparkEnv.get.serializer
- // treat java serializer with default action rather
- // than going thru serialization,
- // to avoid a separate serialization header.
+
+ // Treat java serializer with default action rather than going thru serialization, to avoid a
+ // separate serialization header.
+
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ => {
- // for every other serializer, we
- // assume that it would support Seq[T] and
- // do so efficiently.
- val ser = sfactory.newInstance()
+
out.writeLong(rddId)
out.writeInt(slice)
- val bb = ser.serialize(values)
- out.writeInt(bb.remaining())
- Utils.writeByteBuffer(bb, out)
+
+ val ser = sfactory.newInstance()
+
+ out.writeInt(values.size)
+ values.foreach(v => {
+ val bb = ser.serialize(v)
+ out.writeInt(bb.remaining())
+ Utils.writeByteBuffer(bb, out)
+ })
}
}
}
@@ -73,15 +77,24 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
- val ser = sfactory.newInstance()
rddId = in.readLong()
slice = in.readInt()
+ val ser = sfactory.newInstance()
val s = in.readInt()
- val bb = ByteBuffer.allocate(s)
- in.readFully(bb.array(), 0, s)
- bb.limit(s)
- values = ser.deserialize(bb)
+ var bb: ByteBuffer = null
+ values = (0 until s).map(i => {
+ val len = in.readInt()
+ if (bb == null || bb.capacity < len) {
+ bb = ByteBuffer.allocate(len)
+ } else {
+ bb.clear
+ }
+
+ in.readFully(bb.array(), 0, len);
+ bb.limit(len)
+ ser.deserialize(bb): T
+ }).toSeq
}
}
}