aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-06-12 16:38:37 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-06-12 16:38:37 -0700
commit5da4287b1dbfb8cfcec9c915926d8a8755bd52b2 (patch)
tree5b0d5e79fed362bfc67fcc56345ee17102b8423f /core/src/test/scala
parent5e9a9317c5ef4c549994989bae7826dcb37d1a3b (diff)
parentac480fd977e0de97bcfe646e39feadbd239c1c29 (diff)
downloadspark-5da4287b1dbfb8cfcec9c915926d8a8755bd52b2.tar.gz
spark-5da4287b1dbfb8cfcec9c915926d8a8755bd52b2.tar.bz2
spark-5da4287b1dbfb8cfcec9c915926d8a8755bd52b2.zip
Merge branch 'netty-dbg' of github.com:shivaram/spark into netty-dbg
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala26
1 files changed, 26 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index b967016cf7..33b02fff80 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -367,6 +367,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(nonEmptyBlocks.size <= 4)
}
+ test("zero sized blocks without kryo") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+
+ // 10 partitions from 4 keys
+ val NUM_BLOCKS = 10
+ val a = sc.parallelize(1 to 4, NUM_BLOCKS)
+ val b = a.map(x => (x, x*2))
+
+ // NOTE: The default Java serializer doesn't create zero-sized blocks.
+ // So, use Kryo
+ val c = new ShuffledRDD(b, new HashPartitioner(10))
+
+ val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+ assert(c.count === 4)
+
+ val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
+ val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
+ statuses.map(x => x._2)
+ }
+ val nonEmptyBlocks = blockSizes.filter(x => x > 0)
+
+ // We should have at most 4 non-zero sized partitions
+ assert(nonEmptyBlocks.size <= 4)
+ }
+
}
object ShuffleSuite {