aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-08-30 22:26:43 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-08-30 22:26:43 -0700
commit31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7 (patch)
tree41d1cb1a06b34eba03ba640f0f9a3c1243f07b02
parent38835325454ff3afe2d410583ead37314e1ff49c (diff)
downloadspark-31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7.tar.gz
spark-31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7.tar.bz2
spark-31ffe8d5284a4cda5eb8d21d3ea042cc5a2dc8b7.zip
Synchronization bug fix in broadcast implementations
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala18
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala20
2 files changed, 23 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index da48ad2784..0715722dc1 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -307,9 +307,11 @@ extends Broadcast[T] with Logging with Serializable {
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
while (hasBlocks.get < totalBlocks) {
- var numThreadsToCreate =
- math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
+ var numThreadsToCreate = 0
+ listOfSources.synchronized {
+ numThreadsToCreate = math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
threadPool.getActiveCount
+ }
while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) {
var peerToTalkTo = pickPeerToTalkToRandom
@@ -732,10 +734,14 @@ extends Broadcast[T] with Logging with Serializable {
// Stop broadcast if at least one worker has connected and
// everyone connected so far are done. Comparing with
// listOfSources.size - 1, because it includes the Guide itself
- if (listOfSources.size > 1 &&
- setOfCompletedSources.size == listOfSources.size - 1) {
- stopBroadcast = true
- logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
+ listOfSources.synchronized {
+ setOfCompletedSources.synchronized {
+ if (listOfSources.size > 1 &&
+ setOfCompletedSources.size == listOfSources.size - 1) {
+ stopBroadcast = true
+ logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
+ }
+ }
}
}
}
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index c9e1e67d87..97e7d0120e 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -290,15 +290,17 @@ extends Broadcast[T] with Logging with Serializable {
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- logError("GuideMultipleRequests Timeout.")
-
// Stop broadcast if at least one worker has connected and
- // everyone connected so far are done.
- // Comparing with listOfSources.size - 1, because the Guide itself
- // is included
- if (listOfSources.size > 1 &&
- setOfCompletedSources.size == listOfSources.size - 1) {
- stopBroadcast = true
+ // everyone connected so far are done. Comparing with
+ // listOfSources.size - 1, because it includes the Guide itself
+ listOfSources.synchronized {
+ setOfCompletedSources.synchronized {
+ if (listOfSources.size > 1 &&
+ setOfCompletedSources.size == listOfSources.size - 1) {
+ stopBroadcast = true
+ logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
+ }
+ }
}
}
}
@@ -490,7 +492,7 @@ extends Broadcast[T] with Logging with Serializable {
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
- case e: Exception => logError("ServeMultipleRequests Timeout.")
+ case e: Exception => { }
}
if (clientSocket != null) {