diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-26 16:08:22 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-26 16:08:22 -0700 |
commit | bc1c51817c8d6cca1a87c30eb7dd22805c795d6a (patch) | |
tree | 6701406c8af5bedc0afc7044eb399ef706cc9a4b | |
parent | d673be0e066f50a0ed320cbd71bafe575a9e56e4 (diff) | |
download | spark-bc1c51817c8d6cca1a87c30eb7dd22805c795d6a.tar.gz spark-bc1c51817c8d6cca1a87c30eb7dd22805c795d6a.tar.bz2 spark-bc1c51817c8d6cca1a87c30eb7dd22805c795d6a.zip |
Beautified code...
-rw-r--r-- | src/scala/spark/Broadcast.scala | 70 |
1 files changed, 49 insertions, 21 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 19bdd38390..f5bc7354cb 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -32,7 +32,9 @@ extends BroadcastRecipe with Logging { def value = value_ - BroadcastBT.synchronized { BroadcastBT.values.put (uuid, value_) } + BroadcastBT.synchronized { + BroadcastBT.values.put (uuid, value_) + } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @transient var totalBytes = -1 @@ -63,7 +65,9 @@ extends BroadcastRecipe with Logging { @transient var hasCopyInHDFS = false // Must call this after all the variables have been created/initialized - if (!local) { sendBroadcast } + if (!local) { + sendBroadcast + } def sendBroadcast () { // Store a persistent copy in HDFS @@ -293,8 +297,9 @@ extends BroadcastRecipe with Logging { suitableSources.foreach { srcInfo => // Removing old copy of srcInfo to be replaced with a new one // It works because case clases are compared by constructor params - if (listOfSources.contains(srcInfo)) - { listOfSources = listOfSources - srcInfo } + if (listOfSources.contains(srcInfo)) { + listOfSources = listOfSources - srcInfo + } listOfSources = listOfSources + srcInfo } } @@ -337,9 +342,15 @@ extends BroadcastRecipe with Logging { } catch { case e: Exception => { } } finally { - if (oisTracker != null) { oisTracker.close } - if (oosTracker != null) { oosTracker.close } - if (clientSocketToTracker != null) { clientSocketToTracker.close } + if (oisTracker != null) { + oisTracker.close + } + if (oosTracker != null) { + oosTracker.close + } + if (clientSocketToTracker != null) { + clientSocketToTracker.close + } } retriesLeft -= 1 // TODO: Should wait before retrying. Implement wait function. @@ -392,7 +403,9 @@ extends BroadcastRecipe with Logging { // TODO: Must fix this. This might never break if broadcast fails. // We should be able to break and send false. Also need to kill threads - while (hasBlocks != totalBlocks) { Thread.sleep(1234) } + while (hasBlocks != totalBlocks) { + Thread.sleep(1234) + } return true } @@ -529,9 +542,15 @@ extends BroadcastRecipe with Logging { // } } } finally { - if (oisSource != null) { oisSource.close } - if (oosSource != null) { oosSource.close } - if (peerSocketToSource != null) { peerSocketToSource.close } + if (oisSource != null) { + oisSource.close + } + if (oosSource != null) { + oosSource.close + } + if (peerSocketToSource != null) { + peerSocketToSource.close + } // Delete from peersNowTalking peersNowTalking.synchronized { @@ -612,8 +631,9 @@ extends BroadcastRecipe with Logging { listOfSources.synchronized { // Add this source to the listOfSources - if (listOfSources.contains(sourceInfo)) - { listOfSources = listOfSources - sourceInfo } + if (listOfSources.contains(sourceInfo)) { + listOfSources = listOfSources - sourceInfo + } listOfSources = listOfSources + sourceInfo } } catch { @@ -640,8 +660,9 @@ extends BroadcastRecipe with Logging { var selectedSources = ListBuffer[SourceInfo] () listOfSources.synchronized { do { - if (listOfSources(curIndex) != skipSourceInfo) - { selectedSources = selectedSources + listOfSources(curIndex) } + if (listOfSources(curIndex) != skipSourceInfo) { + selectedSources = selectedSources + listOfSources(curIndex) + } curIndex = (curIndex + 1) % listOfSources.size } while (curIndex != rollOverIndex && selectedSources.size != BroadcastBT.MaxPeersInGuideResponse) @@ -690,7 +711,9 @@ extends BroadcastRecipe with Logging { } } } finally { - if (serverSocket != null) { serverSocket.close } + if (serverSocket != null) { + serverSocket.close + } } } @@ -714,8 +737,9 @@ extends BroadcastRecipe with Logging { // Update listOfSources listOfSources.synchronized { - if (listOfSources.contains(rxSourceInfo)) - { listOfSources = listOfSources - rxSourceInfo } + if (listOfSources.contains(rxSourceInfo)) { + listOfSources = listOfSources - rxSourceInfo + } listOfSources = listOfSources + rxSourceInfo } @@ -788,7 +812,7 @@ extends BroadcastRecipe with Logging { oos.writeObject (arrayOfBlocks(blockIndex)) oos.flush } catch { - case e: Exception => { } + case e: Exception => { } } logInfo ("Sent block: " + blockIndex + " to " + clientSocket) } @@ -804,9 +828,13 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) extends BroadcastRecipe with Logging { def value = value_ - BroadcastCH.synchronized { BroadcastCH.values.put(uuid, value_) } + BroadcastCH.synchronized { + BroadcastCH.values.put(uuid, value_) + } - if (!local) { sendBroadcast } + if (!local) { + sendBroadcast + } def sendBroadcast () { val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) |