aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-29 01:12:44 -0700
committerReynold Xin <rxin@apache.org>2014-07-29 01:12:44 -0700
commit96ba04bbf917bcb971dd0d8cd1e1766dbe9366e8 (patch)
tree019b2d4a48ee41a40feb85aa8c11f2f374d6fefa /core
parent92ef02626e793ea853cced4cbfee316f0b748ed7 (diff)
downloadspark-96ba04bbf917bcb971dd0d8cd1e1766dbe9366e8.tar.gz
spark-96ba04bbf917bcb971dd0d8cd1e1766dbe9366e8.tar.bz2
spark-96ba04bbf917bcb971dd0d8cd1e1766dbe9366e8.zip
[SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.
The pull request includes two changes: 1. Removes SortOrder introduced by SPARK-2125. The key ordering already includes the SortOrder information since an Ordering can be reverse. This is similar to Java's Comparator interface. Rarely does an API accept both a Comparator as well as a SortOrder. 2. Replaces the sortWith call in HashShuffleReader with an in-place quick sort. Author: Reynold Xin <rxin@apache.org> Closes #1631 from rxin/sortOrder and squashes the following commits: c9d37e1 [Reynold Xin] [SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala25
4 files changed, 18 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index f010c03223..09a6057123 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -19,7 +19,6 @@ package org.apache.spark
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
@@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C](
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
- val mapSideCombine: Boolean = false,
- val sortOrder: Option[SortOrder] = None)
+ val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId()
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index afd7075f68..d85f962783 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V, P](self, part)
- .setKeyOrdering(ordering)
- .setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
+ .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
}
-
-private[spark] object SortOrder extends Enumeration {
- type SortOrder = Value
- val ASCENDING, DESCENDING = Value
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index da4a8c3dc2..bf02f68d0d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -21,7 +21,6 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -52,8 +51,6 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
private var mapSideCombine: Boolean = false
- private var sortOrder: Option[SortOrder] = None
-
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
this.serializer = Option(serializer)
@@ -78,15 +75,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
this
}
- /** Set sort order for RDD's sorting. */
- def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
- this.sortOrder = Option(sortOrder)
- this
- }
-
override def getDependencies: Seq[Dependency[_]] = {
- List(new ShuffleDependency(prev, part, serializer,
- keyOrdering, aggregator, mapSideCombine, sortOrder))
+ List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
override val partitioner = Some(part)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index 76cdb8f4f8..c8059496a1 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle.hash
import org.apache.spark.{InterruptibleIterator, TaskContext}
-import org.apache.spark.rdd.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
@@ -51,16 +50,22 @@ class HashShuffleReader[K, C](
iter
}
- val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
- val buf = aggregatedIter.toArray
- if (sortOrder == SortOrder.ASCENDING) {
- buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
- } else {
- buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
- }
+ // Sort the output if there is a sort ordering defined.
+ dep.keyOrdering match {
+ case Some(keyOrd: Ordering[K]) =>
+ // Define a Comparator for the whole record based on the key Ordering.
+ val cmp = new Ordering[Product2[K, C]] {
+ override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
+ keyOrd.compare(o1._1, o2._1)
+ }
+ }
+ val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
+ // TODO: do external sort.
+ scala.util.Sorting.quickSort(sortBuffer)(cmp)
+ sortBuffer.iterator
+ case None =>
+ aggregatedIter
}
-
- sortedIter.getOrElse(aggregatedIter)
}
/** Close this reader */