aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-24 14:08:37 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-24 14:08:37 -0700
commit6ea085169d8ba2d09ca9236273d65238b8411f04 (patch)
tree8dc9787db615364ea88ec4133b65b04902395f4c /core
parentdbbedfc535dd92be2d12e4fc03165fffa9db1ead (diff)
downloadspark-6ea085169d8ba2d09ca9236273d65238b8411f04.tar.gz
spark-6ea085169d8ba2d09ca9236273d65238b8411f04.tar.bz2
spark-6ea085169d8ba2d09ca9236273d65238b8411f04.zip
Fixed the bug that shuffle serializer is ignored by the new shuffle
block iterators for local blocks. Also added a unit test for that.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockFetcherIterator.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala21
2 files changed, 19 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index 43f835237c..88eed0d8c8 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -163,7 +163,7 @@ object BlockFetcherIterator {
// these all at once because they will just memory-map some files, so they won't consume
// any memory that might exceed our maxBytesInFlight
for (id <- localBlockIds) {
- getLocal(id) match {
+ getLocalFromDisk(id, serializer) match {
case Some(iter) => {
// Pass 0 as size since it's not in flight
results.put(new FetchResult(id, 0, () => iter))
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 2b2a90defa..fdee7ca384 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -99,7 +99,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
-
+
test("reduceByKey with partitioner") {
sc = new SparkContext("local", "test")
val p = new Partitioner() {
@@ -272,7 +272,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
- // more partitions/no partitioner so a shuffle dependency
+ // more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
@@ -298,18 +298,33 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
- // more partitions/no partitioner so a shuffle dependency
+ // more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtractByKey(b)
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitioner.get === p)
}
+ test("shuffle serializer") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[1,2,512]", "test")
+ val a = sc.parallelize(1 to 10, 2)
+ val b = a.map { x =>
+ (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ }
+ // If the Kryo serializer is not used correctly, the shuffle would fail because the
+ // default Java serializer cannot handle the non serializable class.
+ val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
+ assert(c.count === 10)
+ }
}
object ShuffleSuite {
+
def mergeCombineException(x: Int, y: Int): Int = {
throw new SparkException("Exception for map-side combine.")
x + y
}
+
+ class NonJavaSerializableClass(val value: Int)
}