aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala10
1 files changed, 6 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index ad1d29a79a..29e0dd26d4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -89,7 +89,12 @@ extends Broadcast[T](id) with Logging with Serializable {
if (receiveBroadcast(id)) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
- SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
+
+ // Store the merged copy in cache so that the next worker doesn't need to rebuild it.
+ // This creates a tradeoff between memory usage and latency.
+ // Storing copy doubles the memory footprint; not storing doubles deserialization cost.
+ SparkEnv.get.blockManager.putSingle(
+ broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
// Remove arrayOfBlocks from memory once value_ is on local cache
resetWorkerVariables()
@@ -111,9 +116,6 @@ extends Broadcast[T](id) with Logging with Serializable {
}
def receiveBroadcast(variableID: Long): Boolean = {
- if (totalBlocks > 0 && totalBlocks == hasBlocks)
- return true
-
// Receive meta-info
val metaId = broadcastId + "_meta"
var attemptId = 10