aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-24 14:50:58 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-24 14:50:58 -0700
commit4879685910a1ee9f314fce1efe8d3ed879f3e64c (patch)
tree3aef0af8b939a2b25b13b8b16fae006ba8b111fc
parentc02585ea130045ef27e579172ac2acc71bc8da63 (diff)
parentd282c1ebbbe1aebbd409c06efedf95fb77833c35 (diff)
downloadspark-4879685910a1ee9f314fce1efe8d3ed879f3e64c.tar.gz
spark-4879685910a1ee9f314fce1efe8d3ed879f3e64c.tar.bz2
spark-4879685910a1ee9f314fce1efe8d3ed879f3e64c.zip
Merge remote-tracking branch 'mesos/master' into ec2-updates
-rw-r--r--README.md45
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala315
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala40
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala63
-rw-r--r--docs/_plugins/copy_api_dirs.rb2
-rw-r--r--docs/building-with-maven.md35
-rw-r--r--docs/configuration.md2
-rw-r--r--docs/running-on-yarn.md20
-rw-r--r--docs/streaming-custom-receivers.md51
-rw-r--r--docs/streaming-programming-guide.md3
-rwxr-xr-xmake-distribution.sh2
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala32
-rw-r--r--pom.xml28
-rw-r--r--project/SparkBuild.scala32
-rw-r--r--project/plugins.sbt4
-rw-r--r--python/pyspark/rdd.py60
-rw-r--r--python/pyspark/statcounter.py109
-rw-r--r--repl/pom.xml6
-rwxr-xr-xsbt/sbt2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala2
24 files changed, 762 insertions, 120 deletions
diff --git a/README.md b/README.md
index 1dd96a0a4a..e5f527b84a 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,7 @@ Spark requires Scala 2.9.3 (Scala 2.10 is not yet supported). The project is
built using Simple Build Tool (SBT), which is packaged with it. To build
Spark and its example programs, run:
- sbt/sbt package
+ sbt/sbt package assembly
Spark also supports building using Maven. If you would like to build using Maven,
see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html)
@@ -43,10 +43,47 @@ locally with one thread, or "local[N]" to run locally with N threads.
## A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
-storage systems. Because the HDFS API has changed in different versions of
+storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.
-You can change the version by setting the `HADOOP_VERSION` variable at the top
-of `project/SparkBuild.scala`, then rebuilding Spark.
+You can change the version by setting the `SPARK_HADOOP_VERSION` environment
+when building Spark.
+
+For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop
+versions without YARN, use:
+
+ # Apache Hadoop 1.2.1
+ $ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt package assembly
+
+ # Cloudera CDH 4.2.0 with MapReduce v1
+ $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt package assembly
+
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
+with YARN, also set `SPARK_WITH_YARN=true`:
+
+ # Apache Hadoop 2.0.5-alpha
+ $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true sbt/sbt package assembly
+
+ # Cloudera CDH 4.2.0 with MapReduce v2
+ $ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_WITH_YARN=true sbt/sbt package assembly
+
+For convenience, these variables may also be set through the `conf/spark-env.sh` file
+described below.
+
+When developing a Spark application, specify the Hadoop version by adding the
+"hadoop-client" artifact to your project's dependencies. For example, if you're
+using Hadoop 1.0.1 and build your application using SBT, add this entry to
+`libraryDependencies`:
+
+ "org.apache.hadoop" % "hadoop-client" % "1.2.1"
+
+If your project is built with Maven, add this to your POM file's `<dependencies>` section:
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <!-- the brackets are needed to tell Maven that this is a hard dependency on version "1.2.1" exactly -->
+ <version>[1.2.1]</version>
+ </dependency>
## Configuration
diff --git a/core/pom.xml b/core/pom.xml
index 6627a87de1..3d70a19584 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -53,6 +53,10 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index fdd2dfa810..23dfbcd604 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -56,8 +56,7 @@ import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
-import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
- SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler._
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
@@ -65,6 +64,10 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
+import scala.Some
+import spark.scheduler.StageInfo
+import spark.storage.RDDInfo
+import spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -615,6 +618,16 @@ class SparkContext(
}
/**
+ * Gets the locality information associated with the partition in a particular rdd
+ * @param rdd of interest
+ * @param partition to be looked up for locality
+ * @return list of preferred locations for the partition
+ */
+ private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
+ dagScheduler.getPreferredLocs(rdd, partition)
+ }
+
+ /**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI.
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 2b5bf18541..e612d026b2 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -17,53 +17,76 @@
package spark.rdd
-import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext}
+import spark._
import java.io.{ObjectOutputStream, IOException}
+import scala.collection.mutable
+import scala.Some
+import scala.collection.mutable.ArrayBuffer
-private[spark] case class CoalescedRDDPartition(
- index: Int,
- @transient rdd: RDD[_],
- parentsIndices: Array[Int]
- ) extends Partition {
+/**
+ * Class that captures a coalesced RDD by essentially keeping track of parent partitions
+ * @param index of this coalesced partition
+ * @param rdd which it belongs to
+ * @param parentsIndices list of indices in the parent that have been coalesced into this partition
+ * @param preferredLocation the preferred location for this partition
+ */
+case class CoalescedRDDPartition(
+ index: Int,
+ @transient rdd: RDD[_],
+ parentsIndices: Array[Int],
+ @transient preferredLocation: String = ""
+ ) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- // Update the reference to parent split at the time of task serialization
+ // Update the reference to parent partition at the time of task serialization
parents = parentsIndices.map(rdd.partitions(_))
oos.defaultWriteObject()
}
+
+ /**
+ * Computes how many of the parents partitions have getPreferredLocation
+ * as one of their preferredLocations
+ * @return locality of this coalesced partition between 0 and 1
+ */
+ def localFraction: Double = {
+ val loc = parents.count(p =>
+ rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
+
+ if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
+ }
}
/**
- * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
- * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the
- * parent had more than this many partitions, or fewer if the parent had fewer.
- *
- * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
- * or to avoid having a large number of small tasks when processing a directory with many files.
+ * Represents a coalesced RDD that has fewer partitions than its parent RDD
+ * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
+ * so that each new partition has roughly the same number of parent partitions and that
+ * the preferred location of each new partition overlaps with as many preferred locations of its
+ * parent partitions
+ * @param prev RDD to be coalesced
+ * @param maxPartitions number of desired partitions in the coalesced RDD
+ * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
class CoalescedRDD[T: ClassManifest](
- @transient var prev: RDD[T],
- maxPartitions: Int)
+ @transient var prev: RDD[T],
+ maxPartitions: Int,
+ balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
- val prevSplits = prev.partitions
- if (prevSplits.length < maxPartitions) {
- prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) }
- } else {
- (0 until maxPartitions).map { i =>
- val rangeStart = ((i.toLong * prevSplits.length) / maxPartitions).toInt
- val rangeEnd = (((i.toLong + 1) * prevSplits.length) / maxPartitions).toInt
- new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray)
- }.toArray
+ val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
+
+ pc.run().zipWithIndex.map {
+ case (pg, i) =>
+ val ids = pg.arr.map(_.index).toArray
+ new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- split.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentSplit =>
- firstParent[T].iterator(parentSplit, context)
+ override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
+ partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
+ firstParent[T].iterator(parentPartition, context)
}
}
@@ -78,4 +101,242 @@ class CoalescedRDD[T: ClassManifest](
super.clearDependencies()
prev = null
}
+
+ /**
+ * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+ * then the preferred machine will be one which most parent splits prefer too.
+ * @param partition
+ * @return the machine most preferred by split
+ */
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+ }
+}
+
+/**
+ * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
+ * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
+ * parent had more than maxPartitions, or fewer if the parent had fewer.
+ *
+ * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
+ * or to avoid having a large number of small tasks when processing a directory with many files.
+ *
+ * If there is no locality information (no preferredLocations) in the parent, then the coalescing
+ * is very simple: chunk parents that are close in the Array in chunks.
+ * If there is locality information, it proceeds to pack them with the following four goals:
+ *
+ * (1) Balance the groups so they roughly have the same number of parent partitions
+ * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
+ * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
+ * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
+ *
+ * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
+ * We assume the final number of desired partitions is small, e.g. less than 1000.
+ *
+ * The algorithm tries to assign unique preferred machines to each partition. If the number of
+ * desired partitions is greater than the number of preferred machines (can happen), it needs to
+ * start picking duplicate preferred machines. This is determined using coupon collector estimation
+ * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
+ * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
+ * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
+ * according to locality. (contact alig for questions)
+ *
+ */
+
+private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+
+ def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
+ def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
+ if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
+
+ val rnd = new scala.util.Random(7919) // keep this class deterministic
+
+ // each element of groupArr represents one coalesced partition
+ val groupArr = ArrayBuffer[PartitionGroup]()
+
+ // hash used to check whether some machine is already in groupArr
+ val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
+
+ // hash used for the first maxPartitions (to avoid duplicates)
+ val initialHash = mutable.Set[Partition]()
+
+ // determines the tradeoff between load-balancing the partitions sizes and their locality
+ // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
+ val slack = (balanceSlack * prev.partitions.size).toInt
+
+ var noLocality = true // if true if no preferredLocations exists for parent RDD
+
+ // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+ def currPrefLocs(part: Partition): Seq[String] = {
+ prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+ }
+
+ // this class just keeps iterating and rotating infinitely over the partitions of the RDD
+ // next() returns the next preferred machine that a partition is replicated on
+ // the rotator first goes through the first replica copy of each partition, then second, third
+ // the iterators return type is a tuple: (replicaString, partition)
+ class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
+
+ var it: Iterator[(String, Partition)] = resetIterator()
+
+ override val isEmpty = !it.hasNext
+
+ // initializes/resets to start iterating from the beginning
+ def resetIterator() = {
+ val iterators = (0 to 2).map( x =>
+ prev.partitions.iterator.flatMap(p => {
+ if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
+ } )
+ )
+ iterators.reduceLeft((x, y) => x ++ y)
+ }
+
+ // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
+ def hasNext(): Boolean = { !isEmpty }
+
+ // return the next preferredLocation of some partition of the RDD
+ def next(): (String, Partition) = {
+ if (it.hasNext)
+ it.next()
+ else {
+ it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
+ it.next()
+ }
+ }
+ }
+
+ /**
+ * Sorts and gets the least element of the list associated with key in groupHash
+ * The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
+ * @param key string representing a partitioned group on preferred machine key
+ * @return Option of PartitionGroup that has least elements for key
+ */
+ def getLeastGroupHash(key: String): Option[PartitionGroup] = {
+ groupHash.get(key).map(_.sortWith(compare).head)
+ }
+
+ def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
+ if (!initialHash.contains(part)) {
+ pgroup.arr += part // already assign this element
+ initialHash += part // needed to avoid assigning partitions to multiple buckets
+ true
+ } else { false }
+ }
+
+ /**
+ * Initializes targetLen partition groups and assigns a preferredLocation
+ * This uses coupon collector to estimate how many preferredLocations it must rotate through
+ * until it has seen most of the preferred locations (2 * n log(n))
+ * @param targetLen
+ */
+ def setupGroups(targetLen: Int) {
+ val rotIt = new LocationIterator(prev)
+
+ // deal with empty case, just create targetLen partition groups with no preferred location
+ if (!rotIt.hasNext()) {
+ (1 to targetLen).foreach(x => groupArr += PartitionGroup())
+ return
+ }
+
+ noLocality = false
+
+ // number of iterations needed to be certain that we've seen most preferred locations
+ val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
+ var numCreated = 0
+ var tries = 0
+
+ // rotate through until either targetLen unique/distinct preferred locations have been created
+ // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
+ // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
+ while (numCreated < targetLen && tries < expectedCoupons2) {
+ tries += 1
+ val (nxt_replica, nxt_part) = rotIt.next()
+ if (!groupHash.contains(nxt_replica)) {
+ val pgroup = PartitionGroup(nxt_replica)
+ groupArr += pgroup
+ addPartToPGroup(nxt_part, pgroup)
+ groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
+ numCreated += 1
+ }
+ }
+
+ while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
+ var (nxt_replica, nxt_part) = rotIt.next()
+ val pgroup = PartitionGroup(nxt_replica)
+ groupArr += pgroup
+ groupHash.get(nxt_replica).get += pgroup
+ var tries = 0
+ while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
+ nxt_part = rotIt.next()._2
+ tries += 1
+ }
+ numCreated += 1
+ }
+
+ }
+
+ /**
+ * Takes a parent RDD partition and decides which of the partition groups to put it in
+ * Takes locality into account, but also uses power of 2 choices to load balance
+ * It strikes a balance between the two use the balanceSlack variable
+ * @param p partition (ball to be thrown)
+ * @return partition group (bin to be put in)
+ */
+ def pickBin(p: Partition): PartitionGroup = {
+ val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
+ val prefPart = if (pref == Nil) None else pref.head
+
+ val r1 = rnd.nextInt(groupArr.size)
+ val r2 = rnd.nextInt(groupArr.size)
+ val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
+ if (prefPart== None) // if no preferred locations, just use basic power of two
+ return minPowerOfTwo
+
+ val prefPartActual = prefPart.get
+
+ if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
+ return minPowerOfTwo // prefer balance over locality
+ else {
+ return prefPartActual // prefer locality over balance
+ }
+ }
+
+ def throwBalls() {
+ if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
+ if (maxPartitions > groupArr.size) { // just return prev.partitions
+ for ((p,i) <- prev.partitions.zipWithIndex) {
+ groupArr(i).arr += p
+ }
+ } else { // no locality available, then simply split partitions based on positions in array
+ for(i <- 0 until maxPartitions) {
+ val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
+ val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
+ (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
+ }
+ }
+ } else {
+ for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
+ pickBin(p).arr += p
+ }
+ }
+ }
+
+ def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+
+ /**
+ * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
+ * load balanced and grouped by locality
+ * @return array of partition groups
+ */
+ def run(): Array[PartitionGroup] = {
+ setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
+ throwBalls() // assign partitions (balls) to each group (bins)
+ getPartitions
+ }
+}
+
+private[spark] case class PartitionGroup(prefLoc: String = "") {
+ var arr = mutable.ArrayBuffer[Partition]()
+
+ def size = arr.size
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9402f18a0f..7275bd346a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -435,23 +435,24 @@ class DAGScheduler(
if (event != null) {
logDebug("Got event of type " + event.getClass.getName)
}
-
- if (event != null) {
- if (processEvent(event)) {
- return
+ this.synchronized { // needed in case other threads makes calls into methods of this class
+ if (event != null) {
+ if (processEvent(event)) {
+ return
+ }
}
- }
- val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
- // Periodically resubmit failed stages if some map output fetches have failed and we have
- // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
- // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
- // the same time, so we want to make sure we've identified all the reduce tasks that depend
- // on the failed node.
- if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
- resubmitFailedStages()
- } else {
- submitWaitingStages()
+ val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
+ // Periodically resubmit failed stages if some map output fetches have failed and we have
+ // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
+ // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
+ // the same time, so we want to make sure we've identified all the reduce tasks that depend
+ // on the failed node.
+ if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
+ resubmitFailedStages()
+ } else {
+ submitWaitingStages()
+ }
}
}
}
@@ -789,7 +790,14 @@ class DAGScheduler(
visitedRdds.contains(target.rdd)
}
- private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
+ /**
+ * Synchronized method that might be called from other threads.
+ * @param rdd whose partitions are to be looked at
+ * @param partition to lookup locality information for
+ * @return list of machines that are preferred by the partition
+ */
+ private[spark]
+ def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (!cached.isEmpty) {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 75778de1cc..e306952bbd 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -22,7 +22,8 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd._
+import scala.collection.parallel.mutable
class RDDSuite extends FunSuite with SharedSparkContext {
@@ -173,6 +174,66 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
}
+ test("cogrouped RDDs with locality") {
+ val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
+ val coal3 = data3.coalesce(3)
+ val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+ assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
+
+ // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
+ val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
+ val coalesced1 = data.coalesce(3)
+ assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
+
+ val splits = coalesced1.glom().collect().map(_.toList).toList
+ assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
+
+ assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = data.coalesce(20)
+ val listOfLists = coalesced4.glom().collect().map(_.toList).toList
+ val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
+ assert( sortedList === (1 to 9).
+ map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
+ }
+
+ test("cogrouped RDDs with locality, large scale (10K partitions)") {
+ // large scale experiment
+ import collection.mutable
+ val rnd = scala.util.Random
+ val partitions = 10000
+ val numMachines = 50
+ val machines = mutable.ListBuffer[String]()
+ (1 to numMachines).foreach(machines += "m"+_)
+
+ val blocks = (1 to partitions).map(i =>
+ { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
+
+ val data2 = sc.makeRDD(blocks)
+ val coalesced2 = data2.coalesce(numMachines*2)
+
+ // test that you get over 90% locality in each group
+ val minLocality = coalesced2.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
+
+ // test that the groups are load balanced with 100 +/- 20 elements in each
+ val maxImbalance = coalesced2.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
+ .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
+ assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
+
+ val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
+ val coalesced3 = data3.coalesce(numMachines*2)
+ val minLocality2 = coalesced3.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
+ (minLocality2*100.).toInt + "%")
+ }
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index 217254c59f..c574ea7f5c 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -18,7 +18,7 @@
require 'fileutils'
include FileUtils
-if ENV['SKIP_API'] != '1'
+if not (ENV['SKIP_API'] == '1' or ENV['SKIP_SCALADOC'] == '1')
# Build Scaladoc for Java/Scala
projects = ["core", "examples", "repl", "bagel", "streaming", "mllib"]
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index 04cd79d039..a9f2cb8a7a 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -8,22 +8,26 @@ title: Building Spark with Maven
Building Spark using Maven Requires Maven 3 (the build process is tested with Maven 3.0.4) and Java 1.6 or newer.
-Building with Maven requires that a Hadoop profile be specified explicitly at the command line, there is no default. There are two profiles to choose from, one for building for Hadoop 1 or Hadoop 2.
+## Specifying the Hadoop version ##
-for Hadoop 1 (using 0.20.205.0) use:
+To enable support for HDFS and other Hadoop-supported storage systems, specify the exact Hadoop version by setting the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
- $ mvn -Phadoop1 clean install
+For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions without YARN, use:
+ # Apache Hadoop 1.2.1
+ $ mvn -Dhadoop.version=1.2.1 clean install
-for Hadoop 2 (using 2.0.0-mr1-cdh4.1.1) use:
+ # Cloudera CDH 4.2.0 with MapReduce v1
+ $ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 clean install
- $ mvn -Phadoop2 clean install
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, enable the "hadoop2-yarn" profile:
-It uses the scala-maven-plugin which supports incremental and continuous compilation. E.g.
+ # Apache Hadoop 2.0.5-alpha
+ $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha clean install
- $ mvn -Phadoop2 scala:cc
+ # Cloudera CDH 4.2.0 with MapReduce v2
+ $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 clean install
-…should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
## Spark Tests in Maven ##
@@ -31,11 +35,11 @@ Tests are run by default via the scalatest-maven-plugin. With this you can do th
Skip test execution (but not compilation):
- $ mvn -DskipTests -Phadoop2 clean install
+ $ mvn -Dhadoop.version=... -DskipTests clean install
To run a specific test suite:
- $ mvn -Phadoop2 -Dsuites=spark.repl.ReplSuite test
+ $ mvn -Dhadoop.version=... -Dsuites=spark.repl.ReplSuite test
## Setting up JVM Memory Usage Via Maven ##
@@ -53,6 +57,15 @@ To fix these, you can do the following:
export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=128M"
+## Continuous Compilation ##
+
+We use the scala-maven-plugin which supports incremental and continuous compilation. E.g.
+
+ $ mvn scala:cc
+
+…should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
+
+
## Using With IntelliJ IDEA ##
This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the pom.xml file in the project root folder, you only need to activate either the hadoop1 or hadoop2 profile in the "Maven Properties" popout. We have not tried Eclipse/Scala IDE with this.
@@ -61,6 +74,6 @@ This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the
It includes support for building a Debian package containing a 'fat-jar' which includes the repl, the examples and bagel. This can be created by specifying the deb profile:
- $ mvn -Phadoop2,deb clean install
+ $ mvn -Pdeb clean install
The debian package can then be found under repl/target. We added the short commit hash to the file name so that we can distinguish individual packages build for SNAPSHOT versions.
diff --git a/docs/configuration.md b/docs/configuration.md
index dff08a06f5..b125eeb03c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -146,7 +146,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.ui.port</td>
- <td>33000</td>
+ <td>3030</td>
<td>
Port for your application's dashboard, which shows memory and workload data
</td>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 9c2cedfd88..6bada9bdd7 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -6,7 +6,7 @@ title: Launching Spark on YARN
Experimental support for running over a [YARN (Hadoop
NextGen)](http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html)
cluster was added to Spark in version 0.6.0. This was merged into master as part of 0.7 effort.
-To build spark core with YARN support, please use the hadoop2-yarn profile.
+To build spark with YARN support, please use the hadoop2-yarn profile.
Ex: mvn -Phadoop2-yarn clean install
# Building spark core consolidated jar.
@@ -15,18 +15,12 @@ We need a consolidated spark core jar (which bundles all the required dependenci
This can be built either through sbt or via maven.
- Building spark assembled jar via sbt.
- It is a manual process of enabling it in project/SparkBuild.scala.
-Please comment out the
- HADOOP_VERSION, HADOOP_MAJOR_VERSION and HADOOP_YARN
-variables before the line 'For Hadoop 2 YARN support'
-Next, uncomment the subsequent 3 variable declaration lines (for these three variables) which enable hadoop yarn support.
+Enable YARN support by setting `SPARK_WITH_YARN=true` when invoking sbt:
-Assembly of the jar Ex:
-
- ./sbt/sbt clean assembly
+ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true ./sbt/sbt clean assembly
The assembled jar would typically be something like :
-`./core/target/spark-core-assembly-0.8.0-SNAPSHOT.jar`
+`./yarn/target/spark-yarn-assembly-0.8.0-SNAPSHOT.jar`
- Building spark assembled jar via Maven.
@@ -34,16 +28,16 @@ The assembled jar would typically be something like :
Something like this. Ex:
- mvn -Phadoop2-yarn clean package -DskipTests=true
+ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha clean package -DskipTests=true
This will build the shaded (consolidated) jar. Typically something like :
-`./repl-bin/target/spark-repl-bin-<VERSION>-shaded-hadoop2-yarn.jar`
+`./yarn/target/spark-yarn-bin-<VERSION>-shaded.jar`
# Preparations
-- Building spark core assembled jar (see above).
+- Building spark-yarn assembly (see above).
- Your application code must be packaged into a separate JAR file.
If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 5476c00d02..dfa343bf94 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -7,10 +7,45 @@ A "Spark Streaming" receiver can be a simple network stream, streams of messages
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+### Write a simple receiver
-## A quick and naive walk-through
+This starts with implementing [NetworkReceiver](#References)
-### Write a simple receiver
+Following is a simple socket text-stream receiver.
+
+{% highlight scala %}
+
+ class SocketTextStreamReceiver(host: String,
+ port: Int
+ ) extends NetworkReceiver[String] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ val socket = new Socket(host, port)
+ val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ var data: String = dataInputStream.readLine()
+ while (data != null) {
+ blocksGenerator += data
+ data = dataInputStream.readLine()
+ }
+ }
+
+ protected def onStop() {
+ blocksGenerator.stop()
+ }
+
+ }
+
+{% endhighlight %}
+
+
+All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
+
+
+### An Actor as Receiver.
This starts with implementing [Actor](#References)
@@ -46,7 +81,16 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu
{% endhighlight %}
-* Plug-in the actor configuration into the spark streaming context and create a DStream.
+* Plug-in the custom receiver into the spark streaming context and create a DStream.
+
+{% highlight scala %}
+
+ val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
+ "localhost", 8445))
+
+{% endhighlight %}
+
+* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
{% highlight scala %}
@@ -99,3 +143,4 @@ _A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
+2.[NetworkReceiver](http://spark-project.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver)
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 8cd1b0cd66..a74c17bdb7 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -301,6 +301,9 @@ dstream.checkpoint(checkpointInterval) // checkpointInterval must be a multiple
For DStreams that must be checkpointed (that is, DStreams created by `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
+## Customizing Receiver
+Spark comes with a built in support for most common usage scenarios where input stream source can be either a network socket stream to support for a few message queues. Apart from that it is also possible to supply your own custom receiver via a convenient API. Find more details at [Custom Receiver Guide](streaming-custom-receivers.html)
+
# Performance Tuning
Getting the best performance of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can tuned to improve the performance of you application. At a high level, you need to consider two things:
<ol>
diff --git a/make-distribution.sh b/make-distribution.sh
index 55dc22b992..70aff418c7 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -46,7 +46,7 @@ export TERM=dumb # Prevents color codes in SBT output
VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/')
# Initialize defaults
-SPARK_HADOOP_VERSION=1.2.1
+SPARK_HADOOP_VERSION=1.0.4
SPARK_WITH_YARN=false
MAKE_TGZ=false
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 6c71dc1f32..dbfbf59975 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -123,10 +123,28 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
- // Initialize user and product factors randomly
- val seed = new Random().nextInt()
- var users = userOutLinks.mapValues(_.elementIds.map(u => randomFactor(rank, seed ^ u)))
- var products = productOutLinks.mapValues(_.elementIds.map(p => randomFactor(rank, seed ^ ~p)))
+ // Initialize user and product factors randomly, but use a deterministic seed for each partition
+ // so that fault recovery works
+ val seedGen = new Random()
+ val seed1 = seedGen.nextInt()
+ val seed2 = seedGen.nextInt()
+ // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
+ def hash(x: Int): Int = {
+ val r = x ^ (x >>> 20) ^ (x >>> 12)
+ r ^ (r >>> 7) ^ (r >>> 4)
+ }
+ var users = userOutLinks.mapPartitionsWithIndex { (index, itr) =>
+ val rand = new Random(hash(seed1 ^ index))
+ itr.map { case (x, y) =>
+ (x, y.elementIds.map(_ => randomFactor(rank, rand)))
+ }
+ }
+ var products = productOutLinks.mapPartitionsWithIndex { (index, itr) =>
+ val rand = new Random(hash(seed2 ^ index))
+ itr.map { case (x, y) =>
+ (x, y.elementIds.map(_ => randomFactor(rank, rand)))
+ }
+ }
for (iter <- 0 until iterations) {
// perform ALS update
@@ -213,11 +231,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
}
/**
- * Make a random factor vector with the given seed.
- * TODO: Initialize things using mapPartitionsWithIndex to make it faster?
+ * Make a random factor vector with the given random.
*/
- private def randomFactor(rank: Int, seed: Int): Array[Double] = {
- val rand = new Random(seed)
+ private def randomFactor(rank: Int, rand: Random): Array[Double] = {
Array.fill(rank)(rand.nextDouble)
}
diff --git a/pom.xml b/pom.xml
index fc0b314070..de883e2abc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,10 +71,10 @@
<java.version>1.5</java.version>
<scala.version>2.9.3</scala.version>
<mesos.version>0.12.1</mesos.version>
- <akka.version>2.0.3</akka.version>
+ <akka.version>2.0.5</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
- <hadoop.version>1.2.1</hadoop.version>
+ <hadoop.version>1.0.4</hadoop.version>
<!-- <hadoop.version>2.0.0-mr1-cdh4.1.2</hadoop.version> -->
<PermGen>64m</PermGen>
@@ -157,12 +157,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.5.3.v20111011</version>
+ <version>7.6.8.v20121106</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>11.0.1</version>
+ <version>14.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>1.3.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -318,7 +323,7 @@
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
- <version>0.8</version>
+ <version>0.9</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -437,6 +442,7 @@
<args>
<arg>-unchecked</arg>
<arg>-optimise</arg>
+ <arg>-deprecation</arg>
</args>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
@@ -579,8 +585,8 @@
<properties>
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
- <!-- <yarn.version>0.23.7</yarn.version> -->
- <yarn.version>2.0.5-alpha</yarn.version>
+ <!-- <hadoop.version>0.23.7</hadoop.version> -->
+ <hadoop.version>2.0.5-alpha</hadoop.version>
</properties>
<modules>
@@ -607,7 +613,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
- <version>${yarn.version}</version>
+ <version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
@@ -638,7 +644,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
- <version>${yarn.version}</version>
+ <version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
@@ -669,7 +675,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
- <version>${yarn.version}</version>
+ <version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
@@ -700,7 +706,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
- <version>${yarn.version}</version>
+ <version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 831bfbed78..fbeae27707 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -24,11 +24,10 @@ import AssemblyKeys._
//import com.jsuereth.pgp.sbtplugin.PgpKeys._
object SparkBuild extends Build {
- // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
- // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
- // Note that these variables can be set through the environment variables
- // SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
- val DEFAULT_HADOOP_VERSION = "1.2.1"
+ // Hadoop version to build against. For example, "1.0.4" for Apache releases, or
+ // "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set
+ // through the environment variables SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
+ val DEFAULT_HADOOP_VERSION = "1.0.4"
val DEFAULT_WITH_YARN = false
// HBase version; set as appropriate.
@@ -58,14 +57,14 @@ object SparkBuild extends Build {
// Allows build configuration to be set through environment variables
lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
- lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
+ lazy val isYarnEnabled = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
case None => DEFAULT_WITH_YARN
case Some(v) => v.toBoolean
}
// Conditionally include the yarn sub-project
- lazy val maybeYarn = if(isYarnMode) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
- lazy val maybeYarnRef = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
+ lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
+ lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef
def sharedSettings = Defaults.defaultSettings ++ Seq(
@@ -134,7 +133,6 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
- "io.netty" % "netty" % "3.5.3.Final",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
@@ -165,17 +163,16 @@ object SparkBuild extends Build {
name := "spark-core",
resolvers ++= Seq(
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
- "Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "14.0.1",
"com.google.code.findbugs" % "jsr305" % "1.3.9",
- "log4j" % "log4j" % "1.2.16",
+ "log4j" % "log4j" % "1.2.17",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
- "commons-daemon" % "commons-daemon" % "1.0.10",
+ "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
@@ -256,7 +253,14 @@ object SparkBuild extends Build {
) ++ assemblySettings ++ extraAssemblySettings
def yarnSettings = sharedSettings ++ Seq(
- name := "spark-yarn",
+ name := "spark-yarn"
+ ) ++ extraYarnSettings ++ assemblySettings ++ extraAssemblySettings
+
+ // Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
+ // if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
+ def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()
+
+ def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
@@ -264,7 +268,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
- ) ++ assemblySettings ++ extraAssemblySettings
+ )
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 1b0f879b94..783b40d4f5 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -6,9 +6,9 @@ resolvers += "Spray Repository" at "http://repo.spray.cc/"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
-addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
-addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1")
// For Sonatype publishing
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 99f5967a8e..1e9b3bb5c0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -31,6 +31,7 @@ from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
read_from_pickle_file
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
+from pyspark.statcounter import StatCounter
from py4j.java_collections import ListConverter, MapConverter
@@ -357,6 +358,63 @@ class RDD(object):
3
"""
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
+
+ def stats(self):
+ """
+ Return a L{StatCounter} object that captures the mean, variance
+ and count of the RDD's elements in one operation.
+ """
+ def redFunc(left_counter, right_counter):
+ return left_counter.mergeStats(right_counter)
+
+ return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
+
+ def mean(self):
+ """
+ Compute the mean of this RDD's elements.
+
+ >>> sc.parallelize([1, 2, 3]).mean()
+ 2.0
+ """
+ return self.stats().mean()
+
+ def variance(self):
+ """
+ Compute the variance of this RDD's elements.
+
+ >>> sc.parallelize([1, 2, 3]).variance()
+ 0.666...
+ """
+ return self.stats().variance()
+
+ def stdev(self):
+ """
+ Compute the standard deviation of this RDD's elements.
+
+ >>> sc.parallelize([1, 2, 3]).stdev()
+ 0.816...
+ """
+ return self.stats().stdev()
+
+ def sampleStdev(self):
+ """
+ Compute the sample standard deviation of this RDD's elements (which corrects for bias in
+ estimating the standard deviation by dividing by N-1 instead of N).
+
+ >>> sc.parallelize([1, 2, 3]).sampleStdev()
+ 1.0
+ """
+ return self.stats().sampleStdev()
+
+ def sampleVariance(self):
+ """
+ Compute the sample variance of this RDD's elements (which corrects for bias in
+ estimating the variance by dividing by N-1 instead of N).
+
+ >>> sc.parallelize([1, 2, 3]).sampleVariance()
+ 1.0
+ """
+ return self.stats().sampleVariance()
def countByValue(self):
"""
@@ -777,7 +835,7 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- (failure_count, test_count) = doctest.testmod(globs=globs)
+ (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py
new file mode 100644
index 0000000000..8e1cbd4ad9
--- /dev/null
+++ b/python/pyspark/statcounter.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is ported from spark/util/StatCounter.scala
+
+import copy
+import math
+
+class StatCounter(object):
+
+ def __init__(self, values=[]):
+ self.n = 0L # Running count of our values
+ self.mu = 0.0 # Running mean of our values
+ self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
+
+ for v in values:
+ self.merge(v)
+
+ # Add a value into this StatCounter, updating the internal statistics.
+ def merge(self, value):
+ delta = value - self.mu
+ self.n += 1
+ self.mu += delta / self.n
+ self.m2 += delta * (value - self.mu)
+ return self
+
+ # Merge another StatCounter into this one, adding up the internal statistics.
+ def mergeStats(self, other):
+ if not isinstance(other, StatCounter):
+ raise Exception("Can only merge Statcounters!")
+
+ if other is self: # reference equality holds
+ self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order
+ else:
+ if self.n == 0:
+ self.mu = other.mu
+ self.m2 = other.m2
+ self.n = other.n
+ elif other.n != 0:
+ delta = other.mu - self.mu
+ if other.n * 10 < self.n:
+ self.mu = self.mu + (delta * other.n) / (self.n + other.n)
+ elif self.n * 10 < other.n:
+ self.mu = other.mu - (delta * self.n) / (self.n + other.n)
+ else:
+ self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)
+
+ self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
+ self.n += other.n
+ return self
+
+ # Clone this StatCounter
+ def copy(self):
+ return copy.deepcopy(self)
+
+ def count(self):
+ return self.n
+
+ def mean(self):
+ return self.mu
+
+ def sum(self):
+ return self.n * self.mu
+
+ # Return the variance of the values.
+ def variance(self):
+ if self.n == 0:
+ return float('nan')
+ else:
+ return self.m2 / self.n
+
+ #
+ # Return the sample variance, which corrects for bias in estimating the variance by dividing
+ # by N-1 instead of N.
+ #
+ def sampleVariance(self):
+ if self.n <= 1:
+ return float('nan')
+ else:
+ return self.m2 / (self.n - 1)
+
+ # Return the standard deviation of the values.
+ def stdev(self):
+ return math.sqrt(self.variance())
+
+ #
+ # Return the sample standard deviation of the values, which corrects for bias in estimating the
+ # variance by dividing by N-1 instead of N.
+ #
+ def sampleStdev(self):
+ return math.sqrt(self.sampleVariance())
+
+ def __repr__(self):
+ return "(count: %s, mean: %s, stdev: %s)" % (self.count(), self.mean(), self.stdev())
+
diff --git a/repl/pom.xml b/repl/pom.xml
index 5bc9a99c5c..f800664cff 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -49,6 +49,12 @@
<scope>runtime</scope>
</dependency>
<dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-mllib</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
diff --git a/sbt/sbt b/sbt/sbt
index 397895276c..a79f60d087 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -25,4 +25,4 @@ fi
export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ffd656227d..62c95b573a 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -183,6 +183,7 @@ class StreamingContext private (
/**
* Create an input stream with any arbitrary user implemented network receiver.
+ * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of NetworkReceiver
*/
def networkStream[T: ClassManifest](
@@ -195,6 +196,7 @@ class StreamingContext private (
/**
* Create an input stream with any arbitrary user implemented actor receiver.
+ * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 344b41c4d0..1db0a69a2f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -145,8 +145,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
}
/**
- * Stops the receiver and reports to exception to the tracker.
- * This should be called whenever an exception has happened on any thread
+ * Stops the receiver and reports exception to the tracker.
+ * This should be called whenever an exception is to be handled on any thread
* of the receiver.
*/
protected def stopOnError(e: Exception) {
@@ -202,7 +202,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
}
/**
- * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
+ * Batches objects created by a [[spark.streaming.dstream.NetworkReceiver]] and puts them into
* appropriately named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index 2d9937eab8..abeeff11b9 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -45,6 +45,8 @@ object ReceiverSupervisorStrategy {
* A receiver trait to be mixed in with your Actor to gain access to
* pushBlock API.
*
+ * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
+ *
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {