aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-26 16:08:22 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-26 16:08:22 -0700
commitbc1c51817c8d6cca1a87c30eb7dd22805c795d6a (patch)
tree6701406c8af5bedc0afc7044eb399ef706cc9a4b
parentd673be0e066f50a0ed320cbd71bafe575a9e56e4 (diff)
downloadspark-bc1c51817c8d6cca1a87c30eb7dd22805c795d6a.tar.gz
spark-bc1c51817c8d6cca1a87c30eb7dd22805c795d6a.tar.bz2
spark-bc1c51817c8d6cca1a87c30eb7dd22805c795d6a.zip
Beautified code...
-rw-r--r--src/scala/spark/Broadcast.scala70
1 files changed, 49 insertions, 21 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 19bdd38390..f5bc7354cb 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -32,7 +32,9 @@ extends BroadcastRecipe with Logging {
def value = value_
- BroadcastBT.synchronized { BroadcastBT.values.put (uuid, value_) }
+ BroadcastBT.synchronized {
+ BroadcastBT.values.put (uuid, value_)
+ }
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@transient var totalBytes = -1
@@ -63,7 +65,9 @@ extends BroadcastRecipe with Logging {
@transient var hasCopyInHDFS = false
// Must call this after all the variables have been created/initialized
- if (!local) { sendBroadcast }
+ if (!local) {
+ sendBroadcast
+ }
def sendBroadcast () {
// Store a persistent copy in HDFS
@@ -293,8 +297,9 @@ extends BroadcastRecipe with Logging {
suitableSources.foreach { srcInfo =>
// Removing old copy of srcInfo to be replaced with a new one
// It works because case clases are compared by constructor params
- if (listOfSources.contains(srcInfo))
- { listOfSources = listOfSources - srcInfo }
+ if (listOfSources.contains(srcInfo)) {
+ listOfSources = listOfSources - srcInfo
+ }
listOfSources = listOfSources + srcInfo
}
}
@@ -337,9 +342,15 @@ extends BroadcastRecipe with Logging {
} catch {
case e: Exception => { }
} finally {
- if (oisTracker != null) { oisTracker.close }
- if (oosTracker != null) { oosTracker.close }
- if (clientSocketToTracker != null) { clientSocketToTracker.close }
+ if (oisTracker != null) {
+ oisTracker.close
+ }
+ if (oosTracker != null) {
+ oosTracker.close
+ }
+ if (clientSocketToTracker != null) {
+ clientSocketToTracker.close
+ }
}
retriesLeft -= 1
// TODO: Should wait before retrying. Implement wait function.
@@ -392,7 +403,9 @@ extends BroadcastRecipe with Logging {
// TODO: Must fix this. This might never break if broadcast fails.
// We should be able to break and send false. Also need to kill threads
- while (hasBlocks != totalBlocks) { Thread.sleep(1234) }
+ while (hasBlocks != totalBlocks) {
+ Thread.sleep(1234)
+ }
return true
}
@@ -529,9 +542,15 @@ extends BroadcastRecipe with Logging {
// }
}
} finally {
- if (oisSource != null) { oisSource.close }
- if (oosSource != null) { oosSource.close }
- if (peerSocketToSource != null) { peerSocketToSource.close }
+ if (oisSource != null) {
+ oisSource.close
+ }
+ if (oosSource != null) {
+ oosSource.close
+ }
+ if (peerSocketToSource != null) {
+ peerSocketToSource.close
+ }
// Delete from peersNowTalking
peersNowTalking.synchronized {
@@ -612,8 +631,9 @@ extends BroadcastRecipe with Logging {
listOfSources.synchronized {
// Add this source to the listOfSources
- if (listOfSources.contains(sourceInfo))
- { listOfSources = listOfSources - sourceInfo }
+ if (listOfSources.contains(sourceInfo)) {
+ listOfSources = listOfSources - sourceInfo
+ }
listOfSources = listOfSources + sourceInfo
}
} catch {
@@ -640,8 +660,9 @@ extends BroadcastRecipe with Logging {
var selectedSources = ListBuffer[SourceInfo] ()
listOfSources.synchronized {
do {
- if (listOfSources(curIndex) != skipSourceInfo)
- { selectedSources = selectedSources + listOfSources(curIndex) }
+ if (listOfSources(curIndex) != skipSourceInfo) {
+ selectedSources = selectedSources + listOfSources(curIndex)
+ }
curIndex = (curIndex + 1) % listOfSources.size
} while (curIndex != rollOverIndex &&
selectedSources.size != BroadcastBT.MaxPeersInGuideResponse)
@@ -690,7 +711,9 @@ extends BroadcastRecipe with Logging {
}
}
} finally {
- if (serverSocket != null) { serverSocket.close }
+ if (serverSocket != null) {
+ serverSocket.close
+ }
}
}
@@ -714,8 +737,9 @@ extends BroadcastRecipe with Logging {
// Update listOfSources
listOfSources.synchronized {
- if (listOfSources.contains(rxSourceInfo))
- { listOfSources = listOfSources - rxSourceInfo }
+ if (listOfSources.contains(rxSourceInfo)) {
+ listOfSources = listOfSources - rxSourceInfo
+ }
listOfSources = listOfSources + rxSourceInfo
}
@@ -788,7 +812,7 @@ extends BroadcastRecipe with Logging {
oos.writeObject (arrayOfBlocks(blockIndex))
oos.flush
} catch {
- case e: Exception => { }
+ case e: Exception => { }
}
logInfo ("Sent block: " + blockIndex + " to " + clientSocket)
}
@@ -804,9 +828,13 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
extends BroadcastRecipe with Logging {
def value = value_
- BroadcastCH.synchronized { BroadcastCH.values.put(uuid, value_) }
+ BroadcastCH.synchronized {
+ BroadcastCH.values.put(uuid, value_)
+ }
- if (!local) { sendBroadcast }
+ if (!local) {
+ sendBroadcast
+ }
def sendBroadcast () {
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))