aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-08 14:29:04 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-08 14:29:04 -0700
commit425c2472698ddc58c0979d1181b99ba31d553dfb (patch)
treeb4688962c55bb1ad05637b10d8ac947215f0e573 /core/src
parentc7c5258e255ec7b3e5514ddf0c6edcdf3af412aa (diff)
downloadspark-425c2472698ddc58c0979d1181b99ba31d553dfb.tar.gz
spark-425c2472698ddc58c0979d1181b99ba31d553dfb.tar.bz2
spark-425c2472698ddc58c0979d1181b99ba31d553dfb.zip
Removed some unused stuff
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala18
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala44
-rw-r--r--core/src/main/scala/spark/broadcast/SourceInfo.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala8
4 files changed, 13 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 09266b9f60..752945e5e6 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -25,8 +25,6 @@ extends Broadcast[T] with Logging with Serializable {
@transient var totalBytes = -1
@transient var totalBlocks = -1
@transient var hasBlocks = new AtomicInteger(0)
- // CHANGED: BlockSize in the Broadcast object is expected to change over time
- @transient var blockSize = Broadcast.BlockSize
// Used ONLY by Master to track how many unique blocks have been sent out
@transient var sentBlocks = new AtomicInteger(0)
@@ -45,9 +43,6 @@ extends Broadcast[T] with Logging with Serializable {
// Used only in Workers
@transient var ttGuide: TalkToGuide = null
- @transient var rxSpeeds = new SpeedTracker
- @transient var txSpeeds = new SpeedTracker
-
@transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
@@ -105,7 +100,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER listenPort is created
val masterSource =
- SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes, blockSize)
+ SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes)
hasBlocksBitVector.synchronized {
masterSource.hasBlocksBitVector = hasBlocksBitVector
}
@@ -115,7 +110,7 @@ extends Broadcast[T] with Logging with Serializable {
// Register with the Tracker
registerBroadcast(uuid,
- SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes, blockSize))
+ SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes))
}
private def readObject(in: ObjectInputStream) {
@@ -161,7 +156,6 @@ extends Broadcast[T] with Logging with Serializable {
totalBytes = -1
totalBlocks = -1
hasBlocks = new AtomicInteger(0)
- blockSize = -1
listenPortLock = new Object
totalBlocksLock = new Object
@@ -169,9 +163,6 @@ extends Broadcast[T] with Logging with Serializable {
serveMR = null
ttGuide = null
- rxSpeeds = new SpeedTracker
- txSpeeds = new SpeedTracker
-
hostAddress = Utils.localIpAddress
listenPort = -1
@@ -248,7 +239,7 @@ extends Broadcast[T] with Logging with Serializable {
}
var localSourceInfo = SourceInfo(
- hostAddress, listenPort, totalBlocks, totalBytes, blockSize)
+ hostAddress, listenPort, totalBlocks, totalBytes)
localSourceInfo.hasBlocks = hasBlocks.get
@@ -401,7 +392,6 @@ extends Broadcast[T] with Logging with Serializable {
totalBlocksLock.notifyAll()
}
totalBytes = gInfo.totalBytes
- blockSize = gInfo.blockSize
// Start ttGuide to periodically talk to the Guide
var ttGuide = new TalkToGuide(gInfo)
@@ -674,8 +664,6 @@ extends Broadcast[T] with Logging with Serializable {
hasBlocks.getAndIncrement
}
- rxSpeeds.addDataPoint(peerToTalkTo, receptionTime)
-
// Some block(may NOT be blockToAskFor) has arrived.
// In any case, blockToAskFor is not in request any more
blocksInRequestBitVector.synchronized {
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 4c7ab5f8ec..39798f6740 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -13,7 +13,7 @@ trait Broadcast[T] extends Serializable {
def value: T
// We cannot have an abstract readObject here due to some weird issues with
- // readObject having to be 'private' in sub-classes. Possibly a Scala bug!
+ // readObject having to be 'private' in sub-classes.
override def toString = "spark.Broadcast(" + uuid + ")"
}
@@ -82,8 +82,6 @@ object Broadcast extends Logging with Serializable {
private var MaxKnockInterval_ = System.getProperty(
"spark.broadcast.maxKnockInterval", "999").toInt
- // Load ChainedBroadcast config params
-
// Load TreeBroadcast config params
private var MaxDegree_ = System.getProperty("spark.broadcast.maxDegree", "2").toInt
@@ -114,8 +112,6 @@ object Broadcast extends Logging with Serializable {
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
- // ChainedBroadcast configs
-
// TreeBroadcast configs
def MaxDegree = MaxDegree_
@@ -187,38 +183,12 @@ object Broadcast extends Logging with Serializable {
}
}
-case class BroadcastBlock (blockID: Int, byteArray: Array[Byte]) extends Serializable
+case class BroadcastBlock (blockID: Int, byteArray: Array[Byte])
+extends Serializable
case class VariableInfo (@transient arrayOfBlocks : Array[BroadcastBlock],
- totalBlocks: Int,
- totalBytes: Int)
- extends Serializable {
-
- @transient
- var hasBlocks = 0
-}
-
-class SpeedTracker extends Serializable {
- // Mapping 'source' to '(totalTime, numBlocks)'
- private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] ()
-
- def addDataPoint (srcInfo: SourceInfo, timeInMillis: Long) {
- sourceToSpeedMap.synchronized {
- if (!sourceToSpeedMap.contains(srcInfo)) {
- sourceToSpeedMap += (srcInfo -> (timeInMillis, 1))
- } else {
- val tTnB = sourceToSpeedMap (srcInfo)
- sourceToSpeedMap += (srcInfo -> (tTnB._1 + timeInMillis, tTnB._2 + 1))
- }
- }
- }
-
- def getTimePerBlock (srcInfo: SourceInfo): Double = {
- sourceToSpeedMap.synchronized {
- val tTnB = sourceToSpeedMap (srcInfo)
- return tTnB._1 / tTnB._2
- }
- }
-
- override def toString = sourceToSpeedMap.toString
+ totalBlocks: Int,
+ totalBytes: Int)
+extends Serializable {
+ @transient var hasBlocks = 0
}
diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala
index 09907f4ee7..7ed2425680 100644
--- a/core/src/main/scala/spark/broadcast/SourceInfo.scala
+++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala
@@ -6,15 +6,11 @@ import spark._
/**
* Used to keep and pass around information of peers involved in a broadcast
- *
- * CHANGED: Keep track of the blockSize for THIS broadcast variable.
- * Broadcast.BlockSize is expected to be updated across different broadcasts
*/
case class SourceInfo (hostAddress: String,
listenPort: Int,
totalBlocks: Int = SourceInfo.UnusedParam,
- totalBytes: Int = SourceInfo.UnusedParam,
- blockSize: Int = Broadcast.BlockSize)
+ totalBytes: Int = SourceInfo.UnusedParam)
extends Comparable[SourceInfo] with Logging {
var currentLeechers = 0
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index e90360eac4..2851f17627 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -22,8 +22,6 @@ extends Broadcast[T] with Logging with Serializable {
@transient var totalBytes = -1
@transient var totalBlocks = -1
@transient var hasBlocks = 0
- // CHANGED: BlockSize in the Broadcast object is expected to change over time
- @transient var blockSize = Broadcast.BlockSize
@transient var listenPortLock = new Object
@transient var guidePortLock = new Object
@@ -85,7 +83,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER listenPort is created
val masterSource =
- SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes, blockSize)
+ SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes)
listOfSources += masterSource
// Register with the Tracker
@@ -130,7 +128,6 @@ extends Broadcast[T] with Logging with Serializable {
totalBytes = -1
totalBlocks = -1
hasBlocks = 0
- blockSize = -1
listenPortLock = new Object
totalBlocksLock = new Object
@@ -244,7 +241,6 @@ extends Broadcast[T] with Logging with Serializable {
totalBlocksLock.notifyAll()
}
totalBytes = sourceInfo.totalBytes
- blockSize = sourceInfo.blockSize
logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
@@ -466,7 +462,7 @@ extends Broadcast[T] with Logging with Serializable {
// Add this new (if it can finish) source to the list of sources
thisWorkerInfo = SourceInfo(sourceInfo.hostAddress,
- sourceInfo.listenPort, totalBlocks, totalBytes, blockSize)
+ sourceInfo.listenPort, totalBlocks, totalBytes)
logInfo("Adding possible new source to listOfSources: " + thisWorkerInfo)
listOfSources += thisWorkerInfo
}