From ad7a9c5a3695f1f83cbdfa96d435003939582c8a Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury Date: Tue, 12 Oct 2010 12:55:43 -0700 Subject: Minor cleanup in Broadcast.scala. Changed BroadcastTest.scala to have multiple broadcasts. --- conf/java-opts | 1 + conf/spark-env.sh | 2 +- src/examples/BroadcastTest.scala | 18 +++-- src/examples/SparkPi.scala | 2 +- src/scala/spark/Broadcast.scala | 148 +++++++++++++++++++------------------ src/scala/spark/SparkContext.scala | 2 +- third_party/mesos.jar | Bin 34562 -> 33618 bytes 7 files changed, 90 insertions(+), 83 deletions(-) diff --git a/conf/java-opts b/conf/java-opts index e69de29bb2..b61e8163b5 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -0,0 +1 @@ +-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false diff --git a/conf/spark-env.sh b/conf/spark-env.sh index 6852b23a34..77f9cb69b9 100755 --- a/conf/spark-env.sh +++ b/conf/spark-env.sh @@ -10,4 +10,4 @@ # be in the same format as the JVM's -Xmx option, e.g. 300m or 1g). # - SPARK_LIBRARY_PATH, to add extra search paths for native libraries. - +MESOS_HOME=/home/mosharaf/Work/mesos diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala index 7764013413..40c2be8f6d 100644 --- a/src/examples/BroadcastTest.scala +++ b/src/examples/BroadcastTest.scala @@ -10,15 +10,19 @@ object BroadcastTest { val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 - var arr = new Array[Int](num) - for (i <- 0 until arr.length) - arr(i) = i + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) + arr1(i) = i - val barr = spark.broadcast(arr) +// var arr2 = new Array[Int](num * 2) +// for (i <- 0 until arr2.length) +// arr2(i) = i + + val barr1 = spark.broadcast(arr1) +// val barr2 = spark.broadcast(arr2) spark.parallelize(1 to 10, slices).foreach { - println("in task: barr = " + barr) - i => println(barr.value.size) +// i => println(barr1.value.size + barr2.value.size) + i => println(barr1.value.size) } } } - diff --git a/src/examples/SparkPi.scala b/src/examples/SparkPi.scala index 07311908ee..f055614125 100644 --- a/src/examples/SparkPi.scala +++ b/src/examples/SparkPi.scala @@ -5,7 +5,7 @@ import SparkContext._ object SparkPi { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: SparkLR []") + System.err.println("Usage: SparkPi []") System.exit(1) } val spark = new SparkContext(args(0), "SparkPi") diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index fc37635983..105aa3483e 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -29,7 +29,6 @@ trait BroadcastRecipe { override def toString = "spark.Broadcast(" + uuid + ")" } -// TODO: Should think about storing in HDFS in the future // TODO: Right, now no parallelization between multiple broadcasts @serializable class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) @@ -66,6 +65,7 @@ extends BroadcastRecipe with Logging { out.close } + // Called by Java when deserializing an object private def readObject (in: ObjectInputStream) { in.defaultReadObject BroadcastCS.synchronized { @@ -74,7 +74,7 @@ extends BroadcastRecipe with Logging { value_ = cachedVal.asInstanceOf[T] } else { // Only a single worker (the first one) in the same node can ever be - // here. The rest will always get the value ready + // here. The rest will always get the value ready. val start = System.nanoTime val retByteArray = BroadcastCS.receiveBroadcast (uuid) @@ -223,7 +223,7 @@ private object Broadcast { // Initialization for CentralizedHDFSBroadcast BroadcastCH.initialize // Initialization for ChainedStreamingBroadcast - //BroadcastCS.initialize (isMaster) + BroadcastCS.initialize (isMaster) initialized = true } @@ -276,7 +276,7 @@ private object BroadcastCS extends Logging { maxRetryCount_ = System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt serverSocketTimout_ = - System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt + System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt dualMode_ = System.getProperty ("spark.broadcast.dualMode", "false").toBoolean @@ -288,10 +288,10 @@ private object BroadcastCS extends Logging { guideMR.start logInfo("GuideMultipleRequests started") } + serveMR = new ServeMultipleRequests serveMR.setDaemon (true) - serveMR.start - + serveMR.start logInfo("ServeMultipleRequests started") logInfo("BroadcastCS object has been initialized") @@ -369,8 +369,7 @@ private object BroadcastCS extends Logging { totalBlocksLock.synchronized { totalBlocksLock.notifyAll } - totalBytes = sourceInfo.totalBytes - + totalBytes = sourceInfo.totalBytes logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) retByteArray = receiveSingleTransmission (sourceInfo) @@ -415,7 +414,7 @@ private object BroadcastCS extends Logging { new ObjectInputStream (clientSocketToSource.getInputStream) logInfo("Inside receiveSingleTransmission") - logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) + logInfo("totalBlocks: " + totalBlocks + " " + "hasBlocks: " + hasBlocks) retByteArray = new Array[Byte] (totalBytes) for (i <- 0 until totalBlocks) { val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] @@ -437,74 +436,78 @@ private object BroadcastCS extends Logging { } } finally { if (oisSource != null) { oisSource.close } - if (oosSource != null) { - oosSource.close - } + if (oosSource != null) { oosSource.close } if (clientSocketToSource != null) { clientSocketToSource.close } } return retByteArray } - class TrackMultipleValues extends Thread with Logging { - override def run = { - var threadPool = Executors.newCachedThreadPool - var serverSocket: ServerSocket = null - - serverSocket = new ServerSocket (BroadcastCS.masterListenPort) - logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort) - - var keepAccepting = true - try { - while (true) { - var clientSocket: Socket = null - try { - serverSocket.setSoTimeout (serverSocketTimout) - clientSocket = serverSocket.accept - } catch { - case e: Exception => { - logInfo("TrackMultipleValues Timeout. Stopping listening...") - keepAccepting = false - } - } - logInfo("TrackMultipleValues:Got new request:" + clientSocket) - if (clientSocket != null) { - try { - threadPool.execute (new Runnable { - def run = { - val oos = new ObjectOutputStream (clientSocket.getOutputStream) - val ois = new ObjectInputStream (clientSocket.getInputStream) - try { - val variableUUID = ois.readObject.asInstanceOf[UUID] - var contactPort = 0 - // TODO: Add logic and data structures to find out UUID->port - // mapping. 0 = missed the broadcast, read from HDFS; <0 = - // Haven't started yet, wait & retry; >0 = Read from this port - oos.writeObject (contactPort) - } catch { - case e: Exception => { } - } finally { - ois.close - oos.close - clientSocket.close - } - } - }) - } catch { - // In failure, close the socket here; else, the thread will close it - case ioe: IOException => clientSocket.close - } - } - } - } finally { - serverSocket.close - } - } - } +// class TrackMultipleValues extends Thread with Logging { +// override def run = { +// var threadPool = Executors.newCachedThreadPool +// var serverSocket: ServerSocket = null +// +// serverSocket = new ServerSocket (BroadcastCS.masterListenPort) +// logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort) +// +// var keepAccepting = true +// try { +// while (keepAccepting) { +// var clientSocket: Socket = null +// try { +// serverSocket.setSoTimeout (serverSocketTimout) +// clientSocket = serverSocket.accept +// } catch { +// case e: Exception => { +// logInfo("TrackMultipleValues Timeout. Stopping listening...") +// keepAccepting = false +// } +// } +// logInfo("TrackMultipleValues:Got new request:" + clientSocket) +// if (clientSocket != null) { +// try { +// threadPool.execute (new Runnable { +// def run = { +// val oos = new ObjectOutputStream (clientSocket.getOutputStream) +// val ois = new ObjectInputStream (clientSocket.getInputStream) +// try { +// val variableUUID = ois.readObject.asInstanceOf[UUID] +// var contactPort = 0 +// // TODO: Add logic and data structures to find out UUID->port +// // mapping. 0 = missed the broadcast, read from HDFS; <0 = +// // Haven't started yet, wait & retry; >0 = Read from this port +// oos.writeObject (contactPort) +// } catch { +// case e: Exception => { } +// } finally { +// ois.close +// oos.close +// clientSocket.close +// } +// } +// }) +// } catch { +// // In failure, close the socket here; else, the thread will close it +// case ioe: IOException => clientSocket.close +// } +// } +// } +// } finally { +// serverSocket.close +// } +// } +// } +// +// class TrackSingleValue { +// +// } + +// public static ExecutorService newCachedThreadPool() { +// return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, +// new SynchronousQueue()); +// } - class TrackSingleValue { - - } class GuideMultipleRequests extends Thread with Logging { override def run = { @@ -661,7 +664,7 @@ private object BroadcastCS extends Logging { while (keepAccepting) { var clientSocket: Socket = null try { - serverSocket.setSoTimeout (serverSocketTimout) + serverSocket.setSoTimeout (serverSocketTimout) clientSocket = serverSocket.accept } catch { case e: Exception => { @@ -731,8 +734,7 @@ private object BroadcastCS extends Logging { logInfo("Send block: " + i + " " + arrayOfBlocks(i)) } } - } - + } } } diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index 62e49271bf..90bea8921a 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -20,7 +20,7 @@ class SparkContext(master: String, frameworkName: String) extends Logging { // TODO: Keep around a weak hash map of values to Cached versions? def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) - //def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) + // def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) def textFile(path: String) = new HdfsTextFile(this, path) diff --git a/third_party/mesos.jar b/third_party/mesos.jar index 1852cf8fd0..60d299c8af 100644 Binary files a/third_party/mesos.jar and b/third_party/mesos.jar differ -- cgit v1.2.3