aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Lyubimov <dlyubimov@apache.org>2013-07-29 18:53:37 -0700
committerDmitriy Lyubimov <dlyubimov@apache.org>2013-07-30 11:04:11 -0700
commit1bca91633eaff0c44d60682622f57eebca09c4ff (patch)
tree34f557a3da6cdf37dddcc5d2fb63229af1966925
parent8e5cd041bbf7f802794b8f6e960f702fb59e5863 (diff)
downloadspark-1bca91633eaff0c44d60682622f57eebca09c4ff.tar.gz
spark-1bca91633eaff0c44d60682622f57eebca09c4ff.tar.bz2
spark-1bca91633eaff0c44d60682622f57eebca09c4ff.zip
+ bug fixes;
test added Conflicts: core/src/test/scala/spark/KryoSerializerSuite.scala
-rw-r--r--core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala20
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala8
2 files changed, 20 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index 104257ac07..dfeb7be077 100644
--- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -21,14 +21,18 @@ import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
import spark._
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import java.io._
import java.nio.ByteBuffer
+import scala.Serializable
private[spark] class ParallelCollectionPartition[T: ClassManifest](
var rddId: Long,
var slice: Int,
var values: Seq[T])
- extends Partition with Externalizable {
+ extends Partition with Serializable {
+
+ // for externalization
+ def this() = this(0, 0, null)
def iterator: Iterator[T] = values.iterator
@@ -41,7 +45,8 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
override def index: Int = slice
- override def writeExternal(out: ObjectOutput) {
+ @throws(classOf[IOException])
+ private def writeObject(out: ObjectOutputStream): Unit = {
out.writeLong(rddId)
out.writeInt(slice)
out.writeInt(values.size)
@@ -59,22 +64,23 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
})
}
- override def readExternal(in: ObjectInput) {
+ @throws(classOf[IOException])
+ private def readObject(in: ObjectInputStream): Unit = {
rddId = in.readLong()
slice = in.readInt()
val s = in.readInt()
val ser = SparkEnv.get.serializer.newInstance()
var bb = ByteBuffer.allocate(1024)
- values = (0 until s).map({
+ values = (0 until s).map(i => {
val s = in.readInt()
if (bb.capacity() < s) {
bb = ByteBuffer.allocate(s)
} else {
bb.clear()
}
- in.readFully(bb.array())
+ in.readFully(bb.array(), 0, s)
bb.limit(s)
- ser.deserialize(bb)
+ ser.deserialize(bb): T
}).toSeq
}
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 793b0b66c4..5ae7a427cc 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.FunSuite
import com.esotericsoftware.kryo._
import SparkContext._
-import spark.test.{ClassWithoutNoArgConstructor, MyRegistrator}
+import spark.test._
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("basic types") {
@@ -134,6 +134,12 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
assert(control == result.toSeq)
}
+ test("kryo with parallelize") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control.map(new ClassWithoutNoArgConstructor(_))).map(_.x).collect()
+ assert (control == result.toSeq)
+ }
+
override def beforeAll() {
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)