From 66135a341d9f8baecc149d13ae5511f14578c395 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sun, 29 Jun 2014 23:00:00 -0700 Subject: [SPARK-2104] Fix task serializing issues when sort with Java non serializable class Details can be see in [SPARK-2104](https://issues.apache.org/jira/browse/SPARK-2104). This work is based on Reynold's work, add some unit tests to validate the issue. @rxin , would you please take a look at this PR, thanks a lot. Author: jerryshao Closes #1245 from jerryshao/SPARK-2104 and squashes the following commits: c8ee362 [jerryshao] Make field partitions transient 2b41917 [jerryshao] Minor changes 47d763c [jerryshao] Fix task serializing issue when sort with Java non serializable class --- .../test/scala/org/apache/spark/ShuffleSuite.scala | 42 +++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) (limited to 'core/src/test') diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index b40fee7e9a..c4f2f7e34f 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -206,6 +206,42 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { // substracted rdd return results as Tuple2 results(0) should be ((3, 33)) } + + test("sort with Java non serializable class - Kryo") { + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .setAppName("test") + .setMaster("local-cluster[2,1,512]") + sc = new SparkContext(conf) + val a = sc.parallelize(1 to 10, 2) + val b = a.map { x => + (new NonJavaSerializableClass(x), x) + } + // If the Kryo serializer is not used correctly, the shuffle would fail because the + // default Java serializer cannot handle the non serializable class. + val c = b.sortByKey().map(x => x._2) + assert(c.collect() === Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + } + + test("sort with Java non serializable class - Java") { + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + val conf = new SparkConf() + .setAppName("test") + .setMaster("local-cluster[2,1,512]") + sc = new SparkContext(conf) + val a = sc.parallelize(1 to 10, 2) + val b = a.map { x => + (new NonJavaSerializableClass(x), x) + } + // default Java serializer cannot handle the non serializable class. + val thrown = intercept[SparkException] { + b.sortByKey().collect() + } + + assert(thrown.getClass === classOf[SparkException]) + assert(thrown.getMessage.contains("NotSerializableException")) + } } object ShuffleSuite { @@ -215,5 +251,9 @@ object ShuffleSuite { x + y } - class NonJavaSerializableClass(val value: Int) + class NonJavaSerializableClass(val value: Int) extends Comparable[NonJavaSerializableClass] { + override def compareTo(o: NonJavaSerializableClass): Int = { + value - o.value + } + } } -- cgit v1.2.3