aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala6
2 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 643541429f..20c31714ae 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -41,9 +41,9 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
Array()
} else {
val rddSize = rdd.count()
- val maxSampleSize = partitions * 10.0
+ val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
- val rddSample = rdd.sample(true, frac, 1).map(_._1).collect().sortWith(_ < _)
+ val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
if (rddSample.length == 0) {
Array()
} else {
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index a7346060b3..be75890a40 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -71,7 +71,11 @@ class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
val buf = new ArrayBuffer[(K, V)]
def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
}
}