aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-08 11:57:00 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-08 11:57:00 -0700
commit178bb29f05158ec69dbbd1d5a12352e0241959a6 (patch)
tree3bf396e58c32406d49efd9cdc131897061e74f3f /core
parent5d1a887bed8423bd6c25660910d18d91880e01fe (diff)
downloadspark-178bb29f05158ec69dbbd1d5a12352e0241959a6.tar.gz
spark-178bb29f05158ec69dbbd1d5a12352e0241959a6.tar.bz2
spark-178bb29f05158ec69dbbd1d5a12352e0241959a6.zip
Removed Chained and Dfs broadcast implementations
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/broadcast/ChainedBroadcast.scala794
-rw-r--r--core/src/main/scala/spark/broadcast/DfsBroadcast.scala135
2 files changed, 0 insertions, 929 deletions
diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
deleted file mode 100644
index 43290c241f..0000000000
--- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
+++ /dev/null
@@ -1,794 +0,0 @@
-package spark.broadcast
-
-import java.io._
-import java.net._
-import java.util.{Comparator, PriorityQueue, Random, UUID}
-
-import scala.collection.mutable.{Map, Set}
-import scala.math
-
-import spark._
-
-class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging with Serializable {
-
- def value = value_
-
- ChainedBroadcast.synchronized {
- ChainedBroadcast.values.put(uuid, 0, value_)
- }
-
- @transient var arrayOfBlocks: Array[BroadcastBlock] = null
- @transient var totalBytes = -1
- @transient var totalBlocks = -1
- @transient var hasBlocks = 0
- // CHANGED: BlockSize in the Broadcast object is expected to change over time
- @transient var blockSize = Broadcast.BlockSize
-
- @transient var listenPortLock = new Object
- @transient var guidePortLock = new Object
- @transient var totalBlocksLock = new Object
- @transient var hasBlocksLock = new Object
-
- @transient var pqOfSources = new PriorityQueue[SourceInfo]
-
- @transient var serveMR: ServeMultipleRequests = null
- @transient var guideMR: GuideMultipleRequests = null
-
- @transient var hostAddress = Utils.localIpAddress
- @transient var listenPort = -1
- @transient var guidePort = -1
-
- @transient var hasCopyInHDFS = false
- @transient var stopBroadcast = false
-
- // Must call this after all the variables have been created/initialized
- if (!isLocal) {
- sendBroadcast
- }
-
- def sendBroadcast() {
- logInfo("Local host address: " + hostAddress)
-
- // Store a persistent copy in HDFS
- // TODO: Turned OFF for now
- // val out = new ObjectOutputStream(DfsBroadcast.openFileForWriting(uuid))
- // out.writeObject(value_)
- // out.close()
- // TODO: Fix this at some point
- hasCopyInHDFS = true
-
- // Create a variableInfo object and store it in valueInfos
- var variableInfo = Broadcast.blockifyObject(value_)
-
- guideMR = new GuideMultipleRequests
- guideMR.setDaemon(true)
- guideMR.start()
- logInfo("GuideMultipleRequests started...")
-
- serveMR = new ServeMultipleRequests
- serveMR.setDaemon(true)
- serveMR.start()
- logInfo("ServeMultipleRequests started...")
-
- // Prepare the value being broadcasted
- // TODO: Refactoring and clean-up required here
- arrayOfBlocks = variableInfo.arrayOfBlocks
- totalBytes = variableInfo.totalBytes
- totalBlocks = variableInfo.totalBlocks
- hasBlocks = variableInfo.totalBlocks
-
- while (listenPort == -1) {
- listenPortLock.synchronized {
- listenPortLock.wait()
- }
- }
-
- pqOfSources = new PriorityQueue[SourceInfo]
- val masterSource =
- SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes, blockSize)
- pqOfSources.add(masterSource)
-
- // Register with the Tracker
- while (guidePort == -1) {
- guidePortLock.synchronized {
- guidePortLock.wait()
- }
- }
- ChainedBroadcast.registerValue(uuid, guidePort)
- }
-
- private def readObject(in: ObjectInputStream) {
- in.defaultReadObject()
- ChainedBroadcast.synchronized {
- val cachedVal = ChainedBroadcast.values.get(uuid, 0)
- if (cachedVal != null) {
- value_ = cachedVal.asInstanceOf[T]
- } else {
- // Initializing everything because Master will only send null/0 values
- initializeSlaveVariables
-
- logInfo("Local host address: " + hostAddress)
-
- serveMR = new ServeMultipleRequests
- serveMR.setDaemon(true)
- serveMR.start()
- logInfo("ServeMultipleRequests started...")
-
- val start = System.nanoTime
-
- val receptionSucceeded = receiveBroadcast(uuid)
- // If does not succeed, then get from HDFS copy
- if (receptionSucceeded) {
- value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
- ChainedBroadcast.values.put(uuid, 0, value_)
- } else {
- val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
- value_ = fileIn.readObject.asInstanceOf[T]
- ChainedBroadcast.values.put(uuid, 0, value_)
- fileIn.close()
- }
-
- val time =(System.nanoTime - start) / 1e9
- logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
- }
- }
- }
-
- private def initializeSlaveVariables() {
- arrayOfBlocks = null
- totalBytes = -1
- totalBlocks = -1
- hasBlocks = 0
- blockSize = -1
-
- listenPortLock = new Object
- totalBlocksLock = new Object
- hasBlocksLock = new Object
-
- serveMR = null
-
- hostAddress = Utils.localIpAddress
- listenPort = -1
-
- stopBroadcast = false
- }
-
- def getMasterListenPort(variableUUID: UUID): Int = {
- var clientSocketToTracker: Socket = null
- var oosTracker: ObjectOutputStream = null
- var oisTracker: ObjectInputStream = null
-
- var masterListenPort: Int = SourceInfo.TxOverGoToHDFS
-
- var retriesLeft = Broadcast.MaxRetryCount
- do {
- try {
- // Connect to the tracker to find out the guide
- clientSocketToTracker =
- new Socket(Broadcast.MasterHostAddress, Broadcast.MasterTrackerPort)
- oosTracker =
- new ObjectOutputStream(clientSocketToTracker.getOutputStream)
- oosTracker.flush()
- oisTracker =
- new ObjectInputStream(clientSocketToTracker.getInputStream)
-
- // Send UUID and receive masterListenPort
- oosTracker.writeObject(uuid)
- oosTracker.flush()
- masterListenPort = oisTracker.readObject.asInstanceOf[Int]
- } catch {
- case e: Exception => {
- logInfo("getMasterListenPort had a " + e)
- }
- } finally {
- if (oisTracker != null) {
- oisTracker.close()
- }
- if (oosTracker != null) {
- oosTracker.close()
- }
- if (clientSocketToTracker != null) {
- clientSocketToTracker.close()
- }
- }
- retriesLeft -= 1
-
- Thread.sleep(ChainedBroadcast.ranGen.nextInt(
- Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) +
- Broadcast.MinKnockInterval)
-
- } while (retriesLeft > 0 && masterListenPort == SourceInfo.TxNotStartedRetry)
-
- logInfo("Got this guidePort from Tracker: " + masterListenPort)
- return masterListenPort
- }
-
- def receiveBroadcast(variableUUID: UUID): Boolean = {
- val masterListenPort = getMasterListenPort(variableUUID)
-
- if (masterListenPort == SourceInfo.TxOverGoToHDFS ||
- masterListenPort == SourceInfo.TxNotStartedRetry) {
- // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go
- // to HDFS anyway when receiveBroadcast returns false
- return false
- }
-
- // Wait until hostAddress and listenPort are created by the
- // ServeMultipleRequests thread
- while (listenPort == -1) {
- listenPortLock.synchronized {
- listenPortLock.wait()
- }
- }
-
- var clientSocketToMaster: Socket = null
- var oosMaster: ObjectOutputStream = null
- var oisMaster: ObjectInputStream = null
-
- // Connect and receive broadcast from the specified source, retrying the
- // specified number of times in case of failures
- var retriesLeft = Broadcast.MaxRetryCount
- do {
- // Connect to Master and send this worker's Information
- clientSocketToMaster =
- new Socket(Broadcast.MasterHostAddress, masterListenPort)
- // TODO: Guiding object connection is reusable
- oosMaster =
- new ObjectOutputStream(clientSocketToMaster.getOutputStream)
- oosMaster.flush()
- oisMaster =
- new ObjectInputStream(clientSocketToMaster.getInputStream)
-
- logInfo("Connected to Master's guiding object")
-
- // Send local source information
- oosMaster.writeObject(SourceInfo(hostAddress, listenPort))
- oosMaster.flush()
-
- // Receive source information from Master
- var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo]
- totalBlocks = sourceInfo.totalBlocks
- arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
- totalBlocksLock.synchronized {
- totalBlocksLock.notifyAll()
- }
- totalBytes = sourceInfo.totalBytes
-
- logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
-
- val start = System.nanoTime
- val receptionSucceeded = receiveSingleTransmission(sourceInfo)
- val time =(System.nanoTime - start) / 1e9
-
- // Updating some statistics in sourceInfo. Master will be using them later
- if (!receptionSucceeded) {
- sourceInfo.receptionFailed = true
- }
-
- // Send back statistics to the Master
- oosMaster.writeObject(sourceInfo)
-
- if (oisMaster != null) {
- oisMaster.close()
- }
- if (oosMaster != null) {
- oosMaster.close()
- }
- if (clientSocketToMaster != null) {
- clientSocketToMaster.close()
- }
-
- retriesLeft -= 1
- } while (retriesLeft > 0 && hasBlocks < totalBlocks)
-
- return(hasBlocks == totalBlocks)
- }
-
- // Tries to receive broadcast from the source and returns Boolean status.
- // This might be called multiple times to retry a defined number of times.
- private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = {
- var clientSocketToSource: Socket = null
- var oosSource: ObjectOutputStream = null
- var oisSource: ObjectInputStream = null
-
- var receptionSucceeded = false
- try {
- // Connect to the source to get the object itself
- clientSocketToSource =
- new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
- oosSource =
- new ObjectOutputStream(clientSocketToSource.getOutputStream)
- oosSource.flush()
- oisSource =
- new ObjectInputStream(clientSocketToSource.getInputStream)
-
- logInfo("Inside receiveSingleTransmission")
- logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
-
- // Send the range
- oosSource.writeObject((hasBlocks, totalBlocks))
- oosSource.flush()
-
- for (i <- hasBlocks until totalBlocks) {
- val recvStartTime = System.currentTimeMillis
- val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
- val receptionTime =(System.currentTimeMillis - recvStartTime)
-
- logInfo("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.")
-
- arrayOfBlocks(hasBlocks) = bcBlock
- hasBlocks += 1
- // Set to true if at least one block is received
- receptionSucceeded = true
- hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll()
- }
- }
- } catch {
- case e: Exception => {
- logInfo("receiveSingleTransmission had a " + e)
- }
- } finally {
- if (oisSource != null) {
- oisSource.close()
- }
- if (oosSource != null) {
- oosSource.close()
- }
- if (clientSocketToSource != null) {
- clientSocketToSource.close()
- }
- }
-
- return receptionSucceeded
- }
-
- class GuideMultipleRequests
- extends Thread with Logging {
- // Keep track of sources that have completed reception
- private var setOfCompletedSources = Set[SourceInfo]()
-
- override def run() {
- var threadPool = Utils.newDaemonCachedThreadPool()
- var serverSocket: ServerSocket = null
-
- serverSocket = new ServerSocket(0)
- guidePort = serverSocket.getLocalPort
- logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
-
- guidePortLock.synchronized {
- guidePortLock.notifyAll()
- }
-
- try {
- // Don't stop until there is a copy in HDFS
- while (!stopBroadcast || !hasCopyInHDFS) {
- var clientSocket: Socket = null
- try {
- serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout)
- clientSocket = serverSocket.accept
- } catch {
- case e: Exception => {
- logInfo("GuideMultipleRequests Timeout.")
-
- // Stop broadcast if at least one worker has connected and
- // everyone connected so far are done. Comparing with
- // pqOfSources.size - 1, because it includes the Guide itself
- if (pqOfSources.size > 1 &&
- setOfCompletedSources.size == pqOfSources.size - 1) {
- stopBroadcast = true
- }
- }
- }
- if (clientSocket != null) {
- logInfo("Guide: Accepted new client connection: " + clientSocket)
- try {
- threadPool.execute(new GuideSingleRequest(clientSocket))
- } catch {
- // In failure, close the socket here; else, the thread will close it
- case ioe: IOException => clientSocket.close()
- }
- }
- }
-
- logInfo("Sending stopBroadcast notifications...")
- sendStopBroadcastNotifications
-
- ChainedBroadcast.unregisterValue(uuid)
- } finally {
- if (serverSocket != null) {
- logInfo("GuideMultipleRequests now stopping...")
- serverSocket.close()
- }
- }
-
- // Shutdown the thread pool
- threadPool.shutdown()
- }
-
- private def sendStopBroadcastNotifications() {
- pqOfSources.synchronized {
- var pqIter = pqOfSources.iterator
- while (pqIter.hasNext) {
- var sourceInfo = pqIter.next
-
- var guideSocketToSource: Socket = null
- var gosSource: ObjectOutputStream = null
- var gisSource: ObjectInputStream = null
-
- try {
- // Connect to the source
- guideSocketToSource =
- new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
- gosSource =
- new ObjectOutputStream(guideSocketToSource.getOutputStream)
- gosSource.flush()
- gisSource =
- new ObjectInputStream(guideSocketToSource.getInputStream)
-
- // Send stopBroadcast signal. Range = SourceInfo.StopBroadcast*2
- gosSource.writeObject((SourceInfo.StopBroadcast,
- SourceInfo.StopBroadcast))
- gosSource.flush()
- } catch {
- case e: Exception => {
- logInfo("sendStopBroadcastNotifications had a " + e)
- }
- } finally {
- if (gisSource != null) {
- gisSource.close()
- }
- if (gosSource != null) {
- gosSource.close()
- }
- if (guideSocketToSource != null) {
- guideSocketToSource.close()
- }
- }
- }
- }
- }
-
- class GuideSingleRequest(val clientSocket: Socket)
- extends Thread with Logging {
- private val oos = new ObjectOutputStream(clientSocket.getOutputStream)
- oos.flush()
- private val ois = new ObjectInputStream(clientSocket.getInputStream)
-
- private var selectedSourceInfo: SourceInfo = null
- private var thisWorkerInfo:SourceInfo = null
-
- override def run() {
- try {
- logInfo("new GuideSingleRequest is running")
- // Connecting worker is sending in its hostAddress and listenPort it will
- // be listening to. Other fields are invalid(SourceInfo.UnusedParam)
- var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
-
- pqOfSources.synchronized {
- // Select a suitable source and send it back to the worker
- selectedSourceInfo = selectSuitableSource(sourceInfo)
- logInfo("Sending selectedSourceInfo: " + selectedSourceInfo)
- oos.writeObject(selectedSourceInfo)
- oos.flush()
-
- // Add this new(if it can finish) source to the PQ of sources
- thisWorkerInfo = SourceInfo(sourceInfo.hostAddress,
- sourceInfo.listenPort, totalBlocks, totalBytes, blockSize)
- logInfo("Adding possible new source to pqOfSources: " + thisWorkerInfo)
- pqOfSources.add(thisWorkerInfo)
- }
-
- // Wait till the whole transfer is done. Then receive and update source
- // statistics in pqOfSources
- sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
-
- pqOfSources.synchronized {
- // This should work since SourceInfo is a case class
- assert(pqOfSources.contains(selectedSourceInfo))
-
- // Remove first
- pqOfSources.remove(selectedSourceInfo)
- // TODO: Removing a source based on just one failure notification!
-
- // Update sourceInfo and put it back in, IF reception succeeded
- if (!sourceInfo.receptionFailed) {
- // Add thisWorkerInfo to sources that have completed reception
- setOfCompletedSources.synchronized {
- setOfCompletedSources += thisWorkerInfo
- }
-
- selectedSourceInfo.currentLeechers -= 1
-
- // Put it back
- pqOfSources.add(selectedSourceInfo)
- }
- }
- } catch {
- // If something went wrong, e.g., the worker at the other end died etc.
- // then close everything up
- case e: Exception => {
- // Assuming that exception caused due to receiver worker failure.
- // Remove failed worker from pqOfSources and update leecherCount of
- // corresponding source worker
- pqOfSources.synchronized {
- if (selectedSourceInfo != null) {
- // Remove first
- pqOfSources.remove(selectedSourceInfo)
- // Update leecher count and put it back in
- selectedSourceInfo.currentLeechers -= 1
- pqOfSources.add(selectedSourceInfo)
- }
-
- // Remove thisWorkerInfo
- if (pqOfSources != null) {
- pqOfSources.remove(thisWorkerInfo)
- }
- }
- }
- } finally {
- ois.close()
- oos.close()
- clientSocket.close()
- }
- }
-
- // FIXME: Caller must have a synchronized block on pqOfSources
- // FIXME: If a worker fails to get the broadcasted variable from a source and
- // comes back to Master, this function might choose the worker itself as a
- // source tp create a dependency cycle(this worker was put into pqOfSources
- // as a streming source when it first arrived). The length of this cycle can
- // be arbitrarily long.
- private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = {
- // Select one based on the ordering strategy(e.g., least leechers etc.)
- // take is a blocking call removing the element from PQ
- var selectedSource = pqOfSources.poll
- assert(selectedSource != null)
- // Update leecher count
- selectedSource.currentLeechers += 1
- // Add it back and then return
- pqOfSources.add(selectedSource)
- return selectedSource
- }
- }
- }
-
- class ServeMultipleRequests
- extends Thread with Logging {
- override def run() {
- var threadPool = Utils.newDaemonCachedThreadPool()
- var serverSocket: ServerSocket = null
-
- serverSocket = new ServerSocket(0)
- listenPort = serverSocket.getLocalPort
- logInfo("ServeMultipleRequests started with " + serverSocket)
-
- listenPortLock.synchronized {
- listenPortLock.notifyAll()
- }
-
- try {
- while (!stopBroadcast) {
- var clientSocket: Socket = null
- try {
- serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout)
- clientSocket = serverSocket.accept
- } catch {
- case e: Exception => {
- logInfo("ServeMultipleRequests Timeout.")
- }
- }
- if (clientSocket != null) {
- logInfo("Serve: Accepted new client connection: " + clientSocket)
- try {
- threadPool.execute(new ServeSingleRequest(clientSocket))
- } catch {
- // In failure, close socket here; else, the thread will close it
- case ioe: IOException => clientSocket.close()
- }
- }
- }
- } finally {
- if (serverSocket != null) {
- logInfo("ServeMultipleRequests now stopping...")
- serverSocket.close()
- }
- }
-
- // Shutdown the thread pool
- threadPool.shutdown()
- }
-
- class ServeSingleRequest(val clientSocket: Socket)
- extends Thread with Logging {
- private val oos = new ObjectOutputStream(clientSocket.getOutputStream)
- oos.flush()
- private val ois = new ObjectInputStream(clientSocket.getInputStream)
-
- private var sendFrom = 0
- private var sendUntil = totalBlocks
-
- override def run() {
- try {
- logInfo("new ServeSingleRequest is running")
-
- // Receive range to send
- var rangeToSend = ois.readObject.asInstanceOf[(Int, Int)]
- sendFrom = rangeToSend._1
- sendUntil = rangeToSend._2
-
- if (sendFrom == SourceInfo.StopBroadcast &&
- sendUntil == SourceInfo.StopBroadcast) {
- stopBroadcast = true
- } else {
- // Carry on
- sendObject
- }
- } catch {
- // If something went wrong, e.g., the worker at the other end died etc.
- // then close everything up
- case e: Exception => {
- logInfo("ServeSingleRequest had a " + e)
- }
- } finally {
- logInfo("ServeSingleRequest is closing streams and sockets")
- ois.close()
- oos.close()
- clientSocket.close()
- }
- }
-
- private def sendObject() {
- // Wait till receiving the SourceInfo from Master
- while (totalBlocks == -1) {
- totalBlocksLock.synchronized {
- totalBlocksLock.wait()
- }
- }
-
- for (i <- sendFrom until sendUntil) {
- while (i == hasBlocks) {
- hasBlocksLock.synchronized {
- hasBlocksLock.wait()
- }
- }
- try {
- oos.writeObject(arrayOfBlocks(i))
- oos.flush()
- } catch {
- case e: Exception => {
- logInfo("sendObject had a " + e)
- }
- }
- logInfo("Sent block: " + i + " to " + clientSocket)
- }
- }
- }
- }
-}
-
-class ChainedBroadcastFactory
-extends BroadcastFactory {
- def initialize(isMaster: Boolean) {
- ChainedBroadcast.initialize(isMaster)
- }
- def newBroadcast[T](value_ : T, isLocal: Boolean) = {
- new ChainedBroadcast[T](value_, isLocal)
- }
-}
-
-private object ChainedBroadcast
-extends Logging {
- val values = SparkEnv.get.cache.newKeySpace()
-
- var valueToGuidePortMap = Map[UUID, Int]()
-
- // Random number generator
- var ranGen = new Random
-
- private var initialized = false
- private var isMaster_ = false
-
- private var trackMV: TrackMultipleValues = null
-
- def initialize(isMaster__ : Boolean) {
- synchronized {
- if (!initialized) {
- isMaster_ = isMaster__
-
- if (isMaster) {
- trackMV = new TrackMultipleValues
- trackMV.setDaemon(true)
- trackMV.start()
- // TODO: Logging the following line makes the Spark framework ID not
- // getting logged, cause it calls logInfo before log4j is initialized
- logInfo("TrackMultipleValues started...")
- }
-
- // Initialize DfsBroadcast to be used for broadcast variable persistence
- DfsBroadcast.initialize
-
- initialized = true
- }
- }
- }
-
- def isMaster = isMaster_
-
- def registerValue(uuid: UUID, guidePort: Int) {
- valueToGuidePortMap.synchronized {
- valueToGuidePortMap +=(uuid -> guidePort)
- logInfo("New value registered with the Tracker " + valueToGuidePortMap)
- }
- }
-
- def unregisterValue(uuid: UUID) {
- valueToGuidePortMap.synchronized {
- valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS
- logInfo("Value unregistered from the Tracker " + valueToGuidePortMap)
- }
- }
-
- class TrackMultipleValues
- extends Thread with Logging {
- override def run() {
- var threadPool = Utils.newDaemonCachedThreadPool()
- var serverSocket: ServerSocket = null
-
- serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
- logInfo("TrackMultipleValues" + serverSocket)
-
- try {
- while (true) {
- var clientSocket: Socket = null
- try {
- serverSocket.setSoTimeout(Broadcast.TrackerSocketTimeout)
- clientSocket = serverSocket.accept
- } catch {
- case e: Exception => {
- logInfo("TrackMultipleValues Timeout. Stopping listening...")
- }
- }
-
- if (clientSocket != null) {
- try {
- threadPool.execute(new Thread {
- override def run() {
- val oos = new ObjectOutputStream(clientSocket.getOutputStream)
- oos.flush()
- val ois = new ObjectInputStream(clientSocket.getInputStream)
- try {
- val uuid = ois.readObject.asInstanceOf[UUID]
- var guidePort =
- if (valueToGuidePortMap.contains(uuid)) {
- valueToGuidePortMap(uuid)
- } else SourceInfo.TxNotStartedRetry
- logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
- oos.writeObject(guidePort)
- } catch {
- case e: Exception => {
- logInfo("TrackMultipleValues had a " + e)
- }
- } finally {
- ois.close()
- oos.close()
- clientSocket.close()
- }
- }
- })
- } catch {
- // In failure, close socket here; else, client thread will close
- case ioe: IOException => clientSocket.close()
- }
- }
- }
- } finally {
- serverSocket.close()
- }
-
- // Shutdown the thread pool
- threadPool.shutdown()
- }
- }
-}
diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
deleted file mode 100644
index d18dfb8963..0000000000
--- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-package spark.broadcast
-
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
-import java.io._
-import java.net._
-import java.util.UUID
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
-
-import spark._
-
-class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging with Serializable {
-
- def value = value_
-
- DfsBroadcast.synchronized {
- DfsBroadcast.values.put(uuid, 0, value_)
- }
-
- if (!isLocal) {
- sendBroadcast
- }
-
- def sendBroadcast () {
- val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
- out.writeObject (value_)
- out.close()
- }
-
- // Called by JVM when deserializing an object
- private def readObject(in: ObjectInputStream) {
- in.defaultReadObject()
- DfsBroadcast.synchronized {
- val cachedVal = DfsBroadcast.values.get(uuid, 0)
- if (cachedVal != null) {
- value_ = cachedVal.asInstanceOf[T]
- } else {
- logInfo( "Started reading Broadcasted variable " + uuid)
- val start = System.nanoTime
-
- val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
- value_ = fileIn.readObject.asInstanceOf[T]
- DfsBroadcast.values.put(uuid, 0, value_)
- fileIn.close()
-
- val time = (System.nanoTime - start) / 1e9
- logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
- }
- }
- }
-}
-
-class DfsBroadcastFactory
-extends BroadcastFactory {
- def initialize (isMaster: Boolean) {
- DfsBroadcast.initialize
- }
- def newBroadcast[T] (value_ : T, isLocal: Boolean) =
- new DfsBroadcast[T] (value_, isLocal)
-}
-
-private object DfsBroadcast
-extends Logging {
- val values = SparkEnv.get.cache.newKeySpace()
-
- private var initialized = false
-
- private var fileSystem: FileSystem = null
- private var workDir: String = null
- private var compress: Boolean = false
- private var bufferSize: Int = 65536
-
- def initialize() {
- synchronized {
- if (!initialized) {
- bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val dfs = System.getProperty("spark.dfs", "file:///")
- if (!dfs.startsWith("file://")) {
- val conf = new Configuration()
- conf.setInt("io.file.buffer.size", bufferSize)
- val rep = System.getProperty("spark.dfs.replication", "3").toInt
- conf.setInt("dfs.replication", rep)
- fileSystem = FileSystem.get(new URI(dfs), conf)
- }
- workDir = System.getProperty("spark.dfs.workDir", "/tmp")
- compress = System.getProperty("spark.compress", "false").toBoolean
-
- initialized = true
- }
- }
- }
-
- private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)
-
- def openFileForReading(uuid: UUID): InputStream = {
- val fileStream = if (fileSystem != null) {
- fileSystem.open(getPath(uuid))
- } else {
- // Local filesystem
- new FileInputStream(getPath(uuid).toString)
- }
-
- if (compress) {
- // LZF stream does its own buffering
- new LZFInputStream(fileStream)
- } else if (fileSystem == null) {
- new BufferedInputStream(fileStream, bufferSize)
- } else {
- // Hadoop streams do their own buffering
- fileStream
- }
- }
-
- def openFileForWriting(uuid: UUID): OutputStream = {
- val fileStream = if (fileSystem != null) {
- fileSystem.create(getPath(uuid))
- } else {
- // Local filesystem
- new FileOutputStream(getPath(uuid).toString)
- }
-
- if (compress) {
- // LZF stream does its own buffering
- new LZFOutputStream(fileStream)
- } else if (fileSystem == null) {
- new BufferedOutputStream(fileStream, bufferSize)
- } else {
- // Hadoop streams do their own buffering
- fileStream
- }
- }
-}