From c94e9cc54aebddc20cc4ab13ca106781c7298642 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 16 Sep 2012 20:46:46 -0700 Subject: Add Java Programming Guide; fix broken doc links. --- docs/bagel-programming-guide.md | 2 +- docs/index.md | 2 +- docs/java-programming-guide.md | 170 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 171 insertions(+), 3 deletions(-) diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md index b133376a97..0c925c176c 100644 --- a/docs/bagel-programming-guide.md +++ b/docs/bagel-programming-guide.md @@ -19,7 +19,7 @@ To write a Bagel application, you will need to add Spark, its dependencies, and ## Programming Model -Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages. +Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages. For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to. diff --git a/docs/index.md b/docs/index.md index 3df638f629..69d55e505e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -54,7 +54,7 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`). # Where to Go from Here -* [Spark Programming Guide]({{HOME_PATH}}programming-guide.html): how to get started using Spark, and details on the API +* [Spark Programming Guide]({{HOME_PATH}}scala-programming-guide.html): how to get started using Spark, and details on the API * [Running Spark on Amazon EC2]({{HOME_PATH}}ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes * [Running Spark on Mesos]({{HOME_PATH}}running-on-mesos.html): instructions on how to deploy to a private cluster * [Running Spark on YARN]({{HOME_PATH}}running-on-yarn.html): instructions on how to run Spark on top of a YARN cluster diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md index e3f644d748..c63448a965 100644 --- a/docs/java-programming-guide.md +++ b/docs/java-programming-guide.md @@ -2,4 +2,172 @@ layout: global title: Java Programming Guide --- -TODO: Write Java programming guide! + +The Spark Java API +([spark.api.java]({{HOME_PATH}}api/core/index.html#spark.api.java.package)) defines +[`JavaSparkContext`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaSparkContext) and +[`JavaRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaRDD) clases, +which support +the same methods as their Scala counterparts but take Java functions and return +Java data and collection types. + +Because Java API is similar to the Scala API, this programming guide only +covers Java-specific features; +the [Scala Programming Guide]({{HOME_PATH}}scala-programming-guide.html) +provides a more general introduction to Spark concepts and should be read +first. + + +# Key differences in the Java API +There are a few key differences between the Java and Scala APIs: + +* Java does not support anonymous or first-class functions, so functions must + be implemented by extending the + [`spark.api.java.function.Function`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function), + [`Function2`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function2), etc. + classes. +* To maintain type safety, the Java API defines specialized Function and RDD + classes for key-value pairs and doubles. +* RDD methods like `collect` and `countByKey` return Java collections types, + such as `java.util.List` and `java.util.Map`. + + +## RDD Classes +Spark defines additional operations on RDDs of doubles and key-value pairs, such +as `stdev` and `join`. + +In the Scala API, these methods are automatically added using Scala's +[implicit conversions](http://www.scala-lang.org/node/130) mechanism. + +In the Java API, the extra methods are defined in +[`JavaDoubleRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaDoubleRDD) and +[`JavaPairRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaPairRDD) +classes. RDD methods like `map` are overloaded by specialized `PairFunction` +and `DoubleFunction` classes, allowing them to return RDDs of the appropriate +types. Common methods like `filter` and `sample` are implemented by +each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`, +etc (this acheives the "same-result-type" principle used by the [Scala collections +framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)). + +## Function Classes + +The following table lists the function classes used by the Java API. Each +class has a single abstract method, `call()`, that must be implemented. + + + + + + + + + + + + + +
ClassFunction Type
Function<T, R>T -> R
DoubleFunction<T>T -> Double
PairFunction<T, K, V>T -> Tuple2<K, V>
FlatMapFunction<T, R>T -> Iterable<R>
DoubleFlatMapFunction<T>T -> Iterable<Double>
PairFlatMapFunction<T, K, V>T -> Iterable<Tuple2<K, V>>
Function2<T1, T2, R>T1, T2 -> R (function of two arguments)
+ +# Other Features +The Java API supports other Spark features, including +[accumulators]({{HOME_PATH}}scala-programming-guide.html#accumulators), +[broadcast variables]({{HOME_PATH}}scala-programming-guide.html#broadcast_variables), and +[caching]({{HOME_PATH}}scala-programming-guide.html#caching). + +# Example + +As an example, we will implement word count using the Java API. + +{% highlight java %} +import spark.api.java.*; +import spark.api.java.function.*; + +JavaSparkContext sc = new JavaSparkContext(...); +JavaRDD lines = ctx.textFile("hdfs://..."); +JavaRDD words = lines.flatMap( + new FlatMapFunction() { + public Iterable call(String s) { + return Arrays.asList(s.split(" ")); + } + } +); +{% endhighlight %} + +The word count program starts by creating a `JavaSparkContext`, which accepts +the same parameters as its Scala counterpart. `JavaSparkContext` supports the +same data loading methods as the regular `SparkContext`; here, `textFile` +loads lines from text files stored in HDFS. + +To split the lines into words, we use `flatMap` to split each line on +whitespace. `flatMap` is passed a `FlatMapFunction` that accepts a string and +returns an `java.lang.Iterable` of strings. + +Here, the `FlatMapFunction` was created inline; another option is to subclass +`FlatMapFunction` and pass an instance to `flatMap`: + +{% highlight java %} +class Split extends FlatMapFunction { + public Iterable call(String s) { + return Arrays.asList(s.split(" ")); + } +); +JavaRDD words = lines.flatMap(new Split()); +{% endhighlight %} + +Continuing with the word count example, we map each word to a `(word, 1)` pair: + +{% highlight java %} +import scala.Tuple2; +JavaPairRDD ones = words.map( + new PairFunction() { + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + } +); +{% endhighlight %} + +Note that `map` was passed a `PairFunction` and +returned a `JavaPairRDD`. + + + +To finish the word count program, we will use `reduceByKey` to count the +occurrences of each word: + +{% highlight java %} +JavaPairRDD counts = ones.reduceByKey( + new Function2() { + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + } +); +{% endhighlight %} + +Here, `reduceByKey` is passed a `Function2`, which implements a function with +two arguments. The resulting `JavaPairRDD` contains `(word, count)` pairs. + +In this example, we explicitly showed each intermediate RDD. It is also +possible to chain the RDD transformations, so the word count example could also +be written as: + +{% highlight java %} +JavaPairRDD counts = lines.flatMap( + ... + ).map( + ... + ).reduceByKey( + ... + ); +{% endhighlight %} +There is no performance difference between these approaches; the choice is +a matter of style. + + +# Where to go from here +Spark includes several sample jobs using the Java API in +`examples/src/main/java`. You can run them by passing the class name to the +`run` script included in Spark -- for example, `./run +spark.examples.JavaWordCount`. Each example program prints usage help when run +without any arguments. -- cgit v1.2.3 From ca64d16a2dc383cccfd32c92cb2134163727bd9c Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 17 Sep 2012 10:08:37 -0700 Subject: When a file is downloaded, make it executable. That's neccsary for scripts (e.g. in Shark) --- core/src/main/scala/spark/Utils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 07aa18e540..5a3f8bde43 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -163,6 +163,8 @@ object Utils extends Logging { logInfo("Untarring " + filename) Utils.execute(Seq("tar", "-xf", filename), targetDir) } + // Make the file executable - That's necessary for scripts + FileUtil.chmod(filename, "a+x") } /** -- cgit v1.2.3 From 397d3816e1ee0351cd2814dc081f368fe45094c0 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 19 Sep 2012 12:31:45 -0700 Subject: Separated ShuffledRDD into multiple classes: RepartitionShuffledRDD, ShuffledSortedRDD, and ShuffledAggregatedRDD. --- core/src/main/scala/spark/Aggregator.scala | 4 +- core/src/main/scala/spark/PairRDDFunctions.scala | 87 ++++++++++++---------- core/src/main/scala/spark/ShuffledRDD.scala | 72 ++++++++++++++++-- .../scala/spark/scheduler/ShuffleMapTask.scala | 9 ++- core/src/test/scala/spark/ShuffleSuite.scala | 18 ++--- 5 files changed, 128 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 6516bea157..b0daa70cfd 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -9,9 +9,9 @@ package spark * known as map-side aggregations. When set to false, * mergeCombiners function is not used. */ -class Aggregator[K, V, C] ( +case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, val mergeCombiners: (C, C) => C, val mapSideCombine: Boolean = true) - extends Serializable + diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 64018f8c6b..aa1d00c63c 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -1,11 +1,10 @@ package spark import java.io.EOFException -import java.net.URL import java.io.ObjectInputStream +import java.net.URL +import java.util.{Date, HashMap => JHashMap} import java.util.concurrent.atomic.AtomicLong -import java.util.{HashMap => JHashMap} -import java.util.Date import java.text.SimpleDateFormat import scala.collection.Map @@ -50,9 +49,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - partitioner: Partitioner): RDD[(K, C)] = { - val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - new ShuffledRDD(self, aggregator, partitioner) + partitioner: Partitioner, + mapSideCombine: Boolean = true): RDD[(K, C)] = { + val aggregator = + if (mapSideCombine) { + new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + } else { + // Don't apply map-side combiner. + // A sanity check to make sure mergeCombiners is not defined. + assert(mergeCombiners == null) + new Aggregator[K, V, C](createCombiner, mergeValue, null, false) + } + new ShuffledAggregatedRDD(self, aggregator, partitioner) } def combineByKey[C](createCombiner: V => C, @@ -65,7 +73,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } - + def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { val map = new JHashMap[K, V] @@ -116,13 +124,24 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( groupByKey(new HashPartitioner(numSplits)) } - def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner) - bufs.flatMapValues(buf => buf) + /** + * Repartition the RDD using the specified partitioner. If mapSideCombine is + * true, Spark will group values of the same key together on the map side + * before the repartitioning. If a large number of duplicated keys are + * expected, and the size of the keys are large, mapSideCombine should be set + * to true. + */ + def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = { + if (mapSideCombine) { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, partitioner) + bufs.flatMapValues(buf => buf) + } else { + new RepartitionShuffledRDD(self, partitioner) + } } def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { @@ -194,17 +213,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) - + def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } - + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } - + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), @@ -215,12 +234,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } } - + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], - other1.asInstanceOf[RDD[(_, _)]], + other1.asInstanceOf[RDD[(_, _)]], other2.asInstanceOf[RDD[(_, _)]]), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) @@ -289,7 +308,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } - + def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } @@ -363,7 +382,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } - + def saveAsHadoopDataset(conf: JobConf) { val outputFormatClass = conf.getOutputFormat val keyClass = conf.getOutputKeyClass @@ -377,7 +396,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( if (valueClass == null) { throw new SparkException("Output value class not set") } - + logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") val writer = new HadoopWriter(conf) @@ -390,14 +409,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( writer.setup(context.stageId, context.splitId, attemptNumber) writer.open() - + var count = 0 while(iter.hasNext) { val record = iter.next count += 1 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) } - + writer.close() writer.commit() } @@ -413,28 +432,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) - extends Logging + extends Logging with Serializable { def sortByKey(ascending: Boolean = true): RDD[(K,V)] = { - val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending)) - new SortedRDD(rangePartitionedRDD, ascending) + new ShuffledSortedRDD(self, ascending) } } -class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean) - extends RDD[(K, V)](prev.context) { - - override def splits = prev.splits - override val partitioner = prev.partitioner - override val dependencies = List(new OneToOneDependency(prev)) - - override def compute(split: Split) = { - prev.iterator(split).toArray - .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator - } -} - class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) @@ -444,7 +449,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)] class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) extends RDD[(K, U)](prev.context) { - + override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 3616d8e47e..a7346060b3 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -1,29 +1,89 @@ package spark +import scala.collection.mutable.ArrayBuffer import java.util.{HashMap => JHashMap} + class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx override def hashCode(): Int = idx } -class ShuffledRDD[K, V, C]( + +/** + * The resulting RDD from a shuffle (e.g. repartitioning of data). + */ +abstract class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], aggregator: Aggregator[K, V, C], - part : Partitioner) + part : Partitioner) extends RDD[(K, C)](parent.context) { - //override val partitioner = Some(part) + override val partitioner = Some(part) - + @transient val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) override def splits = splits_ - + override def preferredLocations(split: Split) = Nil - + val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) override val dependencies = List(dep) +} + + +/** + * Repartition a key-value pair RDD. + */ +class RepartitionShuffledRDD[K, V]( + @transient parent: RDD[(K, V)], + part : Partitioner) + extends ShuffledRDD[K, V, V]( + parent, + Aggregator[K, V, V](null, null, null, false), + part) { + + override def compute(split: Split): Iterator[(K, V)] = { + val buf = new ArrayBuffer[(K, V)] + val fetcher = SparkEnv.get.shuffleFetcher + def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } + fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) + buf.iterator + } +} + + +/** + * A sort-based shuffle (that doesn't apply aggregation). It does so by first + * repartitioning the RDD by range, and then sort within each range. + */ +class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V]( + @transient parent: RDD[(K, V)], + ascending: Boolean) + extends RepartitionShuffledRDD[K, V]( + parent, + new RangePartitioner(parent.splits.size, parent, ascending)) { + + override def compute(split: Split): Iterator[(K, V)] = { + // By separating this from RepartitionShuffledRDD, we avoided a + // buf.iterator.toArray call, thus avoiding building up the buffer twice. + val buf = new ArrayBuffer[(K, V)] + def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } + SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) + buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator + } +} + + +/** + * The resulting RDD from shuffle and running (hash-based) aggregation. + */ +class ShuffledAggregatedRDD[K, V, C]( + @transient parent: RDD[(K, V)], + aggregator: Aggregator[K, V, C], + part : Partitioner) + extends ShuffledRDD[K, V, C](parent, aggregator, part) { override def compute(split: Split): Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 3bcc588015..745aa0c939 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -44,7 +44,8 @@ object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { + def serializeFileSet( + set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { val old = cache.get(stageId) if (old != null) { return old @@ -59,7 +60,6 @@ object ShuffleMapTask { } } - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = { synchronized { val loader = Thread.currentThread.getContextClassLoader @@ -113,7 +113,8 @@ class ShuffleMapTask( out.writeInt(bytes.length) out.write(bytes) - val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache) + val fileSetBytes = ShuffleMapTask.serializeFileSet( + fileSet, stageId, ShuffleMapTask.fileSetCache) out.writeInt(fileSetBytes.length) out.write(fileSetBytes) val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache) @@ -172,7 +173,7 @@ class ShuffleMapTask( buckets.map(_.iterator) } else { // No combiners (no map-side aggregation). Simply partition the map output. - val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)]) + val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) for (elem <- rdd.iterator(split)) { val pair = elem.asInstanceOf[(Any, Any)] val bucketId = partitioner.getPartition(pair._1) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index f622c413f7..9d7e2591f1 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() sc = null } } - + test("groupByKey") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { val valuesFor2 = groups.find(_._1 == 2).get._2 assert(valuesFor2.toList.sorted === List(1)) } - + test("groupByKey with many output partitions") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { (4, (ArrayBuffer(), ArrayBuffer('w'))) )) } - + test("zero-partition RDD") { sc = new SparkContext("local", "test") val emptyDir = Files.createTempDir() @@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { assert(file.splits.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } test("map-side combine") { @@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { _+_, _+_, false) - val shuffledRdd = new ShuffledRDD( + val shuffledRdd = new ShuffledAggregatedRDD( pairs, aggregator, new HashPartitioner(2)) assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1))) @@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // not see an exception because mergeCombine should not have been called. val aggregatorWithException = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) - val shuffledRdd1 = new ShuffledRDD( + val shuffledRdd1 = new ShuffledAggregatedRDD( pairs, aggregatorWithException, new HashPartitioner(2)) assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) @@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // expect to see an exception thrown. val aggregatorWithException1 = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) - val shuffledRdd2 = new ShuffledRDD( + val shuffledRdd2 = new ShuffledAggregatedRDD( pairs, aggregatorWithException1, new HashPartitioner(2)) evaluating { shuffledRdd2.collect() } should produce [SparkException] } -- cgit v1.2.3 From 6b5980da796e0204a7735a31fb454f312bc9daac Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 19 Sep 2012 15:36:16 -0700 Subject: Set a limited number of retry in standalone deploy mode. --- .../main/scala/spark/deploy/master/JobInfo.scala | 9 ++++++ .../main/scala/spark/deploy/master/JobState.scala | 2 ++ .../main/scala/spark/deploy/master/Master.scala | 37 ++++++++++++++-------- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 31d48b82b9..4c81a1b447 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va } def coresLeft: Int = desc.cores - coresGranted + + private var _retryCount = 0 + + def retryCount = _retryCount + + def incrementRetryCount = { + _retryCount += 1 + _retryCount + } } diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala index 50b0c6f95b..8d458ac39c 100644 --- a/core/src/main/scala/spark/deploy/master/JobState.scala +++ b/core/src/main/scala/spark/deploy/master/JobState.scala @@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") type JobState = Value val WAITING, RUNNING, FINISHED, FAILED = Value + + val MAX_NUM_RETRY = 10 } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c98dddea7b..5cc73633ab 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -1,19 +1,18 @@ package spark.deploy.master -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} - import akka.actor._ -import spark.{Logging, Utils} -import spark.util.AkkaUtils +import akka.actor.Terminated +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} + import java.text.SimpleDateFormat import java.util.Date -import akka.remote.RemoteClientLifeCycleEvent + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + import spark.deploy._ -import akka.remote.RemoteClientShutdown -import akka.remote.RemoteClientDisconnected -import spark.deploy.RegisterWorker -import spark.deploy.RegisterWorkerFailed -import akka.actor.Terminated +import spark.{Logging, SparkException, Utils} +import spark.util.AkkaUtils + class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs @@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { exec.state = state exec.job.actor ! ExecutorUpdated(execId, state, message) if (ExecutorState.isFinished(state)) { + val jobInfo = idToJob(jobId) // Remove this executor from the worker and job logInfo("Removing executor " + exec.fullId + " because it is " + state) - idToJob(jobId).removeExecutor(exec) + jobInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) - // TODO: the worker would probably want to restart the executor a few times - schedule() + + // Only retry certain number of times so we don't go into an infinite loop. + if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) { + schedule() + } else { + val e = new SparkException("Job %s wth ID %s failed %d times.".format( + jobInfo.desc.name, jobInfo.id, jobInfo.retryCount)) + logError(e.getMessage, e) + throw e + //System.exit(1) + } } } case None => @@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { addressToWorker.get(address).foreach(removeWorker) addressToJob.get(address).foreach(removeJob) } - + case RequestMasterState => { sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList) } -- cgit v1.2.3 From 8feb5caacd87d72dc15a45ec3c5aea3592de80cb Mon Sep 17 00:00:00 2001 From: root Date: Fri, 21 Sep 2012 05:13:19 +0000 Subject: Fixed an issue with ordering of classloader setup that was causing Java deserializer to break --- core/src/main/scala/spark/executor/Executor.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8f975c52d4..9999b6ba80 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -38,6 +38,10 @@ class Executor extends Logging { System.setProperty(key, value) } + // Create our ClassLoader and set it on this thread + urlClassLoader = createClassLoader() + Thread.currentThread.setContextClassLoader(urlClassLoader) + // Initialize Spark environment (using system properties read above) env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false) SparkEnv.set(env) @@ -45,11 +49,6 @@ class Executor extends Logging { // Start worker thread pool threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - - // Create our ClassLoader and set it on this thread - urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) - } def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { -- cgit v1.2.3 From a642051ade11d10d7ab26a44bdf7acc743fefc97 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 21 Sep 2012 05:42:21 +0000 Subject: Fixed a performance bug in BlockManager that was creating garbage when returning deserialized, in-memory RDDs. --- core/src/main/scala/spark/storage/BlockStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 09287faba0..febb0c0240 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -131,7 +131,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) return None } if (entry.deserialized) { - return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator) + return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) } else { return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) } -- cgit v1.2.3 From 6d28dde37041a7794e4da17bb92ca387e693a96a Mon Sep 17 00:00:00 2001 From: root Date: Fri, 21 Sep 2012 06:02:55 +0000 Subject: Rename our toIterator method into asIterator to prevent confusion with the Scala collection one, which often *copies* a collection. --- core/src/main/scala/spark/Serializer.scala | 4 ++-- core/src/main/scala/spark/storage/BlockManager.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala index 61a70beaf1..5f26bd2a7b 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/Serializer.scala @@ -43,7 +43,7 @@ trait SerializerInstance { def deserializeMany(buffer: ByteBuffer): Iterator[Any] = { // Default implementation uses deserializeStream buffer.rewind() - deserializeStream(new ByteBufferInputStream(buffer)).toIterator + deserializeStream(new ByteBufferInputStream(buffer)).asIterator } } @@ -74,7 +74,7 @@ trait DeserializationStream { * Read the elements of this stream through an iterator. This can only be called once, as * reading each element will consume data from the input source. */ - def toIterator: Iterator[Any] = new Iterator[Any] { + def asIterator: Iterator[Any] = new Iterator[Any] { var gotNext = false var finished = false var nextValue: Any = null diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 3a51f6bd96..550c937ac4 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -614,10 +614,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { - /*serializer.newInstance().deserializeMany(bytes)*/ - val ser = serializer.newInstance() bytes.rewind() - return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator + val ser = serializer.newInstance() + return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator } def stop() { -- cgit v1.2.3 From afb7ccc83820be287abbd6328b3294367fab2c72 Mon Sep 17 00:00:00 2001 From: Denny Date: Fri, 21 Sep 2012 10:58:13 -0700 Subject: HTTP File server fixes. --- core/src/main/scala/spark/HttpFileServer.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala index e6ad4dd28e..05ca846c85 100644 --- a/core/src/main/scala/spark/HttpFileServer.scala +++ b/core/src/main/scala/spark/HttpFileServer.scala @@ -20,7 +20,7 @@ class HttpFileServer extends Logging { fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(fileDir) + httpServer = new HttpServer(baseDir) httpServer.start() serverUri = httpServer.uri } @@ -30,11 +30,13 @@ class HttpFileServer extends Logging { } def addFile(file: File) : String = { - return addFileToDir(file, fileDir) + addFileToDir(file, fileDir) + return serverUri + "/files/" + file.getName } def addJar(file: File) : String = { - return addFileToDir(file, jarDir) + addFileToDir(file, jarDir) + return serverUri + "/jars/" + file.getName } def addFileToDir(file: File, dir: File) : String = { -- cgit v1.2.3 From e41cab04cafdb610f108a03a9e062d422f2eedd8 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 23 Sep 2012 05:56:44 +0000 Subject: Avoid creating an extra buffer when saving a stream of values as DISK_ONLY --- core/src/main/scala/spark/storage/BlockStore.scala | 44 ++++++++++++---------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index febb0c0240..d505df66a7 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,6 +1,6 @@ package spark.storage -import java.io.{File, RandomAccessFile} +import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode import java.util.{LinkedHashMap, UUID} @@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} import scala.collection.mutable.ArrayBuffer +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + import spark.{Utils, Logging, Serializer, SizeEstimator} /** * Abstract class to store blocks */ -abstract class BlockStore(blockManager: BlockManager) extends Logging { +abstract class BlockStore(val blockManager: BlockManager) extends Logging { initLogging() def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) @@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis val file = createFile(blockId) - if (file != null) { - val channel = new RandomAccessFile(file, "rw").getChannel() - val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) - buffer.put(bytes) - channel.close() - val finishTime = System.currentTimeMillis - logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( - blockId, bytes.limit, (finishTime - startTime))) - } else { - logError("File not created for block " + blockId) - } + val channel = new RandomAccessFile(file, "rw").getChannel() + val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) + buffer.put(bytes) + channel.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( + blockId, bytes.limit, (finishTime - startTime))) } def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) - : Either[Iterator[Any], ByteBuffer] = { - val bytes = dataSerialize(values) - logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes") - putBytes(blockId, bytes, level) - return Right(bytes) + : Either[Iterator[Any], ByteBuffer] = { + + logDebug("Attempting to write values for block " + blockId) + val file = createFile(blockId) + val fileOut = new FastBufferedOutputStream(new FileOutputStream(file)) + val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) + objOut.writeAll(values) + objOut.close() + + // Return a byte buffer for the contents of the file + val channel = new RandomAccessFile(file, "rw").getChannel() + Right(channel.map(MapMode.READ_WRITE, 0, channel.size())) } def getBytes(blockId: String): Option[ByteBuffer] = { @@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) newFile.getParentFile.mkdirs() return newFile } else { - logError("File for block " + blockId + " already exists on disk, " + file) - return null + throw new Exception("File for block " + blockId + " already exists on disk, " + file) } } -- cgit v1.2.3 From 107a5ca879aab6331b03c9080f0c0a782f060589 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 23 Sep 2012 06:06:12 +0000 Subject: Make default number of parallel fetches slightly smaller since it doesn't seem to hurt performance much and it will cause slightly less GC. --- core/src/main/scala/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 550c937ac4..15748b70d5 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -631,7 +631,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m object BlockManager { def getNumParallelFetchesFromSystemProperties(): Int = { - System.getProperty("spark.blockManager.parallelFetches", "8").toInt + System.getProperty("spark.blockManager.parallelFetches", "4").toInt } def getMaxMemoryFromSystemProperties(): Long = { -- cgit v1.2.3 From 1f539aa473981ec47b253dfe86c84ed2bcf286ba Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 24 Sep 2012 14:12:48 -0700 Subject: Update Scala version dependency to 2.9.2 --- project/SparkBuild.scala | 2 +- run | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 203001954a..9440b0f112 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -22,7 +22,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.6.0-SNAPSHOT", - scalaVersion := "2.9.1", + scalaVersion := "2.9.2", scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, diff --git a/run b/run index 2946a04d3f..5f640789ff 100755 --- a/run +++ b/run @@ -1,6 +1,6 @@ #!/bin/bash -SCALA_VERSION=2.9.1 +SCALA_VERSION=2.9.2 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`; pwd)" -- cgit v1.2.3 From 35cc9f13e96c9b19a032b54e69160eea47589fc9 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 24 Sep 2012 14:17:10 -0700 Subject: Update Akka to 2.0.3 --- project/SparkBuild.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9440b0f112..0247b46de4 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -61,9 +61,9 @@ object SparkBuild extends Build { "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.9", - "com.typesafe.akka" % "akka-actor" % "2.0.2", - "com.typesafe.akka" % "akka-remote" % "2.0.2", - "com.typesafe.akka" % "akka-slf4j" % "2.0.2", + "com.typesafe.akka" % "akka-actor" % "2.0.3", + "com.typesafe.akka" % "akka-remote" % "2.0.3", + "com.typesafe.akka" % "akka-slf4j" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", -- cgit v1.2.3 From 6eeb379cf86b25975456369cc3de50a41a648b69 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 24 Sep 2012 15:39:58 -0700 Subject: Fix some test issues --- core/src/main/scala/spark/SparkEnv.scala | 2 ++ core/src/test/scala/spark/DistributedSuite.scala | 2 +- core/src/test/scala/spark/FileServerSuite.scala | 24 +++++++++++++----------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 7473b40aa3..6ffae8e85f 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -40,6 +40,8 @@ class SparkEnv ( blockManager.stop() blockManager.master.stop() actorSystem.shutdown() + // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit + Thread.sleep(100) actorSystem.awaitTermination() // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit Thread.sleep(100) diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index b7b8a79327..93b876d205 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val clusterUrl = "local-cluster[2,1,512]" - var sc: SparkContext = _ + @transient var sc: SparkContext = _ after { if (sc != null) { diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index 500af1eb90..fd7a7bd589 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -3,14 +3,14 @@ package spark import com.google.common.io.Files import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter -import java.io.{File, PrintWriter} +import java.io.{File, PrintWriter, FileReader, BufferedReader} import SparkContext._ class FileServerSuite extends FunSuite with BeforeAndAfter { - var sc: SparkContext = _ - var tmpFile : File = _ - var testJarFile : File = _ + @transient var sc: SparkContext = _ + @transient var tmpFile : File = _ + @transient var testJarFile : File = _ before { // Create a sample text file @@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { - val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) val fileVal = in.readLine().toInt in.close() _ * fileVal + _ * fileVal @@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt a + b @@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { - val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) val fileVal = in.readLine().toInt in.close() _ * fileVal + _ * fileVal @@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { assert(result.toSet === Set((1,200), (2,300), (3,500))) } - test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt a + b }.collect() assert(result.toSet === Set((1,2), (2,7), (3,121))) } - -} \ No newline at end of file +} -- cgit v1.2.3 From 39215357af1488dda9ea3b61466d2da377280e6f Mon Sep 17 00:00:00 2001 From: Ravi Pandya Date: Mon, 24 Sep 2012 15:43:19 -0700 Subject: Windows command scripts for sbt and run --- .../scala/spark/deploy/worker/ExecutorRunner.scala | 3 +- run.cmd | 2 + run2.cmd | 68 ++++++++++++++++++++++ sbt/sbt.cmd | 5 ++ 4 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 run.cmd create mode 100644 run2.cmd create mode 100644 sbt/sbt.cmd diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 7043361020..e2a9df275a 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -75,7 +75,8 @@ class ExecutorRunner( def buildCommandSeq(): Seq[String] = { val command = jobDesc.command - val runScript = new File(sparkHome, "run").getCanonicalPath + val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"; + val runScript = new File(sparkHome, script).getCanonicalPath Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables) } diff --git a/run.cmd b/run.cmd new file mode 100644 index 0000000000..f78a4350e1 --- /dev/null +++ b/run.cmd @@ -0,0 +1,2 @@ +@echo off +cmd /V /E /C call %~dp0run2.cmd %* \ No newline at end of file diff --git a/run2.cmd b/run2.cmd new file mode 100644 index 0000000000..9fc4d5054b --- /dev/null +++ b/run2.cmd @@ -0,0 +1,68 @@ +@echo off + +set SCALA_VERSION=2.9.1 + +rem Figure out where the Spark framework is installed +set FWDIR=%~dp0 + +rem Export this as SPARK_HOME +set SPARK_HOME=%FWDIR% + +rem Load environment variables from conf\spark-env.cmd, if it exists +if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" + +rem Check that SCALA_HOME has been specified +if not "x%SCALA_HOME%"=="x" goto scala_exists + echo "SCALA_HOME is not set" + goto exit +:scala_exists + +rem If the user specifies a Mesos JAR, put it before our included one on the classpath +set MESOS_CLASSPATH= +if not "x%MESOS_JAR%"=="x" set MESOS_CLASSPATH=%MESOS_JAR% + +rem Figure out how much memory to use per executor and set it as an environment +rem variable so that our process sees it and can report it to Mesos +if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m + +rem Set JAVA_OPTS to be able to load native libraries and to set heap size +set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% +rem Load extra JAVA_OPTS from conf/java-opts, if it exists +if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd" + +set CORE_DIR=%FWDIR%core +set REPL_DIR=%FWDIR%repl +set EXAMPLES_DIR=%FWDIR%examples +set BAGEL_DIR=%FWDIR%bagel + +rem Build up classpath +set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes +set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources +set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes +for /R "%CORE_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +for /R "%FWDIR%\lib_managed\jars" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +for /R "%FWDIR%\lib_managed\bundles" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +for /R "%REPL_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes + +rem Figure out whether to run our class with java or with the scala launcher. +rem In most cases, we'd prefer to execute our process with java because scala +rem creates a shell script as the parent of its Java process, which makes it +rem hard to kill the child with stuff like Process.destroy(). However, for +rem the Spark shell, the wrapper is necessary to properly reset the terminal +rem when we exit, so we allow it to set a variable to launch with scala. +if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner + set RUNNER=%SCALA_HOME%\bin\scala + # Java options will be passed to scala as JAVA_OPTS + set EXTRA_ARGS= + goto run_spark +:java_runner + set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar + set RUNNER=java + if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java + rem The JVM doesn't read JAVA_OPTS by default so we need to pass it in + set EXTRA_ARGS=%JAVA_OPTS% +:run_spark + +%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %* +:exit \ No newline at end of file diff --git a/sbt/sbt.cmd b/sbt/sbt.cmd new file mode 100644 index 0000000000..b5f13a87e4 --- /dev/null +++ b/sbt/sbt.cmd @@ -0,0 +1,5 @@ +rem @echo off +set EXTRA_ARGS= +if not "%MESOS_HOME%x"=="x" set EXTRA_ARGS=-Djava.library.path=%MESOS_HOME%\lib\java +set SPARK_HOME=%~dp0.. +java -Xmx1200M -XX:MaxPermSize=200m %EXTRA_ARGS% -jar %SPARK_HOME%\sbt\sbt-launch-*.jar "%*" -- cgit v1.2.3 From afd8fc0c664d2ec946fa10ffb6f05803ac113c8c Mon Sep 17 00:00:00 2001 From: Ravi Pandya Date: Mon, 24 Sep 2012 15:46:03 -0700 Subject: Echo off --- sbt/sbt.cmd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbt/sbt.cmd b/sbt/sbt.cmd index b5f13a87e4..6b289ab447 100644 --- a/sbt/sbt.cmd +++ b/sbt/sbt.cmd @@ -1,4 +1,4 @@ -rem @echo off +@echo off set EXTRA_ARGS= if not "%MESOS_HOME%x"=="x" set EXTRA_ARGS=-Djava.library.path=%MESOS_HOME%\lib\java set SPARK_HOME=%~dp0.. -- cgit v1.2.3 From 348bcbca1f12fb4c042c85af792b14426d46e9bd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 24 Sep 2012 16:56:27 -0700 Subject: Added a method to RDD to expose the ClassManifest. --- core/src/main/scala/spark/RDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index d28f3593fe..b413ee21c9 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -68,6 +68,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def preferredLocations(split: Split): Seq[String] = Nil def context = sc + + def manifest: ClassManifest[T] = classManifest[T] // Get a unique ID for this RDD val id = sc.newRddId() -- cgit v1.2.3 From 7a4cd9286172d4a16d2911649a5bf9ea216ca980 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 24 Sep 2012 23:42:33 -0700 Subject: Renamed RDD.manifest to RDD.elementClassManifest --- core/src/main/scala/spark/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index b413ee21c9..efe248896a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -69,7 +69,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def context = sc - def manifest: ClassManifest[T] = classManifest[T] + def elementClassManifest: ClassManifest[T] = classManifest[T] // Get a unique ID for this RDD val id = sc.newRddId() -- cgit v1.2.3 From 36349a2fb6a4d05bbfdce43e579988ca1c813551 Mon Sep 17 00:00:00 2001 From: Ravi Pandya Date: Tue, 25 Sep 2012 07:26:29 -0700 Subject: Add spark-shell.cmd --- run.cmd | 2 +- spark-shell.cmd | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 spark-shell.cmd diff --git a/run.cmd b/run.cmd index f78a4350e1..cc5605f8a9 100644 --- a/run.cmd +++ b/run.cmd @@ -1,2 +1,2 @@ @echo off -cmd /V /E /C call %~dp0run2.cmd %* \ No newline at end of file +cmd /V /E /C %~dp0run2.cmd %* diff --git a/spark-shell.cmd b/spark-shell.cmd new file mode 100644 index 0000000000..34697d52d7 --- /dev/null +++ b/spark-shell.cmd @@ -0,0 +1,4 @@ +@echo off +set FWDIR=%~dp0 +set SPARK_LAUNCH_WITH_SCALA=1 +cmd /V /E /C %FWDIR%run2.cmd spark.repl.Main %* -- cgit v1.2.3