diff options
4 files changed, 13 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 016dc00fb0..da48ad2784 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -722,7 +722,6 @@ extends Broadcast[T] with Logging with Serializable { guidePortLock.synchronized { guidePortLock.notifyAll() } try { - // Don't stop until there is a copy in HDFS while (!stopBroadcast) { var clientSocket: Socket = null try { @@ -730,14 +729,13 @@ extends Broadcast[T] with Logging with Serializable { clientSocket = serverSocket.accept() } catch { case e: Exception => { - logError("GuideMultipleRequests Timeout.") - // Stop broadcast if at least one worker has connected and // everyone connected so far are done. Comparing with // listOfSources.size - 1, because it includes the Guide itself if (listOfSources.size > 1 && setOfCompletedSources.size == listOfSources.size - 1) { stopBroadcast = true + logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.") } } } @@ -918,9 +916,7 @@ extends Broadcast[T] with Logging with Serializable { serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout) clientSocket = serverSocket.accept() } catch { - case e: Exception => { - logError("ServeMultipleRequests Timeout.") - } + case e: Exception => { } } if (clientSocket != null) { logDebug("Serve: Accepted new client connection:" + clientSocket) diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala index d5f5b22461..a74677a777 100644 --- a/core/src/main/scala/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala @@ -229,7 +229,7 @@ extends Logging { var oosTracker: ObjectOutputStream = null var oisTracker: ObjectInputStream = null - var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxOverGoToDefault) + var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxNotStartedRetry) var retriesLeft = MultiTracker.MaxRetryCount do { diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala index f90385fd47..bf4027794d 100644 --- a/core/src/main/scala/spark/broadcast/SourceInfo.scala +++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala @@ -27,9 +27,10 @@ extends Comparable[SourceInfo] with Logging { * Helper Object of SourceInfo for its constants */ object SourceInfo { - // Constants for special values of listenPort + // Broadcast has not started yet! Should never happen. val TxNotStartedRetry = -1 - val TxOverGoToDefault = 0 + // Broadcast has already finished. Try default mechanism. + val TxOverGoToDefault = -3 // Other constants val StopBroadcast = -2 val UnusedParam = 0 diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala index 4a560131e3..21de82cd87 100644 --- a/examples/src/main/scala/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -17,9 +17,13 @@ object BroadcastTest { for (i <- 0 until arr1.length) arr1(i) = i - val barr1 = spark.broadcast(arr1) - spark.parallelize(1 to 10, slices).foreach { - i => println(barr1.value.size) + for (i <- 0 until 2) { + println("Iteration " + i) + println("===========") + val barr1 = spark.broadcast(arr1) + spark.parallelize(1 to 10, slices).foreach { + i => println(barr1.value.size) + } } System.exit(0) |