aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala162
1 files changed, 100 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 2595c15104..2b32546c68 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -17,24 +17,43 @@
package org.apache.spark.broadcast
-import java.io._
+import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
import scala.math
import scala.util.Random
-import org.apache.spark._
-import org.apache.spark.storage.{BroadcastBlockId, BroadcastHelperBlockId, StorageLevel}
+import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
+import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.Utils
+/**
+ * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
+ * protocol to do a distributed transfer of the broadcasted data to the executors.
+ * The mechanism is as follows. The driver divides the serializes the broadcasted data,
+ * divides it into smaller chunks, and stores them in the BlockManager of the driver.
+ * These chunks are reported to the BlockManagerMaster so that all the executors can
+ * learn the location of those chunks. The first time the broadcast variable (sent as
+ * part of task) is deserialized at a executor, all the chunks are fetched using
+ * the BlockManager. When all the chunks are fetched (initially from the driver's
+ * BlockManager), they are combined and deserialized to recreate the broadcasted data.
+ * However, the chunks are also stored in the BlockManager and reported to the
+ * BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
+ * multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
+ * made to other executors who already have those chunks, resulting in a distributed
+ * fetching. This prevents the driver from being the bottleneck in sending out multiple
+ * copies of the broadcast data (one per executor) as done by the
+ * [[org.apache.spark.broadcast.HttpBroadcast]].
+ */
private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
-extends Broadcast[T](id) with Logging with Serializable {
+ extends Broadcast[T](id) with Logging with Serializable {
- def value = value_
+ def getValue = value_
- def broadcastId = BroadcastBlockId(id)
+ val broadcastId = BroadcastBlockId(id)
TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ SparkEnv.get.blockManager.putSingle(
+ broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
@transient var arrayOfBlocks: Array[TorrentBlock] = null
@@ -46,32 +65,52 @@ extends Broadcast[T](id) with Logging with Serializable {
sendBroadcast()
}
- def sendBroadcast() {
- var tInfo = TorrentBroadcast.blockifyObject(value_)
+ /**
+ * Remove all persisted state associated with this Torrent broadcast on the executors.
+ */
+ def doUnpersist(blocking: Boolean) {
+ TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
+ }
+
+ /**
+ * Remove all persisted state associated with this Torrent broadcast on the executors
+ * and driver.
+ */
+ def doDestroy(blocking: Boolean) {
+ TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
+ }
+ def sendBroadcast() {
+ val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
hasBlocks = tInfo.totalBlocks
// Store meta-info
- val metaId = BroadcastHelperBlockId(broadcastId, "meta")
+ val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
- metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, true)
+ metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
// Store individual pieces
for (i <- 0 until totalBlocks) {
- val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + i)
+ val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
- pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, true)
+ pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}
}
- // Called by JVM when deserializing an object
+ /** Used by the JVM when serializing this object. */
+ private def writeObject(out: ObjectOutputStream) {
+ assertValid()
+ out.defaultWriteObject()
+ }
+
+ /** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
@@ -86,18 +125,22 @@ extends Broadcast[T](id) with Logging with Serializable {
// Initialize @transient variables that will receive garbage values from the master.
resetWorkerVariables()
- if (receiveBroadcast(id)) {
+ if (receiveBroadcast()) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
- // Store the merged copy in cache so that the next worker doesn't need to rebuild it.
- // This creates a tradeoff between memory usage and latency.
- // Storing copy doubles the memory footprint; not storing doubles deserialization cost.
+ /* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
+ * This creates a trade-off between memory usage and latency. Storing copy doubles
+ * the memory footprint; not storing doubles deserialization cost. Also,
+ * this does not need to be reported to BlockManagerMaster since other executors
+ * does not need to access this block (they only need to fetch the chunks,
+ * which are reported).
+ */
SparkEnv.get.blockManager.putSingle(
- broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// Remove arrayOfBlocks from memory once value_ is on local cache
resetWorkerVariables()
- } else {
+ } else {
logError("Reading broadcast variable " + id + " failed")
}
@@ -114,9 +157,10 @@ extends Broadcast[T](id) with Logging with Serializable {
hasBlocks = 0
}
- def receiveBroadcast(variableID: Long): Boolean = {
- // Receive meta-info
- val metaId = BroadcastHelperBlockId(broadcastId, "meta")
+ def receiveBroadcast(): Boolean = {
+ // Receive meta-info about the size of broadcast data,
+ // the number of chunks it is divided into, etc.
+ val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
@@ -138,17 +182,21 @@ extends Broadcast[T](id) with Logging with Serializable {
return false
}
- // Receive actual blocks
+ /*
+ * Fetch actual chunks of data. Note that all these chunks are stored in
+ * the BlockManager and reported to the master, so that other executors
+ * can find out and pull the chunks from this executor.
+ */
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
- val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid)
+ val pieceId = BroadcastBlockId(id, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
- pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, true)
+ pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
@@ -156,16 +204,16 @@ extends Broadcast[T](id) with Logging with Serializable {
}
}
- (hasBlocks == totalBlocks)
+ hasBlocks == totalBlocks
}
}
-private object TorrentBroadcast
-extends Logging {
-
+private[spark] object TorrentBroadcast extends Logging {
+ private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
+
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
@@ -179,39 +227,37 @@ extends Logging {
initialized = false
}
- lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
-
def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
val bais = new ByteArrayInputStream(byteArray)
- var blockNum = (byteArray.length / BLOCK_SIZE)
+ var blockNum = byteArray.length / BLOCK_SIZE
if (byteArray.length % BLOCK_SIZE != 0) {
blockNum += 1
}
- var retVal = new Array[TorrentBlock](blockNum)
- var blockID = 0
+ val blocks = new Array[TorrentBlock](blockNum)
+ var blockId = 0
for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {
val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)
- var tempByteArray = new Array[Byte](thisBlockSize)
- val hasRead = bais.read(tempByteArray, 0, thisBlockSize)
+ val tempByteArray = new Array[Byte](thisBlockSize)
+ bais.read(tempByteArray, 0, thisBlockSize)
- retVal(blockID) = new TorrentBlock(blockID, tempByteArray)
- blockID += 1
+ blocks(blockId) = new TorrentBlock(blockId, tempByteArray)
+ blockId += 1
}
bais.close()
- val tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
- tInfo.hasBlocks = blockNum
-
- tInfo
+ val info = TorrentInfo(blocks, blockNum, byteArray.length)
+ info.hasBlocks = blockNum
+ info
}
- def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
- totalBytes: Int,
- totalBlocks: Int): T = {
+ def unBlockifyObject[T](
+ arrayOfBlocks: Array[TorrentBlock],
+ totalBytes: Int,
+ totalBlocks: Int): T = {
val retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
@@ -220,6 +266,13 @@ extends Logging {
Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader)
}
+ /**
+ * Remove all persisted blocks associated with this torrent broadcast on the executors.
+ * If removeFromDriver is true, also remove these persisted blocks on the driver.
+ */
+ def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
+ SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
+ }
}
private[spark] case class TorrentBlock(
@@ -228,25 +281,10 @@ private[spark] case class TorrentBlock(
extends Serializable
private[spark] case class TorrentInfo(
- @transient arrayOfBlocks : Array[TorrentBlock],
+ @transient arrayOfBlocks: Array[TorrentBlock],
totalBlocks: Int,
totalBytes: Int)
extends Serializable {
@transient var hasBlocks = 0
}
-
-/**
- * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast.
- */
-class TorrentBroadcastFactory extends BroadcastFactory {
-
- def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
- TorrentBroadcast.initialize(isDriver, conf)
- }
-
- def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
- new TorrentBroadcast[T](value_, isLocal, id)
-
- def stop() { TorrentBroadcast.stop() }
-}