diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-08-30 22:26:43 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-08-30 22:26:43 -0700 |
commit | 31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7 (patch) | |
tree | 41d1cb1a06b34eba03ba640f0f9a3c1243f07b02 | |
parent | 38835325454ff3afe2d410583ead37314e1ff49c (diff) | |
download | spark-31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7.tar.gz spark-31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7.tar.bz2 spark-31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7.zip |
Synchronization bug fix in broadcast implementations
-rw-r--r-- | core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala | 18 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 20 |
2 files changed, 23 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index da48ad2784..0715722dc1 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -307,9 +307,11 @@ extends Broadcast[T] with Logging with Serializable { var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) while (hasBlocks.get < totalBlocks) { - var numThreadsToCreate = - math.min(listOfSources.size, MultiTracker.MaxChatSlots) - + var numThreadsToCreate = 0 + listOfSources.synchronized { + numThreadsToCreate = math.min(listOfSources.size, MultiTracker.MaxChatSlots) - threadPool.getActiveCount + } while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) { var peerToTalkTo = pickPeerToTalkToRandom @@ -732,10 +734,14 @@ extends Broadcast[T] with Logging with Serializable { // Stop broadcast if at least one worker has connected and // everyone connected so far are done. Comparing with // listOfSources.size - 1, because it includes the Guide itself - if (listOfSources.size > 1 && - setOfCompletedSources.size == listOfSources.size - 1) { - stopBroadcast = true - logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.") + listOfSources.synchronized { + setOfCompletedSources.synchronized { + if (listOfSources.size > 1 && + setOfCompletedSources.size == listOfSources.size - 1) { + stopBroadcast = true + logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.") + } + } } } } diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index c9e1e67d87..97e7d0120e 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -290,15 +290,17 @@ extends Broadcast[T] with Logging with Serializable { clientSocket = serverSocket.accept } catch { case e: Exception => { - logError("GuideMultipleRequests Timeout.") - // Stop broadcast if at least one worker has connected and - // everyone connected so far are done. - // Comparing with listOfSources.size - 1, because the Guide itself - // is included - if (listOfSources.size > 1 && - setOfCompletedSources.size == listOfSources.size - 1) { - stopBroadcast = true + // everyone connected so far are done. Comparing with + // listOfSources.size - 1, because it includes the Guide itself + listOfSources.synchronized { + setOfCompletedSources.synchronized { + if (listOfSources.size > 1 && + setOfCompletedSources.size == listOfSources.size - 1) { + stopBroadcast = true + logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.") + } + } } } } @@ -490,7 +492,7 @@ extends Broadcast[T] with Logging with Serializable { serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout) clientSocket = serverSocket.accept } catch { - case e: Exception => logError("ServeMultipleRequests Timeout.") + case e: Exception => { } } if (clientSocket != null) { |