diff options
-rw-r--r-- | core/src/main/scala/spark/broadcast/ChainedBroadcast.scala | 794 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/DfsBroadcast.scala | 135 |
2 files changed, 0 insertions, 929 deletions
diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala deleted file mode 100644 index 43290c241f..0000000000 --- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala +++ /dev/null @@ -1,794 +0,0 @@ -package spark.broadcast - -import java.io._ -import java.net._ -import java.util.{Comparator, PriorityQueue, Random, UUID} - -import scala.collection.mutable.{Map, Set} -import scala.math - -import spark._ - -class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging with Serializable { - - def value = value_ - - ChainedBroadcast.synchronized { - ChainedBroadcast.values.put(uuid, 0, value_) - } - - @transient var arrayOfBlocks: Array[BroadcastBlock] = null - @transient var totalBytes = -1 - @transient var totalBlocks = -1 - @transient var hasBlocks = 0 - // CHANGED: BlockSize in the Broadcast object is expected to change over time - @transient var blockSize = Broadcast.BlockSize - - @transient var listenPortLock = new Object - @transient var guidePortLock = new Object - @transient var totalBlocksLock = new Object - @transient var hasBlocksLock = new Object - - @transient var pqOfSources = new PriorityQueue[SourceInfo] - - @transient var serveMR: ServeMultipleRequests = null - @transient var guideMR: GuideMultipleRequests = null - - @transient var hostAddress = Utils.localIpAddress - @transient var listenPort = -1 - @transient var guidePort = -1 - - @transient var hasCopyInHDFS = false - @transient var stopBroadcast = false - - // Must call this after all the variables have been created/initialized - if (!isLocal) { - sendBroadcast - } - - def sendBroadcast() { - logInfo("Local host address: " + hostAddress) - - // Store a persistent copy in HDFS - // TODO: Turned OFF for now - // val out = new ObjectOutputStream(DfsBroadcast.openFileForWriting(uuid)) - // out.writeObject(value_) - // out.close() - // TODO: Fix this at some point - hasCopyInHDFS = true - - // Create a variableInfo object and store it in valueInfos - var variableInfo = Broadcast.blockifyObject(value_) - - guideMR = new GuideMultipleRequests - guideMR.setDaemon(true) - guideMR.start() - logInfo("GuideMultipleRequests started...") - - serveMR = new ServeMultipleRequests - serveMR.setDaemon(true) - serveMR.start() - logInfo("ServeMultipleRequests started...") - - // Prepare the value being broadcasted - // TODO: Refactoring and clean-up required here - arrayOfBlocks = variableInfo.arrayOfBlocks - totalBytes = variableInfo.totalBytes - totalBlocks = variableInfo.totalBlocks - hasBlocks = variableInfo.totalBlocks - - while (listenPort == -1) { - listenPortLock.synchronized { - listenPortLock.wait() - } - } - - pqOfSources = new PriorityQueue[SourceInfo] - val masterSource = - SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes, blockSize) - pqOfSources.add(masterSource) - - // Register with the Tracker - while (guidePort == -1) { - guidePortLock.synchronized { - guidePortLock.wait() - } - } - ChainedBroadcast.registerValue(uuid, guidePort) - } - - private def readObject(in: ObjectInputStream) { - in.defaultReadObject() - ChainedBroadcast.synchronized { - val cachedVal = ChainedBroadcast.values.get(uuid, 0) - if (cachedVal != null) { - value_ = cachedVal.asInstanceOf[T] - } else { - // Initializing everything because Master will only send null/0 values - initializeSlaveVariables - - logInfo("Local host address: " + hostAddress) - - serveMR = new ServeMultipleRequests - serveMR.setDaemon(true) - serveMR.start() - logInfo("ServeMultipleRequests started...") - - val start = System.nanoTime - - val receptionSucceeded = receiveBroadcast(uuid) - // If does not succeed, then get from HDFS copy - if (receptionSucceeded) { - value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - ChainedBroadcast.values.put(uuid, 0, value_) - } else { - val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) - value_ = fileIn.readObject.asInstanceOf[T] - ChainedBroadcast.values.put(uuid, 0, value_) - fileIn.close() - } - - val time =(System.nanoTime - start) / 1e9 - logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s") - } - } - } - - private def initializeSlaveVariables() { - arrayOfBlocks = null - totalBytes = -1 - totalBlocks = -1 - hasBlocks = 0 - blockSize = -1 - - listenPortLock = new Object - totalBlocksLock = new Object - hasBlocksLock = new Object - - serveMR = null - - hostAddress = Utils.localIpAddress - listenPort = -1 - - stopBroadcast = false - } - - def getMasterListenPort(variableUUID: UUID): Int = { - var clientSocketToTracker: Socket = null - var oosTracker: ObjectOutputStream = null - var oisTracker: ObjectInputStream = null - - var masterListenPort: Int = SourceInfo.TxOverGoToHDFS - - var retriesLeft = Broadcast.MaxRetryCount - do { - try { - // Connect to the tracker to find out the guide - clientSocketToTracker = - new Socket(Broadcast.MasterHostAddress, Broadcast.MasterTrackerPort) - oosTracker = - new ObjectOutputStream(clientSocketToTracker.getOutputStream) - oosTracker.flush() - oisTracker = - new ObjectInputStream(clientSocketToTracker.getInputStream) - - // Send UUID and receive masterListenPort - oosTracker.writeObject(uuid) - oosTracker.flush() - masterListenPort = oisTracker.readObject.asInstanceOf[Int] - } catch { - case e: Exception => { - logInfo("getMasterListenPort had a " + e) - } - } finally { - if (oisTracker != null) { - oisTracker.close() - } - if (oosTracker != null) { - oosTracker.close() - } - if (clientSocketToTracker != null) { - clientSocketToTracker.close() - } - } - retriesLeft -= 1 - - Thread.sleep(ChainedBroadcast.ranGen.nextInt( - Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) + - Broadcast.MinKnockInterval) - - } while (retriesLeft > 0 && masterListenPort == SourceInfo.TxNotStartedRetry) - - logInfo("Got this guidePort from Tracker: " + masterListenPort) - return masterListenPort - } - - def receiveBroadcast(variableUUID: UUID): Boolean = { - val masterListenPort = getMasterListenPort(variableUUID) - - if (masterListenPort == SourceInfo.TxOverGoToHDFS || - masterListenPort == SourceInfo.TxNotStartedRetry) { - // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go - // to HDFS anyway when receiveBroadcast returns false - return false - } - - // Wait until hostAddress and listenPort are created by the - // ServeMultipleRequests thread - while (listenPort == -1) { - listenPortLock.synchronized { - listenPortLock.wait() - } - } - - var clientSocketToMaster: Socket = null - var oosMaster: ObjectOutputStream = null - var oisMaster: ObjectInputStream = null - - // Connect and receive broadcast from the specified source, retrying the - // specified number of times in case of failures - var retriesLeft = Broadcast.MaxRetryCount - do { - // Connect to Master and send this worker's Information - clientSocketToMaster = - new Socket(Broadcast.MasterHostAddress, masterListenPort) - // TODO: Guiding object connection is reusable - oosMaster = - new ObjectOutputStream(clientSocketToMaster.getOutputStream) - oosMaster.flush() - oisMaster = - new ObjectInputStream(clientSocketToMaster.getInputStream) - - logInfo("Connected to Master's guiding object") - - // Send local source information - oosMaster.writeObject(SourceInfo(hostAddress, listenPort)) - oosMaster.flush() - - // Receive source information from Master - var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo] - totalBlocks = sourceInfo.totalBlocks - arrayOfBlocks = new Array[BroadcastBlock](totalBlocks) - totalBlocksLock.synchronized { - totalBlocksLock.notifyAll() - } - totalBytes = sourceInfo.totalBytes - - logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) - - val start = System.nanoTime - val receptionSucceeded = receiveSingleTransmission(sourceInfo) - val time =(System.nanoTime - start) / 1e9 - - // Updating some statistics in sourceInfo. Master will be using them later - if (!receptionSucceeded) { - sourceInfo.receptionFailed = true - } - - // Send back statistics to the Master - oosMaster.writeObject(sourceInfo) - - if (oisMaster != null) { - oisMaster.close() - } - if (oosMaster != null) { - oosMaster.close() - } - if (clientSocketToMaster != null) { - clientSocketToMaster.close() - } - - retriesLeft -= 1 - } while (retriesLeft > 0 && hasBlocks < totalBlocks) - - return(hasBlocks == totalBlocks) - } - - // Tries to receive broadcast from the source and returns Boolean status. - // This might be called multiple times to retry a defined number of times. - private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = { - var clientSocketToSource: Socket = null - var oosSource: ObjectOutputStream = null - var oisSource: ObjectInputStream = null - - var receptionSucceeded = false - try { - // Connect to the source to get the object itself - clientSocketToSource = - new Socket(sourceInfo.hostAddress, sourceInfo.listenPort) - oosSource = - new ObjectOutputStream(clientSocketToSource.getOutputStream) - oosSource.flush() - oisSource = - new ObjectInputStream(clientSocketToSource.getInputStream) - - logInfo("Inside receiveSingleTransmission") - logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) - - // Send the range - oosSource.writeObject((hasBlocks, totalBlocks)) - oosSource.flush() - - for (i <- hasBlocks until totalBlocks) { - val recvStartTime = System.currentTimeMillis - val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] - val receptionTime =(System.currentTimeMillis - recvStartTime) - - logInfo("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.") - - arrayOfBlocks(hasBlocks) = bcBlock - hasBlocks += 1 - // Set to true if at least one block is received - receptionSucceeded = true - hasBlocksLock.synchronized { - hasBlocksLock.notifyAll() - } - } - } catch { - case e: Exception => { - logInfo("receiveSingleTransmission had a " + e) - } - } finally { - if (oisSource != null) { - oisSource.close() - } - if (oosSource != null) { - oosSource.close() - } - if (clientSocketToSource != null) { - clientSocketToSource.close() - } - } - - return receptionSucceeded - } - - class GuideMultipleRequests - extends Thread with Logging { - // Keep track of sources that have completed reception - private var setOfCompletedSources = Set[SourceInfo]() - - override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() - var serverSocket: ServerSocket = null - - serverSocket = new ServerSocket(0) - guidePort = serverSocket.getLocalPort - logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort) - - guidePortLock.synchronized { - guidePortLock.notifyAll() - } - - try { - // Don't stop until there is a copy in HDFS - while (!stopBroadcast || !hasCopyInHDFS) { - var clientSocket: Socket = null - try { - serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout) - clientSocket = serverSocket.accept - } catch { - case e: Exception => { - logInfo("GuideMultipleRequests Timeout.") - - // Stop broadcast if at least one worker has connected and - // everyone connected so far are done. Comparing with - // pqOfSources.size - 1, because it includes the Guide itself - if (pqOfSources.size > 1 && - setOfCompletedSources.size == pqOfSources.size - 1) { - stopBroadcast = true - } - } - } - if (clientSocket != null) { - logInfo("Guide: Accepted new client connection: " + clientSocket) - try { - threadPool.execute(new GuideSingleRequest(clientSocket)) - } catch { - // In failure, close the socket here; else, the thread will close it - case ioe: IOException => clientSocket.close() - } - } - } - - logInfo("Sending stopBroadcast notifications...") - sendStopBroadcastNotifications - - ChainedBroadcast.unregisterValue(uuid) - } finally { - if (serverSocket != null) { - logInfo("GuideMultipleRequests now stopping...") - serverSocket.close() - } - } - - // Shutdown the thread pool - threadPool.shutdown() - } - - private def sendStopBroadcastNotifications() { - pqOfSources.synchronized { - var pqIter = pqOfSources.iterator - while (pqIter.hasNext) { - var sourceInfo = pqIter.next - - var guideSocketToSource: Socket = null - var gosSource: ObjectOutputStream = null - var gisSource: ObjectInputStream = null - - try { - // Connect to the source - guideSocketToSource = - new Socket(sourceInfo.hostAddress, sourceInfo.listenPort) - gosSource = - new ObjectOutputStream(guideSocketToSource.getOutputStream) - gosSource.flush() - gisSource = - new ObjectInputStream(guideSocketToSource.getInputStream) - - // Send stopBroadcast signal. Range = SourceInfo.StopBroadcast*2 - gosSource.writeObject((SourceInfo.StopBroadcast, - SourceInfo.StopBroadcast)) - gosSource.flush() - } catch { - case e: Exception => { - logInfo("sendStopBroadcastNotifications had a " + e) - } - } finally { - if (gisSource != null) { - gisSource.close() - } - if (gosSource != null) { - gosSource.close() - } - if (guideSocketToSource != null) { - guideSocketToSource.close() - } - } - } - } - } - - class GuideSingleRequest(val clientSocket: Socket) - extends Thread with Logging { - private val oos = new ObjectOutputStream(clientSocket.getOutputStream) - oos.flush() - private val ois = new ObjectInputStream(clientSocket.getInputStream) - - private var selectedSourceInfo: SourceInfo = null - private var thisWorkerInfo:SourceInfo = null - - override def run() { - try { - logInfo("new GuideSingleRequest is running") - // Connecting worker is sending in its hostAddress and listenPort it will - // be listening to. Other fields are invalid(SourceInfo.UnusedParam) - var sourceInfo = ois.readObject.asInstanceOf[SourceInfo] - - pqOfSources.synchronized { - // Select a suitable source and send it back to the worker - selectedSourceInfo = selectSuitableSource(sourceInfo) - logInfo("Sending selectedSourceInfo: " + selectedSourceInfo) - oos.writeObject(selectedSourceInfo) - oos.flush() - - // Add this new(if it can finish) source to the PQ of sources - thisWorkerInfo = SourceInfo(sourceInfo.hostAddress, - sourceInfo.listenPort, totalBlocks, totalBytes, blockSize) - logInfo("Adding possible new source to pqOfSources: " + thisWorkerInfo) - pqOfSources.add(thisWorkerInfo) - } - - // Wait till the whole transfer is done. Then receive and update source - // statistics in pqOfSources - sourceInfo = ois.readObject.asInstanceOf[SourceInfo] - - pqOfSources.synchronized { - // This should work since SourceInfo is a case class - assert(pqOfSources.contains(selectedSourceInfo)) - - // Remove first - pqOfSources.remove(selectedSourceInfo) - // TODO: Removing a source based on just one failure notification! - - // Update sourceInfo and put it back in, IF reception succeeded - if (!sourceInfo.receptionFailed) { - // Add thisWorkerInfo to sources that have completed reception - setOfCompletedSources.synchronized { - setOfCompletedSources += thisWorkerInfo - } - - selectedSourceInfo.currentLeechers -= 1 - - // Put it back - pqOfSources.add(selectedSourceInfo) - } - } - } catch { - // If something went wrong, e.g., the worker at the other end died etc. - // then close everything up - case e: Exception => { - // Assuming that exception caused due to receiver worker failure. - // Remove failed worker from pqOfSources and update leecherCount of - // corresponding source worker - pqOfSources.synchronized { - if (selectedSourceInfo != null) { - // Remove first - pqOfSources.remove(selectedSourceInfo) - // Update leecher count and put it back in - selectedSourceInfo.currentLeechers -= 1 - pqOfSources.add(selectedSourceInfo) - } - - // Remove thisWorkerInfo - if (pqOfSources != null) { - pqOfSources.remove(thisWorkerInfo) - } - } - } - } finally { - ois.close() - oos.close() - clientSocket.close() - } - } - - // FIXME: Caller must have a synchronized block on pqOfSources - // FIXME: If a worker fails to get the broadcasted variable from a source and - // comes back to Master, this function might choose the worker itself as a - // source tp create a dependency cycle(this worker was put into pqOfSources - // as a streming source when it first arrived). The length of this cycle can - // be arbitrarily long. - private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = { - // Select one based on the ordering strategy(e.g., least leechers etc.) - // take is a blocking call removing the element from PQ - var selectedSource = pqOfSources.poll - assert(selectedSource != null) - // Update leecher count - selectedSource.currentLeechers += 1 - // Add it back and then return - pqOfSources.add(selectedSource) - return selectedSource - } - } - } - - class ServeMultipleRequests - extends Thread with Logging { - override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() - var serverSocket: ServerSocket = null - - serverSocket = new ServerSocket(0) - listenPort = serverSocket.getLocalPort - logInfo("ServeMultipleRequests started with " + serverSocket) - - listenPortLock.synchronized { - listenPortLock.notifyAll() - } - - try { - while (!stopBroadcast) { - var clientSocket: Socket = null - try { - serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout) - clientSocket = serverSocket.accept - } catch { - case e: Exception => { - logInfo("ServeMultipleRequests Timeout.") - } - } - if (clientSocket != null) { - logInfo("Serve: Accepted new client connection: " + clientSocket) - try { - threadPool.execute(new ServeSingleRequest(clientSocket)) - } catch { - // In failure, close socket here; else, the thread will close it - case ioe: IOException => clientSocket.close() - } - } - } - } finally { - if (serverSocket != null) { - logInfo("ServeMultipleRequests now stopping...") - serverSocket.close() - } - } - - // Shutdown the thread pool - threadPool.shutdown() - } - - class ServeSingleRequest(val clientSocket: Socket) - extends Thread with Logging { - private val oos = new ObjectOutputStream(clientSocket.getOutputStream) - oos.flush() - private val ois = new ObjectInputStream(clientSocket.getInputStream) - - private var sendFrom = 0 - private var sendUntil = totalBlocks - - override def run() { - try { - logInfo("new ServeSingleRequest is running") - - // Receive range to send - var rangeToSend = ois.readObject.asInstanceOf[(Int, Int)] - sendFrom = rangeToSend._1 - sendUntil = rangeToSend._2 - - if (sendFrom == SourceInfo.StopBroadcast && - sendUntil == SourceInfo.StopBroadcast) { - stopBroadcast = true - } else { - // Carry on - sendObject - } - } catch { - // If something went wrong, e.g., the worker at the other end died etc. - // then close everything up - case e: Exception => { - logInfo("ServeSingleRequest had a " + e) - } - } finally { - logInfo("ServeSingleRequest is closing streams and sockets") - ois.close() - oos.close() - clientSocket.close() - } - } - - private def sendObject() { - // Wait till receiving the SourceInfo from Master - while (totalBlocks == -1) { - totalBlocksLock.synchronized { - totalBlocksLock.wait() - } - } - - for (i <- sendFrom until sendUntil) { - while (i == hasBlocks) { - hasBlocksLock.synchronized { - hasBlocksLock.wait() - } - } - try { - oos.writeObject(arrayOfBlocks(i)) - oos.flush() - } catch { - case e: Exception => { - logInfo("sendObject had a " + e) - } - } - logInfo("Sent block: " + i + " to " + clientSocket) - } - } - } - } -} - -class ChainedBroadcastFactory -extends BroadcastFactory { - def initialize(isMaster: Boolean) { - ChainedBroadcast.initialize(isMaster) - } - def newBroadcast[T](value_ : T, isLocal: Boolean) = { - new ChainedBroadcast[T](value_, isLocal) - } -} - -private object ChainedBroadcast -extends Logging { - val values = SparkEnv.get.cache.newKeySpace() - - var valueToGuidePortMap = Map[UUID, Int]() - - // Random number generator - var ranGen = new Random - - private var initialized = false - private var isMaster_ = false - - private var trackMV: TrackMultipleValues = null - - def initialize(isMaster__ : Boolean) { - synchronized { - if (!initialized) { - isMaster_ = isMaster__ - - if (isMaster) { - trackMV = new TrackMultipleValues - trackMV.setDaemon(true) - trackMV.start() - // TODO: Logging the following line makes the Spark framework ID not - // getting logged, cause it calls logInfo before log4j is initialized - logInfo("TrackMultipleValues started...") - } - - // Initialize DfsBroadcast to be used for broadcast variable persistence - DfsBroadcast.initialize - - initialized = true - } - } - } - - def isMaster = isMaster_ - - def registerValue(uuid: UUID, guidePort: Int) { - valueToGuidePortMap.synchronized { - valueToGuidePortMap +=(uuid -> guidePort) - logInfo("New value registered with the Tracker " + valueToGuidePortMap) - } - } - - def unregisterValue(uuid: UUID) { - valueToGuidePortMap.synchronized { - valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS - logInfo("Value unregistered from the Tracker " + valueToGuidePortMap) - } - } - - class TrackMultipleValues - extends Thread with Logging { - override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() - var serverSocket: ServerSocket = null - - serverSocket = new ServerSocket(Broadcast.MasterTrackerPort) - logInfo("TrackMultipleValues" + serverSocket) - - try { - while (true) { - var clientSocket: Socket = null - try { - serverSocket.setSoTimeout(Broadcast.TrackerSocketTimeout) - clientSocket = serverSocket.accept - } catch { - case e: Exception => { - logInfo("TrackMultipleValues Timeout. Stopping listening...") - } - } - - if (clientSocket != null) { - try { - threadPool.execute(new Thread { - override def run() { - val oos = new ObjectOutputStream(clientSocket.getOutputStream) - oos.flush() - val ois = new ObjectInputStream(clientSocket.getInputStream) - try { - val uuid = ois.readObject.asInstanceOf[UUID] - var guidePort = - if (valueToGuidePortMap.contains(uuid)) { - valueToGuidePortMap(uuid) - } else SourceInfo.TxNotStartedRetry - logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + guidePort) - oos.writeObject(guidePort) - } catch { - case e: Exception => { - logInfo("TrackMultipleValues had a " + e) - } - } finally { - ois.close() - oos.close() - clientSocket.close() - } - } - }) - } catch { - // In failure, close socket here; else, client thread will close - case ioe: IOException => clientSocket.close() - } - } - } - } finally { - serverSocket.close() - } - - // Shutdown the thread pool - threadPool.shutdown() - } - } -} diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala deleted file mode 100644 index d18dfb8963..0000000000 --- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala +++ /dev/null @@ -1,135 +0,0 @@ -package spark.broadcast - -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} - -import java.io._ -import java.net._ -import java.util.UUID - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} - -import spark._ - -class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging with Serializable { - - def value = value_ - - DfsBroadcast.synchronized { - DfsBroadcast.values.put(uuid, 0, value_) - } - - if (!isLocal) { - sendBroadcast - } - - def sendBroadcast () { - val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid)) - out.writeObject (value_) - out.close() - } - - // Called by JVM when deserializing an object - private def readObject(in: ObjectInputStream) { - in.defaultReadObject() - DfsBroadcast.synchronized { - val cachedVal = DfsBroadcast.values.get(uuid, 0) - if (cachedVal != null) { - value_ = cachedVal.asInstanceOf[T] - } else { - logInfo( "Started reading Broadcasted variable " + uuid) - val start = System.nanoTime - - val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) - value_ = fileIn.readObject.asInstanceOf[T] - DfsBroadcast.values.put(uuid, 0, value_) - fileIn.close() - - val time = (System.nanoTime - start) / 1e9 - logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") - } - } - } -} - -class DfsBroadcastFactory -extends BroadcastFactory { - def initialize (isMaster: Boolean) { - DfsBroadcast.initialize - } - def newBroadcast[T] (value_ : T, isLocal: Boolean) = - new DfsBroadcast[T] (value_, isLocal) -} - -private object DfsBroadcast -extends Logging { - val values = SparkEnv.get.cache.newKeySpace() - - private var initialized = false - - private var fileSystem: FileSystem = null - private var workDir: String = null - private var compress: Boolean = false - private var bufferSize: Int = 65536 - - def initialize() { - synchronized { - if (!initialized) { - bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - val dfs = System.getProperty("spark.dfs", "file:///") - if (!dfs.startsWith("file://")) { - val conf = new Configuration() - conf.setInt("io.file.buffer.size", bufferSize) - val rep = System.getProperty("spark.dfs.replication", "3").toInt - conf.setInt("dfs.replication", rep) - fileSystem = FileSystem.get(new URI(dfs), conf) - } - workDir = System.getProperty("spark.dfs.workDir", "/tmp") - compress = System.getProperty("spark.compress", "false").toBoolean - - initialized = true - } - } - } - - private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid) - - def openFileForReading(uuid: UUID): InputStream = { - val fileStream = if (fileSystem != null) { - fileSystem.open(getPath(uuid)) - } else { - // Local filesystem - new FileInputStream(getPath(uuid).toString) - } - - if (compress) { - // LZF stream does its own buffering - new LZFInputStream(fileStream) - } else if (fileSystem == null) { - new BufferedInputStream(fileStream, bufferSize) - } else { - // Hadoop streams do their own buffering - fileStream - } - } - - def openFileForWriting(uuid: UUID): OutputStream = { - val fileStream = if (fileSystem != null) { - fileSystem.create(getPath(uuid)) - } else { - // Local filesystem - new FileOutputStream(getPath(uuid).toString) - } - - if (compress) { - // LZF stream does its own buffering - new LZFOutputStream(fileStream) - } else if (fileSystem == null) { - new BufferedOutputStream(fileStream, bufferSize) - } else { - // Hadoop streams do their own buffering - fileStream - } - } -} |