diff options
Diffstat (limited to 'core/src/main/scala/spark/broadcast/TreeBroadcast.scala')
-rw-r--r-- | core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 20 |
1 files changed, 11 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 5bd40a40e3..fa676e9064 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -292,15 +292,17 @@ extends Broadcast[T](id) with Logging with Serializable { clientSocket = serverSocket.accept } catch { case e: Exception => { - logError("GuideMultipleRequests Timeout.") - // Stop broadcast if at least one worker has connected and - // everyone connected so far are done. - // Comparing with listOfSources.size - 1, because the Guide itself - // is included - if (listOfSources.size > 1 && - setOfCompletedSources.size == listOfSources.size - 1) { - stopBroadcast = true + // everyone connected so far are done. Comparing with + // listOfSources.size - 1, because it includes the Guide itself + listOfSources.synchronized { + setOfCompletedSources.synchronized { + if (listOfSources.size > 1 && + setOfCompletedSources.size == listOfSources.size - 1) { + stopBroadcast = true + logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.") + } + } } } } @@ -492,7 +494,7 @@ extends Broadcast[T](id) with Logging with Serializable { serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout) clientSocket = serverSocket.accept } catch { - case e: Exception => logError("ServeMultipleRequests Timeout.") + case e: Exception => { } } if (clientSocket != null) { |