aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-06-29 23:00:00 -0700
committerReynold Xin <rxin@apache.org>2014-06-29 23:00:00 -0700
commit66135a341d9f8baecc149d13ae5511f14578c395 (patch)
treea7abe8a4aeb0e33b8c78b53fde6c62dbd08bdf5a /core/src/test
parent7b71a0e09622e09285a9884ebb67b5fb1c5caa53 (diff)
downloadspark-66135a341d9f8baecc149d13ae5511f14578c395.tar.gz
spark-66135a341d9f8baecc149d13ae5511f14578c395.tar.bz2
spark-66135a341d9f8baecc149d13ae5511f14578c395.zip
[SPARK-2104] Fix task serializing issues when sort with Java non serializable class
Details can be see in [SPARK-2104](https://issues.apache.org/jira/browse/SPARK-2104). This work is based on Reynold's work, add some unit tests to validate the issue. @rxin , would you please take a look at this PR, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #1245 from jerryshao/SPARK-2104 and squashes the following commits: c8ee362 [jerryshao] Make field partitions transient 2b41917 [jerryshao] Minor changes 47d763c [jerryshao] Fix task serializing issue when sort with Java non serializable class
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala42
1 files changed, 41 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index b40fee7e9a..c4f2f7e34f 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -206,6 +206,42 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// substracted rdd return results as Tuple2
results(0) should be ((3, 33))
}
+
+ test("sort with Java non serializable class - Kryo") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ val conf = new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .setAppName("test")
+ .setMaster("local-cluster[2,1,512]")
+ sc = new SparkContext(conf)
+ val a = sc.parallelize(1 to 10, 2)
+ val b = a.map { x =>
+ (new NonJavaSerializableClass(x), x)
+ }
+ // If the Kryo serializer is not used correctly, the shuffle would fail because the
+ // default Java serializer cannot handle the non serializable class.
+ val c = b.sortByKey().map(x => x._2)
+ assert(c.collect() === Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+ }
+
+ test("sort with Java non serializable class - Java") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ val conf = new SparkConf()
+ .setAppName("test")
+ .setMaster("local-cluster[2,1,512]")
+ sc = new SparkContext(conf)
+ val a = sc.parallelize(1 to 10, 2)
+ val b = a.map { x =>
+ (new NonJavaSerializableClass(x), x)
+ }
+ // default Java serializer cannot handle the non serializable class.
+ val thrown = intercept[SparkException] {
+ b.sortByKey().collect()
+ }
+
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+ }
}
object ShuffleSuite {
@@ -215,5 +251,9 @@ object ShuffleSuite {
x + y
}
- class NonJavaSerializableClass(val value: Int)
+ class NonJavaSerializableClass(val value: Int) extends Comparable[NonJavaSerializableClass] {
+ override def compareTo(o: NonJavaSerializableClass): Int = {
+ value - o.value
+ }
+ }
}