aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-05-29 00:52:55 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-05-29 00:52:55 -0700
commitb79b10a6d60a7f1f199e6bddd1243a05c57526ad (patch)
tree47c4924aaf4c375513f134ec209409f34362abdb /core
parentfbc1ab346867d5c81dc59e4c8d85aeda2f516ce2 (diff)
downloadspark-b79b10a6d60a7f1f199e6bddd1243a05c57526ad.tar.gz
spark-b79b10a6d60a7f1f199e6bddd1243a05c57526ad.tar.bz2
spark-b79b10a6d60a7f1f199e6bddd1243a05c57526ad.zip
Flush serializer to fix zero-size kryo blocks bug.
Also convert the local-cluster test case to check for non-zero block sizes
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala22
2 files changed, 19 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 57d4dafefc..1829c2f92e 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -59,6 +59,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
// Flush the partial writes, and set valid length to be the length of the entire file.
// Return the number of bytes written for this commit.
override def commit(): Long = {
+ // NOTE: Flush the serializer first and then the compressed/buffered output stream
+ objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index a4fe14b9ae..271f4a4e44 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -305,15 +305,27 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(c.partitioner.get === p)
}
- test("shuffle local cluster") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ test("shuffle non-zero block size") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
+ val NUM_BLOCKS = 3
+
val a = sc.parallelize(1 to 10, 2)
- val b = a.map {
- x => (x, x * 2)
+ val b = a.map { x =>
+ (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
}
- val c = new ShuffledRDD(b, new HashPartitioner(3))
+ // If the Kryo serializer is not used correctly, the shuffle would fail because the
+ // default Java serializer cannot handle the non serializable class.
+ val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
+ classOf[spark.KryoSerializer].getName)
+ val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+
assert(c.count === 10)
+
+ // All blocks must have non-zero size
+ (0 until NUM_BLOCKS).foreach { id =>
+ val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
+ assert(statuses.forall(s => s._2 > 0))
+ }
}
test("shuffle serializer") {