aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala8
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/SourceInfo.scala5
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala10
4 files changed, 13 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 016dc00fb0..da48ad2784 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -722,7 +722,6 @@ extends Broadcast[T] with Logging with Serializable {
guidePortLock.synchronized { guidePortLock.notifyAll() }
try {
- // Don't stop until there is a copy in HDFS
while (!stopBroadcast) {
var clientSocket: Socket = null
try {
@@ -730,14 +729,13 @@ extends Broadcast[T] with Logging with Serializable {
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
- logError("GuideMultipleRequests Timeout.")
-
// Stop broadcast if at least one worker has connected and
// everyone connected so far are done. Comparing with
// listOfSources.size - 1, because it includes the Guide itself
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
+ logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
}
}
}
@@ -918,9 +916,7 @@ extends Broadcast[T] with Logging with Serializable {
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept()
} catch {
- case e: Exception => {
- logError("ServeMultipleRequests Timeout.")
- }
+ case e: Exception => { }
}
if (clientSocket != null) {
logDebug("Serve: Accepted new client connection:" + clientSocket)
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index d5f5b22461..a74677a777 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -229,7 +229,7 @@ extends Logging {
var oosTracker: ObjectOutputStream = null
var oisTracker: ObjectInputStream = null
- var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxOverGoToDefault)
+ var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxNotStartedRetry)
var retriesLeft = MultiTracker.MaxRetryCount
do {
diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala
index f90385fd47..bf4027794d 100644
--- a/core/src/main/scala/spark/broadcast/SourceInfo.scala
+++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala
@@ -27,9 +27,10 @@ extends Comparable[SourceInfo] with Logging {
* Helper Object of SourceInfo for its constants
*/
object SourceInfo {
- // Constants for special values of listenPort
+ // Broadcast has not started yet! Should never happen.
val TxNotStartedRetry = -1
- val TxOverGoToDefault = 0
+ // Broadcast has already finished. Try default mechanism.
+ val TxOverGoToDefault = -3
// Other constants
val StopBroadcast = -2
val UnusedParam = 0
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
index 4a560131e3..21de82cd87 100644
--- a/examples/src/main/scala/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -17,9 +17,13 @@ object BroadcastTest {
for (i <- 0 until arr1.length)
arr1(i) = i
- val barr1 = spark.broadcast(arr1)
- spark.parallelize(1 to 10, slices).foreach {
- i => println(barr1.value.size)
+ for (i <- 0 until 2) {
+ println("Iteration " + i)
+ println("===========")
+ val barr1 = spark.broadcast(arr1)
+ spark.parallelize(1 to 10, slices).foreach {
+ i => println(barr1.value.size)
+ }
}
System.exit(0)