aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala7
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9f5a72bae0..5c7ae57b78 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1278,12 +1278,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
- if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
- // This is a warning instead of an exception in order to avoid breaking user programs that
- // might have created RDD broadcast variables but not used them:
- logWarning("Can not directly broadcast RDDs; instead, call collect() and "
- + "broadcast the result (see SPARK-5063)")
- }
+ require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
+ "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index f97cfbba32..6657104823 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -130,6 +130,13 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
assert(thrown.getMessage.toLowerCase.contains("stopped"))
}
+ test("Forbid broadcasting RDD directly") {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val rdd = sc.parallelize(1 to 4)
+ intercept[IllegalArgumentException] { sc.broadcast(rdd) }
+ sc.stop()
+ }
+
/**
* Verify the persistence of state associated with an TorrentBroadcast in a local-cluster.
*