diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-08 13:22:12 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-08 13:22:12 -0700 |
commit | c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa (patch) | |
tree | 9eec312911b437a691d1710b3d5141c342a3cca9 | |
parent | 178bb29f05158ec69dbbd1d5a12352e0241959a6 (diff) | |
download | spark-c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa.tar.gz spark-c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa.tar.bz2 spark-c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa.zip |
Compiles without Dfs
-rw-r--r-- | core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 24 |
2 files changed, 6 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index e009d4e7db..09266b9f60 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -52,7 +52,6 @@ extends Broadcast[T] with Logging with Serializable { @transient var listenPort = -1 @transient var guidePort = -1 - @transient var hasCopyInHDFS = false @transient var stopBroadcast = false // Must call this after all the variables have been created/initialized @@ -63,14 +62,6 @@ extends Broadcast[T] with Logging with Serializable { def sendBroadcast() { logInfo("Local host address: " + hostAddress) - // Store a persistent copy in HDFS - // TODO: Turned OFF for now. Related to persistence - // val out = new ObjectOutputStream(BroadcastCH.openFileForWriting(uuid)) - // out.writeObject(value_) - // out.close() - // FIXME: Fix this at some point - hasCopyInHDFS = true - // Create a variableInfo object and store it in valueInfos var variableInfo = Broadcast.blockifyObject(value_) @@ -149,16 +140,11 @@ extends Broadcast[T] with Logging with Serializable { val start = System.nanoTime val receptionSucceeded = receiveBroadcast(uuid) - // If does not succeed, then get from HDFS copy if (receptionSucceeded) { value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) BitTorrentBroadcast.values.put(uuid, 0, value_) } else { - // TODO: This part won't work, cause HDFS writing is turned OFF - val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) - value_ = fileIn.readObject.asInstanceOf[T] - BitTorrentBroadcast.values.put(uuid, 0, value_) - fileIn.close() + logError("Reading Broadcasted variable " + uuid + " failed") } val time = (System.nanoTime - start) / 1e9 @@ -874,7 +860,7 @@ extends Broadcast[T] with Logging with Serializable { try { // Don't stop until there is a copy in HDFS - while (!stopBroadcast || !hasCopyInHDFS) { + while (!stopBroadcast) { var clientSocket: Socket = null try { serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout) @@ -1221,22 +1207,15 @@ extends Logging { def initialize(isMaster__ : Boolean) { synchronized { if (!initialized) { - isMaster_ = isMaster__ - if (isMaster) { trackMV = new TrackMultipleValues trackMV.setDaemon(true) trackMV.start() // TODO: Logging the following line makes the Spark framework ID not // getting logged, cause it calls logInfo before log4j is initialized - logInfo("TrackMultipleValues started...") + // logInfo("TrackMultipleValues started...") } - - // Initialize DfsBroadcast to be used for broadcast variable persistence - // TODO: Think about persistence - DfsBroadcast.initialize - initialized = true } } diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index f5527b6ec9..e90360eac4 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -39,7 +39,6 @@ extends Broadcast[T] with Logging with Serializable { @transient var listenPort = -1 @transient var guidePort = -1 - @transient var hasCopyInHDFS = false @transient var stopBroadcast = false // Must call this after all the variables have been created/initialized @@ -50,14 +49,6 @@ extends Broadcast[T] with Logging with Serializable { def sendBroadcast() { logInfo("Local host address: " + hostAddress) - // Store a persistent copy in HDFS - // TODO: Turned OFF for now - // val out = new ObjectOutputStream(DfsBroadcast.openFileForWriting(uuid)) - // out.writeObject(value_) - // out.close() - // TODO: Fix this at some point - hasCopyInHDFS = true - // Create a variableInfo object and store it in valueInfos var variableInfo = Broadcast.blockifyObject(value_) @@ -121,15 +112,11 @@ extends Broadcast[T] with Logging with Serializable { val start = System.nanoTime val receptionSucceeded = receiveBroadcast(uuid) - // If does not succeed, then get from HDFS copy if (receptionSucceeded) { value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) TreeBroadcast.values.put(uuid, 0, value_) } else { - val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) - value_ = fileIn.readObject.asInstanceOf[T] - TreeBroadcast.values.put(uuid, 0, value_) - fileIn.close() + logError("Reading Broadcasted variable " + uuid + " failed") } val time = (System.nanoTime - start) / 1e9 @@ -367,7 +354,7 @@ extends Broadcast[T] with Logging with Serializable { try { // Don't stop until there is a copy in HDFS - while (!stopBroadcast || !hasCopyInHDFS) { + while (!stopBroadcast) { var clientSocket: Socket = null try { serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout) @@ -708,19 +695,14 @@ extends Logging { synchronized { if (!initialized) { isMaster_ = isMaster__ - if (isMaster) { trackMV = new TrackMultipleValues trackMV.setDaemon(true) trackMV.start() // TODO: Logging the following line makes the Spark framework ID not // getting logged, cause it calls logInfo before log4j is initialized - logInfo("TrackMultipleValues started...") + // logInfo("TrackMultipleValues started...") } - - // Initialize DfsBroadcast to be used for broadcast variable persistence - DfsBroadcast.initialize - initialized = true } } |