diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-08 14:29:04 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-08 14:29:04 -0700 |
commit | 425c2472698ddc58c0979d1181b99ba31d553dfb (patch) | |
tree | b4688962c55bb1ad05637b10d8ac947215f0e573 | |
parent | c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa (diff) | |
download | spark-425c2472698ddc58c0979d1181b99ba31d553dfb.tar.gz spark-425c2472698ddc58c0979d1181b99ba31d553dfb.tar.bz2 spark-425c2472698ddc58c0979d1181b99ba31d553dfb.zip |
Removed some unused stuff
4 files changed, 13 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 09266b9f60..752945e5e6 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -25,8 +25,6 @@ extends Broadcast[T] with Logging with Serializable { @transient var totalBytes = -1 @transient var totalBlocks = -1 @transient var hasBlocks = new AtomicInteger(0) - // CHANGED: BlockSize in the Broadcast object is expected to change over time - @transient var blockSize = Broadcast.BlockSize // Used ONLY by Master to track how many unique blocks have been sent out @transient var sentBlocks = new AtomicInteger(0) @@ -45,9 +43,6 @@ extends Broadcast[T] with Logging with Serializable { // Used only in Workers @transient var ttGuide: TalkToGuide = null - @transient var rxSpeeds = new SpeedTracker - @transient var txSpeeds = new SpeedTracker - @transient var hostAddress = Utils.localIpAddress @transient var listenPort = -1 @transient var guidePort = -1 @@ -105,7 +100,7 @@ extends Broadcast[T] with Logging with Serializable { // Must always come AFTER listenPort is created val masterSource = - SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes, blockSize) + SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes) hasBlocksBitVector.synchronized { masterSource.hasBlocksBitVector = hasBlocksBitVector } @@ -115,7 +110,7 @@ extends Broadcast[T] with Logging with Serializable { // Register with the Tracker registerBroadcast(uuid, - SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes, blockSize)) + SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes)) } private def readObject(in: ObjectInputStream) { @@ -161,7 +156,6 @@ extends Broadcast[T] with Logging with Serializable { totalBytes = -1 totalBlocks = -1 hasBlocks = new AtomicInteger(0) - blockSize = -1 listenPortLock = new Object totalBlocksLock = new Object @@ -169,9 +163,6 @@ extends Broadcast[T] with Logging with Serializable { serveMR = null ttGuide = null - rxSpeeds = new SpeedTracker - txSpeeds = new SpeedTracker - hostAddress = Utils.localIpAddress listenPort = -1 @@ -248,7 +239,7 @@ extends Broadcast[T] with Logging with Serializable { } var localSourceInfo = SourceInfo( - hostAddress, listenPort, totalBlocks, totalBytes, blockSize) + hostAddress, listenPort, totalBlocks, totalBytes) localSourceInfo.hasBlocks = hasBlocks.get @@ -401,7 +392,6 @@ extends Broadcast[T] with Logging with Serializable { totalBlocksLock.notifyAll() } totalBytes = gInfo.totalBytes - blockSize = gInfo.blockSize // Start ttGuide to periodically talk to the Guide var ttGuide = new TalkToGuide(gInfo) @@ -674,8 +664,6 @@ extends Broadcast[T] with Logging with Serializable { hasBlocks.getAndIncrement } - rxSpeeds.addDataPoint(peerToTalkTo, receptionTime) - // Some block(may NOT be blockToAskFor) has arrived. // In any case, blockToAskFor is not in request any more blocksInRequestBitVector.synchronized { diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 4c7ab5f8ec..39798f6740 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -13,7 +13,7 @@ trait Broadcast[T] extends Serializable { def value: T // We cannot have an abstract readObject here due to some weird issues with - // readObject having to be 'private' in sub-classes. Possibly a Scala bug! + // readObject having to be 'private' in sub-classes. override def toString = "spark.Broadcast(" + uuid + ")" } @@ -82,8 +82,6 @@ object Broadcast extends Logging with Serializable { private var MaxKnockInterval_ = System.getProperty( "spark.broadcast.maxKnockInterval", "999").toInt - // Load ChainedBroadcast config params - // Load TreeBroadcast config params private var MaxDegree_ = System.getProperty("spark.broadcast.maxDegree", "2").toInt @@ -114,8 +112,6 @@ object Broadcast extends Logging with Serializable { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - // ChainedBroadcast configs - // TreeBroadcast configs def MaxDegree = MaxDegree_ @@ -187,38 +183,12 @@ object Broadcast extends Logging with Serializable { } } -case class BroadcastBlock (blockID: Int, byteArray: Array[Byte]) extends Serializable +case class BroadcastBlock (blockID: Int, byteArray: Array[Byte]) +extends Serializable case class VariableInfo (@transient arrayOfBlocks : Array[BroadcastBlock], - totalBlocks: Int, - totalBytes: Int) - extends Serializable { - - @transient - var hasBlocks = 0 -} - -class SpeedTracker extends Serializable { - // Mapping 'source' to '(totalTime, numBlocks)' - private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] () - - def addDataPoint (srcInfo: SourceInfo, timeInMillis: Long) { - sourceToSpeedMap.synchronized { - if (!sourceToSpeedMap.contains(srcInfo)) { - sourceToSpeedMap += (srcInfo -> (timeInMillis, 1)) - } else { - val tTnB = sourceToSpeedMap (srcInfo) - sourceToSpeedMap += (srcInfo -> (tTnB._1 + timeInMillis, tTnB._2 + 1)) - } - } - } - - def getTimePerBlock (srcInfo: SourceInfo): Double = { - sourceToSpeedMap.synchronized { - val tTnB = sourceToSpeedMap (srcInfo) - return tTnB._1 / tTnB._2 - } - } - - override def toString = sourceToSpeedMap.toString + totalBlocks: Int, + totalBytes: Int) +extends Serializable { + @transient var hasBlocks = 0 } diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala index 09907f4ee7..7ed2425680 100644 --- a/core/src/main/scala/spark/broadcast/SourceInfo.scala +++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala @@ -6,15 +6,11 @@ import spark._ /** * Used to keep and pass around information of peers involved in a broadcast - * - * CHANGED: Keep track of the blockSize for THIS broadcast variable. - * Broadcast.BlockSize is expected to be updated across different broadcasts */ case class SourceInfo (hostAddress: String, listenPort: Int, totalBlocks: Int = SourceInfo.UnusedParam, - totalBytes: Int = SourceInfo.UnusedParam, - blockSize: Int = Broadcast.BlockSize) + totalBytes: Int = SourceInfo.UnusedParam) extends Comparable[SourceInfo] with Logging { var currentLeechers = 0 diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index e90360eac4..2851f17627 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -22,8 +22,6 @@ extends Broadcast[T] with Logging with Serializable { @transient var totalBytes = -1 @transient var totalBlocks = -1 @transient var hasBlocks = 0 - // CHANGED: BlockSize in the Broadcast object is expected to change over time - @transient var blockSize = Broadcast.BlockSize @transient var listenPortLock = new Object @transient var guidePortLock = new Object @@ -85,7 +83,7 @@ extends Broadcast[T] with Logging with Serializable { // Must always come AFTER listenPort is created val masterSource = - SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes, blockSize) + SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes) listOfSources += masterSource // Register with the Tracker @@ -130,7 +128,6 @@ extends Broadcast[T] with Logging with Serializable { totalBytes = -1 totalBlocks = -1 hasBlocks = 0 - blockSize = -1 listenPortLock = new Object totalBlocksLock = new Object @@ -244,7 +241,6 @@ extends Broadcast[T] with Logging with Serializable { totalBlocksLock.notifyAll() } totalBytes = sourceInfo.totalBytes - blockSize = sourceInfo.blockSize logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) @@ -466,7 +462,7 @@ extends Broadcast[T] with Logging with Serializable { // Add this new (if it can finish) source to the list of sources thisWorkerInfo = SourceInfo(sourceInfo.hostAddress, - sourceInfo.listenPort, totalBlocks, totalBytes, blockSize) + sourceInfo.listenPort, totalBlocks, totalBytes) logInfo("Adding possible new source to listOfSources: " + thisWorkerInfo) listOfSources += thisWorkerInfo } |