aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/java-opts1
-rwxr-xr-xconf/spark-env.sh2
-rw-r--r--src/examples/BroadcastTest.scala18
-rw-r--r--src/examples/SparkPi.scala2
-rw-r--r--src/scala/spark/Broadcast.scala148
-rw-r--r--src/scala/spark/SparkContext.scala2
-rw-r--r--third_party/mesos.jarbin34562 -> 33618 bytes
7 files changed, 90 insertions, 83 deletions
diff --git a/conf/java-opts b/conf/java-opts
index e69de29bb2..b61e8163b5 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -0,0 +1 @@
+-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false
diff --git a/conf/spark-env.sh b/conf/spark-env.sh
index 6852b23a34..77f9cb69b9 100755
--- a/conf/spark-env.sh
+++ b/conf/spark-env.sh
@@ -10,4 +10,4 @@
# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g).
# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.
-
+MESOS_HOME=/home/mosharaf/Work/mesos
diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala
index 7764013413..40c2be8f6d 100644
--- a/src/examples/BroadcastTest.scala
+++ b/src/examples/BroadcastTest.scala
@@ -10,15 +10,19 @@ object BroadcastTest {
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
- var arr = new Array[Int](num)
- for (i <- 0 until arr.length)
- arr(i) = i
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length)
+ arr1(i) = i
- val barr = spark.broadcast(arr)
+// var arr2 = new Array[Int](num * 2)
+// for (i <- 0 until arr2.length)
+// arr2(i) = i
+
+ val barr1 = spark.broadcast(arr1)
+// val barr2 = spark.broadcast(arr2)
spark.parallelize(1 to 10, slices).foreach {
- println("in task: barr = " + barr)
- i => println(barr.value.size)
+// i => println(barr1.value.size + barr2.value.size)
+ i => println(barr1.value.size)
}
}
}
-
diff --git a/src/examples/SparkPi.scala b/src/examples/SparkPi.scala
index 07311908ee..f055614125 100644
--- a/src/examples/SparkPi.scala
+++ b/src/examples/SparkPi.scala
@@ -5,7 +5,7 @@ import SparkContext._
object SparkPi {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: SparkLR <host> [<slices>]")
+ System.err.println("Usage: SparkPi <host> [<slices>]")
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkPi")
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index fc37635983..105aa3483e 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -29,7 +29,6 @@ trait BroadcastRecipe {
override def toString = "spark.Broadcast(" + uuid + ")"
}
-// TODO: Should think about storing in HDFS in the future
// TODO: Right, now no parallelization between multiple broadcasts
@serializable
class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@@ -66,6 +65,7 @@ extends BroadcastRecipe with Logging {
out.close
}
+ // Called by Java when deserializing an object
private def readObject (in: ObjectInputStream) {
in.defaultReadObject
BroadcastCS.synchronized {
@@ -74,7 +74,7 @@ extends BroadcastRecipe with Logging {
value_ = cachedVal.asInstanceOf[T]
} else {
// Only a single worker (the first one) in the same node can ever be
- // here. The rest will always get the value ready
+ // here. The rest will always get the value ready.
val start = System.nanoTime
val retByteArray = BroadcastCS.receiveBroadcast (uuid)
@@ -223,7 +223,7 @@ private object Broadcast {
// Initialization for CentralizedHDFSBroadcast
BroadcastCH.initialize
// Initialization for ChainedStreamingBroadcast
- //BroadcastCS.initialize (isMaster)
+ BroadcastCS.initialize (isMaster)
initialized = true
}
@@ -276,7 +276,7 @@ private object BroadcastCS extends Logging {
maxRetryCount_ =
System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
serverSocketTimout_ =
- System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt
+ System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt
dualMode_ =
System.getProperty ("spark.broadcast.dualMode", "false").toBoolean
@@ -288,10 +288,10 @@ private object BroadcastCS extends Logging {
guideMR.start
logInfo("GuideMultipleRequests started")
}
+
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
- serveMR.start
-
+ serveMR.start
logInfo("ServeMultipleRequests started")
logInfo("BroadcastCS object has been initialized")
@@ -369,8 +369,7 @@ private object BroadcastCS extends Logging {
totalBlocksLock.synchronized {
totalBlocksLock.notifyAll
}
- totalBytes = sourceInfo.totalBytes
-
+ totalBytes = sourceInfo.totalBytes
logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
retByteArray = receiveSingleTransmission (sourceInfo)
@@ -415,7 +414,7 @@ private object BroadcastCS extends Logging {
new ObjectInputStream (clientSocketToSource.getInputStream)
logInfo("Inside receiveSingleTransmission")
- logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
+ logInfo("totalBlocks: " + totalBlocks + " " + "hasBlocks: " + hasBlocks)
retByteArray = new Array[Byte] (totalBytes)
for (i <- 0 until totalBlocks) {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
@@ -437,74 +436,78 @@ private object BroadcastCS extends Logging {
}
} finally {
if (oisSource != null) { oisSource.close }
- if (oosSource != null) {
- oosSource.close
- }
+ if (oosSource != null) { oosSource.close }
if (clientSocketToSource != null) { clientSocketToSource.close }
}
return retByteArray
}
- class TrackMultipleValues extends Thread with Logging {
- override def run = {
- var threadPool = Executors.newCachedThreadPool
- var serverSocket: ServerSocket = null
-
- serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
- logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort)
-
- var keepAccepting = true
- try {
- while (true) {
- var clientSocket: Socket = null
- try {
- serverSocket.setSoTimeout (serverSocketTimout)
- clientSocket = serverSocket.accept
- } catch {
- case e: Exception => {
- logInfo("TrackMultipleValues Timeout. Stopping listening...")
- keepAccepting = false
- }
- }
- logInfo("TrackMultipleValues:Got new request:" + clientSocket)
- if (clientSocket != null) {
- try {
- threadPool.execute (new Runnable {
- def run = {
- val oos = new ObjectOutputStream (clientSocket.getOutputStream)
- val ois = new ObjectInputStream (clientSocket.getInputStream)
- try {
- val variableUUID = ois.readObject.asInstanceOf[UUID]
- var contactPort = 0
- // TODO: Add logic and data structures to find out UUID->port
- // mapping. 0 = missed the broadcast, read from HDFS; <0 =
- // Haven't started yet, wait & retry; >0 = Read from this port
- oos.writeObject (contactPort)
- } catch {
- case e: Exception => { }
- } finally {
- ois.close
- oos.close
- clientSocket.close
- }
- }
- })
- } catch {
- // In failure, close the socket here; else, the thread will close it
- case ioe: IOException => clientSocket.close
- }
- }
- }
- } finally {
- serverSocket.close
- }
- }
- }
+// class TrackMultipleValues extends Thread with Logging {
+// override def run = {
+// var threadPool = Executors.newCachedThreadPool
+// var serverSocket: ServerSocket = null
+//
+// serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
+// logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort)
+//
+// var keepAccepting = true
+// try {
+// while (keepAccepting) {
+// var clientSocket: Socket = null
+// try {
+// serverSocket.setSoTimeout (serverSocketTimout)
+// clientSocket = serverSocket.accept
+// } catch {
+// case e: Exception => {
+// logInfo("TrackMultipleValues Timeout. Stopping listening...")
+// keepAccepting = false
+// }
+// }
+// logInfo("TrackMultipleValues:Got new request:" + clientSocket)
+// if (clientSocket != null) {
+// try {
+// threadPool.execute (new Runnable {
+// def run = {
+// val oos = new ObjectOutputStream (clientSocket.getOutputStream)
+// val ois = new ObjectInputStream (clientSocket.getInputStream)
+// try {
+// val variableUUID = ois.readObject.asInstanceOf[UUID]
+// var contactPort = 0
+// // TODO: Add logic and data structures to find out UUID->port
+// // mapping. 0 = missed the broadcast, read from HDFS; <0 =
+// // Haven't started yet, wait & retry; >0 = Read from this port
+// oos.writeObject (contactPort)
+// } catch {
+// case e: Exception => { }
+// } finally {
+// ois.close
+// oos.close
+// clientSocket.close
+// }
+// }
+// })
+// } catch {
+// // In failure, close the socket here; else, the thread will close it
+// case ioe: IOException => clientSocket.close
+// }
+// }
+// }
+// } finally {
+// serverSocket.close
+// }
+// }
+// }
+//
+// class TrackSingleValue {
+//
+// }
+
+// public static ExecutorService newCachedThreadPool() {
+// return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+// new SynchronousQueue<Runnable>());
+// }
- class TrackSingleValue {
-
- }
class GuideMultipleRequests extends Thread with Logging {
override def run = {
@@ -661,7 +664,7 @@ private object BroadcastCS extends Logging {
while (keepAccepting) {
var clientSocket: Socket = null
try {
- serverSocket.setSoTimeout (serverSocketTimout)
+ serverSocket.setSoTimeout (serverSocketTimout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
@@ -731,8 +734,7 @@ private object BroadcastCS extends Logging {
logInfo("Send block: " + i + " " + arrayOfBlocks(i))
}
}
- }
-
+ }
}
}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 62e49271bf..90bea8921a 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -20,7 +20,7 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
// TODO: Keep around a weak hash map of values to Cached versions?
def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local)
- //def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
+ // def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
def textFile(path: String) = new HdfsTextFile(this, path)
diff --git a/third_party/mesos.jar b/third_party/mesos.jar
index 1852cf8fd0..60d299c8af 100644
--- a/third_party/mesos.jar
+++ b/third_party/mesos.jar
Binary files differ