aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-26 22:58:35 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-26 22:58:35 -0800
commit98542f81bb30381148759f414f9c2ca679d3bd63 (patch)
tree886bf3ac7bc2a533c7343044c7ffee56bc866be5
parent9d0111659bc8f2e186cf28cf968013e121f96605 (diff)
downloadspark-98542f81bb30381148759f414f9c2ca679d3bd63.tar.gz
spark-98542f81bb30381148759f414f9c2ca679d3bd63.tar.bz2
spark-98542f81bb30381148759f414f9c2ca679d3bd63.zip
- Removed dual Mode
- Removed ununsed speed-related stuff - Moved (H)DfsBroadcast to its own file
-rw-r--r--conf/java-opts2
-rw-r--r--src/scala/spark/Broadcast.scala199
-rw-r--r--src/scala/spark/DfsBroadcast.scala127
-rw-r--r--src/scala/spark/SparkContext.scala2
4 files changed, 148 insertions, 182 deletions
diff --git a/conf/java-opts b/conf/java-opts
index e8b1255f82..20a2ade45c 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=22222 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.DualMode=false
+-Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=22222 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 3f002d8aec..08648d2ef4 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -10,11 +10,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{Map, Set}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
-
-import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
-
@serializable
trait Broadcast {
val uuid = UUID.randomUUID
@@ -63,12 +58,12 @@ extends Broadcast with Logging {
sendBroadcast
}
- def sendBroadcast () {
+ def sendBroadcast (): Unit = {
logInfo ("Local host address: " + hostAddress)
// Store a persistent copy in HDFS
// TODO: Turned OFF for now
- // val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
+ // val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
// out.writeObject (value_)
// out.close
// TODO: Fix this at some point
@@ -104,12 +99,6 @@ extends Broadcast with Logging {
val masterSource_0 =
SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0)
pqOfSources.add (masterSource_0)
- // Add one more time to have two replicas of any seeds in the PQ
- if (BroadcastCS.DualMode) {
- val masterSource_1 =
- SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 1)
- pqOfSources.add (masterSource_1)
- }
// Register with the Tracker
while (guidePort == -1) {
@@ -120,7 +109,7 @@ extends Broadcast with Logging {
BroadcastCS.registerValue (uuid, guidePort)
}
- private def readObject (in: ObjectInputStream) {
+ private def readObject (in: ObjectInputStream): Unit = {
in.defaultReadObject
BroadcastCS.synchronized {
val cachedVal = BroadcastCS.values.get (uuid)
@@ -145,7 +134,7 @@ extends Broadcast with Logging {
value_ = unBlockifyObject[T]
BroadcastCS.values.put (uuid, value_)
} else {
- val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
+ val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
value_ = fileIn.readObject.asInstanceOf[T]
BroadcastCS.values.put(uuid, value_)
fileIn.close
@@ -157,7 +146,7 @@ extends Broadcast with Logging {
}
}
- private def initializeSlaveVariables = {
+ private def initializeSlaveVariables: Unit = {
arrayOfBlocks = null
totalBytes = -1
totalBlocks = -1
@@ -334,7 +323,6 @@ extends Broadcast with Logging {
if (!receptionSucceeded) {
sourceInfo.receptionFailed = true
}
- sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time
// Send back statistics to the Master
oosMaster.writeObject (sourceInfo)
@@ -419,7 +407,7 @@ extends Broadcast with Logging {
// Keep track of sources that have completed reception
private var setOfCompletedSources = Set[SourceInfo] ()
- override def run = {
+ override def run: Unit = {
var threadPool = BroadcastCS.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
@@ -477,7 +465,7 @@ extends Broadcast with Logging {
threadPool.shutdown
}
- private def sendStopBroadcastNotifications = {
+ private def sendStopBroadcastNotifications: Unit = {
pqOfSources.synchronized {
var pqIter = pqOfSources.iterator
while (pqIter.hasNext) {
@@ -529,7 +517,7 @@ extends Broadcast with Logging {
private var selectedSourceInfo: SourceInfo = null
private var thisWorkerInfo:SourceInfo = null
- override def run = {
+ override def run: Unit = {
try {
logInfo ("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
@@ -568,20 +556,9 @@ extends Broadcast with Logging {
setOfCompletedSources += thisWorkerInfo
selectedSourceInfo.currentLeechers -= 1
- selectedSourceInfo.MBps = sourceInfo.MBps
// Put it back
pqOfSources.add (selectedSourceInfo)
-
- // Update global source speed statistics
- BroadcastCS.setSourceSpeed (
- sourceInfo.hostAddress, sourceInfo.MBps)
-
- // No need to find and update thisWorkerInfo, but add its replica
- if (BroadcastCS.DualMode) {
- pqOfSources.add (SourceInfo (thisWorkerInfo.hostAddress,
- thisWorkerInfo.listenPort, totalBlocks, totalBytes, 1))
- }
}
}
} catch {
@@ -635,7 +612,7 @@ extends Broadcast with Logging {
class ServeMultipleRequests
extends Thread with Logging {
- override def run = {
+ override def run: Unit = {
var threadPool = BroadcastCS.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
@@ -688,7 +665,7 @@ extends Broadcast with Logging {
private var sendFrom = 0
private var sendUntil = totalBlocks
- override def run = {
+ override def run: Unit = {
try {
logInfo ("new ServeSingleRequest is running")
@@ -719,7 +696,7 @@ extends Broadcast with Logging {
}
}
- private def sendObject = {
+ private def sendObject: Unit = {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
@@ -748,49 +725,6 @@ extends Broadcast with Logging {
}
}
-@serializable
-class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
-extends Broadcast with Logging {
-
- def value = value_
-
- BroadcastCH.synchronized {
- BroadcastCH.values.put(uuid, value_)
- }
-
- if (!local) {
- sendBroadcast
- }
-
- def sendBroadcast () {
- val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
- out.writeObject (value_)
- out.close
- }
-
- // Called by Java when deserializing an object
- private def readObject(in: ObjectInputStream) {
- in.defaultReadObject
- BroadcastCH.synchronized {
- val cachedVal = BroadcastCH.values.get(uuid)
- if (cachedVal != null) {
- value_ = cachedVal.asInstanceOf[T]
- } else {
- logInfo( "Started reading Broadcasted variable " + uuid)
- val start = System.nanoTime
-
- val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
- value_ = fileIn.readObject.asInstanceOf[T]
- BroadcastCH.values.put(uuid, value_)
- fileIn.close
-
- val time = (System.nanoTime - start) / 1e9
- logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
- }
- }
- }
-}
-
@serializable
case class SourceInfo (val hostAddress: String, val listenPort: Int,
val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)
@@ -798,7 +732,6 @@ extends Comparable [SourceInfo] with Logging {
var currentLeechers = 0
var receptionFailed = false
- var MBps: Double = BroadcastCS.MaxMBps
var hasBlocks = 0
@@ -831,11 +764,11 @@ extends Logging {
// Will be called by SparkContext or Executor before using Broadcast
// Calls all other initializers here
- def initialize (isMaster: Boolean) {
+ def initialize (isMaster: Boolean): Unit = {
synchronized {
if (!initialized) {
- // Initialization for CentralizedHDFSBroadcast
- BroadcastCH.initialize
+ // Initialization for DfsBroadcast
+ DfsBroadcast.initialize
// Initialization for ChainedStreamingBroadcast
BroadcastCS.initialize (isMaster)
@@ -851,8 +784,6 @@ extends Logging {
var valueToGuidePortMap = Map[UUID, Int] ()
- var sourceToSpeedMap = Map[String, Double] ()
-
// Random number generator
var ranGen = new Random
@@ -867,19 +798,12 @@ extends Logging {
private var TrackerSocketTimeout_ : Int = 50000
private var ServerSocketTimeout_ : Int = 10000
- private var DualMode_ : Boolean = false
-
private var trackMV: TrackMultipleValues = null
- // newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed
- private val ALPHA = 0.7
- // 125.0 MBps = 1 Gbps link
- private val MaxMBps_ = 125.0
-
private var MinKnockInterval_ = 500
private var MaxKnockInterval_ = 999
- def initialize (isMaster__ : Boolean) {
+ def initialize (isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
MasterHostAddress_ =
@@ -901,9 +825,6 @@ extends Logging {
MaxKnockInterval_ =
System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
- DualMode_ =
- System.getProperty ("spark.broadcast.DualMode", "false").toBoolean
-
isMaster_ = isMaster__
if (isMaster) {
@@ -926,23 +847,19 @@ extends Logging {
def TrackerSocketTimeout = TrackerSocketTimeout_
def ServerSocketTimeout = ServerSocketTimeout_
- def DualMode = DualMode_
-
def isMaster = isMaster_
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
- def MaxMBps = MaxMBps_
-
- def registerValue (uuid: UUID, guidePort: Int) = {
+ def registerValue (uuid: UUID, guidePort: Int): Unit = {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)
logInfo ("New value registered with the Tracker " + valueToGuidePortMap)
}
}
- def unregisterValue (uuid: UUID) = {
+ def unregisterValue (uuid: UUID): Unit = {
valueToGuidePortMap.synchronized {
valueToGuidePortMap (uuid) = SourceInfo.TxOverGoToHDFS
logInfo ("Value unregistered from the Tracker " + valueToGuidePortMap)
@@ -980,23 +897,9 @@ extends Logging {
return threadPool
}
- def getSourceSpeed (hostAddress: String): Double = {
- sourceToSpeedMap.synchronized {
- sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
- }
- }
-
- def setSourceSpeed (hostAddress: String, MBps: Double) = {
- sourceToSpeedMap.synchronized {
- var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
- var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps
- sourceToSpeedMap.update (hostAddress, newSpeed)
- }
- }
-
class TrackMultipleValues
extends Thread with Logging {
- override def run = {
+ override def run: Unit = {
var threadPool = BroadcastCS.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
@@ -1018,7 +921,7 @@ extends Logging {
if (clientSocket != null) {
try {
threadPool.execute (new Thread {
- override def run = {
+ override def run: Unit = {
val oos = new ObjectOutputStream (clientSocket.getOutputStream)
oos.flush
val ois = new ObjectInputStream (clientSocket.getInputStream)
@@ -1056,67 +959,3 @@ extends Logging {
}
}
}
-
-private object BroadcastCH
-extends Logging {
- val values = new MapMaker ().softValues ().makeMap[UUID, Any]
-
- private var initialized = false
-
- private var fileSystem: FileSystem = null
- private var workDir: String = null
- private var compress: Boolean = false
- private var bufferSize: Int = 65536
-
- def initialize () {
- synchronized {
- if (!initialized) {
- bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val dfs = System.getProperty("spark.dfs", "file:///")
- if (!dfs.startsWith("file://")) {
- val conf = new Configuration()
- conf.setInt("io.file.buffer.size", bufferSize)
- val rep = System.getProperty("spark.dfs.replication", "3").toInt
- conf.setInt("dfs.replication", rep)
- fileSystem = FileSystem.get(new URI(dfs), conf)
- }
- workDir = System.getProperty("spark.dfs.workdir", "/tmp")
- compress = System.getProperty("spark.compress", "false").toBoolean
-
- initialized = true
- }
- }
- }
-
- private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)
-
- def openFileForReading(uuid: UUID): InputStream = {
- val fileStream = if (fileSystem != null) {
- fileSystem.open(getPath(uuid))
- } else {
- // Local filesystem
- new FileInputStream(getPath(uuid).toString)
- }
- if (compress)
- new LZFInputStream(fileStream) // LZF stream does its own buffering
- else if (fileSystem == null)
- new BufferedInputStream(fileStream, bufferSize)
- else
- fileStream // Hadoop streams do their own buffering
- }
-
- def openFileForWriting(uuid: UUID): OutputStream = {
- val fileStream = if (fileSystem != null) {
- fileSystem.create(getPath(uuid))
- } else {
- // Local filesystem
- new FileOutputStream(getPath(uuid).toString)
- }
- if (compress)
- new LZFOutputStream(fileStream) // LZF stream does its own buffering
- else if (fileSystem == null)
- new BufferedOutputStream(fileStream, bufferSize)
- else
- fileStream // Hadoop streams do their own buffering
- }
-}
diff --git a/src/scala/spark/DfsBroadcast.scala b/src/scala/spark/DfsBroadcast.scala
new file mode 100644
index 0000000000..5be5f98e8c
--- /dev/null
+++ b/src/scala/spark/DfsBroadcast.scala
@@ -0,0 +1,127 @@
+package spark
+
+import com.google.common.collect.MapMaker
+
+import java.io._
+import java.net._
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
+
+import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+@serializable
+class DfsBroadcast[T](@transient var value_ : T, local: Boolean)
+extends Broadcast with Logging {
+
+ def value = value_
+
+ DfsBroadcast.synchronized {
+ DfsBroadcast.values.put(uuid, value_)
+ }
+
+ if (!local) {
+ sendBroadcast
+ }
+
+ def sendBroadcast (): Unit = {
+ val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
+ out.writeObject (value_)
+ out.close
+ }
+
+ // Called by JVM when deserializing an object
+ private def readObject(in: ObjectInputStream): Unit = {
+ in.defaultReadObject
+ DfsBroadcast.synchronized {
+ val cachedVal = DfsBroadcast.values.get(uuid)
+ if (cachedVal != null) {
+ value_ = cachedVal.asInstanceOf[T]
+ } else {
+ logInfo( "Started reading Broadcasted variable " + uuid)
+ val start = System.nanoTime
+
+ val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
+ value_ = fileIn.readObject.asInstanceOf[T]
+ DfsBroadcast.values.put(uuid, value_)
+ fileIn.close
+
+ val time = (System.nanoTime - start) / 1e9
+ logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
+ }
+ }
+ }
+}
+
+private object DfsBroadcast
+extends Logging {
+ val values = new MapMaker ().softValues ().makeMap[UUID, Any]
+
+ private var initialized = false
+
+ private var fileSystem: FileSystem = null
+ private var workDir: String = null
+ private var compress: Boolean = false
+ private var bufferSize: Int = 65536
+
+ def initialize (): Unit = {
+ synchronized {
+ if (!initialized) {
+ bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val dfs = System.getProperty("spark.dfs", "file:///")
+ if (!dfs.startsWith("file://")) {
+ val conf = new Configuration()
+ conf.setInt("io.file.buffer.size", bufferSize)
+ val rep = System.getProperty("spark.dfs.replication", "3").toInt
+ conf.setInt("dfs.replication", rep)
+ fileSystem = FileSystem.get(new URI(dfs), conf)
+ }
+ workDir = System.getProperty("spark.dfs.workdir", "/tmp")
+ compress = System.getProperty("spark.compress", "false").toBoolean
+
+ initialized = true
+ }
+ }
+ }
+
+ private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)
+
+ def openFileForReading(uuid: UUID): InputStream = {
+ val fileStream = if (fileSystem != null) {
+ fileSystem.open(getPath(uuid))
+ } else {
+ // Local filesystem
+ new FileInputStream(getPath(uuid).toString)
+ }
+
+ if (compress) {
+ // LZF stream does its own buffering
+ new LZFInputStream(fileStream)
+ } else if (fileSystem == null) {
+ new BufferedInputStream(fileStream, bufferSize)
+ } else {
+ // Hadoop streams do their own buffering
+ fileStream
+ }
+ }
+
+ def openFileForWriting(uuid: UUID): OutputStream = {
+ val fileStream = if (fileSystem != null) {
+ fileSystem.create(getPath(uuid))
+ } else {
+ // Local filesystem
+ new FileOutputStream(getPath(uuid).toString)
+ }
+
+ if (compress) {
+ // LZF stream does its own buffering
+ new LZFOutputStream(fileStream)
+ } else if (fileSystem == null) {
+ new BufferedOutputStream(fileStream, bufferSize)
+ } else {
+ // Hadoop streams do their own buffering
+ fileStream
+ }
+ }
+}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 216ea4c0a9..75efd9d1fb 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -19,7 +19,7 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
new Accumulator(initialValue, param)
// TODO: Keep around a weak hash map of values to Cached versions?
- // def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local)
+ // def broadcast[T](value: T) = new DfsBroadcast(value, local)
def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
def textFile(path: String) = new HdfsTextFile(this, path)