aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala6
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala18
3 files changed, 16 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 643541429f..20c31714ae 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -41,9 +41,9 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
Array()
} else {
val rddSize = rdd.count()
- val maxSampleSize = partitions * 10.0
+ val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
- val rddSample = rdd.sample(true, frac, 1).map(_._1).collect().sortWith(_ < _)
+ val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
if (rddSample.length == 0) {
Array()
} else {
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index a7346060b3..be75890a40 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -71,7 +71,11 @@ class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
val buf = new ArrayBuffer[(K, V)]
def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
}
}
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index 8fa1442a4d..188a9b564e 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -58,11 +58,11 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
val sorted = sc.parallelize(pairArr, 4).sortByKey()
assert(sorted.collect() === pairArr.sortBy(_._1))
val partitions = sorted.collectPartitions()
- logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
- partitions(0).length should be > 200
- partitions(1).length should be > 200
- partitions(2).length should be > 200
- partitions(3).length should be > 200
+ logInfo("Partition lengths: " + partitions.map(_.length).mkString(", "))
+ partitions(0).length should be > 180
+ partitions(1).length should be > 180
+ partitions(2).length should be > 180
+ partitions(3).length should be > 180
partitions(0).last should be < partitions(1).head
partitions(1).last should be < partitions(2).head
partitions(2).last should be < partitions(3).head
@@ -75,10 +75,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
val partitions = sorted.collectPartitions()
logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
- partitions(0).length should be > 200
- partitions(1).length should be > 200
- partitions(2).length should be > 200
- partitions(3).length should be > 200
+ partitions(0).length should be > 180
+ partitions(1).length should be > 180
+ partitions(2).length should be > 180
+ partitions(3).length should be > 180
partitions(0).last should be > partitions(1).head
partitions(1).last should be > partitions(2).head
partitions(2).last should be > partitions(3).head