aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-16 13:38:42 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-16 13:38:42 -0600
commit924f47dd11dd9b44211372bc7d7960066e26f682 (patch)
tree7713e3aba3eb6b456a4a7e24463554a64eb43c07 /core/src/main
parentbeb7ab870858541d736033cc3c6fad4dad657aa3 (diff)
downloadspark-924f47dd11dd9b44211372bc7d7960066e26f682.tar.gz
spark-924f47dd11dd9b44211372bc7d7960066e26f682.tar.bz2
spark-924f47dd11dd9b44211372bc7d7960066e26f682.zip
Add RDD.subtract.
Instead of reusing the cogroup primitive, this adds a SubtractedRDD that knows it only needs to keep rdd1's values (per split) in memory.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala21
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala108
2 files changed, 129 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index f6e927a989..a4c51a0115 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -30,6 +30,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithSplitRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
@@ -384,6 +385,26 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/split size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: RDD[T]): RDD[T] =
+ subtract(other, partitioner.getOrElse(new HashPartitioner(splits.size)))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], numSplits: Int): RDD[T] =
+ subtract(other, new HashPartitioner(numSplits))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p)
+
+ /**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
new file mode 100644
index 0000000000..244874e4e0
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -0,0 +1,108 @@
+package spark.rdd
+
+import java.util.{HashSet => JHashSet}
+import scala.collection.JavaConversions._
+import spark.RDD
+import spark.Partitioner
+import spark.Dependency
+import spark.TaskContext
+import spark.Split
+import spark.SparkEnv
+import spark.ShuffleDependency
+import spark.OneToOneDependency
+
+/**
+ * An optimized version of cogroup for set difference/subtraction.
+ *
+ * It is possible to implement this operation with just `cogroup`, but
+ * that is less efficient because all of the entries from `rdd2`, for
+ * both matching and non-matching values in `rdd1`, are kept in the
+ * JHashMap until the end.
+ *
+ * With this implementation, only the entries from `rdd1` are kept in-memory,
+ * and the entries from `rdd2` are essentially streamed, as we only need to
+ * touch each once to decide if the value needs to be removed.
+ *
+ * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
+ * you can use `rdd1`'s partitioner/split size and not worry about running
+ * out of memory because of the size of `rdd2`.
+ */
+private[spark] class SubtractedRDD[T: ClassManifest](
+ @transient var rdd1: RDD[T],
+ @transient var rdd2: RDD[T],
+ part: Partitioner) extends RDD[T](rdd1.context, Nil) {
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(rdd1, rdd2).map { rdd =>
+ if (rdd.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + rdd)
+ new OneToOneDependency(rdd)
+ } else {
+ logInfo("Adding shuffle dependency with " + rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(i => {
+ val set = new JHashSet[T]()
+ while (i.hasNext) {
+ set.add(i.next)
+ }
+ set.iterator
+ }, true)
+ // ShuffleDependency requires a tuple (k, v), which it will partition by k.
+ // We need this to partition to map to the same place as the k for
+ // OneToOneDependency, which means:
+ // - for already-tupled RDD[(A, B)], into getPartition(a)
+ // - for non-tupled RDD[C], into getPartition(c)
+ val part2 = new Partitioner() {
+ def numPartitions = part.numPartitions
+ def getPartition(key: Any) = key match {
+ case (k, v) => part.getPartition(k)
+ case k => part.getPartition(k)
+ }
+ }
+ new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2)
+ }
+ }
+ }
+
+ override def getSplits: Array[Split] = {
+ val array = new Array[Split](part.numPartitions)
+ for (i <- 0 until array.size) {
+ // Each CoGroupSplit will dependend on rdd1 and rdd2
+ array(i) = new CoGroupSplit(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
+ dependencies(j) match {
+ case s: ShuffleDependency[_, _] =>
+ new ShuffleCoGroupSplitDep(s.shuffleId)
+ case _ =>
+ new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
+ }
+ }.toList)
+ }
+ array
+ }
+
+ override val partitioner = Some(part)
+
+ override def compute(s: Split, context: TaskContext): Iterator[T] = {
+ val split = s.asInstanceOf[CoGroupSplit]
+ val set = new JHashSet[T]
+ def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
+ for (k <- rdd.iterator(itsSplit, context))
+ op(k.asInstanceOf[T])
+ case ShuffleCoGroupSplitDep(shuffleId) =>
+ for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, split.index))
+ op(k.asInstanceOf[T])
+ }
+ // the first dep is rdd1; add all keys to the set
+ integrate(split.deps(0), set.add)
+ // the second dep is rdd2; remove all of its keys from the set
+ integrate(split.deps(1), set.remove)
+ set.iterator
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ }
+
+} \ No newline at end of file