aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-10-22 11:59:28 -0700
committerReynold Xin <rxin@databricks.com>2016-10-22 11:59:28 -0700
commit4f1dcd3dce270268b42fbe59409790364fa5c5df (patch)
tree7ba81fa5e5a4f7ca5cacd6c1bab81a2c86975e93 /core/src
parent5fa9f8795a71e08bcbef5975ba8c072db5be8866 (diff)
downloadspark-4f1dcd3dce270268b42fbe59409790364fa5c5df.tar.gz
spark-4f1dcd3dce270268b42fbe59409790364fa5c5df.tar.bz2
spark-4f1dcd3dce270268b42fbe59409790364fa5c5df.zip
[SPARK-18051][SPARK CORE] fix bug of custom PartitionCoalescer causing serialization exception
## What changes were proposed in this pull request? add a require check in `CoalescedRDD` to make sure the passed in `partitionCoalescer` to be `serializable`. and update the document for api `RDD.coalesce` ## How was this patch tested? Manual.(test code in jira [SPARK-18051]) Author: WeichenXu <WeichenXu123@outlook.com> Closes #15587 from WeichenXu123/fix_coalescer_bug.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala3
2 files changed, 6 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 9c198a61f3..2cba1febe8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -80,6 +80,10 @@ private[spark] class CoalescedRDD[T: ClassTag](
require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
s"Number of partitions ($maxPartitions) must be positive.")
+ if (partitionCoalescer.isDefined) {
+ require(partitionCoalescer.get.isInstanceOf[Serializable],
+ "The partition coalescer passed in must be serializable.")
+ }
override def getPartitions: Array[Partition] = {
val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index be119578d2..db535de9e9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -432,7 +432,8 @@ abstract class RDD[T: ClassTag](
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
- * data distributed using a hash partitioner.
+ * data distributed using a hash partitioner. The optional partition coalescer
+ * passed in must be serializable.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)