diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala | 7 |
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9f5a72bae0..5c7ae57b78 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1278,12 +1278,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() - if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) { - // This is a warning instead of an exception in order to avoid breaking user programs that - // might have created RDD broadcast variables but not used them: - logWarning("Can not directly broadcast RDDs; instead, call collect() and " - + "broadcast the result (see SPARK-5063)") - } + require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), + "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index f97cfbba32..6657104823 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -130,6 +130,13 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { assert(thrown.getMessage.toLowerCase.contains("stopped")) } + test("Forbid broadcasting RDD directly") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + val rdd = sc.parallelize(1 to 4) + intercept[IllegalArgumentException] { sc.broadcast(rdd) } + sc.stop() + } + /** * Verify the persistence of state associated with an TorrentBroadcast in a local-cluster. * |