aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/broadcast/TreeBroadcast.scala')
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala20
1 files changed, 11 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 5bd40a40e3..fa676e9064 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -292,15 +292,17 @@ extends Broadcast[T](id) with Logging with Serializable {
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- logError("GuideMultipleRequests Timeout.")
-
// Stop broadcast if at least one worker has connected and
- // everyone connected so far are done.
- // Comparing with listOfSources.size - 1, because the Guide itself
- // is included
- if (listOfSources.size > 1 &&
- setOfCompletedSources.size == listOfSources.size - 1) {
- stopBroadcast = true
+ // everyone connected so far are done. Comparing with
+ // listOfSources.size - 1, because it includes the Guide itself
+ listOfSources.synchronized {
+ setOfCompletedSources.synchronized {
+ if (listOfSources.size > 1 &&
+ setOfCompletedSources.size == listOfSources.size - 1) {
+ stopBroadcast = true
+ logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
+ }
+ }
}
}
}
@@ -492,7 +494,7 @@ extends Broadcast[T](id) with Logging with Serializable {
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
- case e: Exception => logError("ServeMultipleRequests Timeout.")
+ case e: Exception => { }
}
if (clientSocket != null) {