aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@cs.berkeley.edu>2014-10-27 08:45:36 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2014-10-27 08:45:36 -0700
commit9aa340a23fd7532f5e72c3352df92ce3e857fc80 (patch)
tree7948a24fbea1636e34cc8a7012295d0ed3e00e5a
parent6377adaf3212b4facb4af644b70b7e99455cef48 (diff)
downloadspark-9aa340a23fd7532f5e72c3352df92ce3e857fc80.tar.gz
spark-9aa340a23fd7532f5e72c3352df92ce3e857fc80.tar.bz2
spark-9aa340a23fd7532f5e72c3352df92ce3e857fc80.zip
[SPARK-4030] Make destroy public for broadcast variables
This change makes the destroy function public for broadcast variables. Motivation for the change is described in https://issues.apache.org/jira/browse/SPARK-4030. This patch also logs where destroy was called from if a broadcast variable is used after destruction. Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Closes #2922 from shivaram/broadcast-destroy and squashes the following commits: a11abab [Shivaram Venkataraman] Fix scala style in Utils.scala bed9c9d [Shivaram Venkataraman] Make destroy blocking by default e80c1ab [Shivaram Venkataraman] Make destroy public for broadcast variables Also log where destroy was called from if a broadcast variable is used after destruction.
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala20
3 files changed, 41 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 15fd30e657..87f5cf944e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -20,6 +20,8 @@ package org.apache.spark.broadcast
import java.io.Serializable
import org.apache.spark.SparkException
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
import scala.reflect.ClassTag
@@ -52,7 +54,7 @@ import scala.reflect.ClassTag
* @param id A unique identifier for the broadcast variable.
* @tparam T Type of the data contained in the broadcast variable.
*/
-abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
+abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
/**
* Flag signifying whether the broadcast variable is valid
@@ -60,6 +62,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
*/
@volatile private var _isValid = true
+ private var _destroySite = ""
+
/** Get the broadcasted value. */
def value: T = {
assertValid()
@@ -84,13 +88,26 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
doUnpersist(blocking)
}
+
+ /**
+ * Destroy all data and metadata related to this broadcast variable. Use this with caution;
+ * once a broadcast variable has been destroyed, it cannot be used again.
+ * This method blocks until destroy has completed
+ */
+ def destroy() {
+ destroy(blocking = true)
+ }
+
/**
* Destroy all data and metadata related to this broadcast variable. Use this with caution;
* once a broadcast variable has been destroyed, it cannot be used again.
+ * @param blocking Whether to block until destroy has completed
*/
private[spark] def destroy(blocking: Boolean) {
assertValid()
_isValid = false
+ _destroySite = Utils.getCallSite().shortForm
+ logInfo("Destroying %s (from %s)".format(toString, _destroySite))
doDestroy(blocking)
}
@@ -124,7 +141,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
/** Check if this broadcast is valid. If not valid, exception is thrown. */
protected def assertValid() {
if (!_isValid) {
- throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
+ throw new SparkException(
+ "Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d722ee5a97..84ed5db8f0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -992,7 +992,8 @@ private[spark] object Utils extends Logging {
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
// finding the call site of a method.
- val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
+ val SPARK_CORE_CLASS_REGEX =
+ """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
val SCALA_CLASS_REGEX = """^scala""".r
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index e096c8c3e9..1014fd62d9 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.broadcast
import scala.util.Random
-import org.scalatest.FunSuite
+import org.scalatest.{Assertions, FunSuite}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.io.SnappyCompressionCodec
@@ -136,6 +136,12 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
test("Unpersisting TorrentBroadcast on executors and driver in distributed mode") {
testUnpersistTorrentBroadcast(distributed = true, removeFromDriver = true)
}
+
+ test("Using broadcast after destroy prints callsite") {
+ sc = new SparkContext("local", "test")
+ testPackage.runCallSiteTest(sc)
+ }
+
/**
* Verify the persistence of state associated with an HttpBroadcast in either local mode or
* local-cluster mode (when distributed = true).
@@ -311,3 +317,15 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
conf
}
}
+
+package object testPackage extends Assertions {
+
+ def runCallSiteTest(sc: SparkContext) {
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val broadcast = sc.broadcast(rdd)
+ broadcast.destroy()
+ val thrown = intercept[SparkException] { broadcast.value }
+ assert(thrown.getMessage.contains("BroadcastSuite.scala"))
+ }
+
+}