aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--core/src/main/scala/spark/RDD.scala21
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala23
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala27
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala20
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala13
-rw-r--r--core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java20
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala13
-rw-r--r--core/src/main/scala/spark/network/Connection.scala24
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala17
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala15
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala108
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala26
-rw-r--r--core/src/main/scala/spark/util/Vector.scala10
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala21
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala26
-rwxr-xr-xrun20
18 files changed, 361 insertions, 47 deletions
diff --git a/.gitignore b/.gitignore
index 88d7b56181..155e785b01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,5 @@ log/
spark-tests.log
streaming-tests.log
dependency-reduced-pom.xml
+.ensime
+.ensime_lucene
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index da82dfd10f..9e8eaee756 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -30,6 +30,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
@@ -394,6 +395,26 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: RDD[T]): RDD[T] =
+ subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+ subtract(other, new HashPartitioner(numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p)
+
+ /**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d39767c3b3..f40bb7935f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -439,7 +439,7 @@ class SparkContext(
}
/**
- * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
+ * Broadcast a read-only variable to the cluster, returning a [[spark.broadcast.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index da3cb2cd31..ba00b6a844 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -6,8 +6,8 @@ import spark.api.java.function.{Function => JFunction}
import spark.util.StatCounter
import spark.partial.{BoundedDouble, PartialResult}
import spark.storage.StorageLevel
-
import java.lang.Double
+import spark.Partitioner
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
@@ -58,6 +58,27 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, p))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index df3af3817d..cfbdda88c0 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -59,7 +59,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
- def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
+ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
/**
@@ -102,7 +102,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
- def combineByKey[C](createCombiner: Function[V, C],
+ def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
@@ -182,6 +182,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other, p))
+
+ /**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
@@ -309,7 +330,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
- def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = {
+ def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
fromRDD(rdd.mapValues(f))
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3ccd6f055e..3016888898 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -55,6 +55,26 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] =
+ wrapRDD(rdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
+ wrapRDD(rdd.subtract(other, p))
+
}
object JavaRDD {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 90b45cf875..d884529d7a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import com.google.common.base.Optional
-trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
+trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T]
@@ -82,12 +82,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
}
/**
- * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
*/
- private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -110,8 +111,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]):
- JavaPairRDD[K, V] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
diff --git a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
deleted file mode 100644
index 68b6fd6622..0000000000
--- a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package spark.api.java;
-
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDDLike;
-import spark.api.java.function.PairFlatMapFunction;
-
-import java.io.Serializable;
-
-/**
- * Workaround for SPARK-668.
- */
-class PairFlatMapWorkaround<T> implements Serializable {
- /**
- * Return a new RDD by first applying a function to all elements of this
- * RDD, and then flattening the results.
- */
- public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
- return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
- }
-}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 1cd68a2aa6..b7f167425f 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
+ var firstApp: Option[ApplicationInfo] = None
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
@@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ .filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
- for (worker <- workers if worker.coresFree > 0) {
+ for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
@@ -245,6 +247,13 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
idToApp(app.id) = app
actorToApp(driver) = app
addressToApp(driver.path.address) = app
+ if (firstApp == None) {
+ firstApp = Some(app)
+ }
+ val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
+ if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+ logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
+ }
return app
}
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index cd5b7d57f3..d1451bc212 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
outbox.synchronized {
outbox.addMessage(message)
if (channel.isConnected) {
- changeConnectionKeyInterest(SelectionKey.OP_WRITE)
+ changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
}
}
}
@@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
def finishConnect() {
try {
channel.finishConnect
- changeConnectionKeyInterest(SelectionKey.OP_WRITE)
+ changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
} catch {
case e: Exception => {
@@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
currentBuffers ++= chunk.buffers
}
case None => {
- changeConnectionKeyInterest(0)
- /*key.interestOps(0)*/
+ changeConnectionKeyInterest(SelectionKey.OP_READ)
return
}
}
@@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
}
}
}
+
+ override def read() {
+ // We don't expect the other side to send anything; so, we just read to detect an error or EOF.
+ try {
+ val length = channel.read(ByteBuffer.allocate(1))
+ if (length == -1) { // EOF
+ close()
+ } else if (length > 0) {
+ logWarning("Unexpected data read from SendingConnection to " + remoteConnectionManagerId)
+ }
+ } catch {
+ case e: Exception =>
+ logError("Exception while reading SendingConnection to " + remoteConnectionManagerId, e)
+ callOnExceptionCallback(e)
+ close()
+ }
+ }
}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 8139a2a40c..78097502bc 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
/**
@@ -42,7 +42,7 @@ class HadoopRDD[K, V](
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
- extends RDD[(K, V)](sc, Nil) {
+ extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -71,7 +71,7 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => reader.close())
+ context.addOnCompleteCallback{ () => close() }
val key: K = reader.createKey()
val value: V = reader.createValue()
@@ -88,9 +88,6 @@ class HadoopRDD[K, V](
}
gotNext = true
}
- if (finished) {
- reader.close()
- }
!finished
}
@@ -104,6 +101,14 @@ class HadoopRDD[K, V](
gotNext = false
(key, value)
}
+
+ private def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
}
override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index ebd4c3f0e2..df2361025c 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
private[spark]
@@ -26,7 +26,8 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
- with HadoopMapReduceUtil {
+ with HadoopMapReduceUtil
+ with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -61,7 +62,7 @@ class NewHadoopRDD[K, V](
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => reader.close())
+ context.addOnCompleteCallback(() => close())
var havePair = false
var finished = false
@@ -81,6 +82,14 @@ class NewHadoopRDD[K, V](
havePair = false
return (reader.getCurrentKey, reader.getCurrentValue)
}
+
+ private def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
}
override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
new file mode 100644
index 0000000000..daf9cc993c
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -0,0 +1,108 @@
+package spark.rdd
+
+import java.util.{HashSet => JHashSet}
+import scala.collection.JavaConversions._
+import spark.RDD
+import spark.Partitioner
+import spark.Dependency
+import spark.TaskContext
+import spark.Partition
+import spark.SparkEnv
+import spark.ShuffleDependency
+import spark.OneToOneDependency
+
+/**
+ * An optimized version of cogroup for set difference/subtraction.
+ *
+ * It is possible to implement this operation with just `cogroup`, but
+ * that is less efficient because all of the entries from `rdd2`, for
+ * both matching and non-matching values in `rdd1`, are kept in the
+ * JHashMap until the end.
+ *
+ * With this implementation, only the entries from `rdd1` are kept in-memory,
+ * and the entries from `rdd2` are essentially streamed, as we only need to
+ * touch each once to decide if the value needs to be removed.
+ *
+ * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
+ * you can use `rdd1`'s partitioner/partition size and not worry about running
+ * out of memory because of the size of `rdd2`.
+ */
+private[spark] class SubtractedRDD[T: ClassManifest](
+ @transient var rdd1: RDD[T],
+ @transient var rdd2: RDD[T],
+ part: Partitioner) extends RDD[T](rdd1.context, Nil) {
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(rdd1, rdd2).map { rdd =>
+ if (rdd.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + rdd)
+ new OneToOneDependency(rdd)
+ } else {
+ logInfo("Adding shuffle dependency with " + rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(i => {
+ val set = new JHashSet[T]()
+ while (i.hasNext) {
+ set.add(i.next)
+ }
+ set.iterator
+ }, true)
+ // ShuffleDependency requires a tuple (k, v), which it will partition by k.
+ // We need this to partition to map to the same place as the k for
+ // OneToOneDependency, which means:
+ // - for already-tupled RDD[(A, B)], into getPartition(a)
+ // - for non-tupled RDD[C], into getPartition(c)
+ val part2 = new Partitioner() {
+ def numPartitions = part.numPartitions
+ def getPartition(key: Any) = key match {
+ case (k, v) => part.getPartition(k)
+ case k => part.getPartition(k)
+ }
+ }
+ new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2)
+ }
+ }
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val array = new Array[Partition](part.numPartitions)
+ for (i <- 0 until array.size) {
+ // Each CoGroupPartition will depend on rdd1 and rdd2
+ array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
+ dependencies(j) match {
+ case s: ShuffleDependency[_, _] =>
+ new ShuffleCoGroupSplitDep(s.shuffleId)
+ case _ =>
+ new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+ }
+ }.toList)
+ }
+ array
+ }
+
+ override val partitioner = Some(part)
+
+ override def compute(p: Partition, context: TaskContext): Iterator[T] = {
+ val partition = p.asInstanceOf[CoGroupPartition]
+ val set = new JHashSet[T]
+ def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
+ for (k <- rdd.iterator(itsSplit, context))
+ op(k.asInstanceOf[T])
+ case ShuffleCoGroupSplitDep(shuffleId) =>
+ for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index))
+ op(k.asInstanceOf[T])
+ }
+ // the first dep is rdd1; add all keys to the set
+ integrate(partition.deps(0), set.add)
+ // the second dep is rdd2; remove all of its keys from the set
+ integrate(partition.deps(1), set.remove)
+ set.iterator
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ }
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1e4fbdb874..d9c2f9517b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
import spark.scheduler._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ // Threshold above which we warn user initial TaskSet may be starved
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+ var hasReceivedTask = false
+ var hasLaunchedTask = false
+ val starvationTimer = new Timer(true)
+
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
@@ -94,6 +101,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+ if (hasReceivedTask == false) {
+ starvationTimer.scheduleAtFixedRate(new TimerTask() {
+ override def run() {
+ if (!hasLaunchedTask) {
+ logWarning("Initial job has not accepted any resources; " +
+ "check your cluster UI to ensure that workers are registered")
+ } else {
+ this.cancel()
+ }
+ }
+ }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }
+ hasReceivedTask = true;
}
backend.reviveOffers()
}
@@ -150,6 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
+ if (tasks.size > 0) {
+ hasLaunchedTask = true
+ }
return tasks
}
}
@@ -235,7 +259,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
override def defaultParallelism() = backend.defaultParallelism()
-
+
// Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
var shouldRevive = false
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index 03559751bc..835822edb2 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -11,12 +11,16 @@ class Vector(val elements: Array[Double]) extends Serializable {
return Vector(length, i => this(i) + other(i))
}
+ def add(other: Vector) = this + other
+
def - (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
return Vector(length, i => this(i) - other(i))
}
+ def subtract(other: Vector) = this - other
+
def dot(other: Vector): Double = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
@@ -61,10 +65,16 @@ class Vector(val elements: Array[Double]) extends Serializable {
this
}
+ def addInPlace(other: Vector) = this +=other
+
def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
+ def multiply (d: Double) = this * d
+
def / (d: Double): Vector = this * (1 / d)
+ def divide (d: Double) = this / d
+
def unary_- = this * -1
def sum = elements.reduceLeft(_ + _)
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0e2585daa4..caa4ba3a37 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(grouped.collect.size === 1)
}
}
+
+ test("recover from node failures with replication") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ // Using more than two nodes so we don't have a symmetric communication pattern and might
+ // cache a partially correct list of peers.
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+
+ assert(data.count === 4)
+ assert(data.map(markNodeIfIdentity).collect.size === 4)
+ assert(data.map(failOnMarkedIdentity).collect.size === 4)
+
+ // Create a new replicated RDD to make sure that cached peer information doesn't cause
+ // problems.
+ val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data2.count === 2)
+ }
+ }
}
object DistributedSuite {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 92c3f67416..77e0eab829 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -234,6 +234,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
}
+
+ test("subtract") {
+ sc = new SparkContext("local", "test")
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ sc = new SparkContext("local", "test")
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList)
+ // more splits/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ assert(c.partitioner.get === p)
+ }
}
object ShuffleSuite {
diff --git a/run b/run
index 2d8a737e01..6b2d84d48d 100755
--- a/run
+++ b/run
@@ -25,6 +25,26 @@ if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker"
SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
fi
+
+# Add java opts for master, worker, executor. The opts maybe null
+case "$1" in
+ 'spark.deploy.master.Master')
+ SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS"
+ ;;
+ 'spark.deploy.worker.Worker')
+ SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS"
+ ;;
+ 'spark.executor.StandaloneExecutorBackend')
+ SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS"
+ ;;
+ 'spark.executor.MesosExecutorBackend')
+ SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS"
+ ;;
+ 'spark.repl.Main')
+ SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS"
+ ;;
+esac
+
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
if [ `command -v scala` ]; then
RUNNER="scala"