diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2013-10-16 13:37:58 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2013-10-16 21:33:33 -0700 |
commit | a8d0981832ba71415a35c16cdc2bedb98bbfcdb9 (patch) | |
tree | 54a584caf8a0e47e5defe1da306f17c59a05a93a | |
parent | feb45d391f8d09c120d7d43e72e96e9bf9784fa0 (diff) | |
download | spark-a8d0981832ba71415a35c16cdc2bedb98bbfcdb9.tar.gz spark-a8d0981832ba71415a35c16cdc2bedb98bbfcdb9.tar.bz2 spark-a8d0981832ba71415a35c16cdc2bedb98bbfcdb9.zip |
Fixes for the new BlockId naming convention.
-rw-r--r-- | core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockId.scala | 9 |
2 files changed, 14 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index c174804e9a..3341401c8a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -23,7 +23,7 @@ import scala.math import scala.util.Random import org.apache.spark._ -import org.apache.spark.storage.{BlockManager, StorageLevel} +import org.apache.spark.storage.{BroadcastBlockId, BroadcastHelperBlockId, StorageLevel} import org.apache.spark.util.Utils @@ -32,7 +32,7 @@ extends Broadcast[T](id) with Logging with Serializable { def value = value_ - def broadcastId = BlockManager.toBroadcastId(id) + def broadcastId = BroadcastBlockId(id) TorrentBroadcast.synchronized { SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false) @@ -55,7 +55,7 @@ extends Broadcast[T](id) with Logging with Serializable { hasBlocks = tInfo.totalBlocks // Store meta-info - val metaId = broadcastId + "_meta" + val metaId = BroadcastHelperBlockId(broadcastId, "meta") val metaInfo = TorrentInfo(null, totalBlocks, totalBytes) TorrentBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( @@ -64,7 +64,7 @@ extends Broadcast[T](id) with Logging with Serializable { // Store individual pieces for (i <- 0 until totalBlocks) { - val pieceId = broadcastId + "_piece_" + i + val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + i) TorrentBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, true) @@ -117,7 +117,7 @@ extends Broadcast[T](id) with Logging with Serializable { def receiveBroadcast(variableID: Long): Boolean = { // Receive meta-info - val metaId = broadcastId + "_meta" + val metaId = BroadcastHelperBlockId(broadcastId, "meta") var attemptId = 10 while (attemptId > 0 && totalBlocks == -1) { TorrentBroadcast.synchronized { @@ -141,7 +141,7 @@ extends Broadcast[T](id) with Logging with Serializable { // Receive actual blocks val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList) for (pid <- recvOrder) { - val pieceId = broadcastId + "_piece_" + pid + val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid) TorrentBroadcast.synchronized { SparkEnv.get.blockManager.getSingle(pieceId) match { case Some(x) => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c7efc67a4a..7156d855d8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -32,7 +32,7 @@ private[spark] sealed abstract class BlockId { def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None def isRDD = isInstanceOf[RDDBlockId] def isShuffle = isInstanceOf[ShuffleBlockId] - def isBroadcast = isInstanceOf[BroadcastBlockId] + def isBroadcast = isInstanceOf[BroadcastBlockId] || isInstanceOf[BroadcastHelperBlockId] override def toString = name override def hashCode = name.hashCode @@ -55,6 +55,10 @@ private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId { def name = "broadcast_" + broadcastId } +private[spark] case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId { + def name = broadcastId.name + "_" + hType +} + private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId { def name = "taskresult_" + taskId } @@ -72,6 +76,7 @@ private[spark] object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r val BROADCAST = "broadcast_([0-9]+)".r + val BROADCAST_HELPER = "broadcast_([0-9]+)_([A-Za-z0-9]+)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r val TEST = "test_(.*)".r @@ -84,6 +89,8 @@ private[spark] object BlockId { ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case BROADCAST(broadcastId) => BroadcastBlockId(broadcastId.toLong) + case BROADCAST_HELPER(broadcastId, hType) => + BroadcastHelperBlockId(BroadcastBlockId(broadcastId.toLong), hType) case TASKRESULT(taskId) => TaskResultBlockId(taskId.toLong) case STREAM(streamId, uniqueId) => |