From 63fe22558791e6a511eb1f48efb88e2afdf77659 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Wed, 13 Mar 2013 17:17:34 -0500 Subject: Simplify SubtractedRDD in preparation from subtractByKey. --- core/src/main/scala/spark/PairRDDFunctions.scala | 2 + core/src/main/scala/spark/RDD.scala | 24 ++++++- core/src/main/scala/spark/rdd/SubtractedRDD.scala | 78 ++++++++++------------- 3 files changed, 58 insertions(+), 46 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index e7408e4352..1bd1741a71 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -639,6 +639,8 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( } }, true) } + + // def subtractByKey(other: RDD[K]): RDD[(K,V)] = subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) } private[spark] diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 584efa8adf..3451136fd4 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -408,8 +408,24 @@ abstract class RDD[T: ClassManifest]( * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - def subtract(other: RDD[T]): RDD[T] = - subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) + def subtract(other: RDD[T]): RDD[T] = { + // If we do have a partitioner, our T is really (K, V), and we'll need to + // unwrap the (T, null) that subtract does to get back to the K + val rdd = subtract(other, partitioner match { + case None => new HashPartitioner(partitions.size) + case Some(p) => new Partitioner() { + override def numPartitions = p.numPartitions + override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1) + } + }) + // Hacky, but if we did have a partitioner, we can keep using it + new RDD[T](rdd) { + override def getPartitions = rdd.partitions + override def getDependencies = rdd.dependencies + override def compute(split: Partition, context: TaskContext) = rdd.compute(split, context) + override val partitioner = RDD.this.partitioner + } + } /** * Return an RDD with the elements from `this` that are not in `other`. @@ -420,7 +436,9 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD with the elements from `this` that are not in `other`. */ - def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p) + def subtract(other: RDD[T], p: Partitioner): RDD[T] = { + new SubtractedRDD[T, Any](this.map((_, null)), other.map((_, null)), p).keys + } /** * Reduces the elements of this RDD using the specified commutative and associative binary operator. diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 43ec90cac5..1bc84f7e1e 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -1,7 +1,8 @@ package spark.rdd -import java.util.{HashSet => JHashSet} +import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer import spark.RDD import spark.Partitioner import spark.Dependency @@ -27,39 +28,20 @@ import spark.OneToOneDependency * you can use `rdd1`'s partitioner/partition size and not worry about running * out of memory because of the size of `rdd2`. */ -private[spark] class SubtractedRDD[T: ClassManifest]( - @transient var rdd1: RDD[T], - @transient var rdd2: RDD[T], - part: Partitioner) extends RDD[T](rdd1.context, Nil) { +private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest]( + @transient var rdd1: RDD[(K, V)], + @transient var rdd2: RDD[(K, V)], + part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) { override def getDependencies: Seq[Dependency[_]] = { Seq(rdd1, rdd2).map { rdd => - if (rdd.partitioner == Some(part)) { - logInfo("Adding one-to-one dependency with " + rdd) - new OneToOneDependency(rdd) - } else { - logInfo("Adding shuffle dependency with " + rdd) - val mapSideCombinedRDD = rdd.mapPartitions(i => { - val set = new JHashSet[T]() - while (i.hasNext) { - set.add(i.next) - } - set.iterator - }, true) - // ShuffleDependency requires a tuple (k, v), which it will partition by k. - // We need this to partition to map to the same place as the k for - // OneToOneDependency, which means: - // - for already-tupled RDD[(A, B)], into getPartition(a) - // - for non-tupled RDD[C], into getPartition(c) - val part2 = new Partitioner() { - def numPartitions = part.numPartitions - def getPartition(key: Any) = key match { - case (k, v) => part.getPartition(k) - case k => part.getPartition(k) - } - } - new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2) - } + if (rdd.partitioner == Some(part)) { + logInfo("Adding one-to-one dependency with " + rdd) + new OneToOneDependency(rdd) + } else { + logInfo("Adding shuffle dependency with " + rdd) + new ShuffleDependency(rdd, part) + } } } @@ -81,22 +63,32 @@ private[spark] class SubtractedRDD[T: ClassManifest]( override val partitioner = Some(part) - override def compute(p: Partition, context: TaskContext): Iterator[T] = { + override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = { val partition = p.asInstanceOf[CoGroupPartition] - val set = new JHashSet[T] - def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match { + val map = new JHashMap[K, ArrayBuffer[V]] + def getSeq(k: K): ArrayBuffer[V] = { + val seq = map.get(k) + if (seq != null) { + seq + } else { + val seq = new ArrayBuffer[V]() + map.put(k, seq) + seq + } + } + def integrate(dep: CoGroupSplitDep, op: ((K, V)) => Unit) = dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => - for (k <- rdd.iterator(itsSplit, context)) - op(k.asInstanceOf[T]) + for (t <- rdd.iterator(itsSplit, context)) + op(t.asInstanceOf[(K, V)]) case ShuffleCoGroupSplitDep(shuffleId) => - for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index, context.taskMetrics)) - op(k.asInstanceOf[T]) + for (t <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index, context.taskMetrics)) + op(t.asInstanceOf[(K, V)]) } - // the first dep is rdd1; add all keys to the set - integrate(partition.deps(0), set.add) - // the second dep is rdd2; remove all of its keys from the set - integrate(partition.deps(1), set.remove) - set.iterator + // the first dep is rdd1; add all values to the map + integrate(partition.deps(0), t => getSeq(t._1) += t._2) + // the second dep is rdd2; remove all of its keys + integrate(partition.deps(1), t => map.remove(t._1) ) + map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten } override def clearDependencies() { -- cgit v1.2.3 From 4632c45af16dbdbf9e959fb9cac7f5f4a8d44357 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Thu, 14 Mar 2013 10:35:34 -0500 Subject: Finished subtractByKeys. --- core/src/main/scala/spark/PairRDDFunctions.scala | 19 ++++++++++-- core/src/main/scala/spark/RDD.scala | 36 +++++++++++------------ core/src/main/scala/spark/rdd/SubtractedRDD.scala | 2 +- core/src/test/scala/spark/ShuffleSuite.scala | 28 +++++++++++++++++- 4 files changed, 62 insertions(+), 23 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 1bd1741a71..47b9c6962f 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -440,6 +440,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } + /** + * Return an RDD with the pairs from `this` whose keys are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtractByKey(other: RDD[(K, V)]): RDD[(K, V)] = + subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size))) + + /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ + def subtractByKey(other: RDD[(K, V)], numPartitions: Int): RDD[(K, V)] = + subtractByKey(other, new HashPartitioner(numPartitions)) + + /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ + def subtractByKey(other: RDD[(K, V)], p: Partitioner): RDD[(K, V)] = + new SubtractedRDD[K, V](self, other, p) + /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. @@ -639,8 +656,6 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( } }, true) } - - // def subtractByKey(other: RDD[K]): RDD[(K,V)] = subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) } private[spark] diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 3451136fd4..9bd8a0f98d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -408,24 +408,8 @@ abstract class RDD[T: ClassManifest]( * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - def subtract(other: RDD[T]): RDD[T] = { - // If we do have a partitioner, our T is really (K, V), and we'll need to - // unwrap the (T, null) that subtract does to get back to the K - val rdd = subtract(other, partitioner match { - case None => new HashPartitioner(partitions.size) - case Some(p) => new Partitioner() { - override def numPartitions = p.numPartitions - override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1) - } - }) - // Hacky, but if we did have a partitioner, we can keep using it - new RDD[T](rdd) { - override def getPartitions = rdd.partitions - override def getDependencies = rdd.dependencies - override def compute(split: Partition, context: TaskContext) = rdd.compute(split, context) - override val partitioner = RDD.this.partitioner - } - } + def subtract(other: RDD[T]): RDD[T] = + subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) /** * Return an RDD with the elements from `this` that are not in `other`. @@ -437,7 +421,21 @@ abstract class RDD[T: ClassManifest]( * Return an RDD with the elements from `this` that are not in `other`. */ def subtract(other: RDD[T], p: Partitioner): RDD[T] = { - new SubtractedRDD[T, Any](this.map((_, null)), other.map((_, null)), p).keys + if (partitioner == Some(p)) { + // Our partitioner knows how to handle T (which, since we have a partitioner, is + // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples + val p2 = new Partitioner() { + override def numPartitions = p.numPartitions + override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1) + } + // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies + // anyway, and when calling .keys, will not have a partitioner set, even though + // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be + // partitioned by the right/real keys (e.g. p). + this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys + } else { + this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys + } } /** diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 1bc84f7e1e..90488f13cc 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -87,7 +87,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest]( // the first dep is rdd1; add all values to the map integrate(partition.deps(0), t => getSeq(t._1) += t._2) // the second dep is rdd2; remove all of its keys - integrate(partition.deps(1), t => map.remove(t._1) ) + integrate(partition.deps(1), t => map.remove(t._1)) map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 8411291b2c..731c45cca2 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -272,13 +272,39 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { } // partitionBy so we have a narrow dependency val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) // more partitions/no partitioner so a shuffle dependency val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) val c = a.subtract(b) assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) + val b = sc.parallelize(Array((2, "bb"), (3, "cc"), (4, "dd")), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + test("subtractByKey with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) assert(c.partitioner.get === p) } + } object ShuffleSuite { -- cgit v1.2.3 From 7d8bb4df3a5f8078cd4e86cef5e3b0b728afd2bc Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Thu, 14 Mar 2013 14:44:15 -0500 Subject: Allow subtractByKey's other argument to have a different value type. --- core/src/main/scala/spark/PairRDDFunctions.scala | 8 ++++---- core/src/main/scala/spark/rdd/SubtractedRDD.scala | 6 +++--- core/src/test/scala/spark/ShuffleSuite.scala | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 47b9c6962f..3d1b1ca268 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -446,16 +446,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - def subtractByKey(other: RDD[(K, V)]): RDD[(K, V)] = + def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] = subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size))) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - def subtractByKey(other: RDD[(K, V)], numPartitions: Int): RDD[(K, V)] = + def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = subtractByKey(other, new HashPartitioner(numPartitions)) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - def subtractByKey(other: RDD[(K, V)], p: Partitioner): RDD[(K, V)] = - new SubtractedRDD[K, V](self, other, p) + def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = + new SubtractedRDD[K, V, W](self, other, p) /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 90488f13cc..2f8ff9bb34 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -28,9 +28,9 @@ import spark.OneToOneDependency * you can use `rdd1`'s partitioner/partition size and not worry about running * out of memory because of the size of `rdd2`. */ -private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest]( +private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest]( @transient var rdd1: RDD[(K, V)], - @transient var rdd2: RDD[(K, V)], + @transient var rdd2: RDD[(K, W)], part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) { override def getDependencies: Seq[Dependency[_]] = { @@ -40,7 +40,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest]( new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - new ShuffleDependency(rdd, part) + new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part) } } } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 731c45cca2..2b2a90defa 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -283,7 +283,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { test("subtractByKey") { sc = new SparkContext("local", "test") val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) - val b = sc.parallelize(Array((2, "bb"), (3, "cc"), (4, "dd")), 4) + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) val c = a.subtractByKey(b) assert(c.collect().toSet === Set((1, "a"), (1, "a"))) assert(c.partitions.size === a.partitions.size) -- cgit v1.2.3 From 7786881f47699d8e463d911468ed9d43079948a8 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Thu, 14 Mar 2013 14:57:12 -0500 Subject: Fix tabs that snuck in. --- core/src/main/scala/spark/rdd/SubtractedRDD.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 2f8ff9bb34..0a02561062 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -35,13 +35,13 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM override def getDependencies: Seq[Dependency[_]] = { Seq(rdd1, rdd2).map { rdd => - if (rdd.partitioner == Some(part)) { - logInfo("Adding one-to-one dependency with " + rdd) - new OneToOneDependency(rdd) - } else { - logInfo("Adding shuffle dependency with " + rdd) - new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part) - } + if (rdd.partitioner == Some(part)) { + logInfo("Adding one-to-one dependency with " + rdd) + new OneToOneDependency(rdd) + } else { + logInfo("Adding shuffle dependency with " + rdd) + new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part) + } } } @@ -97,4 +97,4 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM rdd2 = null } -} \ No newline at end of file +} -- cgit v1.2.3