aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorWesley Tang <tangmingjun@mininglamp.com>2016-03-16 16:12:17 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 16:12:17 +0000
commit5f6bdf97c5d2d78db0ca39703084d8ae377346b8 (patch)
tree7c1fa312e8d99bec4f3aae393d6ecbbf145bf7ce /core
parent9412547e7a58c40e7be5acacaf15cc48057cb18a (diff)
downloadspark-5f6bdf97c5d2d78db0ca39703084d8ae377346b8.tar.gz
spark-5f6bdf97c5d2d78db0ca39703084d8ae377346b8.tar.bz2
spark-5f6bdf97c5d2d78db0ca39703084d8ae377346b8.zip
[SPARK-13281][CORE] Switch broadcast of RDD to exception from warning
## What changes were proposed in this pull request? In SparkContext, throw Illegalargumentexception when trying to broadcast rdd directly, instead of logging the warning. ## How was this patch tested? mvn clean install Add UT in BroadcastSuite Author: Wesley Tang <tangmingjun@mininglamp.com> Closes #11735 from breakdawn/master.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala7
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9f5a72bae0..5c7ae57b78 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1278,12 +1278,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
- if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
- // This is a warning instead of an exception in order to avoid breaking user programs that
- // might have created RDD broadcast variables but not used them:
- logWarning("Can not directly broadcast RDDs; instead, call collect() and "
- + "broadcast the result (see SPARK-5063)")
- }
+ require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
+ "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index f97cfbba32..6657104823 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -130,6 +130,13 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
assert(thrown.getMessage.toLowerCase.contains("stopped"))
}
+ test("Forbid broadcasting RDD directly") {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val rdd = sc.parallelize(1 to 4)
+ intercept[IllegalArgumentException] { sc.broadcast(rdd) }
+ sc.stop()
+ }
+
/**
* Verify the persistence of state associated with an TorrentBroadcast in a local-cluster.
*