diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-28 22:44:34 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-28 22:44:34 +0530 |
commit | 00c7a37604516a2533e685d5a1ea97275c134c42 (patch) | |
tree | 558882332628de995fa389aef7cb7be4cffba3f2 | |
parent | d09db1c051d255157f38f400fe9301fa438c5f41 (diff) | |
parent | afee9024430ef79cc0840a5e5788b60c8c53f9d2 (diff) | |
download | spark-00c7a37604516a2533e685d5a1ea97275c134c42.tar.gz spark-00c7a37604516a2533e685d5a1ea97275c134c42.tar.bz2 spark-00c7a37604516a2533e685d5a1ea97275c134c42.zip |
Merge branch 'master' of github.com:mridulm/spark
55 files changed, 281 insertions, 197 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml index 89282161ea..b83a0ef6c0 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -4,7 +4,7 @@ <parent> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index e10c03f6ba..5ecdd7d004 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer import storage.StorageLevel object Bagel extends Logging { - - val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY + val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK /** * Runs a Bagel program. @@ -63,8 +62,9 @@ object Bagel extends Logging { val combinedMsgs = msgs.combineByKey( combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) + val superstep_ = superstep // Create a read-only copy of superstep for capture in closure val (processed, numMsgs, numActiveVerts) = - comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel) + comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel) val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000)) diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 25db395c22..a09c978068 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("halting by voting") { diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index 0c584055c7..8ee3ec481f 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -30,7 +30,7 @@ # SPARK_NICENESS The scheduling priority for daemons. Defaults to 0. ## -usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <args...>" +usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>" # if no args specified, show usage if [ $# -le 1 ]; then @@ -48,6 +48,8 @@ startStop=$1 shift command=$1 shift +instance=$1 +shift spark_rotate_log () { @@ -92,10 +94,10 @@ if [ "$SPARK_PID_DIR" = "" ]; then fi # some variables -export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log +export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log export SPARK_ROOT_LOGGER="INFO,DRFA" -log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out -pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid +log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out +pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid # Set default scheduling priority if [ "$SPARK_NICENESS" = "" ]; then diff --git a/bin/spark-daemons.sh b/bin/spark-daemons.sh index 4f9719ee80..0619097e4d 100755 --- a/bin/spark-daemons.sh +++ b/bin/spark-daemons.sh @@ -2,7 +2,7 @@ # Run a Spark command on all slave hosts. -usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..." +usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..." # if no args specified, show usage if [ $# -le 1 ]; then diff --git a/bin/start-master.sh b/bin/start-master.sh index 87feb261fe..83a3e1f3dc 100755 --- a/bin/start-master.sh +++ b/bin/start-master.sh @@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then fi fi -"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT +"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT diff --git a/bin/start-slave.sh b/bin/start-slave.sh index 45a0cf7a6b..616c76e4ee 100755 --- a/bin/start-slave.sh +++ b/bin/start-slave.sh @@ -11,4 +11,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then fi fi -"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1 +"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@" diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh index 390247ca4a..4e05224190 100755 --- a/bin/start-slaves.sh +++ b/bin/start-slaves.sh @@ -21,4 +21,13 @@ fi echo "Master IP: $SPARK_MASTER_IP" # Launch the slaves -exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT +else + if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then + SPARK_WORKER_WEBUI_PORT=8081 + fi + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 )) spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i )) + done +fi diff --git a/bin/stop-master.sh b/bin/stop-master.sh index f75167dd2c..172ee5891d 100755 --- a/bin/stop-master.sh +++ b/bin/stop-master.sh @@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" -"$bin"/spark-daemon.sh stop spark.deploy.master.Master
\ No newline at end of file +"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1 diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh index 21c9ebf324..fbfc594472 100755 --- a/bin/stop-slaves.sh +++ b/bin/stop-slaves.sh @@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" -"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker
\ No newline at end of file +if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then + . "${SPARK_CONF_DIR}/spark-env.sh" +fi + +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1 +else + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 )) + done +fi diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 6d71ec5691..37565ca827 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -12,6 +12,7 @@ # - SPARK_WORKER_CORES, to set the number of cores to use on this machine # - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g) # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT +# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine # # Finally, Spark also relies on the following variables, but these can be set # on just the *master* (i.e. in your driver program), and will automatically diff --git a/core/pom.xml b/core/pom.xml index 7f65ce5c00..da26d674ec 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -4,7 +4,7 @@ <parent> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 33dc7627a3..ccd9d0364a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest]( * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - @deprecated("use mapPartitionsWithIndex") + @deprecated("use mapPartitionsWithIndex", "0.7.0") def mapPartitionsWithSplit[U: ClassManifest]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index d00092e984..57e0405fb4 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -1,6 +1,7 @@ package spark import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration import rdd.{CheckpointRDD, CoalescedRDD} import scheduler.{ResultTask, ShuffleMapTask} @@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) } } + // Create the output path for the checkpoint + val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) + val fs = path.getFileSystem(new Configuration()) + if (!fs.mkdirs(path)) { + throw new SparkException("Failed to create checkpoint path " + path) + } + // Save to file, and reload it as an RDD - val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString - rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _) - val newRDD = new CheckpointRDD[T](rdd.context, path) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _) + val newRDD = new CheckpointRDD[T](rdd.context, path.toString) // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { - cpFile = Some(path) + cpFile = Some(path.toString) cpRDD = Some(newRDD) rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 6b4a11d6d3..518034e07b 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla self: RDD[(K, V)]) extends Logging with Serializable { - + private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { classManifest[T].erasure } else { // We get the type of the Writable class by looking at the apply method which converts // from T to Writable. Since we have two apply methods we filter out the one which - // is of the form "java.lang.Object apply(java.lang.Object)" + // is not of the form "java.lang.Object apply(java.lang.Object)" implicitly[T => Writable].getClass.getDeclaredMethods().filter( - m => m.getReturnType().toString != "java.lang.Object" && + m => m.getReturnType().toString != "class java.lang.Object" && m.getName() == "apply")(0).getReturnType } @@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla val valueClass = getWritableClass[V] val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) - - logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + + logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, format) + self.saveAsHadoopFile(path, keyClass, valueClass, format) } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) - } + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + } } } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 79d00edee7..43ee39c993 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -131,6 +131,6 @@ private[spark] object CheckpointRDD extends Logging { val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") - fs.delete(path) + fs.delete(path, true) } } diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 9213513e80..a6235491ca 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -29,7 +29,7 @@ private[spark] case class NarrowCoGroupSplitDep( private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep private[spark] -class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep]) +class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) extends Partition with Serializable { override val index: Int = idx override def hashCode(): Int = idx @@ -88,7 +88,7 @@ class CoGroupedRDD[K]( case _ => new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } - }.toList) + }.toArray) } array } diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 51f02409b6..4e33b7dd5c 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -16,7 +16,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { * @tparam V the value class. */ class ShuffledRDD[K, V]( - prev: RDD[(K, V)], + @transient prev: RDD[(K, V)], part: Partitioner) extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) { diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 0a02561062..481e03b349 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -56,7 +56,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM case _ => new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } - }.toList) + }.toArray) } array } diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e80ec17aa5..35b0e06785 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( @transient rdd2: RDD[U] ) extends Partition { - var split1 = rdd1.partitions(idx) - var split2 = rdd1.partitions(idx) + var partition1 = rdd1.partitions(idx) + var partition2 = rdd2.partitions(idx) override val index: Int = idx - def splits = (split1, split2) + def partitions = (partition1, partition2) @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent split at the time of task serialization - split1 = rdd1.partitions(idx) - split2 = rdd2.partitions(idx) + // Update the reference to parent partition at the time of task serialization + partition1 = rdd1.partitions(idx) + partition2 = rdd2.partitions(idx) oos.defaultWriteObject() } } @@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context)) } override def getPreferredLocations(s: Partition): Seq[String] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) } override def clearDependencies() { diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 3ce1e6e257..9b64f95df8 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -121,7 +121,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val toRemove = new HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { - logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") + logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " + + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") toRemove += info.blockManagerId } } diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala index ff00dd05dd..76d5258b02 100644 --- a/core/src/test/scala/spark/LocalSparkContext.scala +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -27,6 +27,7 @@ object LocalSparkContext { sc.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ @@ -38,4 +39,4 @@ object LocalSparkContext { } } -}
\ No newline at end of file +} diff --git a/docs/_config.yml b/docs/_config.yml index f99d5bb376..5c135a0242 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,8 +3,8 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 0.7.1-SNAPSHOT -SPARK_VERSION_SHORT: 0.7.1 -SCALA_VERSION: 2.9.2 +SPARK_VERSION: 0.8.0-SNAPSHOT +SPARK_VERSION_SHORT: 0.8.0 +SCALA_VERSION: 2.9.3 MESOS_VERSION: 0.9.0-incubating SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index c2eeafd07a..04cd79d039 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -42,10 +42,10 @@ To run a specific test suite: You might run into the following errors if you're using a vanilla installation of Maven: - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes... + [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes... + [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] To fix these, you can do the following: diff --git a/docs/index.md b/docs/index.md index 51d505e1fa..0c4add45dc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -18,7 +18,7 @@ or you will need to set the `SCALA_HOME` environment variable to point to where you've installed Scala. Scala must also be accessible through one of these methods on slave nodes on your cluster. -Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bundled with it. To compile the code, go into the top-level Spark directory and run +Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run sbt/sbt package diff --git a/docs/quick-start.md b/docs/quick-start.md index 5c80d2ed3a..335643536a 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -53,8 +53,8 @@ scala> textFile.filter(line => line.contains("Spark")).count() // How many lines res3: Long = 15 {% endhighlight %} -## Transformations -RDD transformations can be used for more complex computations. Let's say we want to find the line with the most words: +## More On RDD Operations +RDD actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words: {% highlight scala %} scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) @@ -113,8 +113,8 @@ import SparkContext._ object SimpleJob { def main(args: Array[String]) { - val logFile = "/var/log/syslog" // Should be some file on your system - val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", + val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system + val sc = new SparkContext("local", "Simple Job", "YOUR_SPARK_HOME", List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar")) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() @@ -124,7 +124,7 @@ object SimpleJob { } {% endhighlight %} -This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. +This job simply counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency. This file also adds two repositories which host Spark dependencies: @@ -156,7 +156,7 @@ $ find . $ sbt package $ sbt run ... -Lines with a: 8422, Lines with b: 1836 +Lines with a: 46, Lines with b: 23 {% endhighlight %} This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. @@ -173,7 +173,7 @@ import spark.api.java.function.Function; public class SimpleJob { public static void main(String[] args) { - String logFile = "/var/log/syslog"; // Should be some file on your system + String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system JavaSparkContext sc = new JavaSparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"}); JavaRDD<String> logData = sc.textFile(logFile).cache(); @@ -191,7 +191,7 @@ public class SimpleJob { } {% endhighlight %} -This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that like in the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail. +This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail. To build the job, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version. @@ -239,7 +239,7 @@ Now, we can execute the job using Maven: $ mvn package $ mvn exec:java -Dexec.mainClass="SimpleJob" ... -Lines with a: 8422, Lines with b: 1836 +Lines with a: 46, Lines with b: 23 {% endhighlight %} This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. @@ -253,7 +253,7 @@ As an example, we'll create a simple Spark job, `SimpleJob.py`: """SimpleJob.py""" from pyspark import SparkContext -logFile = "/var/log/syslog" # Should be some file on your system +logFile = "$YOUR_SPARK_HOME/README.md" # Should be some file on your system sc = SparkContext("local", "Simple job") logData = sc.textFile(logFile).cache() @@ -265,7 +265,8 @@ print "Lines with a: %i, lines with b: %i" % (numAs, numBs) This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. -Like in the Scala and Java examples, we use a SparkContext to create RDDs. +Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. +As with the Scala and Java examples, we use a SparkContext to create RDDs. We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference. For jobs that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html). `SimpleJob` is simple enough that we do not need to specify any code dependencies. @@ -276,7 +277,7 @@ We can run this job using the `pyspark` script: $ cd $SPARK_HOME $ ./pyspark SimpleJob.py ... -Lines with a: 8422, Lines with b: 1836 +Lines with a: 46, Lines with b: 23 {% endhighlight python %} This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index b30699cf3d..f5788dc467 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -83,7 +83,7 @@ DStreams support many of the transformations available on normal Spark RDD's: <tr> <td> <b>groupByKey</b>([<i>numTasks</i>]) </td> <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. <br /> - <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. + <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. </td> </tr> <tr> @@ -132,7 +132,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo <td> <b>groupByKeyAndWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td> <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. <br /> -<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td> +<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td> </tr> <tr> <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td> diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 571d27fde6..9f2daad2b6 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -540,11 +540,24 @@ def scp(host, opts, local_file, dest_file): (opts.identity_file, local_file, opts.user, host, dest_file), shell=True) -# Run a command on a host through ssh, throwing an exception if ssh fails +# Run a command on a host through ssh, retrying up to two times +# and then throwing an exception if ssh continues to fail. def ssh(host, opts, command): - subprocess.check_call( - "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % - (opts.identity_file, opts.user, host, command), shell=True) + tries = 0 + while True: + try: + return subprocess.check_call( + "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % + (opts.identity_file, opts.user, host, command), shell=True) + except subprocess.CalledProcessError as e: + if (tries > 2): + raise e + print "Error connecting to host {0}, sleeping 30".format(e) + time.sleep(30) + tries = tries + 1 + + + # Gets a list of zones to launch instances in diff --git a/examples/pom.xml b/examples/pom.xml index 9594257ad4..c42d2bcdb9 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -4,7 +4,7 @@ <parent> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -22,7 +22,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_2.9.2</artifactId> - <version>0.1.8</version> + <version>0.1.11</version> </dependency> <dependency> <groupId>org.scalatest</groupId> diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala index b07e799cef..4849f216fb 100644 --- a/examples/src/main/scala/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -10,73 +10,73 @@ import scala.collection.mutable.HashSet * K-means clustering. */ object LocalKMeans { - val N = 1000 - val R = 1000 // Scaling factor - val D = 10 - val K = 10 - val convergeDist = 0.001 - val rand = new Random(42) - - def generateData = { - def generatePoint(i: Int) = { - Vector(D, _ => rand.nextDouble * R) - } - Array.tabulate(N)(generatePoint) - } - - def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { - var index = 0 - var bestIndex = 0 - var closest = Double.PositiveInfinity - - for (i <- 1 to centers.size) { - val vCurr = centers.get(i).get - val tempDist = p.squaredDist(vCurr) - if (tempDist < closest) { - closest = tempDist - bestIndex = i - } - } - - return bestIndex - } - - def main(args: Array[String]) { - val data = generateData - var points = new HashSet[Vector] - var kPoints = new HashMap[Int, Vector] - var tempDist = 1.0 - - while (points.size < K) { - points.add(data(rand.nextInt(N))) - } - - val iter = points.iterator - for (i <- 1 to points.size) { - kPoints.put(i, iter.next()) - } - - println("Initial centers: " + kPoints) - - while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - - var mappings = closest.groupBy[Int] (x => x._1) - - var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))}) - - var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} - - tempDist = 0.0 - for (mapping <- newPoints) { - tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) - } - - for (newP <- newPoints) { - kPoints.put(newP._1, newP._2) - } - } - - println("Final centers: " + kPoints) - } + val N = 1000 + val R = 1000 // Scaling factor + val D = 10 + val K = 10 + val convergeDist = 0.001 + val rand = new Random(42) + + def generateData = { + def generatePoint(i: Int) = { + Vector(D, _ => rand.nextDouble * R) + } + Array.tabulate(N)(generatePoint) + } + + def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { + var index = 0 + var bestIndex = 0 + var closest = Double.PositiveInfinity + + for (i <- 1 to centers.size) { + val vCurr = centers.get(i).get + val tempDist = p.squaredDist(vCurr) + if (tempDist < closest) { + closest = tempDist + bestIndex = i + } + } + + return bestIndex + } + + def main(args: Array[String]) { + val data = generateData + var points = new HashSet[Vector] + var kPoints = new HashMap[Int, Vector] + var tempDist = 1.0 + + while (points.size < K) { + points.add(data(rand.nextInt(N))) + } + + val iter = points.iterator + for (i <- 1 to points.size) { + kPoints.put(i, iter.next()) + } + + println("Initial centers: " + kPoints) + + while(tempDist > convergeDist) { + var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + + var mappings = closest.groupBy[Int] (x => x._1) + + var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))}) + + var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} + + tempDist = 0.0 + for (mapping <- newPoints) { + tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) + } + + for (newP <- newPoints) { + kPoints.put(newP._1, newP._2) + } + } + + println("Final centers: " + kPoints) + } } diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala index 92cd81c487..a0aaf60918 100644 --- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala @@ -8,7 +8,7 @@ object MultiBroadcastTest { System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]") System.exit(1) } - + val sc = new SparkContext(args(0), "Broadcast Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) @@ -19,7 +19,7 @@ object MultiBroadcastTest { for (i <- 0 until arr1.length) { arr1(i) = i } - + var arr2 = new Array[Int](num) for (i <- 0 until arr2.length) { arr2(i) = i @@ -30,7 +30,7 @@ object MultiBroadcastTest { sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size + barr2.value.size) } - + System.exit(0) } } diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala index 0d17bda004..461b84a2c6 100644 --- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -11,7 +11,7 @@ object SimpleSkewedGroupByTest { "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]") System.exit(1) } - + var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 @@ -20,7 +20,7 @@ object SimpleSkewedGroupByTest { val sc = new SparkContext(args(0), "GroupBy Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random var result = new Array[(Int, Array[Byte])](numKVPairs) diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala index 83be3fc27b..435675f9de 100644 --- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -10,7 +10,7 @@ object SkewedGroupByTest { System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } - + var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 @@ -18,7 +18,7 @@ object SkewedGroupByTest { val sc = new SparkContext(args(0), "GroupBy Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index 483aae452b..a9642100e3 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -49,7 +49,7 @@ object TwitterAlgebirdCMS { val users = stream.map(status => status.getUser.getId) - val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) var globalCMS = cms.zero val mm = new MapMonoid[Long, Int]() var globalExact = Map[Long, Int]() @@ -3,7 +3,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <packaging>pom</packaging> <name>Spark Project Parent POM</name> <url>http://spark-project.org/</url> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 947ac47f6b..7c004df6fb 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -48,12 +48,13 @@ object SparkBuild extends Build { scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, + retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", transitiveClassifiers in Scope.GlobalScope := Seq("sources"), testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), // Fork new JVMs for tests and set Java options for those fork := true, - javaOptions += "-Xmx1g", + javaOptions += "-Xmx2g", // Shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), @@ -125,6 +126,9 @@ object SparkBuild extends Build { val slf4jVersion = "1.6.1" + val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") + val excludeNetty = ExclusionRule(organization = "org.jboss.netty") + def coreSettings = sharedSettings ++ Seq( name := "spark-core", resolvers ++= Seq( @@ -145,33 +149,33 @@ object SparkBuild extends Build { "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", - "com.typesafe.akka" % "akka-actor" % "2.0.3", - "com.typesafe.akka" % "akka-remote" % "2.0.3", - "com.typesafe.akka" % "akka-slf4j" % "2.0.3", + "com.typesafe.akka" % "akka-actor" % "2.0.3" excludeAll(excludeNetty), + "com.typesafe.akka" % "akka-remote" % "2.0.3" excludeAll(excludeNetty), + "com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", - "cc.spray" % "spray-can" % "1.0-M2.1", - "cc.spray" % "spray-server" % "1.0-M2.1", - "cc.spray" % "spray-json_2.9.2" % "1.1.1", + "cc.spray" % "spray-can" % "1.0-M2.1" excludeAll(excludeNetty), + "cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty), + "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), "org.apache.mesos" % "mesos" % "0.9.0-incubating" ) ++ ( if (HADOOP_MAJOR_VERSION == "2") { if (HADOOP_YARN) { Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) } else { Seq( - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ), - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) } } else { - Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) ) + Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) }), unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { @@ -201,16 +205,17 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty), "com.github.sgroschupf" % "zkclient" % "0.1", - "org.twitter4j" % "twitter4j-stream" % "3.0.3", - "com.typesafe.akka" % "akka-zeromq" % "2.0.3" + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + "com.typesafe.akka" % "akka-zeromq" % "2.0.3" excludeAll(excludeNetty) ) ) ++ assemblySettings ++ extraAssemblySettings def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard + case m if m.toLowerCase.matches("meta-inf/.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } diff --git a/project/build.properties b/project/build.properties index d4287112c6..9b860e23c5 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.11.3 +sbt.version=0.12.3 diff --git a/project/plugins.sbt b/project/plugins.sbt index 4d0e696a11..d4f2442872 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,13 +4,13 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release resolvers += "Spray Repository" at "http://repo.spray.cc/" -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1") +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1") -addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0") +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0") -addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2") +addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1") // For Sonatype publishing //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py index 73f7f8fbaf..7f85a1008e 100644 --- a/python/examples/transitive_closure.py +++ b/python/examples/transitive_closure.py @@ -24,7 +24,7 @@ if __name__ == "__main__": "Usage: PythonTC <master> [<slices>]" exit(-1) sc = SparkContext(sys.argv[1], "PythonTC") - slices = sys.argv[2] if len(sys.argv) > 2 else 2 + slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 tc = sc.parallelize(generateGraph(), slices).cache() # Linear transitive closure: each round grows paths by one edge, diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 172ed85fab..a9fec17a9d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -399,7 +399,7 @@ class RDD(object): >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob - >>> ''.join(input(glob(tempFile.name + "/part-0000*"))) + >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' """ def func(split, iterator): diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 46f38c2772..7a7280313e 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -4,7 +4,7 @@ <parent> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/repl/pom.xml b/repl/pom.xml index 1f885673f4..038da5d988 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -4,7 +4,7 @@ <parent> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index cd7b5128b2..23556dbc8f 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.7.1 + /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 /_/ """) import Properties._ diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 43559b96d3..1c64f9b98d 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -32,6 +32,7 @@ class ReplSuite extends FunSuite { interp.sparkContext.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") return out.toString } @@ -1,6 +1,6 @@ #!/bin/bash -SCALA_VERSION=2.9.2 +SCALA_VERSION=2.9.3 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`; pwd)" @@ -22,6 +22,7 @@ fi # values for that; it doesn't need a lot if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} + SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true" SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default fi @@ -46,14 +47,15 @@ case "$1" in esac if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then - if [ `command -v scala` ]; then - RUNNER="scala" + if [ "$SCALA_HOME" ]; then + RUNNER="${SCALA_HOME}/bin/scala" else - if [ -z "$SCALA_HOME" ]; then - echo "SCALA_HOME is not set" >&2 + if [ `command -v scala` ]; then + RUNNER="scala" + else + echo "SCALA_HOME is not set and scala is not in PATH" >&2 exit 1 fi - RUNNER="${SCALA_HOME}/bin/scala" fi else if [ `command -v java` ]; then @@ -1,6 +1,6 @@ @echo off -set SCALA_VERSION=2.9.2 +set SCALA_VERSION=2.9.3 rem Figure out where the Spark framework is installed set FWDIR=%~dp0 @@ -21,6 +21,7 @@ set RUNNING_DAEMON=0 if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1 if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1 if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m +set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY% if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% @@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then fi export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd) export SPARK_TESTING=1 # To put test classes on classpath -java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" +java -Xmx1200m -XX:MaxPermSize=250m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" diff --git a/streaming/pom.xml b/streaming/pom.xml index fc2e211a42..08ff3e2ae1 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -4,7 +4,7 @@ <parent> <groupId>org.spark-project</groupId> <artifactId>spark-parent</artifactId> - <version>0.7.1-SNAPSHOT</version> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e303e33e5e..7bd104b8d5 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + @volatile private var stopped = false + val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() while (attempts < maxAttempts) { + if (stopped) { + logInfo("Already stopped, ignore checkpoint attempt for " + file) + return + } attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) - } - val fos = fs.create(file) + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) fos.write(bytes) fos.close() - fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + val finishTime = System.currentTimeMillis(); logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") @@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { + stopped = true executor.shutdown() } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index f673e5be15..e7a3f92bc0 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -74,6 +74,7 @@ object MasterFailureTest extends Logging { val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Long], state: Option[Long]) => { + logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values) Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) } st.flatMap(_.split(" ")) @@ -159,6 +160,7 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -205,6 +207,7 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { @@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) // Write the data to a local file and then move it to the target test directory val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false while (!done && tries < maxTries) { tries += 1 try { - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) done = true } catch { case ioe: IOException => { diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index cf2ed8b1d4..e7352deb81 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..607dea77ec 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart @@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -}
\ No newline at end of file +} diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index a5fa7ab92d..4529e774e9 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging { val batchDuration = Milliseconds(1000) before { + logInfo("BEFORE ...") FileUtils.deleteDirectory(new File(directory)) } after { + logInfo("AFTER ...") FileUtils.deleteDirectory(new File(directory)) } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 67dca2ac31..0acb6db6f2 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( |