diff options
-rw-r--r-- | conf/java-opts.template | 1 | ||||
-rwxr-xr-x | conf/spark-env.sh.template | 2 | ||||
-rw-r--r-- | src/examples/BroadcastTest.scala | 18 | ||||
-rw-r--r-- | src/examples/SparkPi.scala | 2 | ||||
-rw-r--r-- | src/scala/spark/Broadcast.scala | 148 | ||||
-rw-r--r-- | third_party/mesos.jar | bin | 34562 -> 33618 bytes |
6 files changed, 89 insertions, 82 deletions
diff --git a/conf/java-opts.template b/conf/java-opts.template index e69de29bb2..b61e8163b5 100644 --- a/conf/java-opts.template +++ b/conf/java-opts.template @@ -0,0 +1 @@ +-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 6852b23a34..77f9cb69b9 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -10,4 +10,4 @@ # be in the same format as the JVM's -Xmx option, e.g. 300m or 1g). # - SPARK_LIBRARY_PATH, to add extra search paths for native libraries. - +MESOS_HOME=/home/mosharaf/Work/mesos diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala index 7764013413..40c2be8f6d 100644 --- a/src/examples/BroadcastTest.scala +++ b/src/examples/BroadcastTest.scala @@ -10,15 +10,19 @@ object BroadcastTest { val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 - var arr = new Array[Int](num) - for (i <- 0 until arr.length) - arr(i) = i + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) + arr1(i) = i - val barr = spark.broadcast(arr) +// var arr2 = new Array[Int](num * 2) +// for (i <- 0 until arr2.length) +// arr2(i) = i + + val barr1 = spark.broadcast(arr1) +// val barr2 = spark.broadcast(arr2) spark.parallelize(1 to 10, slices).foreach { - println("in task: barr = " + barr) - i => println(barr.value.size) +// i => println(barr1.value.size + barr2.value.size) + i => println(barr1.value.size) } } } - diff --git a/src/examples/SparkPi.scala b/src/examples/SparkPi.scala index 07311908ee..f055614125 100644 --- a/src/examples/SparkPi.scala +++ b/src/examples/SparkPi.scala @@ -5,7 +5,7 @@ import SparkContext._ object SparkPi { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: SparkLR <host> [<slices>]") + System.err.println("Usage: SparkPi <host> [<slices>]") System.exit(1) } val spark = new SparkContext(args(0), "SparkPi") diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index fc37635983..105aa3483e 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -29,7 +29,6 @@ trait BroadcastRecipe { override def toString = "spark.Broadcast(" + uuid + ")" } -// TODO: Should think about storing in HDFS in the future // TODO: Right, now no parallelization between multiple broadcasts @serializable class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) @@ -66,6 +65,7 @@ extends BroadcastRecipe with Logging { out.close } + // Called by Java when deserializing an object private def readObject (in: ObjectInputStream) { in.defaultReadObject BroadcastCS.synchronized { @@ -74,7 +74,7 @@ extends BroadcastRecipe with Logging { value_ = cachedVal.asInstanceOf[T] } else { // Only a single worker (the first one) in the same node can ever be - // here. The rest will always get the value ready + // here. The rest will always get the value ready. val start = System.nanoTime val retByteArray = BroadcastCS.receiveBroadcast (uuid) @@ -223,7 +223,7 @@ private object Broadcast { // Initialization for CentralizedHDFSBroadcast BroadcastCH.initialize // Initialization for ChainedStreamingBroadcast - //BroadcastCS.initialize (isMaster) + BroadcastCS.initialize (isMaster) initialized = true } @@ -276,7 +276,7 @@ private object BroadcastCS extends Logging { maxRetryCount_ = System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt serverSocketTimout_ = - System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt + System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt dualMode_ = System.getProperty ("spark.broadcast.dualMode", "false").toBoolean @@ -288,10 +288,10 @@ private object BroadcastCS extends Logging { guideMR.start logInfo("GuideMultipleRequests started") } + serveMR = new ServeMultipleRequests serveMR.setDaemon (true) - serveMR.start - + serveMR.start logInfo("ServeMultipleRequests started") logInfo("BroadcastCS object has been initialized") @@ -369,8 +369,7 @@ private object BroadcastCS extends Logging { totalBlocksLock.synchronized { totalBlocksLock.notifyAll } - totalBytes = sourceInfo.totalBytes - + totalBytes = sourceInfo.totalBytes logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) retByteArray = receiveSingleTransmission (sourceInfo) @@ -415,7 +414,7 @@ private object BroadcastCS extends Logging { new ObjectInputStream (clientSocketToSource.getInputStream) logInfo("Inside receiveSingleTransmission") - logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) + logInfo("totalBlocks: " + totalBlocks + " " + "hasBlocks: " + hasBlocks) retByteArray = new Array[Byte] (totalBytes) for (i <- 0 until totalBlocks) { val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] @@ -437,74 +436,78 @@ private object BroadcastCS extends Logging { } } finally { if (oisSource != null) { oisSource.close } - if (oosSource != null) { - oosSource.close - } + if (oosSource != null) { oosSource.close } if (clientSocketToSource != null) { clientSocketToSource.close } } return retByteArray } - class TrackMultipleValues extends Thread with Logging { - override def run = { - var threadPool = Executors.newCachedThreadPool - var serverSocket: ServerSocket = null - - serverSocket = new ServerSocket (BroadcastCS.masterListenPort) - logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort) - - var keepAccepting = true - try { - while (true) { - var clientSocket: Socket = null - try { - serverSocket.setSoTimeout (serverSocketTimout) - clientSocket = serverSocket.accept - } catch { - case e: Exception => { - logInfo("TrackMultipleValues Timeout. Stopping listening...") - keepAccepting = false - } - } - logInfo("TrackMultipleValues:Got new request:" + clientSocket) - if (clientSocket != null) { - try { - threadPool.execute (new Runnable { - def run = { - val oos = new ObjectOutputStream (clientSocket.getOutputStream) - val ois = new ObjectInputStream (clientSocket.getInputStream) - try { - val variableUUID = ois.readObject.asInstanceOf[UUID] - var contactPort = 0 - // TODO: Add logic and data structures to find out UUID->port - // mapping. 0 = missed the broadcast, read from HDFS; <0 = - // Haven't started yet, wait & retry; >0 = Read from this port - oos.writeObject (contactPort) - } catch { - case e: Exception => { } - } finally { - ois.close - oos.close - clientSocket.close - } - } - }) - } catch { - // In failure, close the socket here; else, the thread will close it - case ioe: IOException => clientSocket.close - } - } - } - } finally { - serverSocket.close - } - } - } +// class TrackMultipleValues extends Thread with Logging { +// override def run = { +// var threadPool = Executors.newCachedThreadPool +// var serverSocket: ServerSocket = null +// +// serverSocket = new ServerSocket (BroadcastCS.masterListenPort) +// logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort) +// +// var keepAccepting = true +// try { +// while (keepAccepting) { +// var clientSocket: Socket = null +// try { +// serverSocket.setSoTimeout (serverSocketTimout) +// clientSocket = serverSocket.accept +// } catch { +// case e: Exception => { +// logInfo("TrackMultipleValues Timeout. Stopping listening...") +// keepAccepting = false +// } +// } +// logInfo("TrackMultipleValues:Got new request:" + clientSocket) +// if (clientSocket != null) { +// try { +// threadPool.execute (new Runnable { +// def run = { +// val oos = new ObjectOutputStream (clientSocket.getOutputStream) +// val ois = new ObjectInputStream (clientSocket.getInputStream) +// try { +// val variableUUID = ois.readObject.asInstanceOf[UUID] +// var contactPort = 0 +// // TODO: Add logic and data structures to find out UUID->port +// // mapping. 0 = missed the broadcast, read from HDFS; <0 = +// // Haven't started yet, wait & retry; >0 = Read from this port +// oos.writeObject (contactPort) +// } catch { +// case e: Exception => { } +// } finally { +// ois.close +// oos.close +// clientSocket.close +// } +// } +// }) +// } catch { +// // In failure, close the socket here; else, the thread will close it +// case ioe: IOException => clientSocket.close +// } +// } +// } +// } finally { +// serverSocket.close +// } +// } +// } +// +// class TrackSingleValue { +// +// } + +// public static ExecutorService newCachedThreadPool() { +// return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, +// new SynchronousQueue<Runnable>()); +// } - class TrackSingleValue { - - } class GuideMultipleRequests extends Thread with Logging { override def run = { @@ -661,7 +664,7 @@ private object BroadcastCS extends Logging { while (keepAccepting) { var clientSocket: Socket = null try { - serverSocket.setSoTimeout (serverSocketTimout) + serverSocket.setSoTimeout (serverSocketTimout) clientSocket = serverSocket.accept } catch { case e: Exception => { @@ -731,8 +734,7 @@ private object BroadcastCS extends Logging { logInfo("Send block: " + i + " " + arrayOfBlocks(i)) } } - } - + } } } diff --git a/third_party/mesos.jar b/third_party/mesos.jar Binary files differindex 1852cf8fd0..60d299c8af 100644 --- a/third_party/mesos.jar +++ b/third_party/mesos.jar |