aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAntonio <antonio@interpals.net>2012-02-13 00:07:39 -0800
committerAntonio <antonio@interpals.net>2012-02-13 00:07:39 -0800
commit620798161be67fe0aefbd750211d20e9bbc9daf2 (patch)
tree9be257871f544e13fee20e80201d38d0839238c7 /core
parente93f6226658cc18fd29995e72f73c0d9246682d3 (diff)
downloadspark-620798161be67fe0aefbd750211d20e9bbc9daf2.tar.gz
spark-620798161be67fe0aefbd750211d20e9bbc9daf2.tar.bz2
spark-620798161be67fe0aefbd750211d20e9bbc9daf2.zip
Added fixes to sorting
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/spark/Partitioner.scala21
2 files changed, 9 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 9d4954670c..295fe81ce6 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -377,7 +377,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
override val partitioner = prev.partitioner
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = {
- prev.iterator(split).toList
+ prev.iterator(split).toArray
.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
}
}
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index ceb9fee956..d1e65edcda 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -36,27 +36,22 @@ class RangePartitioner[K <% Ordered[K],V](partitions: Int, rdd: RDD[(K,V)], asce
val rddSample = rdd.sample(true, frac, 1).collect.toList
.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1)
.map(_._1)
- val bucketSize:Float = rddSample.size / partitions
+ val bucketSize = rddSample.size / partitions
val rangeBounds = rddSample.zipWithIndex.filter(_._2 % bucketSize == 0)
.map(_._1).slice(1, partitions)
def getPartition(key: Any): Int = {
- key match {
- case k:K => {
- val p =
- rangeBounds.zipWithIndex.foldLeft(0) {
- case (part, (bound, index)) =>
- if (k > bound) index + 1 else part
- }
- if (ascending) p else numPartitions-1-p
+ val k = key.asInstanceOf[K]
+ val p = rangeBounds.zipWithIndex.foldLeft(0) {
+ case (part, (bound, index)) =>
+ if (k > bound) index + 1 else part
}
- case _ => 0
- }
+ if (ascending) p else numPartitions-1-p
}
override def equals(other: Any): Boolean = other match {
- case r: RangePartitioner[K,V] =>
- r.numPartitions == numPartitions
+ case r: RangePartitioner[_,_] =>
+ r.numPartitions == numPartitions & r.rangeBounds == rangeBounds
case _ => false
}
}