aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/pom.xml2
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala6
-rwxr-xr-xbin/spark-daemon.sh10
-rwxr-xr-xbin/spark-daemons.sh2
-rwxr-xr-xbin/start-master.sh2
-rwxr-xr-xbin/start-slave.sh2
-rwxr-xr-xbin/start-slaves.sh11
-rwxr-xr-xbin/stop-master.sh2
-rwxr-xr-xbin/stop-slaves.sh12
-rwxr-xr-xconf/spark-env.sh.template1
-rw-r--r--core/pom.xml2
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala22
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala20
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala3
-rw-r--r--docs/_config.yml6
-rw-r--r--docs/building-with-maven.md4
-rw-r--r--docs/index.md2
-rw-r--r--docs/quick-start.md4
-rw-r--r--docs/streaming-programming-guide.md4
-rwxr-xr-xec2/spark_ec2.py21
-rw-r--r--examples/pom.xml4
-rw-r--r--examples/src/main/scala/spark/examples/LocalKMeans.scala138
-rw-r--r--examples/src/main/scala/spark/examples/MultiBroadcastTest.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala4
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala2
-rw-r--r--pom.xml2
-rw-r--r--project/build.properties2
-rw-r--r--project/plugins.sbt8
-rw-r--r--python/examples/transitive_closure.py2
-rw-r--r--python/pyspark/rdd.py2
-rw-r--r--repl-bin/pom.xml2
-rw-r--r--repl/pom.xml2
-rw-r--r--repl/src/main/scala/spark/repl/SparkILoop.scala2
-rwxr-xr-xrun14
-rw-r--r--run2.cmd3
-rwxr-xr-xsbt/sbt2
-rw-r--r--streaming/pom.xml2
43 files changed, 195 insertions, 156 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 89282161ea..b83a0ef6c0 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index e10c03f6ba..5ecdd7d004 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer
import storage.StorageLevel
object Bagel extends Logging {
-
- val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY
+ val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
/**
* Runs a Bagel program.
@@ -63,8 +62,9 @@ object Bagel extends Logging {
val combinedMsgs = msgs.combineByKey(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
+ val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
- comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel)
+ comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh
index 0c584055c7..8ee3ec481f 100755
--- a/bin/spark-daemon.sh
+++ b/bin/spark-daemon.sh
@@ -30,7 +30,7 @@
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
##
-usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <args...>"
+usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
# if no args specified, show usage
if [ $# -le 1 ]; then
@@ -48,6 +48,8 @@ startStop=$1
shift
command=$1
shift
+instance=$1
+shift
spark_rotate_log ()
{
@@ -92,10 +94,10 @@ if [ "$SPARK_PID_DIR" = "" ]; then
fi
# some variables
-export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log
+export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log
export SPARK_ROOT_LOGGER="INFO,DRFA"
-log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out
-pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid
+log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out
+pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid
# Set default scheduling priority
if [ "$SPARK_NICENESS" = "" ]; then
diff --git a/bin/spark-daemons.sh b/bin/spark-daemons.sh
index 4f9719ee80..0619097e4d 100755
--- a/bin/spark-daemons.sh
+++ b/bin/spark-daemons.sh
@@ -2,7 +2,7 @@
# Run a Spark command on all slave hosts.
-usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..."
+usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
# if no args specified, show usage
if [ $# -le 1 ]; then
diff --git a/bin/start-master.sh b/bin/start-master.sh
index 87feb261fe..83a3e1f3dc 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
fi
fi
-"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
+"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
diff --git a/bin/start-slave.sh b/bin/start-slave.sh
index 45a0cf7a6b..616c76e4ee 100755
--- a/bin/start-slave.sh
+++ b/bin/start-slave.sh
@@ -11,4 +11,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
fi
fi
-"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
+"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@"
diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh
index 390247ca4a..4e05224190 100755
--- a/bin/start-slaves.sh
+++ b/bin/start-slaves.sh
@@ -21,4 +21,13 @@ fi
echo "Master IP: $SPARK_MASTER_IP"
# Launch the slaves
-exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+ exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+else
+ if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
+ SPARK_WORKER_WEBUI_PORT=8081
+ fi
+ for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+ "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 )) spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
+ done
+fi
diff --git a/bin/stop-master.sh b/bin/stop-master.sh
index f75167dd2c..172ee5891d 100755
--- a/bin/stop-master.sh
+++ b/bin/stop-master.sh
@@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
-"$bin"/spark-daemon.sh stop spark.deploy.master.Master \ No newline at end of file
+"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1
diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh
index 21c9ebf324..fbfc594472 100755
--- a/bin/stop-slaves.sh
+++ b/bin/stop-slaves.sh
@@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
-"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker \ No newline at end of file
+if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
+ . "${SPARK_CONF_DIR}/spark-env.sh"
+fi
+
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+ "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1
+else
+ for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+ "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 ))
+ done
+fi
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 6d71ec5691..37565ca827 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -12,6 +12,7 @@
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
+# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine
#
# Finally, Spark also relies on the following variables, but these can be set
# on just the *master* (i.e. in your driver program), and will automatically
diff --git a/core/pom.xml b/core/pom.xml
index 7f65ce5c00..da26d674ec 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 33dc7627a3..ccd9d0364a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- @deprecated("use mapPartitionsWithIndex")
+ @deprecated("use mapPartitionsWithIndex", "0.7.0")
def mapPartitionsWithSplit[U: ClassManifest](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6b4a11d6d3..518034e07b 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
self: RDD[(K, V)])
extends Logging
with Serializable {
-
+
private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
- // is of the form "java.lang.Object apply(java.lang.Object)"
+ // is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
- m => m.getReturnType().toString != "java.lang.Object" &&
+ m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType
}
@@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
- }
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ }
}
}
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 79d00edee7..43ee39c993 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -131,6 +131,6 @@ private[spark] object CheckpointRDD extends Logging {
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
- fs.delete(path)
+ fs.delete(path, true)
}
}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 9213513e80..a6235491ca 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -29,7 +29,7 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
-class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
+class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
@@ -88,7 +88,7 @@ class CoGroupedRDD[K](
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
- }.toList)
+ }.toArray)
}
array
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 51f02409b6..4e33b7dd5c 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -16,7 +16,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
- prev: RDD[(K, V)],
+ @transient prev: RDD[(K, V)],
part: Partitioner)
extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) {
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 0a02561062..481e03b349 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -56,7 +56,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
- }.toList)
+ }.toArray)
}
array
}
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e80ec17aa5..35b0e06785 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
@transient rdd2: RDD[U]
) extends Partition {
- var split1 = rdd1.partitions(idx)
- var split2 = rdd1.partitions(idx)
+ var partition1 = rdd1.partitions(idx)
+ var partition2 = rdd2.partitions(idx)
override val index: Int = idx
- def splits = (split1, split2)
+ def partitions = (partition1, partition2)
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- // Update the reference to parent split at the time of task serialization
- split1 = rdd1.partitions(idx)
- split2 = rdd2.partitions(idx)
+ // Update the reference to parent partition at the time of task serialization
+ partition1 = rdd1.partitions(idx)
+ partition2 = rdd2.partitions(idx)
oos.defaultWriteObject()
}
}
@@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = {
- val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
- rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context))
+ val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+ rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context))
}
override def getPreferredLocations(s: Partition): Seq[String] = {
- val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
- rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2))
+ val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+ rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 3ce1e6e257..9b64f95df8 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -121,7 +121,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
diff --git a/docs/_config.yml b/docs/_config.yml
index f99d5bb376..5c135a0242 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -3,8 +3,8 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
-SPARK_VERSION: 0.7.1-SNAPSHOT
-SPARK_VERSION_SHORT: 0.7.1
-SCALA_VERSION: 2.9.2
+SPARK_VERSION: 0.8.0-SNAPSHOT
+SPARK_VERSION_SHORT: 0.8.0
+SCALA_VERSION: 2.9.3
MESOS_VERSION: 0.9.0-incubating
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index c2eeafd07a..04cd79d039 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -42,10 +42,10 @@ To run a specific test suite:
You might run into the following errors if you're using a vanilla installation of Maven:
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+ [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
[ERROR] PermGen space -> [Help 1]
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+ [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
[ERROR] Java heap space -> [Help 1]
To fix these, you can do the following:
diff --git a/docs/index.md b/docs/index.md
index 51d505e1fa..0c4add45dc 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -18,7 +18,7 @@ or you will need to set the `SCALA_HOME` environment variable to point
to where you've installed Scala. Scala must also be accessible through one
of these methods on slave nodes on your cluster.
-Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bundled with it. To compile the code, go into the top-level Spark directory and run
+Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
sbt/sbt package
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 5c80d2ed3a..2d961b29cb 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -53,8 +53,8 @@ scala> textFile.filter(line => line.contains("Spark")).count() // How many lines
res3: Long = 15
{% endhighlight %}
-## Transformations
-RDD transformations can be used for more complex computations. Let's say we want to find the line with the most words:
+## More On RDD Operations
+RDD actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words:
{% highlight scala %}
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index b30699cf3d..f5788dc467 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -83,7 +83,7 @@ DStreams support many of the transformations available on normal Spark RDD's:
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. <br />
- <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
+ <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
</td>
</tr>
<tr>
@@ -132,7 +132,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo
<td> <b>groupByKeyAndWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>])
</td>
<td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. <br />
-<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
+<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
</tr>
<tr>
<td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td>
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 571d27fde6..9f2daad2b6 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -540,11 +540,24 @@ def scp(host, opts, local_file, dest_file):
(opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
-# Run a command on a host through ssh, throwing an exception if ssh fails
+# Run a command on a host through ssh, retrying up to two times
+# and then throwing an exception if ssh continues to fail.
def ssh(host, opts, command):
- subprocess.check_call(
- "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
- (opts.identity_file, opts.user, host, command), shell=True)
+ tries = 0
+ while True:
+ try:
+ return subprocess.check_call(
+ "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
+ (opts.identity_file, opts.user, host, command), shell=True)
+ except subprocess.CalledProcessError as e:
+ if (tries > 2):
+ raise e
+ print "Error connecting to host {0}, sleeping 30".format(e)
+ time.sleep(30)
+ tries = tries + 1
+
+
+
# Gets a list of zones to launch instances in
diff --git a/examples/pom.xml b/examples/pom.xml
index 9594257ad4..c42d2bcdb9 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -22,7 +22,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
- <version>0.1.8</version>
+ <version>0.1.11</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
index b07e799cef..4849f216fb 100644
--- a/examples/src/main/scala/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -10,73 +10,73 @@ import scala.collection.mutable.HashSet
* K-means clustering.
*/
object LocalKMeans {
- val N = 1000
- val R = 1000 // Scaling factor
- val D = 10
- val K = 10
- val convergeDist = 0.001
- val rand = new Random(42)
-
- def generateData = {
- def generatePoint(i: Int) = {
- Vector(D, _ => rand.nextDouble * R)
- }
- Array.tabulate(N)(generatePoint)
- }
-
- def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
- var index = 0
- var bestIndex = 0
- var closest = Double.PositiveInfinity
-
- for (i <- 1 to centers.size) {
- val vCurr = centers.get(i).get
- val tempDist = p.squaredDist(vCurr)
- if (tempDist < closest) {
- closest = tempDist
- bestIndex = i
- }
- }
-
- return bestIndex
- }
-
- def main(args: Array[String]) {
- val data = generateData
- var points = new HashSet[Vector]
- var kPoints = new HashMap[Int, Vector]
- var tempDist = 1.0
-
- while (points.size < K) {
- points.add(data(rand.nextInt(N)))
- }
-
- val iter = points.iterator
- for (i <- 1 to points.size) {
- kPoints.put(i, iter.next())
- }
-
- println("Initial centers: " + kPoints)
-
- while(tempDist > convergeDist) {
- var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
-
- var mappings = closest.groupBy[Int] (x => x._1)
-
- var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
-
- var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
-
- tempDist = 0.0
- for (mapping <- newPoints) {
- tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
- }
-
- for (newP <- newPoints) {
- kPoints.put(newP._1, newP._2)
- }
- }
-
- println("Final centers: " + kPoints)
- }
+ val N = 1000
+ val R = 1000 // Scaling factor
+ val D = 10
+ val K = 10
+ val convergeDist = 0.001
+ val rand = new Random(42)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ Vector(D, _ => rand.nextDouble * R)
+ }
+ Array.tabulate(N)(generatePoint)
+ }
+
+ def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ var index = 0
+ var bestIndex = 0
+ var closest = Double.PositiveInfinity
+
+ for (i <- 1 to centers.size) {
+ val vCurr = centers.get(i).get
+ val tempDist = p.squaredDist(vCurr)
+ if (tempDist < closest) {
+ closest = tempDist
+ bestIndex = i
+ }
+ }
+
+ return bestIndex
+ }
+
+ def main(args: Array[String]) {
+ val data = generateData
+ var points = new HashSet[Vector]
+ var kPoints = new HashMap[Int, Vector]
+ var tempDist = 1.0
+
+ while (points.size < K) {
+ points.add(data(rand.nextInt(N)))
+ }
+
+ val iter = points.iterator
+ for (i <- 1 to points.size) {
+ kPoints.put(i, iter.next())
+ }
+
+ println("Initial centers: " + kPoints)
+
+ while(tempDist > convergeDist) {
+ var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+
+ var mappings = closest.groupBy[Int] (x => x._1)
+
+ var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+
+ var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
+
+ tempDist = 0.0
+ for (mapping <- newPoints) {
+ tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+ }
+
+ for (newP <- newPoints) {
+ kPoints.put(newP._1, newP._2)
+ }
+ }
+
+ println("Final centers: " + kPoints)
+ }
}
diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
index 92cd81c487..a0aaf60918 100644
--- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
@@ -8,7 +8,7 @@ object MultiBroadcastTest {
System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
System.exit(1)
}
-
+
val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
@@ -19,7 +19,7 @@ object MultiBroadcastTest {
for (i <- 0 until arr1.length) {
arr1(i) = i
}
-
+
var arr2 = new Array[Int](num)
for (i <- 0 until arr2.length) {
arr2(i) = i
@@ -30,7 +30,7 @@ object MultiBroadcastTest {
sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size + barr2.value.size)
}
-
+
System.exit(0)
}
}
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
index 0d17bda004..461b84a2c6 100644
--- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -11,7 +11,7 @@ object SimpleSkewedGroupByTest {
"[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
System.exit(1)
}
-
+
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
@@ -20,7 +20,7 @@ object SimpleSkewedGroupByTest {
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
-
+
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var result = new Array[(Int, Array[Byte])](numKVPairs)
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
index 83be3fc27b..435675f9de 100644
--- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -10,7 +10,7 @@ object SkewedGroupByTest {
System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
-
+
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
@@ -18,7 +18,7 @@ object SkewedGroupByTest {
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
-
+
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 483aae452b..a9642100e3 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -49,7 +49,7 @@ object TwitterAlgebirdCMS {
val users = stream.map(status => status.getUser.getId)
- val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
diff --git a/pom.xml b/pom.xml
index 12e310a038..c3323ffad0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark-project.org/</url>
diff --git a/project/build.properties b/project/build.properties
index d4287112c6..9b860e23c5 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=0.11.3
+sbt.version=0.12.3
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4d0e696a11..d4f2442872 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -4,13 +4,13 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release
resolvers += "Spray Repository" at "http://repo.spray.cc/"
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
-addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
-addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
-addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2")
+addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1")
// For Sonatype publishing
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
index 73f7f8fbaf..7f85a1008e 100644
--- a/python/examples/transitive_closure.py
+++ b/python/examples/transitive_closure.py
@@ -24,7 +24,7 @@ if __name__ == "__main__":
"Usage: PythonTC <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonTC")
- slices = sys.argv[2] if len(sys.argv) > 2 else 2
+ slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
tc = sc.parallelize(generateGraph(), slices).cache()
# Linear transitive closure: each round grows paths by one edge,
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 172ed85fab..a9fec17a9d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -399,7 +399,7 @@ class RDD(object):
>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
>>> from fileinput import input
>>> from glob import glob
- >>> ''.join(input(glob(tempFile.name + "/part-0000*")))
+ >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
"""
def func(split, iterator):
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 46f38c2772..7a7280313e 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/repl/pom.xml b/repl/pom.xml
index 1f885673f4..038da5d988 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index cd7b5128b2..23556dbc8f 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
- /___/ .__/\_,_/_/ /_/\_\ version 0.7.1
+ /___/ .__/\_,_/_/ /_/\_\ version 0.8.0
/_/
""")
import Properties._
diff --git a/run b/run
index 2c29cc4a66..756f8703f2 100755
--- a/run
+++ b/run
@@ -1,6 +1,6 @@
#!/bin/bash
-SCALA_VERSION=2.9.2
+SCALA_VERSION=2.9.3
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
@@ -22,6 +22,7 @@ fi
# values for that; it doesn't need a lot
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
+ SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true"
SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
fi
@@ -46,14 +47,15 @@ case "$1" in
esac
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
- if [ `command -v scala` ]; then
- RUNNER="scala"
+ if [ "$SCALA_HOME" ]; then
+ RUNNER="${SCALA_HOME}/bin/scala"
else
- if [ -z "$SCALA_HOME" ]; then
- echo "SCALA_HOME is not set" >&2
+ if [ `command -v scala` ]; then
+ RUNNER="scala"
+ else
+ echo "SCALA_HOME is not set and scala is not in PATH" >&2
exit 1
fi
- RUNNER="${SCALA_HOME}/bin/scala"
fi
else
if [ `command -v java` ]; then
diff --git a/run2.cmd b/run2.cmd
index cb20a4b7a2..d2d4807971 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -1,6 +1,6 @@
@echo off
-set SCALA_VERSION=2.9.2
+set SCALA_VERSION=2.9.3
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0
@@ -21,6 +21,7 @@ set RUNNING_DAEMON=0
if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1
if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
+set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
diff --git a/sbt/sbt b/sbt/sbt
index 8f426d18e8..850c58e1e9 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then
fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200m -XX:MaxPermSize=250m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
diff --git a/streaming/pom.xml b/streaming/pom.xml
index fc2e211a42..08ff3e2ae1 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>