diff options
author | Wesley Tang <tangmingjun@mininglamp.com> | 2016-03-16 16:12:17 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-16 16:12:17 +0000 |
commit | 5f6bdf97c5d2d78db0ca39703084d8ae377346b8 (patch) | |
tree | 7c1fa312e8d99bec4f3aae393d6ecbbf145bf7ce | |
parent | 9412547e7a58c40e7be5acacaf15cc48057cb18a (diff) | |
download | spark-5f6bdf97c5d2d78db0ca39703084d8ae377346b8.tar.gz spark-5f6bdf97c5d2d78db0ca39703084d8ae377346b8.tar.bz2 spark-5f6bdf97c5d2d78db0ca39703084d8ae377346b8.zip |
[SPARK-13281][CORE] Switch broadcast of RDD to exception from warning
## What changes were proposed in this pull request?
In SparkContext, throw Illegalargumentexception when trying to broadcast rdd directly, instead of logging the warning.
## How was this patch tested?
mvn clean install
Add UT in BroadcastSuite
Author: Wesley Tang <tangmingjun@mininglamp.com>
Closes #11735 from breakdawn/master.
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala | 7 |
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9f5a72bae0..5c7ae57b78 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1278,12 +1278,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() - if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) { - // This is a warning instead of an exception in order to avoid breaking user programs that - // might have created RDD broadcast variables but not used them: - logWarning("Can not directly broadcast RDDs; instead, call collect() and " - + "broadcast the result (see SPARK-5063)") - } + require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), + "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index f97cfbba32..6657104823 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -130,6 +130,13 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { assert(thrown.getMessage.toLowerCase.contains("stopped")) } + test("Forbid broadcasting RDD directly") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + val rdd = sc.parallelize(1 to 4) + intercept[IllegalArgumentException] { sc.broadcast(rdd) } + sc.stop() + } + /** * Verify the persistence of state associated with an TorrentBroadcast in a local-cluster. * |