diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-26 22:58:35 -0800 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-26 22:58:35 -0800 |
commit | 98542f81bb30381148759f414f9c2ca679d3bd63 (patch) | |
tree | 886bf3ac7bc2a533c7343044c7ffee56bc866be5 | |
parent | 9d0111659bc8f2e186cf28cf968013e121f96605 (diff) | |
download | spark-98542f81bb30381148759f414f9c2ca679d3bd63.tar.gz spark-98542f81bb30381148759f414f9c2ca679d3bd63.tar.bz2 spark-98542f81bb30381148759f414f9c2ca679d3bd63.zip |
- Removed dual Mode
- Removed ununsed speed-related stuff
- Moved (H)DfsBroadcast to its own file
-rw-r--r-- | conf/java-opts | 2 | ||||
-rw-r--r-- | src/scala/spark/Broadcast.scala | 199 | ||||
-rw-r--r-- | src/scala/spark/DfsBroadcast.scala | 127 | ||||
-rw-r--r-- | src/scala/spark/SparkContext.scala | 2 |
4 files changed, 148 insertions, 182 deletions
diff --git a/conf/java-opts b/conf/java-opts index e8b1255f82..20a2ade45c 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=22222 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.DualMode=false +-Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=22222 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 3f002d8aec..08648d2ef4 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -10,11 +10,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{Map, Set} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} - -import spark.compress.lzf.{LZFInputStream, LZFOutputStream} - @serializable trait Broadcast { val uuid = UUID.randomUUID @@ -63,12 +58,12 @@ extends Broadcast with Logging { sendBroadcast } - def sendBroadcast () { + def sendBroadcast (): Unit = { logInfo ("Local host address: " + hostAddress) // Store a persistent copy in HDFS // TODO: Turned OFF for now - // val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) + // val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid)) // out.writeObject (value_) // out.close // TODO: Fix this at some point @@ -104,12 +99,6 @@ extends Broadcast with Logging { val masterSource_0 = SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0) pqOfSources.add (masterSource_0) - // Add one more time to have two replicas of any seeds in the PQ - if (BroadcastCS.DualMode) { - val masterSource_1 = - SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 1) - pqOfSources.add (masterSource_1) - } // Register with the Tracker while (guidePort == -1) { @@ -120,7 +109,7 @@ extends Broadcast with Logging { BroadcastCS.registerValue (uuid, guidePort) } - private def readObject (in: ObjectInputStream) { + private def readObject (in: ObjectInputStream): Unit = { in.defaultReadObject BroadcastCS.synchronized { val cachedVal = BroadcastCS.values.get (uuid) @@ -145,7 +134,7 @@ extends Broadcast with Logging { value_ = unBlockifyObject[T] BroadcastCS.values.put (uuid, value_) } else { - val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) + val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) value_ = fileIn.readObject.asInstanceOf[T] BroadcastCS.values.put(uuid, value_) fileIn.close @@ -157,7 +146,7 @@ extends Broadcast with Logging { } } - private def initializeSlaveVariables = { + private def initializeSlaveVariables: Unit = { arrayOfBlocks = null totalBytes = -1 totalBlocks = -1 @@ -334,7 +323,6 @@ extends Broadcast with Logging { if (!receptionSucceeded) { sourceInfo.receptionFailed = true } - sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time // Send back statistics to the Master oosMaster.writeObject (sourceInfo) @@ -419,7 +407,7 @@ extends Broadcast with Logging { // Keep track of sources that have completed reception private var setOfCompletedSources = Set[SourceInfo] () - override def run = { + override def run: Unit = { var threadPool = BroadcastCS.newDaemonCachedThreadPool var serverSocket: ServerSocket = null @@ -477,7 +465,7 @@ extends Broadcast with Logging { threadPool.shutdown } - private def sendStopBroadcastNotifications = { + private def sendStopBroadcastNotifications: Unit = { pqOfSources.synchronized { var pqIter = pqOfSources.iterator while (pqIter.hasNext) { @@ -529,7 +517,7 @@ extends Broadcast with Logging { private var selectedSourceInfo: SourceInfo = null private var thisWorkerInfo:SourceInfo = null - override def run = { + override def run: Unit = { try { logInfo ("new GuideSingleRequest is running") // Connecting worker is sending in its hostAddress and listenPort it will @@ -568,20 +556,9 @@ extends Broadcast with Logging { setOfCompletedSources += thisWorkerInfo selectedSourceInfo.currentLeechers -= 1 - selectedSourceInfo.MBps = sourceInfo.MBps // Put it back pqOfSources.add (selectedSourceInfo) - - // Update global source speed statistics - BroadcastCS.setSourceSpeed ( - sourceInfo.hostAddress, sourceInfo.MBps) - - // No need to find and update thisWorkerInfo, but add its replica - if (BroadcastCS.DualMode) { - pqOfSources.add (SourceInfo (thisWorkerInfo.hostAddress, - thisWorkerInfo.listenPort, totalBlocks, totalBytes, 1)) - } } } } catch { @@ -635,7 +612,7 @@ extends Broadcast with Logging { class ServeMultipleRequests extends Thread with Logging { - override def run = { + override def run: Unit = { var threadPool = BroadcastCS.newDaemonCachedThreadPool var serverSocket: ServerSocket = null @@ -688,7 +665,7 @@ extends Broadcast with Logging { private var sendFrom = 0 private var sendUntil = totalBlocks - override def run = { + override def run: Unit = { try { logInfo ("new ServeSingleRequest is running") @@ -719,7 +696,7 @@ extends Broadcast with Logging { } } - private def sendObject = { + private def sendObject: Unit = { // Wait till receiving the SourceInfo from Master while (totalBlocks == -1) { totalBlocksLock.synchronized { @@ -748,49 +725,6 @@ extends Broadcast with Logging { } } -@serializable -class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) -extends Broadcast with Logging { - - def value = value_ - - BroadcastCH.synchronized { - BroadcastCH.values.put(uuid, value_) - } - - if (!local) { - sendBroadcast - } - - def sendBroadcast () { - val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) - out.writeObject (value_) - out.close - } - - // Called by Java when deserializing an object - private def readObject(in: ObjectInputStream) { - in.defaultReadObject - BroadcastCH.synchronized { - val cachedVal = BroadcastCH.values.get(uuid) - if (cachedVal != null) { - value_ = cachedVal.asInstanceOf[T] - } else { - logInfo( "Started reading Broadcasted variable " + uuid) - val start = System.nanoTime - - val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) - value_ = fileIn.readObject.asInstanceOf[T] - BroadcastCH.values.put(uuid, value_) - fileIn.close - - val time = (System.nanoTime - start) / 1e9 - logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") - } - } - } -} - @serializable case class SourceInfo (val hostAddress: String, val listenPort: Int, val totalBlocks: Int, val totalBytes: Int, val replicaID: Int) @@ -798,7 +732,6 @@ extends Comparable [SourceInfo] with Logging { var currentLeechers = 0 var receptionFailed = false - var MBps: Double = BroadcastCS.MaxMBps var hasBlocks = 0 @@ -831,11 +764,11 @@ extends Logging { // Will be called by SparkContext or Executor before using Broadcast // Calls all other initializers here - def initialize (isMaster: Boolean) { + def initialize (isMaster: Boolean): Unit = { synchronized { if (!initialized) { - // Initialization for CentralizedHDFSBroadcast - BroadcastCH.initialize + // Initialization for DfsBroadcast + DfsBroadcast.initialize // Initialization for ChainedStreamingBroadcast BroadcastCS.initialize (isMaster) @@ -851,8 +784,6 @@ extends Logging { var valueToGuidePortMap = Map[UUID, Int] () - var sourceToSpeedMap = Map[String, Double] () - // Random number generator var ranGen = new Random @@ -867,19 +798,12 @@ extends Logging { private var TrackerSocketTimeout_ : Int = 50000 private var ServerSocketTimeout_ : Int = 10000 - private var DualMode_ : Boolean = false - private var trackMV: TrackMultipleValues = null - // newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed - private val ALPHA = 0.7 - // 125.0 MBps = 1 Gbps link - private val MaxMBps_ = 125.0 - private var MinKnockInterval_ = 500 private var MaxKnockInterval_ = 999 - def initialize (isMaster__ : Boolean) { + def initialize (isMaster__ : Boolean): Unit = { synchronized { if (!initialized) { MasterHostAddress_ = @@ -901,9 +825,6 @@ extends Logging { MaxKnockInterval_ = System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt - DualMode_ = - System.getProperty ("spark.broadcast.DualMode", "false").toBoolean - isMaster_ = isMaster__ if (isMaster) { @@ -926,23 +847,19 @@ extends Logging { def TrackerSocketTimeout = TrackerSocketTimeout_ def ServerSocketTimeout = ServerSocketTimeout_ - def DualMode = DualMode_ - def isMaster = isMaster_ def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxMBps = MaxMBps_ - - def registerValue (uuid: UUID, guidePort: Int) = { + def registerValue (uuid: UUID, guidePort: Int): Unit = { valueToGuidePortMap.synchronized { valueToGuidePortMap += (uuid -> guidePort) logInfo ("New value registered with the Tracker " + valueToGuidePortMap) } } - def unregisterValue (uuid: UUID) = { + def unregisterValue (uuid: UUID): Unit = { valueToGuidePortMap.synchronized { valueToGuidePortMap (uuid) = SourceInfo.TxOverGoToHDFS logInfo ("Value unregistered from the Tracker " + valueToGuidePortMap) @@ -980,23 +897,9 @@ extends Logging { return threadPool } - def getSourceSpeed (hostAddress: String): Double = { - sourceToSpeedMap.synchronized { - sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps) - } - } - - def setSourceSpeed (hostAddress: String, MBps: Double) = { - sourceToSpeedMap.synchronized { - var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps) - var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps - sourceToSpeedMap.update (hostAddress, newSpeed) - } - } - class TrackMultipleValues extends Thread with Logging { - override def run = { + override def run: Unit = { var threadPool = BroadcastCS.newDaemonCachedThreadPool var serverSocket: ServerSocket = null @@ -1018,7 +921,7 @@ extends Logging { if (clientSocket != null) { try { threadPool.execute (new Thread { - override def run = { + override def run: Unit = { val oos = new ObjectOutputStream (clientSocket.getOutputStream) oos.flush val ois = new ObjectInputStream (clientSocket.getInputStream) @@ -1056,67 +959,3 @@ extends Logging { } } } - -private object BroadcastCH -extends Logging { - val values = new MapMaker ().softValues ().makeMap[UUID, Any] - - private var initialized = false - - private var fileSystem: FileSystem = null - private var workDir: String = null - private var compress: Boolean = false - private var bufferSize: Int = 65536 - - def initialize () { - synchronized { - if (!initialized) { - bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - val dfs = System.getProperty("spark.dfs", "file:///") - if (!dfs.startsWith("file://")) { - val conf = new Configuration() - conf.setInt("io.file.buffer.size", bufferSize) - val rep = System.getProperty("spark.dfs.replication", "3").toInt - conf.setInt("dfs.replication", rep) - fileSystem = FileSystem.get(new URI(dfs), conf) - } - workDir = System.getProperty("spark.dfs.workdir", "/tmp") - compress = System.getProperty("spark.compress", "false").toBoolean - - initialized = true - } - } - } - - private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid) - - def openFileForReading(uuid: UUID): InputStream = { - val fileStream = if (fileSystem != null) { - fileSystem.open(getPath(uuid)) - } else { - // Local filesystem - new FileInputStream(getPath(uuid).toString) - } - if (compress) - new LZFInputStream(fileStream) // LZF stream does its own buffering - else if (fileSystem == null) - new BufferedInputStream(fileStream, bufferSize) - else - fileStream // Hadoop streams do their own buffering - } - - def openFileForWriting(uuid: UUID): OutputStream = { - val fileStream = if (fileSystem != null) { - fileSystem.create(getPath(uuid)) - } else { - // Local filesystem - new FileOutputStream(getPath(uuid).toString) - } - if (compress) - new LZFOutputStream(fileStream) // LZF stream does its own buffering - else if (fileSystem == null) - new BufferedOutputStream(fileStream, bufferSize) - else - fileStream // Hadoop streams do their own buffering - } -} diff --git a/src/scala/spark/DfsBroadcast.scala b/src/scala/spark/DfsBroadcast.scala new file mode 100644 index 0000000000..5be5f98e8c --- /dev/null +++ b/src/scala/spark/DfsBroadcast.scala @@ -0,0 +1,127 @@ +package spark + +import com.google.common.collect.MapMaker + +import java.io._ +import java.net._ +import java.util.UUID + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} + +import spark.compress.lzf.{LZFInputStream, LZFOutputStream} + +@serializable +class DfsBroadcast[T](@transient var value_ : T, local: Boolean) +extends Broadcast with Logging { + + def value = value_ + + DfsBroadcast.synchronized { + DfsBroadcast.values.put(uuid, value_) + } + + if (!local) { + sendBroadcast + } + + def sendBroadcast (): Unit = { + val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid)) + out.writeObject (value_) + out.close + } + + // Called by JVM when deserializing an object + private def readObject(in: ObjectInputStream): Unit = { + in.defaultReadObject + DfsBroadcast.synchronized { + val cachedVal = DfsBroadcast.values.get(uuid) + if (cachedVal != null) { + value_ = cachedVal.asInstanceOf[T] + } else { + logInfo( "Started reading Broadcasted variable " + uuid) + val start = System.nanoTime + + val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) + value_ = fileIn.readObject.asInstanceOf[T] + DfsBroadcast.values.put(uuid, value_) + fileIn.close + + val time = (System.nanoTime - start) / 1e9 + logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") + } + } + } +} + +private object DfsBroadcast +extends Logging { + val values = new MapMaker ().softValues ().makeMap[UUID, Any] + + private var initialized = false + + private var fileSystem: FileSystem = null + private var workDir: String = null + private var compress: Boolean = false + private var bufferSize: Int = 65536 + + def initialize (): Unit = { + synchronized { + if (!initialized) { + bufferSize = System.getProperty("spark.buffer.size", "65536").toInt + val dfs = System.getProperty("spark.dfs", "file:///") + if (!dfs.startsWith("file://")) { + val conf = new Configuration() + conf.setInt("io.file.buffer.size", bufferSize) + val rep = System.getProperty("spark.dfs.replication", "3").toInt + conf.setInt("dfs.replication", rep) + fileSystem = FileSystem.get(new URI(dfs), conf) + } + workDir = System.getProperty("spark.dfs.workdir", "/tmp") + compress = System.getProperty("spark.compress", "false").toBoolean + + initialized = true + } + } + } + + private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid) + + def openFileForReading(uuid: UUID): InputStream = { + val fileStream = if (fileSystem != null) { + fileSystem.open(getPath(uuid)) + } else { + // Local filesystem + new FileInputStream(getPath(uuid).toString) + } + + if (compress) { + // LZF stream does its own buffering + new LZFInputStream(fileStream) + } else if (fileSystem == null) { + new BufferedInputStream(fileStream, bufferSize) + } else { + // Hadoop streams do their own buffering + fileStream + } + } + + def openFileForWriting(uuid: UUID): OutputStream = { + val fileStream = if (fileSystem != null) { + fileSystem.create(getPath(uuid)) + } else { + // Local filesystem + new FileOutputStream(getPath(uuid).toString) + } + + if (compress) { + // LZF stream does its own buffering + new LZFOutputStream(fileStream) + } else if (fileSystem == null) { + new BufferedOutputStream(fileStream, bufferSize) + } else { + // Hadoop streams do their own buffering + fileStream + } + } +} diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index 216ea4c0a9..75efd9d1fb 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -19,7 +19,7 @@ class SparkContext(master: String, frameworkName: String) extends Logging { new Accumulator(initialValue, param) // TODO: Keep around a weak hash map of values to Cached versions? - // def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) + // def broadcast[T](value: T) = new DfsBroadcast(value, local) def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) def textFile(path: String) = new HdfsTextFile(this, path) |