aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-15 23:47:11 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-15 23:47:11 -0700
commitf58da6164eaf13dd986a39a40535975096b71b44 (patch)
tree78ccc05ff8b49bd8ec903d0481ecfebaa28ed574
parentb5cf47cda382601e3f1400884415c9a6bd508a13 (diff)
parent4449eb97834ed6191dc0937d255c475191895980 (diff)
downloadspark-f58da6164eaf13dd986a39a40535975096b71b44.tar.gz
spark-f58da6164eaf13dd986a39a40535975096b71b44.tar.bz2
spark-f58da6164eaf13dd986a39a40535975096b71b44.zip
Merge branch 'master' into dev
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala3
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala1
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala187
-rw-r--r--core/src/main/scala/spark/Partitioner.scala7
-rw-r--r--core/src/main/scala/spark/RDD.scala4
-rw-r--r--core/src/main/scala/spark/Utils.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/BroadcastFactory.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala111
-rw-r--r--core/src/test/scala/spark/BroadcastSuite.scala23
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala101
-rw-r--r--ec2/README4
-rw-r--r--ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh8
-rwxr-xr-xec2/spark-ec220
-rwxr-xr-xec2/spark_ec2.py539
-rw-r--r--ec2/third_party/boto-2.4.1.zipbin0 -> 860195 bytes
-rw-r--r--project/SparkBuild.scala2
-rw-r--r--repl/src/main/scala/spark/repl/SparkILoop.scala2
18 files changed, 926 insertions, 102 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 2f57c9c0fd..996ca2a877 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -30,8 +30,7 @@ object Bagel extends Logging {
val aggregated = agg(verts, aggregator)
val combinedMsgs = msgs.combineByKey(
- combiner.createCombiner, combiner.mergeMsg, combiner.mergeCombiners,
- splits, partitioner)
+ combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep))
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 4c18cb9134..ed8ace3a57 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -106,7 +106,6 @@ object WikipediaPageRankStandalone {
ranks = (contribs.combineByKey((x: Double) => x,
(x: Double, y: Double) => x + y,
(x: Double, y: Double) => x + y,
- numSplits,
partitioner)
.mapValues(sum => a/n + (1-a)*sum))
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index ff6764e0a2..270447712b 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -49,7 +49,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- numSplits: Int,
partitioner: Partitioner): RDD[(K, C)] = {
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
new ShuffledRDD(self, aggregator, partitioner)
@@ -59,12 +58,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numSplits: Int): RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits,
- new HashPartitioner(numSplits))
+ combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
}
- def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
- combineByKey[V]((v: V) => v, func, func, numSplits)
+ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+ combineByKey[V]((v: V) => v, func, func, partitioner)
}
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
@@ -100,13 +98,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self.map(_._1).countByValueApprox(timeout, confidence)
}
- def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, numSplits)
- bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
+ reduceByKey(new HashPartitioner(numSplits), func)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
@@ -114,100 +107,90 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
+ def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
+ groupByKey(new HashPartitioner(numSplits))
+ }
+
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
}
- def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[V]
- val wbuf = new ArrayBuffer[W]
- seq.foreach(_ match {
- case Left(v) => vbuf += v
- case Right(w) => wbuf += w
- })
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
+ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, ws) =>
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
- def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[V]
- val wbuf = new ArrayBuffer[Option[W]]
- seq.foreach(_ match {
- case Left(v) => vbuf += v
- case Right(w) => wbuf += Some(w)
- })
- if (wbuf.isEmpty) {
- wbuf += None
+ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, ws) =>
+ if (ws.isEmpty) {
+ vs.iterator.map(v => (v, None))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
}
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
}
}
- def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[Option[V]]
- val wbuf = new ArrayBuffer[W]
- seq.foreach(_ match {
- case Left(v) => vbuf += Some(v)
- case Right(w) => wbuf += w
- })
- if (vbuf.isEmpty) {
- vbuf += None
+ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], W))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, ws) =>
+ if (vs.isEmpty) {
+ ws.iterator.map(w => (None, w))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
}
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
}
}
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) : RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism)
+ combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
- reduceByKey(func, defaultParallelism)
+ reduceByKey(defaultPartitioner(self), func)
}
def groupByKey(): RDD[(K, Seq[V])] = {
- groupByKey(defaultParallelism)
+ groupByKey(defaultPartitioner(self))
}
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
- join(other, defaultParallelism)
+ join(other, defaultPartitioner(self, other))
+ }
+
+ def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
+ join(other, new HashPartitioner(numSplits))
}
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
- leftOuterJoin(other, defaultParallelism)
+ leftOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
+ leftOuterJoin(other, new HashPartitioner(numSplits))
}
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
- rightOuterJoin(other, defaultParallelism)
+ rightOuterJoin(other, defaultPartitioner(self, other))
}
- def defaultParallelism = self.context.defaultParallelism
+ def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
+ rightOuterJoin(other, new HashPartitioner(numSplits))
+ }
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
@@ -216,42 +199,72 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
new MappedValuesRDD(self, cleanF)
}
- def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
+ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
- val part = self.partitioner match {
- case Some(p) => p
- case None => new HashPartitioner(defaultParallelism)
- }
+ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
- part)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(
- classManifest[K],
- Manifests.seqSeqManifest)
+ partitioner)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
prfs.mapValues {
case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
- def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- val part = self.partitioner match {
- case Some(p) => p
- case None => new HashPartitioner(defaultParallelism)
- }
- new CoGroupedRDD[K](
+ val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
other1.asInstanceOf[RDD[(_, _)]],
other2.asInstanceOf[RDD[(_, _)]]),
- part).map {
- case (k, Seq(vs, w1s, w2s)) =>
- (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
+ partitioner)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ prfs.mapValues {
+ case Seq(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ }
+ }
+
+ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(self, other))
+ }
+
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+ }
+
+ def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, new HashPartitioner(numSplits))
+ }
+
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int)
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, new HashPartitioner(numSplits))
+ }
+
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(self, other))
+ }
+
+ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+ }
+
+ /**
+ * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
+ * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
+ */
+ def defaultPartitioner(rdds: RDD[_]*): Partitioner = {
+ for (r <- rdds if r.partitioner != None) {
+ return r.partitioner.get
}
+ return new HashPartitioner(self.context.defaultParallelism)
}
def lookup(key: K): Seq[V] = {
@@ -398,6 +411,7 @@ class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean)
override def splits = prev.splits
override val partitioner = prev.partitioner
override val dependencies = List(new OneToOneDependency(prev))
+
override def compute(split: Split) = {
prev.iterator(split).toArray
.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
@@ -411,16 +425,15 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
}
-class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => Traversable[U])
+class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
+
override def compute(split: Split) = {
- prev.iterator(split).toStream.flatMap {
- case (k, v) => f(v).map(x => (k, x))
- }.iterator
+ prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) }
}
}
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 8f3f0f5e15..0e45ebd35c 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -26,8 +26,9 @@ class HashPartitioner(partitions: Int) extends Partitioner {
}
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
- partitions: Int, rdd: RDD[(K,V)],
- ascending: Boolean = true)
+ partitions: Int,
+ @transient rdd: RDD[(K,V)],
+ private val ascending: Boolean = true)
extends Partitioner {
private val rangeBounds: Array[K] = {
@@ -65,7 +66,7 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_,_] =>
- r.rangeBounds.sameElements(rangeBounds)
+ r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 22dcc27bad..1191523ccc 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
- def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
+ def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
@@ -361,7 +361,7 @@ class MappedRDD[U: ClassManifest, T: ClassManifest](
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
- f: T => Traversable[U])
+ f: T => TraversableOnce[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 742e60b176..89624eb370 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -76,6 +76,12 @@ object Utils {
}
} catch { case e: IOException => ; }
}
+ // Add a shutdown hook to delete the temp dir when the JVM exits
+ Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
+ override def run() {
+ Utils.deleteRecursively(dir)
+ }
+ })
return dir
}
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index cdf05fe5de..06049749a9 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -33,7 +33,7 @@ object Broadcast extends Logging with Serializable {
def initialize (isMaster__ : Boolean): Unit = synchronized {
if (!initialized) {
val broadcastFactoryClass = System.getProperty(
- "spark.broadcast.factory", "spark.broadcast.DfsBroadcastFactory")
+ "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
@@ -219,4 +219,4 @@ class SpeedTracker extends Serializable {
}
override def toString = sourceToSpeedMap.toString
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
index 341746d18e..b18908f789 100644
--- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
@@ -7,6 +7,6 @@ package spark.broadcast
* entire Spark job.
*/
trait BroadcastFactory {
- def initialize (isMaster: Boolean): Unit
- def newBroadcast[T] (value_ : T, isLocal: Boolean): Broadcast[T]
-} \ No newline at end of file
+ def initialize(isMaster: Boolean): Unit
+ def newBroadcast[T](value_ : T, isLocal: Boolean): Broadcast[T]
+}
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
new file mode 100644
index 0000000000..c9f4aaa89a
--- /dev/null
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -0,0 +1,111 @@
+package spark.broadcast
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+import java.io._
+import java.net._
+import java.util.UUID
+
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
+import spark._
+
+class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean)
+extends Broadcast[T] with Logging with Serializable {
+
+ def value = value_
+
+ HttpBroadcast.synchronized {
+ HttpBroadcast.values.put(uuid, 0, value_)
+ }
+
+ if (!isLocal) {
+ HttpBroadcast.write(uuid, value_)
+ }
+
+ // Called by JVM when deserializing an object
+ private def readObject(in: ObjectInputStream): Unit = {
+ in.defaultReadObject()
+ HttpBroadcast.synchronized {
+ val cachedVal = HttpBroadcast.values.get(uuid, 0)
+ if (cachedVal != null) {
+ value_ = cachedVal.asInstanceOf[T]
+ } else {
+ logInfo("Started reading broadcast variable " + uuid)
+ val start = System.nanoTime
+ value_ = HttpBroadcast.read[T](uuid)
+ HttpBroadcast.values.put(uuid, 0, value_)
+ val time = (System.nanoTime - start) / 1e9
+ logInfo("Reading broadcast variable " + uuid + " took " + time + " s")
+ }
+ }
+ }
+}
+
+class HttpBroadcastFactory extends BroadcastFactory {
+ def initialize(isMaster: Boolean): Unit = HttpBroadcast.initialize(isMaster)
+ def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal)
+}
+
+private object HttpBroadcast extends Logging {
+ val values = SparkEnv.get.cache.newKeySpace()
+
+ private var initialized = false
+
+ private var broadcastDir: File = null
+ private var compress: Boolean = false
+ private var bufferSize: Int = 65536
+ private var serverUri: String = null
+ private var server: HttpServer = null
+
+ def initialize(isMaster: Boolean): Unit = {
+ synchronized {
+ if (!initialized) {
+ bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ compress = System.getProperty("spark.compress", "false").toBoolean
+ if (isMaster) {
+ createServer()
+ }
+ serverUri = System.getProperty("spark.httpBroadcast.uri")
+ initialized = true
+ }
+ }
+ }
+
+ private def createServer() {
+ broadcastDir = Utils.createTempDir()
+ server = new HttpServer(broadcastDir)
+ server.start()
+ serverUri = server.uri
+ System.setProperty("spark.httpBroadcast.uri", serverUri)
+ logInfo("Broadcast server started at " + serverUri)
+ }
+
+ def write(uuid: UUID, value: Any) {
+ val file = new File(broadcastDir, "broadcast-" + uuid)
+ val out: OutputStream = if (compress) {
+ new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
+ } else {
+ new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ }
+ val ser = SparkEnv.get.serializer.newInstance()
+ val serOut = ser.outputStream(out)
+ serOut.writeObject(value)
+ serOut.close()
+ }
+
+ def read[T](uuid: UUID): T = {
+ val url = serverUri + "/broadcast-" + uuid
+ var in = if (compress) {
+ new LZFInputStream(new URL(url).openStream()) // Does its own buffering
+ } else {
+ new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ }
+ val ser = SparkEnv.get.serializer.newInstance()
+ val serIn = ser.inputStream(in)
+ val obj = serIn.readObject[T]()
+ serIn.close()
+ obj
+ }
+}
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
new file mode 100644
index 0000000000..750703de30
--- /dev/null
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -0,0 +1,23 @@
+package spark
+
+import org.scalatest.FunSuite
+
+class BroadcastSuite extends FunSuite {
+ test("basic broadcast") {
+ val sc = new SparkContext("local", "test")
+ val list = List(1, 2, 3, 4)
+ val listBroadcast = sc.broadcast(list)
+ val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
+ assert(results.collect.toSet === Set((1, 10), (2, 10)))
+ sc.stop()
+ }
+
+ test("broadcast variables accessed in multiple threads") {
+ val sc = new SparkContext("local[10]", "test")
+ val list = List(1, 2, 3, 4)
+ val listBroadcast = sc.broadcast(list)
+ val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
+ assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
+ sc.stop()
+ }
+}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
new file mode 100644
index 0000000000..7f7f9493dc
--- /dev/null
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -0,0 +1,101 @@
+package spark
+
+import org.scalatest.FunSuite
+
+import scala.collection.mutable.ArrayBuffer
+
+import SparkContext._
+
+class PartitioningSuite extends FunSuite {
+ test("HashPartitioner equality") {
+ val p2 = new HashPartitioner(2)
+ val p4 = new HashPartitioner(4)
+ val anotherP4 = new HashPartitioner(4)
+ assert(p2 === p2)
+ assert(p4 === p4)
+ assert(p2 != p4)
+ assert(p4 != p2)
+ assert(p4 === anotherP4)
+ assert(anotherP4 === p4)
+ }
+
+ test("RangePartitioner equality") {
+ val sc = new SparkContext("local", "test")
+
+ // Make an RDD where all the elements are the same so that the partition range bounds
+ // are deterministically all the same.
+ val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
+
+ val p2 = new RangePartitioner(2, rdd)
+ val p4 = new RangePartitioner(4, rdd)
+ val anotherP4 = new RangePartitioner(4, rdd)
+ val descendingP2 = new RangePartitioner(2, rdd, false)
+ val descendingP4 = new RangePartitioner(4, rdd, false)
+
+ assert(p2 === p2)
+ assert(p4 === p4)
+ assert(p2 != p4)
+ assert(p4 != p2)
+ assert(p4 === anotherP4)
+ assert(anotherP4 === p4)
+ assert(descendingP2 === descendingP2)
+ assert(descendingP4 === descendingP4)
+ assert(descendingP2 != descendingP4)
+ assert(descendingP4 != descendingP2)
+ assert(p2 != descendingP2)
+ assert(p4 != descendingP4)
+ assert(descendingP2 != p2)
+ assert(descendingP4 != p4)
+
+ sc.stop()
+ }
+
+ test("HashPartitioner not equal to RangePartitioner") {
+ val sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(1 to 10).map(x => (x, x))
+ val rangeP2 = new RangePartitioner(2, rdd)
+ val hashP2 = new HashPartitioner(2)
+ assert(rangeP2 === rangeP2)
+ assert(hashP2 === hashP2)
+ assert(hashP2 != rangeP2)
+ assert(rangeP2 != hashP2)
+ sc.stop()
+ }
+
+ test("partitioner preservation") {
+ val sc = new SparkContext("local", "test")
+
+ val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
+
+ val grouped2 = rdd.groupByKey(2)
+ val grouped4 = rdd.groupByKey(4)
+ val reduced2 = rdd.reduceByKey(_ + _, 2)
+ val reduced4 = rdd.reduceByKey(_ + _, 4)
+
+ assert(rdd.partitioner === None)
+
+ assert(grouped2.partitioner === Some(new HashPartitioner(2)))
+ assert(grouped4.partitioner === Some(new HashPartitioner(4)))
+ assert(reduced2.partitioner === Some(new HashPartitioner(2)))
+ assert(reduced4.partitioner === Some(new HashPartitioner(4)))
+
+ assert(grouped2.groupByKey().partitioner === grouped2.partitioner)
+ assert(grouped2.groupByKey(3).partitioner != grouped2.partitioner)
+ assert(grouped2.groupByKey(2).partitioner === grouped2.partitioner)
+ assert(grouped4.groupByKey().partitioner === grouped4.partitioner)
+ assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
+ assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
+
+ assert(grouped2.join(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner)
+
+ assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
+
+ sc.stop()
+ }
+}
diff --git a/ec2/README b/ec2/README
new file mode 100644
index 0000000000..58dc087a81
--- /dev/null
+++ b/ec2/README
@@ -0,0 +1,4 @@
+This folder contains a script, spark-ec2, for launching Spark clusters on
+Amazon EC2. Usage instructions are available online at:
+
+https://github.com/mesos/spark/wiki/Running-Spark-on-Amazon-EC2
diff --git a/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh b/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh
new file mode 100644
index 0000000000..1f76e61b2f
--- /dev/null
+++ b/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+# These variables are automatically filled in by the mesos-ec2 script.
+export MESOS_MASTERS="{{master_list}}"
+export MESOS_SLAVES="{{slave_list}}"
+export MESOS_ZOO_LIST="{{zoo_list}}"
+export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
+export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
diff --git a/ec2/spark-ec2 b/ec2/spark-ec2
new file mode 100755
index 0000000000..2714f19ba3
--- /dev/null
+++ b/ec2/spark-ec2
@@ -0,0 +1,20 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd "`dirname $0`"
+PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py $@
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
new file mode 100755
index 0000000000..f8a78ac3f8
--- /dev/null
+++ b/ec2/spark_ec2.py
@@ -0,0 +1,539 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import with_statement
+
+import boto
+import logging
+import os
+import random
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import urllib2
+from optparse import OptionParser
+from sys import stderr
+from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
+
+
+# A static URL from which to figure out the latest Mesos EC2 AMI
+LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.5"
+
+
+# Configure and parse our command-line arguments
+def parse_args():
+ parser = OptionParser(usage="spark-ec2 [options] <action> <cluster_name>"
+ + "\n\n<action> can be: launch, destroy, login, stop, start, get-master",
+ add_help_option=False)
+ parser.add_option("-h", "--help", action="help",
+ help="Show this help message and exit")
+ parser.add_option("-s", "--slaves", type="int", default=1,
+ help="Number of slaves to launch (default: 1)")
+ parser.add_option("-w", "--wait", type="int", default=90,
+ help="Seconds to wait for nodes to start (default: 90)")
+ parser.add_option("-k", "--key-pair",
+ help="Key pair to use on instances")
+ parser.add_option("-i", "--identity-file",
+ help="SSH private key file to use for logging into instances")
+ parser.add_option("-t", "--instance-type", default="m1.large",
+ help="Type of instance to launch (default: m1.large). " +
+ "WARNING: must be 64-bit; small instances won't work")
+ parser.add_option("-m", "--master-instance-type", default="",
+ help="Master instance type (leave empty for same as instance-type)")
+ parser.add_option("-z", "--zone", default="us-east-1b",
+ help="Availability zone to launch instances in")
+ parser.add_option("-a", "--ami", default="latest",
+ help="Amazon Machine Image ID to use, or 'latest' to use latest " +
+ "availabe AMI (default: latest)")
+ parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
+ help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
+ "the given local address (for use with login)")
+ parser.add_option("--resume", action="store_true", default=False,
+ help="Resume installation on a previously launched cluster " +
+ "(for debugging)")
+ parser.add_option("--ebs-vol-size", metavar="SIZE", type="int", default=0,
+ help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
+ "/vol. The volumes will be deleted when the instances terminate. " +
+ "Only possible on EBS-backed AMIs.")
+ parser.add_option("--swap", metavar="SWAP", type="int", default=1024,
+ help="Swap space to set up per node, in MB (default: 1024)")
+ parser.add_option("--spot-price", metavar="PRICE", type="float",
+ help="If specified, launch slaves as spot instances with the given " +
+ "maximum price (in dollars)")
+ (opts, args) = parser.parse_args()
+ if len(args) != 2:
+ parser.print_help()
+ sys.exit(1)
+ (action, cluster_name) = args
+ if opts.identity_file == None and action in ['launch', 'login']:
+ print >> stderr, ("ERROR: The -i or --identity-file argument is " +
+ "required for " + action)
+ sys.exit(1)
+ if os.getenv('AWS_ACCESS_KEY_ID') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
+ "must be set")
+ sys.exit(1)
+ if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
+ "must be set")
+ sys.exit(1)
+ return (opts, action, cluster_name)
+
+
+# Get the EC2 security group of the given name, creating it if it doesn't exist
+def get_or_make_group(conn, name):
+ groups = conn.get_all_security_groups()
+ group = [g for g in groups if g.name == name]
+ if len(group) > 0:
+ return group[0]
+ else:
+ print "Creating security group " + name
+ return conn.create_security_group(name, "Mesos EC2 group")
+
+
+# Wait for a set of launched instances to exit the "pending" state
+# (i.e. either to start running or to fail and be terminated)
+def wait_for_instances(conn, instances):
+ while True:
+ for i in instances:
+ i.update()
+ if len([i for i in instances if i.state == 'pending']) > 0:
+ time.sleep(5)
+ else:
+ return
+
+
+# Check whether a given EC2 instance object is in a state we consider active,
+# i.e. not terminating or terminated. We count both stopping and stopped as
+# active since we can restart stopped clusters.
+def is_active(instance):
+ return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
+
+
+# Launch a cluster of the given name, by setting up its security groups,
+# and then starting new instances in them.
+# Returns a tuple of EC2 reservation objects for the master, slave
+# and zookeeper instances (in that order).
+# Fails if there already instances running in the cluster's groups.
+def launch_cluster(conn, opts, cluster_name):
+ print "Setting up security groups..."
+ master_group = get_or_make_group(conn, cluster_name + "-master")
+ slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+ zoo_group = get_or_make_group(conn, cluster_name + "-zoo")
+ if master_group.rules == []: # Group was just now created
+ master_group.authorize(src_group=master_group)
+ master_group.authorize(src_group=slave_group)
+ master_group.authorize(src_group=zoo_group)
+ master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
+ master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
+ master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
+ master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
+ master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
+ master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
+ if slave_group.rules == []: # Group was just now created
+ slave_group.authorize(src_group=master_group)
+ slave_group.authorize(src_group=slave_group)
+ slave_group.authorize(src_group=zoo_group)
+ slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
+ slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
+ slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
+ slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
+ slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
+ slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
+ if zoo_group.rules == []: # Group was just now created
+ zoo_group.authorize(src_group=master_group)
+ zoo_group.authorize(src_group=slave_group)
+ zoo_group.authorize(src_group=zoo_group)
+ zoo_group.authorize('tcp', 22, 22, '0.0.0.0/0')
+ zoo_group.authorize('tcp', 2181, 2181, '0.0.0.0/0')
+ zoo_group.authorize('tcp', 2888, 2888, '0.0.0.0/0')
+ zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
+
+ # Check if instances are already running in our groups
+ print "Checking for running cluster..."
+ reservations = conn.get_all_instances()
+ for res in reservations:
+ group_names = [g.id for g in res.groups]
+ if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names:
+ active = [i for i in res.instances if is_active(i)]
+ if len(active) > 0:
+ print >> stderr, ("ERROR: There are already instances running in " +
+ "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
+ sys.exit(1)
+
+ if opts.ami == "latest":
+ # Figure out the latest AMI from our static URL
+ try:
+ opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip()
+ print "Latest Spark AMI: " + opts.ami
+ except:
+ print >> stderr, "Could not read " + LATEST_AMI_URL
+
+ print "Launching instances..."
+
+ try:
+ image = conn.get_all_images(image_ids=[opts.ami])[0]
+ except:
+ print >> stderr, "Could not find AMI " + opts.ami
+ sys.exit(1)
+
+ # Create block device mapping so that we can add an EBS volume if asked to
+ block_map = BlockDeviceMapping()
+ if opts.ebs_vol_size > 0:
+ device = EBSBlockDeviceType()
+ device.size = opts.ebs_vol_size
+ device.delete_on_termination = True
+ block_map["/dev/sdv"] = device
+
+ # Launch slaves
+ if opts.spot_price != None:
+ # Launch spot instances with the requested price
+ print ("Requesting %d slaves as spot instances with price $%.3f" %
+ (opts.slaves, opts.spot_price))
+ slave_reqs = conn.request_spot_instances(
+ price = opts.spot_price,
+ image_id = opts.ami,
+ launch_group = "launch-group-%s" % cluster_name,
+ placement = opts.zone,
+ count = opts.slaves,
+ key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ block_device_map = block_map)
+ my_req_ids = [req.id for req in slave_reqs]
+ print "Waiting for spot instances to be granted..."
+ while True:
+ time.sleep(10)
+ reqs = conn.get_all_spot_instance_requests()
+ id_to_req = {}
+ for r in reqs:
+ id_to_req[r.id] = r
+ active = 0
+ instance_ids = []
+ for i in my_req_ids:
+ if id_to_req[i].state == "active":
+ active += 1
+ instance_ids.append(id_to_req[i].instance_id)
+ if active == opts.slaves:
+ print "All %d slaves granted" % opts.slaves
+ reservations = conn.get_all_instances(instance_ids)
+ slave_nodes = []
+ for r in reservations:
+ slave_nodes += r.instances
+ break
+ else:
+ print "%d of %d slaves granted, waiting longer" % (active, opts.slaves)
+ else:
+ # Launch non-spot instances
+ slave_res = image.run(key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ placement = opts.zone,
+ min_count = opts.slaves,
+ max_count = opts.slaves,
+ block_device_map = block_map)
+ slave_nodes = slave_res.instances
+ print "Launched slaves, regid = " + slave_res.id
+
+ # Launch masters
+ master_type = opts.master_instance_type
+ if master_type == "":
+ master_type = opts.instance_type
+ master_res = image.run(key_name = opts.key_pair,
+ security_groups = [master_group],
+ instance_type = master_type,
+ placement = opts.zone,
+ min_count = 1,
+ max_count = 1,
+ block_device_map = block_map)
+ master_nodes = master_res.instances
+ print "Launched master, regid = " + master_res.id
+
+ zoo_nodes = []
+
+ # Return all the instances
+ return (master_nodes, slave_nodes, zoo_nodes)
+
+
+# Get the EC2 instances in an existing cluster if available.
+# Returns a tuple of lists of EC2 instance objects for the masters,
+# slaves and zookeeper nodes (in that order).
+def get_existing_cluster(conn, opts, cluster_name):
+ print "Searching for existing cluster " + cluster_name + "..."
+ reservations = conn.get_all_instances()
+ master_nodes = []
+ slave_nodes = []
+ zoo_nodes = []
+ for res in reservations:
+ active = [i for i in res.instances if is_active(i)]
+ if len(active) > 0:
+ group_names = [g.name for g in res.groups]
+ if group_names == [cluster_name + "-master"]:
+ master_nodes += res.instances
+ elif group_names == [cluster_name + "-slaves"]:
+ slave_nodes += res.instances
+ elif group_names == [cluster_name + "-zoo"]:
+ zoo_nodes += res.instances
+ if master_nodes != [] and slave_nodes != []:
+ print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
+ (len(master_nodes), len(slave_nodes), len(zoo_nodes)))
+ return (master_nodes, slave_nodes, zoo_nodes)
+ else:
+ if master_nodes == [] and slave_nodes != []:
+ print "ERROR: Could not find master in group " + cluster_name + "-master"
+ elif master_nodes != [] and slave_nodes == []:
+ print "ERROR: Could not find slaves in group " + cluster_name + "-slaves"
+ else:
+ print "ERROR: Could not find any existing cluster"
+ sys.exit(1)
+
+
+# Deploy configuration files and run setup scripts on a newly launched
+# or started EC2 cluster.
+def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
+ print "Deploying files to master..."
+ deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, zoo_nodes)
+ master = master_nodes[0].public_dns_name
+ if deploy_ssh_key:
+ print "Copying SSH key %s to master..." % opts.identity_file
+ ssh(master, opts, 'mkdir -p /root/.ssh')
+ scp(master, opts, opts.identity_file, '/root/.ssh/id_rsa')
+ print "Running setup on master..."
+ ssh(master, opts, "chmod u+x mesos-ec2/setup")
+ ssh(master, opts, "mesos-ec2/setup %s %s %s %s" %
+ ("generic", "none", "master", opts.swap))
+ print "Done!"
+
+
+# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
+def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
+ print "Waiting for instances to start up..."
+ time.sleep(5)
+ wait_for_instances(conn, master_nodes)
+ wait_for_instances(conn, slave_nodes)
+ if zoo_nodes != []:
+ wait_for_instances(conn, zoo_nodes)
+ print "Waiting %d more seconds..." % wait_secs
+ time.sleep(wait_secs)
+
+
+# Get number of local disks available for a given EC2 instance type.
+def get_num_disks(instance_type):
+ # From http://docs.amazonwebservices.com/AWSEC2/latest/UserGuide/index.html?InstanceStorage.html
+ disks_by_instance = {
+ "m1.small": 1,
+ "m1.large": 2,
+ "m1.xlarge": 4,
+ "t1.micro": 1,
+ "c1.medium": 1,
+ "c1.xlarge": 4,
+ "m2.xlarge": 1,
+ "m2.2xlarge": 1,
+ "m2.4xlarge": 2,
+ "cc1.4xlarge": 2,
+ "cc2.8xlarge": 4,
+ "cg1.4xlarge": 2
+ }
+ if instance_type in disks_by_instance:
+ return disks_by_instance[instance_type]
+ else:
+ print >> stderr, ("WARNING: Don't know number of disks on instance type %s; assuming 1"
+ % instance_type)
+ return 1
+
+
+# Deploy the configuration file templates in a given local directory to
+# a cluster, filling in any template parameters with information about the
+# cluster (e.g. lists of masters and slaves). Files are only deployed to
+# the first master instance in the cluster, and we expect the setup
+# script to be run on that instance to copy them to other nodes.
+def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
+ active_master = master_nodes[0].public_dns_name
+
+ num_disks = get_num_disks(opts.instance_type)
+ hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
+ mapred_local_dirs = "/mnt/hadoop/mrlocal"
+ if num_disks > 1:
+ for i in range(2, num_disks + 1):
+ hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
+ mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
+
+ if zoo_nodes != []:
+ zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
+ cluster_url = "zoo://" + ",".join(
+ ["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
+ else:
+ zoo_list = "NONE"
+ cluster_url = "%s:5050" % active_master
+
+ template_vars = {
+ "master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
+ "active_master": active_master,
+ "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
+ "zoo_list": zoo_list,
+ "cluster_url": cluster_url,
+ "hdfs_data_dirs": hdfs_data_dirs,
+ "mapred_local_dirs": mapred_local_dirs
+ }
+
+ # Create a temp directory in which we will place all the files to be
+ # deployed after we substitue template parameters in them
+ tmp_dir = tempfile.mkdtemp()
+ for path, dirs, files in os.walk(root_dir):
+ if path.find(".svn") == -1:
+ dest_dir = os.path.join('/', path[len(root_dir):])
+ local_dir = tmp_dir + dest_dir
+ if not os.path.exists(local_dir):
+ os.makedirs(local_dir)
+ for filename in files:
+ if filename[0] not in '#.~' and filename[-1] != '~':
+ dest_file = os.path.join(dest_dir, filename)
+ local_file = tmp_dir + dest_file
+ with open(os.path.join(path, filename)) as src:
+ with open(local_file, "w") as dest:
+ text = src.read()
+ for key in template_vars:
+ text = text.replace("{{" + key + "}}", template_vars[key])
+ dest.write(text)
+ dest.close()
+ # rsync the whole directory over to the master machine
+ command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
+ "'%s/' 'root@%s:/'") % (opts.identity_file, tmp_dir, active_master))
+ subprocess.check_call(command, shell=True)
+ # Remove the temp directory we created above
+ shutil.rmtree(tmp_dir)
+
+
+# Copy a file to a given host through scp, throwing an exception if scp fails
+def scp(host, opts, local_file, dest_file):
+ subprocess.check_call(
+ "scp -q -o StrictHostKeyChecking=no -i %s '%s' 'root@%s:%s'" %
+ (opts.identity_file, local_file, host, dest_file), shell=True)
+
+
+# Run a command on a host through ssh, throwing an exception if ssh fails
+def ssh(host, opts, command):
+ subprocess.check_call(
+ "ssh -t -o StrictHostKeyChecking=no -i %s root@%s '%s'" %
+ (opts.identity_file, host, command), shell=True)
+
+
+def main():
+ (opts, action, cluster_name) = parse_args()
+ conn = boto.connect_ec2()
+
+ # Select an AZ at random if it was not specified.
+ if opts.zone == "":
+ opts.zone = random.choice(conn.get_all_zones()).name
+
+ if action == "launch":
+ if opts.resume:
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name)
+ else:
+ (master_nodes, slave_nodes, zoo_nodes) = launch_cluster(
+ conn, opts, cluster_name)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True)
+
+ elif action == "destroy":
+ response = raw_input("Are you sure you want to destroy the cluster " +
+ cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
+ "Destroy cluster " + cluster_name + " (y/N): ")
+ if response == "y":
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name)
+ print "Terminating master..."
+ for inst in master_nodes:
+ inst.terminate()
+ print "Terminating slaves..."
+ for inst in slave_nodes:
+ inst.terminate()
+ if zoo_nodes != []:
+ print "Terminating zoo..."
+ for inst in zoo_nodes:
+ inst.terminate()
+
+ elif action == "login":
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name)
+ master = master_nodes[0].public_dns_name
+ print "Logging into master " + master + "..."
+ proxy_opt = ""
+ if opts.proxy_port != None:
+ proxy_opt = "-D " + opts.proxy_port
+ subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s root@%s" %
+ (opts.identity_file, proxy_opt, master), shell=True)
+
+ elif action == "get-master":
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)
+ print master_nodes[0].public_dns_name
+
+ elif action == "stop":
+ response = raw_input("Are you sure you want to stop the cluster " +
+ cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
+ "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
+ "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
+ "Stop cluster " + cluster_name + " (y/N): ")
+ if response == "y":
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name)
+ print "Stopping master..."
+ for inst in master_nodes:
+ if inst.state not in ["shutting-down", "terminated"]:
+ inst.stop()
+ print "Stopping slaves..."
+ for inst in slave_nodes:
+ if inst.state not in ["shutting-down", "terminated"]:
+ inst.stop()
+ if zoo_nodes != []:
+ print "Stopping zoo..."
+ for inst in zoo_nodes:
+ if inst.state not in ["shutting-down", "terminated"]:
+ inst.stop()
+
+ elif action == "start":
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name)
+ print "Starting slaves..."
+ for inst in slave_nodes:
+ if inst.state not in ["shutting-down", "terminated"]:
+ inst.start()
+ print "Starting master..."
+ for inst in master_nodes:
+ if inst.state not in ["shutting-down", "terminated"]:
+ inst.start()
+ if zoo_nodes != []:
+ print "Starting zoo..."
+ for inst in zoo_nodes:
+ if inst.state not in ["shutting-down", "terminated"]:
+ inst.start()
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
+
+ else:
+ print >> stderr, "Invalid action: %s" % action
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ logging.basicConfig()
+ main()
diff --git a/ec2/third_party/boto-2.4.1.zip b/ec2/third_party/boto-2.4.1.zip
new file mode 100644
index 0000000000..49886b89ae
--- /dev/null
+++ b/ec2/third_party/boto-2.4.1.zip
Binary files differ
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a2faf7399c..3ce6a086c1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -20,7 +20,7 @@ object SparkBuild extends Build {
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
- version := "0.4-SNAPSHOT",
+ version := "0.5.1-SNAPSHOT",
scalaVersion := "2.9.1",
scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index a4bca02d21..b3af4b1e20 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
- /___/ .__/\_,_/_/ /_/\_\ version 0.4
+ /___/ .__/\_,_/_/ /_/\_\ version 0.5.1-SNAPSHOT
/_/
""")
import Properties._