aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-05-13 23:24:51 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-13 23:24:51 -0700
commit7bb9a521f35eb19576c6cc2da3fd385910270e46 (patch)
treec07844990d8d4e99eb25e89a38a12af9bdf9ac64 /core
parentc33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f (diff)
downloadspark-7bb9a521f35eb19576c6cc2da3fd385910270e46.tar.gz
spark-7bb9a521f35eb19576c6cc2da3fd385910270e46.tar.bz2
spark-7bb9a521f35eb19576c6cc2da3fd385910270e46.zip
Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"
This reverts commit 92cebada09a7e5a00ab48bcb350a9462949c33eb.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala61
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala34
2 files changed, 0 insertions, 95 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 6274796061..9155159cf6 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V](
false
}
}
-
-/**
- * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds
- * Default value is 1000. Once all partitions have bounds elements, the partitioner
- * allocates 1 element per partition so eventually the smaller partitions are at most
- * off by 1 key compared to the larger partitions.
- */
-class BoundaryPartitioner[K : Ordering : ClassTag, V](
- partitions: Int,
- @transient rdd: RDD[_ <: Product2[K,V]],
- private val boundary: Int = 1000)
- extends Partitioner {
-
- // this array keeps track of keys assigned to a partition
- // counts[0] refers to # of keys in partition 0 and so on
- private val counts: Array[Int] = {
- new Array[Int](numPartitions)
- }
-
- def numPartitions = math.abs(partitions)
-
- /*
- * Ideally, this should've been calculated based on # partitions and total keys
- * But we are not calling count on RDD here to avoid calling an action.
- * User has the flexibility of calling count and passing in any appropriate boundary
- */
- def keysPerPartition = boundary
-
- var currPartition = 0
-
- /*
- * Pick current partition for the key until we hit the bound for keys / partition,
- * start allocating to next partition at that time.
- *
- * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500
- * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3,
- * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2,
- * 1503 goes to P3 and so on.
- */
- def getPartition(key: Any): Int = {
- val partition = currPartition
- counts(partition) = counts(partition) + 1
- /*
- * Since we are filling up a partition before moving to next one (this helps in maintaining
- * order of keys, in certain cases, it is possible to end up with empty partitions, like
- * 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely
- * empty.
- */
- if(counts(currPartition) >= keysPerPartition)
- currPartition = (currPartition + 1) % numPartitions
- partition
- }
-
- override def equals(other: Any): Boolean = other match {
- case r: BoundaryPartitioner[_,_] =>
- (r.counts.sameElements(counts) && r.boundary == boundary
- && r.currPartition == currPartition)
- case _ =>
- false
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d40395803..7c30626a0c 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(descendingP4 != p4)
}
- test("BoundaryPartitioner equality") {
- // Make an RDD where all the elements are the same so that the partition range bounds
- // are deterministically all the same.
- val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))
-
- val p2 = new BoundaryPartitioner(2, rdd, 1000)
- val p4 = new BoundaryPartitioner(4, rdd, 1000)
- val anotherP4 = new BoundaryPartitioner(4, rdd)
-
- assert(p2 === p2)
- assert(p4 === p4)
- assert(p2 != p4)
- assert(p4 != p2)
- assert(p4 === anotherP4)
- assert(anotherP4 === p4)
- }
-
- test("BoundaryPartitioner getPartition") {
- val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
- val partitioner = new BoundaryPartitioner(4, rdd, 500)
- 1.to(2000).map { element => {
- val partition = partitioner.getPartition(element)
- if (element <= 500) {
- assert(partition === 0)
- } else if (element > 501 && element <= 1000) {
- assert(partition === 1)
- } else if (element > 1001 && element <= 1500) {
- assert(partition === 2)
- } else if (element > 1501 && element <= 2000) {
- assert(partition === 3)
- }
- }}
- }
-
test("RangePartitioner getPartition") {
val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
// We have different behaviour of getPartition for partitions with less than 1000 and more than