aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-06 16:52:50 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-06 16:52:50 -0700
commitcbb29fae1a9b8678ffeab3521901d61170a24dd2 (patch)
treead889e2efa015bc8722f474d44e2da60299f05db
parent8690be8f5a4d5943b48889863c9e43f23213696d (diff)
downloadspark-cbb29fae1a9b8678ffeab3521901d61170a24dd2.tar.gz
spark-cbb29fae1a9b8678ffeab3521901d61170a24dd2.tar.bz2
spark-cbb29fae1a9b8678ffeab3521901d61170a24dd2.zip
Updated log outputs for consistency with BT branches.
Code formatting.
-rw-r--r--conf/java-opts2
-rw-r--r--conf/log4j.properties2
-rw-r--r--src/scala/spark/Broadcast.scala339
3 files changed, 209 insertions, 134 deletions
diff --git a/conf/java-opts b/conf/java-opts
index b61e8163b5..0307361a26 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false
+-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=256 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.dualMode=false
diff --git a/conf/log4j.properties b/conf/log4j.properties
index d72dbadc39..33774b463d 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -2,7 +2,7 @@
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 0e29c8aa43..b5bcdde21b 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -2,11 +2,11 @@ package spark
import java.io._
import java.net._
-import java.util.{UUID, PriorityQueue, Comparator}
+import java.util.{Comparator, PriorityQueue, UUID}
import com.google.common.collect.MapMaker
-import java.util.concurrent.{Executors, ExecutorService}
+import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor}
import scala.collection.mutable.Map
@@ -39,11 +39,13 @@ trait BroadcastRecipe {
@serializable
class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
- extends BroadcastRecipe with Logging {
+extends BroadcastRecipe with Logging {
def value = value_
- BroadcastCS.synchronized { BroadcastCS.values.put (uuid, value_) }
+ BroadcastCS.synchronized {
+ BroadcastCS.values.put (uuid, value_)
+ }
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@transient var totalBytes = -1
@@ -68,10 +70,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Must call this after all the variables have been created/initialized
if (!local) {
- val start = System.nanoTime
sendBroadcast
- val time = (System.nanoTime - start) / 1e9
- logInfo("sendBroadcast took " + time + " s")
}
def sendBroadcast () {
@@ -80,20 +79,21 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
// out.writeObject (value_)
// out.close
+ // TODO: Fix this at some point
hasCopyInHDFS = true
// Create a variableInfo object and store it in valueInfos
- var variableInfo = blockifyObject (value_, BroadcastCS.blockSize)
+ var variableInfo = blockifyObject (value_, BroadcastCS.BlockSize)
guideMR = new GuideMultipleRequests
guideMR.setDaemon (true)
guideMR.start
- logInfo (System.currentTimeMillis + ": " + "GuideMultipleRequests started")
+ logInfo ("GuideMultipleRequests started...")
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
- logInfo (System.currentTimeMillis + ": " + "ServeMultipleRequests started")
+ logInfo ("ServeMultipleRequests started...")
// Prepare the value being broadcasted
// TODO: Refactoring and clean-up required here
@@ -113,7 +113,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0)
pqOfSources.add (masterSource_0)
// Add one more time to have two replicas of any seeds in the PQ
- if (BroadcastCS.dualMode) {
+ if (BroadcastCS.DualMode) {
val masterSource_1 =
new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 1)
pqOfSources.add (masterSource_1)
@@ -144,7 +144,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
- logInfo (System.currentTimeMillis + ": " + "ServeMultipleRequests started")
+ logInfo ("ServeMultipleRequests started...")
val start = System.nanoTime
@@ -161,7 +161,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
val time = (System.nanoTime - start) / 1e9
- logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
+ logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
@@ -215,7 +215,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
var retByteArray = new Array[Byte] (totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy (arrayOfBlocks(i).byteArray, 0, retByteArray,
- i * BroadcastCS.blockSize, arrayOfBlocks(i).byteArray.length)
+ i * BroadcastCS.BlockSize, arrayOfBlocks(i).byteArray.length)
}
byteArrayToObject (retByteArray)
}
@@ -238,12 +238,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
var masterListenPort: Int = -1
- var retriesLeft = BroadcastCS.maxRetryCount
+ var retriesLeft = BroadcastCS.MaxRetryCount
do {
try {
// Connect to the tracker to find out the guide
val clientSocketToTracker =
- new Socket(BroadcastCS.masterHostAddress, BroadcastCS.masterTrackerPort)
+ new Socket(BroadcastCS.MasterHostAddress, BroadcastCS.MasterTrackerPort)
val oosTracker =
new ObjectOutputStream (clientSocketToTracker.getOutputStream)
oosTracker.flush
@@ -258,14 +258,20 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// In case of any failure, set masterListenPort = 0 to read from HDFS
case e: Exception => (masterListenPort = 0)
} finally {
- if (oisTracker != null) { oisTracker.close }
- if (oosTracker != null) { oosTracker.close }
- if (clientSocketToTracker != null) { clientSocketToTracker.close }
+ if (oisTracker != null) {
+ oisTracker.close
+ }
+ if (oosTracker != null) {
+ oosTracker.close
+ }
+ if (clientSocketToTracker != null) {
+ clientSocketToTracker.close
+ }
}
retriesLeft -= 1
// TODO: Should wait before retrying
} while (retriesLeft > 0 && masterListenPort < 0)
- logInfo (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + masterListenPort)
+ logInfo ("Got this guidePort from Tracker: " + masterListenPort)
return masterListenPort
}
@@ -273,7 +279,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Get masterListenPort for this variable from the Tracker
val masterListenPort = getMasterListenPort (variableUUID)
// If Tracker says that there is no guide for this object, read from HDFS
- if (masterListenPort == 0) { return false }
+ if (masterListenPort == 0) {
+ return false
+ }
// Wait until hostAddress and listenPort are created by the
// ServeMultipleRequests thread
@@ -285,12 +293,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Connect and receive broadcast from the specified source, retrying the
// specified number of times in case of failures
- var retriesLeft = BroadcastCS.maxRetryCount
+ var retriesLeft = BroadcastCS.MaxRetryCount
do {
// Connect to Master and send this worker's Information
val clientSocketToMaster =
- new Socket(BroadcastCS.masterHostAddress, masterListenPort)
- logInfo (System.currentTimeMillis + ": " + "Connected to Master's guiding object")
+ new Socket(BroadcastCS.MasterHostAddress, masterListenPort)
+ logInfo ("Connected to Master's guiding object")
// TODO: Guiding object connection is reusable
val oosMaster =
new ObjectOutputStream (clientSocketToMaster.getOutputStream)
@@ -310,14 +318,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
totalBytes = sourceInfo.totalBytes
- logInfo (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
+ logInfo ("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
val start = System.nanoTime
val receptionSucceeded = receiveSingleTransmission (sourceInfo)
val time = (System.nanoTime - start) / 1e9
// Updating some statistics in sourceInfo. Master will be using them later
- if (!receptionSucceeded) { sourceInfo.receptionFailed = true }
+ if (!receptionSucceeded) {
+ sourceInfo.receptionFailed = true
+ }
sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time
// Send back statistics to the Master
@@ -351,15 +361,20 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
oisSource =
new ObjectInputStream (clientSocketToSource.getInputStream)
- logInfo (System.currentTimeMillis + ": " + "Inside receiveSingleTransmission")
- logInfo (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
+ logInfo ("Inside receiveSingleTransmission")
+ logInfo ("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
// Send the range
oosSource.writeObject((hasBlocks, totalBlocks))
oosSource.flush
for (i <- hasBlocks until totalBlocks) {
+ val recvStartTime = System.currentTimeMillis
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
+ val receptionTime = (System.currentTimeMillis - recvStartTime)
+
+ logInfo ("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.")
+
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocks += 1
// Set to true if at least one block is received
@@ -367,23 +382,29 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
hasBlocksLock.synchronized {
hasBlocksLock.notifyAll
}
- logInfo (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock)
}
- logInfo (System.currentTimeMillis + ": " + "After the receive loop")
+ logInfo ("After the receive loop...")
} catch {
case e: Exception => {
- logInfo (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e)
+ logInfo ("receiveSingleTransmission had a " + e)
}
} finally {
- if (oisSource != null) { oisSource.close }
- if (oosSource != null) { oosSource.close }
- if (clientSocketToSource != null) { clientSocketToSource.close }
+ if (oisSource != null) {
+ oisSource.close
+ }
+ if (oosSource != null) {
+ oosSource.close
+ }
+ if (clientSocketToSource != null) {
+ clientSocketToSource.close
+ }
}
return receptionSucceeded
}
- class GuideMultipleRequests extends Thread with Logging {
+ class GuideMultipleRequests
+ extends Thread with Logging {
override def run = {
// TODO: Cached threadpool has 60 s keep alive timer
var threadPool = Executors.newCachedThreadPool
@@ -391,7 +412,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
serverSocket = new ServerSocket (0)
guidePort = serverSocket.getLocalPort
- logInfo (System.currentTimeMillis + ": " + "GuideMultipleRequests" + serverSocket + " " + guidePort)
+ logInfo ("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
guidePortLock.notifyAll
@@ -403,16 +424,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
while (keepAccepting || !hasCopyInHDFS) {
var clientSocket: Socket = null
try {
- serverSocket.setSoTimeout (BroadcastCS.serverSocketTimout)
+ serverSocket.setSoTimeout (BroadcastCS.ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- logInfo ("GuideMultipleRequests Timeout. Stopping listening..." + hasCopyInHDFS)
+ logInfo ("GuideMultipleRequests Timeout.")
keepAccepting = false
}
}
if (clientSocket != null) {
- logInfo (System.currentTimeMillis + ": " + "Guide:Accepted new client connection:" + clientSocket)
+ logInfo ("Guide: Accepted new client connection: " + clientSocket)
try {
threadPool.execute (new GuideSingleRequest (clientSocket))
} catch {
@@ -423,12 +444,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
BroadcastCS.unregisterValue (uuid)
} finally {
- serverSocket.close
+ if (serverSocket != null) {
+ logInfo ("GuideMultipleRequests now stopping...")
+ serverSocket.close
+ }
}
}
class GuideSingleRequest (val clientSocket: Socket)
- extends Runnable with Logging {
+ extends Thread with Logging {
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
@@ -436,9 +460,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
private var selectedSourceInfo: SourceInfo = null
private var thisWorkerInfo:SourceInfo = null
- def run = {
+ override def run = {
try {
- logInfo (System.currentTimeMillis + ": " + "new GuideSingleRequest is running")
+ logInfo ("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
// be listening to. ReplicaID is 0 and other fields are invalid (-1)
var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
@@ -446,14 +470,14 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
pqOfSources.synchronized {
// Select a suitable source and send it back to the worker
selectedSourceInfo = selectSuitableSource (sourceInfo)
- logInfo (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo)
+ logInfo ("Sending selectedSourceInfo:" + selectedSourceInfo)
oos.writeObject (selectedSourceInfo)
oos.flush
// Add this new (if it can finish) source to the PQ of sources
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
sourceInfo.listenPort, totalBlocks, totalBytes, 0)
- logInfo (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo)
+ logInfo ("Adding possible new source to pqOfSources: " + thisWorkerInfo)
pqOfSources.add (thisWorkerInfo)
}
@@ -481,7 +505,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
sourceInfo.hostAddress, sourceInfo.MBps)
// No need to find and update thisWorkerInfo, but add its replica
- if (BroadcastCS.dualMode) {
+ if (BroadcastCS.DualMode) {
pqOfSources.add (new SourceInfo (thisWorkerInfo.hostAddress,
thisWorkerInfo.listenPort, totalBlocks, totalBytes, 1))
}
@@ -504,7 +528,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
// Remove thisWorkerInfo
- if (pqOfSources != null) { pqOfSources.remove (thisWorkerInfo) }
+ if (pqOfSources != null) {
+ pqOfSources.remove (thisWorkerInfo)
+ }
}
}
} finally {
@@ -534,14 +560,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
- class ServeMultipleRequests extends Thread with Logging {
+ class ServeMultipleRequests
+ extends Thread with Logging {
override def run = {
var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
listenPort = serverSocket.getLocalPort
- logInfo (System.currentTimeMillis + ": " + "ServeMultipleRequests" + serverSocket + " " + listenPort)
+ logInfo ("ServeMultipleRequests started with " + serverSocket)
listenPortLock.synchronized {
listenPortLock.notifyAll
@@ -552,16 +579,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
while (keepAccepting) {
var clientSocket: Socket = null
try {
- serverSocket.setSoTimeout (BroadcastCS.serverSocketTimout)
+ serverSocket.setSoTimeout (BroadcastCS.ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
- logInfo ("ServeMultipleRequests Timeout. Stopping listening...")
+ logInfo ("ServeMultipleRequests Timeout.")
keepAccepting = false
}
}
if (clientSocket != null) {
- logInfo (System.currentTimeMillis + ": " + "Serve:Accepted new client connection:" + clientSocket)
+ logInfo ("Serve: Accepted new client connection: " + clientSocket)
try {
threadPool.execute (new ServeSingleRequest (clientSocket))
} catch {
@@ -571,12 +598,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
} finally {
- serverSocket.close
+ if (serverSocket != null) {
+ logInfo ("ServeMultipleRequests now stopping...")
+ serverSocket.close
+ }
}
}
class ServeSingleRequest (val clientSocket: Socket)
- extends Runnable with Logging {
+ extends Thread with Logging {
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
@@ -584,9 +614,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
private var sendFrom = 0
private var sendUntil = totalBlocks
- def run = {
+ override def run = {
try {
- logInfo (System.currentTimeMillis + ": " + "new ServeSingleRequest is running")
+ logInfo ("new ServeSingleRequest is running")
// Receive range to send
var sendRange = ois.readObject.asInstanceOf[(Int, Int)]
@@ -599,10 +629,10 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
case e: Exception => {
- logInfo (System.currentTimeMillis + ": " + "ServeSingleRequest had a " + e)
+ logInfo ("ServeSingleRequest had a " + e)
}
} finally {
- logInfo (System.currentTimeMillis + ": " + "ServeSingleRequest is closing streams and sockets")
+ logInfo ("ServeSingleRequest is closing streams and sockets")
ois.close
oos.close
clientSocket.close
@@ -629,7 +659,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
} catch {
case e: Exception => { }
}
- logInfo (System.currentTimeMillis + ": " + "Send block: " + i + " " + arrayOfBlocks(i))
+ logInfo ("Sent block: " + i + " to " + clientSocket)
}
}
}
@@ -638,13 +668,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
//@serializable
//class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
-// extends BroadcastRecipe with Logging {
+//extends BroadcastRecipe with Logging {
// def value = value_
// BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) }
//
-// if (!local) { sendBroadcast }
+// if (!local) {
+// sendBroadcast
+// }
//
// @transient var publishThread: PublishThread = null
// @transient var hasCopyInHDFS = false
@@ -675,7 +707,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// if (receptionSucceeded) {
// value_ = BroadcastSS.values.get(uuid).asInstanceOf[T]
// } else {
-// logInfo (System.currentTimeMillis + ": " + "Reading from HDFS")
+// logInfo ("Reading from HDFS")
// val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
// value_ = fileIn.readObject.asInstanceOf[T]
// BroadcastSS.values.put(uuid, value_)
@@ -683,16 +715,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// }
//
// val time = (System.nanoTime - start) / 1e9
-// logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
+// logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
// }
// }
// }
//
-// class PublishThread extends Thread with Logging {
+// class PublishThread
+// extends Thread with Logging {
// override def run = {
// // TODO: Put some delay here to give time others to register
// // Thread.sleep (5000)
-// logInfo (System.currentTimeMillis + ": " + "Waited. Now sending...")
+// logInfo ("Waited. Now sending...")
// BroadcastSS.synchronized {
// BroadcastSS.publishVariable[T] (uuid, value)
// }
@@ -702,13 +735,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@serializable
class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
- extends BroadcastRecipe with Logging {
+extends BroadcastRecipe with Logging {
def value = value_
- BroadcastCH.synchronized { BroadcastCH.values.put(uuid, value_) }
+ BroadcastCH.synchronized {
+ BroadcastCH.values.put(uuid, value_)
+ }
- if (!local) { sendBroadcast }
+ if (!local) {
+ sendBroadcast
+ }
def sendBroadcast () {
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
@@ -724,7 +761,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T]
} else {
- logInfo( System.currentTimeMillis + ": " + "Started reading Broadcasted variable " + uuid)
+ logInfo( "Started reading Broadcasted variable " + uuid)
val start = System.nanoTime
val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
@@ -733,7 +770,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
fileIn.close
val time = (System.nanoTime - start) / 1e9
- logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
+ logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
@@ -742,7 +779,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
@serializable
case class SourceInfo (val hostAddress: String, val listenPort: Int,
val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)
- extends Comparable [SourceInfo] with Logging {
+extends Comparable [SourceInfo] with Logging {
var currentLeechers = 0
var receptionFailed = false
@@ -779,7 +816,8 @@ case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
@transient var hasBlocks = 0
}
-private object Broadcast extends Logging {
+private object Broadcast
+extends Logging {
private var initialized = false
// Will be called by SparkContext or Executor before using Broadcast
@@ -801,7 +839,8 @@ private object Broadcast extends Logging {
}
}
-private object BroadcastCS extends Logging {
+private object BroadcastCS
+extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
var valueToGuidePortMap = Map[UUID, Int] ()
@@ -811,12 +850,15 @@ private object BroadcastCS extends Logging {
private var initialized = false
private var isMaster_ = false
- private var masterHostAddress_ = "127.0.0.1"
- private var masterTrackerPort_ : Int = 11111
- private var blockSize_ : Int = 512 * 1024
- private var maxRetryCount_ : Int = 2
- private var serverSocketTimout_ : Int = 50000
- private var dualMode_ : Boolean = false
+ private var MasterHostAddress_ = "127.0.0.1"
+ private var MasterTrackerPort_ : Int = 11111
+ private var BlockSize_ : Int = 512 * 1024
+ private var MaxRetryCount_ : Int = 2
+
+ private var TrackerSocketTimeout_ : Int = 50000
+ private var ServerSocketTimeout_ : Int = 10000
+
+ private var DualMode_ : Boolean = false
private var trackMV: TrackMultipleValues = null
@@ -828,18 +870,22 @@ private object BroadcastCS extends Logging {
def initialize (isMaster__ : Boolean) {
synchronized {
if (!initialized) {
- masterHostAddress_ =
- System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1")
- masterTrackerPort_ =
- System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt
- blockSize_ =
- System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
- maxRetryCount_ =
- System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
- serverSocketTimout_ =
- System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt
- dualMode_ =
- System.getProperty ("spark.broadcast.dualMode", "false").toBoolean
+ MasterHostAddress_ =
+ System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
+ MasterTrackerPort_ =
+ System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt
+ BlockSize_ =
+ System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024
+ MaxRetryCount_ =
+ System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
+
+ TrackerSocketTimeout_ =
+ System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt
+ ServerSocketTimeout_ =
+ System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt
+
+ DualMode_ =
+ System.getProperty ("spark.broadcast.DualMode", "false").toBoolean
isMaster_ = isMaster__
@@ -847,7 +893,7 @@ private object BroadcastCS extends Logging {
trackMV = new TrackMultipleValues
trackMV.setDaemon (true)
trackMV.start
- logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues started")
+ logInfo ("TrackMultipleValues started...")
}
initialized = true
@@ -855,12 +901,15 @@ private object BroadcastCS extends Logging {
}
}
- def masterHostAddress = masterHostAddress_
- def masterTrackerPort = masterTrackerPort_
- def blockSize = blockSize_
- def maxRetryCount = maxRetryCount_
- def serverSocketTimout = serverSocketTimout_
- def dualMode = dualMode_
+ def MasterHostAddress = MasterHostAddress_
+ def MasterTrackerPort = MasterTrackerPort_
+ def BlockSize = BlockSize_
+ def MaxRetryCount = MaxRetryCount_
+
+ def TrackerSocketTimeout = TrackerSocketTimeout_
+ def ServerSocketTimeout = ServerSocketTimeout_
+
+ def DualMode = DualMode_
def isMaster = isMaster_
@@ -869,7 +918,7 @@ private object BroadcastCS extends Logging {
def registerValue (uuid: UUID, guidePort: Int) = {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)
- logInfo (System.currentTimeMillis + ": " + "New value registered with the Tracker " + valueToGuidePortMap)
+ logInfo ("New value registered with the Tracker " + valueToGuidePortMap)
}
}
@@ -877,7 +926,7 @@ private object BroadcastCS extends Logging {
valueToGuidePortMap.synchronized {
// Set to 0 to make sure that people read it from HDFS
valueToGuidePortMap (uuid) = 0
- logInfo (System.currentTimeMillis + ": " + "Value unregistered from the Tracker " + valueToGuidePortMap)
+ logInfo ("Value unregistered from the Tracker " + valueToGuidePortMap)
}
}
@@ -895,22 +944,23 @@ private object BroadcastCS extends Logging {
}
}
- class TrackMultipleValues extends Thread with Logging {
+ class TrackMultipleValues
+ extends Thread with Logging {
var keepAccepting = true
override def run = {
var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null
- serverSocket = new ServerSocket (BroadcastCS.masterTrackerPort)
- logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues" + serverSocket)
+ serverSocket = new ServerSocket (BroadcastCS.MasterTrackerPort)
+ logInfo ("TrackMultipleValues" + serverSocket)
try {
while (keepAccepting) {
var clientSocket: Socket = null
try {
// TODO:
- serverSocket.setSoTimeout (serverSocketTimout)
+ serverSocket.setSoTimeout (ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
@@ -937,7 +987,7 @@ private object BroadcastCS extends Logging {
if (valueToGuidePortMap.contains (uuid)) {
valueToGuidePortMap (uuid)
} else -1
- logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
+ logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
oos.writeObject (guidePort)
} catch {
case e: Exception => { }
@@ -972,7 +1022,7 @@ private object BroadcastCS extends Logging {
// private var masterBootHost_ = "127.0.0.1"
// private var masterBootPort_ : Int = 22222
// private var blockSize_ : Int = 512 * 1024
-// private var maxRetryCount_ : Int = 2
+// private var MaxRetryCount_ : Int = 2
//
// private var masterBootAddress_ : InetSocketAddress = null
// private var localBindPort_ : Int = -1
@@ -995,7 +1045,7 @@ private object BroadcastCS extends Logging {
// synchronized {
// if (!initialized) {
// masterBootHost_ =
-// System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1")
+// System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
// masterBootPort_ =
// System.getProperty ("spark.broadcast.masterBootPort", "22222").toInt
//
@@ -1004,8 +1054,8 @@ private object BroadcastCS extends Logging {
//
// blockSize_ =
// System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
-// maxRetryCount_ =
-// System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
+// MaxRetryCount_ =
+// System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
//
// isMaster_ = isMaster__
//
@@ -1019,21 +1069,27 @@ private object BroadcastCS extends Logging {
//
// def masterBootAddress = masterBootAddress_
// def blockSize = blockSize_
-// def maxRetryCount = maxRetryCount_
+// def MaxRetryCount = MaxRetryCount_
//
// def pEnvironment: Environment = {
-// if (pEnvironment_ == null) { initializeSplitStream }
+// if (pEnvironment_ == null) {
+// initializeSplitStream
+// }
// pEnvironment_
// }
//
// def pastryNode: PastryNode = {
-// if (pastryNode_ == null) { initializeSplitStream }
+// if (pastryNode_ == null) {
+// initializeSplitStream
+// }
// pastryNode_
// }
//
// def localBindPort = {
// if (localBindPort_ == -1) {
-// if (isMaster) { localBindPort_ = masterBootPort_ }
+// if (isMaster) {
+// localBindPort_ = masterBootPort_
+// }
// else {
// // TODO: What's the best way of finding a free port?
// val sSocket = new ServerSocket (0)
@@ -1092,16 +1148,22 @@ private object BroadcastCS extends Logging {
// def receiveVariable[A] (uuid: UUID): Boolean = {
// // TODO: Things will change if out-of-order variable recepetion is supported
//
-// logInfo (System.currentTimeMillis + ": " + "In receiveVariable")
+// logInfo ("In receiveVariable")
//
// // Check in valueBytes
-// if (xferValueBytesToValues[A] (uuid)) { return true }
+// if (xferValueBytesToValues[A] (uuid)) {
+// return true
+// }
//
// // Check if its in progress
-// for (i <- 0 until maxRetryCount) {
-// logInfo (System.currentTimeMillis + ": " + uuid + " " + curUUID)
-// while (uuid == curUUID) { Thread.sleep (100) } // TODO: How long to sleep
-// if (xferValueBytesToValues[A] (uuid)) { return true }
+// for (i <- 0 until MaxRetryCount) {
+// logInfo (uuid + " " + curUUID)
+// while (uuid == curUUID) {
+// Thread.sleep (100) // TODO: How long to sleep
+// }
+// if (xferValueBytesToValues[A] (uuid)) {
+// return true
+// }
//
// // Wait for a while to see if we've reached here before xmission started
// Thread.sleep (100)
@@ -1111,10 +1173,14 @@ private object BroadcastCS extends Logging {
//
// private def xferValueBytesToValues[A] (uuid: UUID): Boolean = {
// var cachedValueBytes: Array[Byte] = null
-// valueBytes.synchronized { cachedValueBytes = valueBytes.get (uuid) }
+// valueBytes.synchronized {
+// cachedValueBytes = valueBytes.get (uuid)
+// }
// if (cachedValueBytes != null) {
// val cachedValue = byteArrayToObject[A] (cachedValueBytes)
-// values.synchronized { values.put (uuid, cachedValue) }
+// values.synchronized {
+// values.put (uuid, cachedValue)
+// }
// return true
// }
// return false
@@ -1138,19 +1204,22 @@ private object BroadcastCS extends Logging {
// private def intToByteArray (value: Int): Array[Byte] = {
// var retVal = new Array[Byte] (4)
-// for (i <- 0 until 4)
+// for (i <- 0 until 4) {
// retVal(i) = (value >> ((4 - 1 - i) * 8)).toByte
+// }
// return retVal
// }
// private def byteArrayToInt (arr: Array[Byte], offset: Int): Int = {
// var retVal = 0
-// for (i <- 0 until 4)
+// for (i <- 0 until 4) {
// retVal += ((arr(i + offset).toInt & 0x000000FF) << ((4 - 1 - i) * 8))
+// }
// return retVal
// }
-// class SSClient (pastryNode: PastryNode) extends SplitStreamClient
+// class SSClient (pastryNode: PastryNode)
+// extends SplitStreamClient
// with Application {
// // Magic bits: 11111100001100100100110000111111
// val magicBits = 0xFC324C3F
@@ -1188,7 +1257,9 @@ private object BroadcastCS extends Logging {
//
// // Subscribing notifies your application when data comes through the tree
// myStripes = myChannel.getStripes
-// for (curStripe <- myStripes) { curStripe.subscribe (this) }
+// for (curStripe <- myStripes) {
+// curStripe.subscribe (this)
+// }
// }
//
// // Part of SplitStreamClient. Called when a published message is received.
@@ -1215,7 +1286,7 @@ private object BroadcastCS extends Logging {
// curBlockBitmap = new Array[Boolean] (curTotalBlocks)
// curArrayOfBytes = new Array[Byte] (curTotalBytes)
//
-// logInfo (System.currentTimeMillis + ": " + curUUID + " " + curTotalBlocks + " " + curTotalBytes)
+// logInfo (curUUID + " " + curTotalBlocks + " " + curTotalBytes)
// }
// case DATA_MSG => {
// val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] (
@@ -1235,7 +1306,7 @@ private object BroadcastCS extends Logging {
// System.arraycopy (blockData, 0, curArrayOfBytes,
// blockIndex * blockSize, blockData.length)
//
-// logInfo (System.currentTimeMillis + ": " + "Got stuff for: " + blockUUID)
+// logInfo ("Got stuff for: " + blockUUID)
//
// // Done receiving
// if (curHasBlocks == curTotalBlocks) {
@@ -1244,7 +1315,7 @@ private object BroadcastCS extends Logging {
// valueBytes.put (curUUID, curArrayOfBytes)
// }
//
-// logInfo (System.currentTimeMillis + ": " + "Finished reading. Stored in valueBytes")
+// logInfo ("Finished reading. Stored in valueBytes")
//
// // RESET
// curUUID = null
@@ -1300,13 +1371,16 @@ private object BroadcastCS extends Logging {
// myStripes(stripeID).publish (bytesToSend)
// }
-// /* class PublishContent extends Message {
+// /* class PublishContent
+// extends Message {
// def getPriority: Int = { Message.MEDIUM_PRIORITY }
// } */
//
// // Error handling
-// def joinFailed(s: Stripe) = { logInfo ("joinFailed(" + s + ")") }
-
+// def joinFailed(s: Stripe) = {
+// logInfo ("joinFailed(" + s + ")")
+// }
+//
// // Rest of the Application interface. NOT USED.
// def deliver (id: rice.p2p.commonapi.Id, message: Message) = { }
// def forward (message: RouteMessage): Boolean = false
@@ -1314,7 +1388,8 @@ private object BroadcastCS extends Logging {
// }
//}
-private object BroadcastCH extends Logging {
+private object BroadcastCH
+extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
private var initialized = false