aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2013-10-16 13:37:58 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2013-10-16 21:33:33 -0700
commita8d0981832ba71415a35c16cdc2bedb98bbfcdb9 (patch)
tree54a584caf8a0e47e5defe1da306f17c59a05a93a
parentfeb45d391f8d09c120d7d43e72e96e9bf9784fa0 (diff)
downloadspark-a8d0981832ba71415a35c16cdc2bedb98bbfcdb9.tar.gz
spark-a8d0981832ba71415a35c16cdc2bedb98bbfcdb9.tar.bz2
spark-a8d0981832ba71415a35c16cdc2bedb98bbfcdb9.zip
Fixes for the new BlockId naming convention.
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala9
2 files changed, 14 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index c174804e9a..3341401c8a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -23,7 +23,7 @@ import scala.math
import scala.util.Random
import org.apache.spark._
-import org.apache.spark.storage.{BlockManager, StorageLevel}
+import org.apache.spark.storage.{BroadcastBlockId, BroadcastHelperBlockId, StorageLevel}
import org.apache.spark.util.Utils
@@ -32,7 +32,7 @@ extends Broadcast[T](id) with Logging with Serializable {
def value = value_
- def broadcastId = BlockManager.toBroadcastId(id)
+ def broadcastId = BroadcastBlockId(id)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
@@ -55,7 +55,7 @@ extends Broadcast[T](id) with Logging with Serializable {
hasBlocks = tInfo.totalBlocks
// Store meta-info
- val metaId = broadcastId + "_meta"
+ val metaId = BroadcastHelperBlockId(broadcastId, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
@@ -64,7 +64,7 @@ extends Broadcast[T](id) with Logging with Serializable {
// Store individual pieces
for (i <- 0 until totalBlocks) {
- val pieceId = broadcastId + "_piece_" + i
+ val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, true)
@@ -117,7 +117,7 @@ extends Broadcast[T](id) with Logging with Serializable {
def receiveBroadcast(variableID: Long): Boolean = {
// Receive meta-info
- val metaId = broadcastId + "_meta"
+ val metaId = BroadcastHelperBlockId(broadcastId, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
@@ -141,7 +141,7 @@ extends Broadcast[T](id) with Logging with Serializable {
// Receive actual blocks
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
- val pieceId = broadcastId + "_piece_" + pid
+ val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c7efc67a4a..7156d855d8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -32,7 +32,7 @@ private[spark] sealed abstract class BlockId {
def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD = isInstanceOf[RDDBlockId]
def isShuffle = isInstanceOf[ShuffleBlockId]
- def isBroadcast = isInstanceOf[BroadcastBlockId]
+ def isBroadcast = isInstanceOf[BroadcastBlockId] || isInstanceOf[BroadcastHelperBlockId]
override def toString = name
override def hashCode = name.hashCode
@@ -55,6 +55,10 @@ private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId {
def name = "broadcast_" + broadcastId
}
+private[spark] case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId {
+ def name = broadcastId.name + "_" + hType
+}
+
private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
def name = "taskresult_" + taskId
}
@@ -72,6 +76,7 @@ private[spark] object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val BROADCAST = "broadcast_([0-9]+)".r
+ val BROADCAST_HELPER = "broadcast_([0-9]+)_([A-Za-z0-9]+)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEST = "test_(.*)".r
@@ -84,6 +89,8 @@ private[spark] object BlockId {
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case BROADCAST(broadcastId) =>
BroadcastBlockId(broadcastId.toLong)
+ case BROADCAST_HELPER(broadcastId, hType) =>
+ BroadcastHelperBlockId(BroadcastBlockId(broadcastId.toLong), hType)
case TASKRESULT(taskId) =>
TaskResultBlockId(taskId.toLong)
case STREAM(streamId, uniqueId) =>