diff options
author | Antonio <antonio@interpals.net> | 2012-02-13 00:07:39 -0800 |
---|---|---|
committer | Antonio <antonio@interpals.net> | 2012-02-13 00:07:39 -0800 |
commit | 620798161be67fe0aefbd750211d20e9bbc9daf2 (patch) | |
tree | 9be257871f544e13fee20e80201d38d0839238c7 | |
parent | e93f6226658cc18fd29995e72f73c0d9246682d3 (diff) | |
download | spark-620798161be67fe0aefbd750211d20e9bbc9daf2.tar.gz spark-620798161be67fe0aefbd750211d20e9bbc9daf2.tar.bz2 spark-620798161be67fe0aefbd750211d20e9bbc9daf2.zip |
Added fixes to sorting
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/Partitioner.scala | 21 |
2 files changed, 9 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 9d4954670c..295fe81ce6 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -377,7 +377,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( override val partitioner = prev.partitioner override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = { - prev.iterator(split).toList + prev.iterator(split).toArray .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator } } diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index ceb9fee956..d1e65edcda 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -36,27 +36,22 @@ class RangePartitioner[K <% Ordered[K],V](partitions: Int, rdd: RDD[(K,V)], asce val rddSample = rdd.sample(true, frac, 1).collect.toList .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1) .map(_._1) - val bucketSize:Float = rddSample.size / partitions + val bucketSize = rddSample.size / partitions val rangeBounds = rddSample.zipWithIndex.filter(_._2 % bucketSize == 0) .map(_._1).slice(1, partitions) def getPartition(key: Any): Int = { - key match { - case k:K => { - val p = - rangeBounds.zipWithIndex.foldLeft(0) { - case (part, (bound, index)) => - if (k > bound) index + 1 else part - } - if (ascending) p else numPartitions-1-p + val k = key.asInstanceOf[K] + val p = rangeBounds.zipWithIndex.foldLeft(0) { + case (part, (bound, index)) => + if (k > bound) index + 1 else part } - case _ => 0 - } + if (ascending) p else numPartitions-1-p } override def equals(other: Any): Boolean = other match { - case r: RangePartitioner[K,V] => - r.numPartitions == numPartitions + case r: RangePartitioner[_,_] => + r.numPartitions == numPartitions & r.rangeBounds == rangeBounds case _ => false } } |