aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala12
3 files changed, 22 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index f350784378..22d01c47e6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
- blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
- case Some(x) =>
- releaseLock(broadcastId)
- x.asInstanceOf[T]
-
+ blockManager.getLocalValues(broadcastId) match {
+ case Some(blockResult) =>
+ if (blockResult.data.hasNext) {
+ val x = blockResult.data.next().asInstanceOf[T]
+ releaseLock(broadcastId)
+ x
+ } else {
+ throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
+ }
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index c08275c7e0..fb54dd66a3 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -702,7 +702,7 @@ private[storage] class PartiallyUnrolledIterator[T](
}
override def next(): T = {
- if (unrolled == null) {
+ if (unrolled == null || !unrolled.hasNext) {
rest.next()
} else {
unrolled.next()
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 973676398a..6646068d50 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
+ test("Cache broadcast to disk") {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.storage.memoryFraction", "0.0")
+ sc = new SparkContext(conf)
+ val list = List[Int](1, 2, 3, 4)
+ val broadcast = sc.broadcast(list)
+ assert(broadcast.value.sum === 10)
+ }
+
/**
* Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
*