aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-08 13:22:12 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-08 13:22:12 -0700
commitc7c5258e255ec7b3e5514ddf0c6edcdf3af412aa (patch)
tree9eec312911b437a691d1710b3d5141c342a3cca9
parent178bb29f05158ec69dbbd1d5a12352e0241959a6 (diff)
downloadspark-c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa.tar.gz
spark-c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa.tar.bz2
spark-c7c5258e255ec7b3e5514ddf0c6edcdf3af412aa.zip
Compiles without Dfs
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala27
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala24
2 files changed, 6 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index e009d4e7db..09266b9f60 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -52,7 +52,6 @@ extends Broadcast[T] with Logging with Serializable {
@transient var listenPort = -1
@transient var guidePort = -1
- @transient var hasCopyInHDFS = false
@transient var stopBroadcast = false
// Must call this after all the variables have been created/initialized
@@ -63,14 +62,6 @@ extends Broadcast[T] with Logging with Serializable {
def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
- // Store a persistent copy in HDFS
- // TODO: Turned OFF for now. Related to persistence
- // val out = new ObjectOutputStream(BroadcastCH.openFileForWriting(uuid))
- // out.writeObject(value_)
- // out.close()
- // FIXME: Fix this at some point
- hasCopyInHDFS = true
-
// Create a variableInfo object and store it in valueInfos
var variableInfo = Broadcast.blockifyObject(value_)
@@ -149,16 +140,11 @@ extends Broadcast[T] with Logging with Serializable {
val start = System.nanoTime
val receptionSucceeded = receiveBroadcast(uuid)
- // If does not succeed, then get from HDFS copy
if (receptionSucceeded) {
value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
BitTorrentBroadcast.values.put(uuid, 0, value_)
} else {
- // TODO: This part won't work, cause HDFS writing is turned OFF
- val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
- value_ = fileIn.readObject.asInstanceOf[T]
- BitTorrentBroadcast.values.put(uuid, 0, value_)
- fileIn.close()
+ logError("Reading Broadcasted variable " + uuid + " failed")
}
val time = (System.nanoTime - start) / 1e9
@@ -874,7 +860,7 @@ extends Broadcast[T] with Logging with Serializable {
try {
// Don't stop until there is a copy in HDFS
- while (!stopBroadcast || !hasCopyInHDFS) {
+ while (!stopBroadcast) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout)
@@ -1221,22 +1207,15 @@ extends Logging {
def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
-
isMaster_ = isMaster__
-
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
// TODO: Logging the following line makes the Spark framework ID not
// getting logged, cause it calls logInfo before log4j is initialized
- logInfo("TrackMultipleValues started...")
+ // logInfo("TrackMultipleValues started...")
}
-
- // Initialize DfsBroadcast to be used for broadcast variable persistence
- // TODO: Think about persistence
- DfsBroadcast.initialize
-
initialized = true
}
}
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index f5527b6ec9..e90360eac4 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -39,7 +39,6 @@ extends Broadcast[T] with Logging with Serializable {
@transient var listenPort = -1
@transient var guidePort = -1
- @transient var hasCopyInHDFS = false
@transient var stopBroadcast = false
// Must call this after all the variables have been created/initialized
@@ -50,14 +49,6 @@ extends Broadcast[T] with Logging with Serializable {
def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
- // Store a persistent copy in HDFS
- // TODO: Turned OFF for now
- // val out = new ObjectOutputStream(DfsBroadcast.openFileForWriting(uuid))
- // out.writeObject(value_)
- // out.close()
- // TODO: Fix this at some point
- hasCopyInHDFS = true
-
// Create a variableInfo object and store it in valueInfos
var variableInfo = Broadcast.blockifyObject(value_)
@@ -121,15 +112,11 @@ extends Broadcast[T] with Logging with Serializable {
val start = System.nanoTime
val receptionSucceeded = receiveBroadcast(uuid)
- // If does not succeed, then get from HDFS copy
if (receptionSucceeded) {
value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
TreeBroadcast.values.put(uuid, 0, value_)
} else {
- val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
- value_ = fileIn.readObject.asInstanceOf[T]
- TreeBroadcast.values.put(uuid, 0, value_)
- fileIn.close()
+ logError("Reading Broadcasted variable " + uuid + " failed")
}
val time = (System.nanoTime - start) / 1e9
@@ -367,7 +354,7 @@ extends Broadcast[T] with Logging with Serializable {
try {
// Don't stop until there is a copy in HDFS
- while (!stopBroadcast || !hasCopyInHDFS) {
+ while (!stopBroadcast) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout(Broadcast.ServerSocketTimeout)
@@ -708,19 +695,14 @@ extends Logging {
synchronized {
if (!initialized) {
isMaster_ = isMaster__
-
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
// TODO: Logging the following line makes the Spark framework ID not
// getting logged, cause it calls logInfo before log4j is initialized
- logInfo("TrackMultipleValues started...")
+ // logInfo("TrackMultipleValues started...")
}
-
- // Initialize DfsBroadcast to be used for broadcast variable persistence
- DfsBroadcast.initialize
-
initialized = true
}
}