aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/broadcast/TreeBroadcast.scala')
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala62
1 files changed, 31 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 374389def5..f5527b6ec9 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -47,7 +47,7 @@ extends Broadcast[T] with Logging with Serializable {
sendBroadcast
}
- def sendBroadcast(): Unit = {
+ def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
// Store a persistent copy in HDFS
@@ -70,25 +70,25 @@ extends Broadcast[T] with Logging with Serializable {
guideMR = new GuideMultipleRequests
guideMR.setDaemon(true)
- guideMR.start
+ guideMR.start()
logInfo("GuideMultipleRequests started...")
// Must always come AFTER guideMR is created
while (guidePort == -1) {
guidePortLock.synchronized {
- guidePortLock.wait
+ guidePortLock.wait()
}
}
serveMR = new ServeMultipleRequests
serveMR.setDaemon(true)
- serveMR.start
+ serveMR.start()
logInfo("ServeMultipleRequests started...")
// Must always come AFTER serveMR is created
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -101,8 +101,8 @@ extends Broadcast[T] with Logging with Serializable {
TreeBroadcast.registerValue(uuid, guidePort)
}
- private def readObject(in: ObjectInputStream): Unit = {
- in.defaultReadObject
+ private def readObject(in: ObjectInputStream) {
+ in.defaultReadObject()
TreeBroadcast.synchronized {
val cachedVal = TreeBroadcast.values.get(uuid, 0)
if (cachedVal != null) {
@@ -115,7 +115,7 @@ extends Broadcast[T] with Logging with Serializable {
serveMR = new ServeMultipleRequests
serveMR.setDaemon(true)
- serveMR.start
+ serveMR.start()
logInfo("ServeMultipleRequests started...")
val start = System.nanoTime
@@ -138,7 +138,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def initializeSlaveVariables: Unit = {
+ private def initializeSlaveVariables() {
arrayOfBlocks = null
totalBytes = -1
totalBlocks = -1
@@ -221,7 +221,7 @@ extends Broadcast[T] with Logging with Serializable {
// ServeMultipleRequests thread
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -254,7 +254,7 @@ extends Broadcast[T] with Logging with Serializable {
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
totalBlocksLock.synchronized {
- totalBlocksLock.notifyAll
+ totalBlocksLock.notifyAll()
}
totalBytes = sourceInfo.totalBytes
blockSize = sourceInfo.blockSize
@@ -326,7 +326,7 @@ extends Broadcast[T] with Logging with Serializable {
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll
+ hasBlocksLock.notifyAll()
}
}
} catch {
@@ -353,7 +353,7 @@ extends Broadcast[T] with Logging with Serializable {
// Keep track of sources that have completed reception
private var setOfCompletedSources = Set[SourceInfo]()
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -362,7 +362,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
- guidePortLock.notifyAll
+ guidePortLock.notifyAll()
}
try {
@@ -408,10 +408,10 @@ extends Broadcast[T] with Logging with Serializable {
}
// Shutdown the thread pool
- threadPool.shutdown
+ threadPool.shutdown()
}
- private def sendStopBroadcastNotifications: Unit = {
+ private def sendStopBroadcastNotifications() {
listOfSources.synchronized {
var listIter = listOfSources.iterator
while (listIter.hasNext) {
@@ -463,7 +463,7 @@ extends Broadcast[T] with Logging with Serializable {
private var selectedSourceInfo: SourceInfo = null
private var thisWorkerInfo:SourceInfo = null
- override def run: Unit = {
+ override def run() {
try {
logInfo("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
@@ -569,7 +569,7 @@ extends Broadcast[T] with Logging with Serializable {
class ServeMultipleRequests
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -578,7 +578,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("ServeMultipleRequests started with " + serverSocket)
listenPortLock.synchronized {
- listenPortLock.notifyAll
+ listenPortLock.notifyAll()
}
try {
@@ -610,7 +610,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Shutdown the thread pool
- threadPool.shutdown
+ threadPool.shutdown()
}
class ServeSingleRequest(val clientSocket: Socket)
@@ -622,7 +622,7 @@ extends Broadcast[T] with Logging with Serializable {
private var sendFrom = 0
private var sendUntil = totalBlocks
- override def run: Unit = {
+ override def run() {
try {
logInfo("new ServeSingleRequest is running")
@@ -652,18 +652,18 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def sendObject: Unit = {
+ private def sendObject() {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
- totalBlocksLock.wait
+ totalBlocksLock.wait()
}
}
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized {
- hasBlocksLock.wait
+ hasBlocksLock.wait()
}
}
try {
@@ -704,7 +704,7 @@ extends Logging {
private var MaxDegree_ : Int = 2
- def initialize(isMaster__ : Boolean): Unit = {
+ def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
isMaster_ = isMaster__
@@ -712,7 +712,7 @@ extends Logging {
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
- trackMV.start
+ trackMV.start()
// TODO: Logging the following line makes the Spark framework ID not
// getting logged, cause it calls logInfo before log4j is initialized
logInfo("TrackMultipleValues started...")
@@ -728,14 +728,14 @@ extends Logging {
def isMaster = isMaster_
- def registerValue(uuid: UUID, guidePort: Int): Unit = {
+ def registerValue(uuid: UUID, guidePort: Int) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)
logInfo("New value registered with the Tracker " + valueToGuidePortMap)
}
}
- def unregisterValue(uuid: UUID): Unit = {
+ def unregisterValue(uuid: UUID) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS
logInfo("Value unregistered from the Tracker " + valueToGuidePortMap)
@@ -744,7 +744,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -766,7 +766,7 @@ extends Logging {
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
- override def run: Unit = {
+ override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
@@ -800,7 +800,7 @@ extends Logging {
}
// Shutdown the thread pool
- threadPool.shutdown
+ threadPool.shutdown()
}
}
}