aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-04-21 16:48:51 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-21 16:48:51 -0700
commite2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5 (patch)
tree4ba34de912caf16f572a42c9cc033aeb2bb9bb4a /sql
parent6d1e4c4a65541cbf78284005de1776dc49efa9f4 (diff)
downloadspark-e2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5.tar.gz
spark-e2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5.tar.bz2
spark-e2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5.zip
[SPARK-14724] Use radix sort for shuffles and sort operator when possible
## What changes were proposed in this pull request? Spark currently uses TimSort for all in-memory sorts, including sorts done for shuffle. One low-hanging fruit is to use radix sort when possible (e.g. sorting by integer keys). This PR adds a radix sort implementation to the unsafe sort package and switches shuffles and sorts to use it when possible. The current implementation does not have special support for null values, so we cannot radix-sort `LongType`. I will address this in a follow-up PR. ## How was this patch tested? Unit tests, enabling radix sort on existing tests. Microbenchmark results: ``` Running benchmark: radix sort 25000000 Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 3.13.0-44-generic Intel(R) Core(TM) i7-4600U CPU 2.10GHz radix sort 25000000: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- reference TimSort key prefix array 15546 / 15859 1.6 621.9 1.0X reference Arrays.sort 2416 / 2446 10.3 96.6 6.4X radix sort one byte 133 / 137 188.4 5.3 117.2X radix sort two bytes 255 / 258 98.2 10.2 61.1X radix sort eight bytes 991 / 997 25.2 39.6 15.7X radix sort key prefix array 1540 / 1563 16.2 61.6 10.1X ``` I also ran a mix of the supported TPCDS queries and compared TimSort vs RadixSort metrics. The overall benchmark ran ~10% faster with radix sort on. In the breakdown below, the radix-enabled sort phases averaged about 20x faster than TimSort, however sorting is only a small fraction of the overall runtime. About half of the TPCDS queries were able to take advantage of radix sort. ``` TPCDS on master: 2499s real time, 8185s executor - 1171s in TimSort, avg 267 MB/s (note the /s accounting is weird here since dataSize counts the record sizes too) TPCDS with radix enabled: 2294s real time, 7391s executor - 596s in TimSort, avg 254 MB/s - 26s in radix sort, avg 4.2 GB/s ``` cc davies rxin Author: Eric Liang <ekl@databricks.com> Closes #12490 from ericl/sort-benchmark.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala10
8 files changed, 64 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 7784345a7a..8d9906da7e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -59,7 +59,8 @@ public final class UnsafeExternalRowSorter {
Ordering<InternalRow> ordering,
PrefixComparator prefixComparator,
PrefixComputer prefixComputer,
- long pageSizeBytes) throws IOException {
+ long pageSizeBytes,
+ boolean canUseRadixSort) throws IOException {
this.schema = schema;
this.prefixComputer = prefixComputer;
final SparkEnv sparkEnv = SparkEnv.get();
@@ -72,7 +73,8 @@ public final class UnsafeExternalRowSorter {
new RowComparator(ordering, schema.length()),
prefixComparator,
/* initialSize */ 4096,
- pageSizeBytes
+ pageSizeBytes,
+ canUseRadixSort
);
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index e0c3b22a3c..42a8be6b1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -84,8 +84,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression {
case DateType | TimestampType =>
(Long.MinValue, s"(long) $input")
case FloatType | DoubleType =>
- (DoublePrefixComparator.computePrefix(Double.NegativeInfinity),
- s"$DoublePrefixCmp.computePrefix((double)$input)")
+ (0L, s"$DoublePrefixCmp.computePrefix((double)$input)")
case StringType => (0L, s"$input.getPrefix()")
case BinaryType => (0L, s"$BinaryPrefixCmp.computePrefix($input)")
case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 8132bba04c..b6499e35b5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,13 +85,15 @@ public final class UnsafeKVExternalSorter {
recordComparator,
prefixComparator,
/* initialSize */ 4096,
- pageSizeBytes);
+ pageSizeBytes,
+ keySchema.length() == 1 && SortPrefixUtils.canSortFullyWithPrefix(keySchema.apply(0)));
} else {
// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underline array for in-memory sorter (it's always large enough).
// Since we will not grow the array, it's fine to pass `null` as consumer.
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
- null, taskMemoryManager, recordComparator, prefixComparator, map.getArray());
+ null, taskMemoryManager, recordComparator, prefixComparator, map.getArray(),
+ false /* TODO(ekl) we can only radix sort if the BytesToBytes load factor is <= 0.5 */);
// We cannot use the destructive iterator here because we are reusing the existing memory
// pages in BytesToBytesMap to hold records during sorting.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 80255fafbe..6f9baa2d33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.unsafe.sort.RadixSort;
/**
* Performs (external) sorting.
@@ -48,7 +50,10 @@ case class Sort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+ private val enableRadixSort = sqlContext.conf.enableRadixSort
+
override private[sql] lazy val metrics = Map(
+ "sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
@@ -59,6 +64,9 @@ case class Sort(
val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
+ val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
+ SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
+
// The generator for prefix
val prefixProjection = UnsafeProjection.create(Seq(SortPrefix(boundSortExpression)))
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
@@ -69,7 +77,8 @@ case class Sort(
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
val sorter = new UnsafeExternalRowSorter(
- schema, ordering, prefixComparator, prefixComputer, pageSize)
+ schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
+
if (testSpillFrequency > 0) {
sorter.setTestSpillFrequency(testSpillFrequency)
}
@@ -139,11 +148,15 @@ case class Sort(
val dataSize = metricTerm(ctx, "dataSize")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
+ val startTime = ctx.freshName("startTime")
+ val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
| $addToSorter();
- | Long $spillSizeBefore = $metrics.memoryBytesSpilled();
+ | long $spillSizeBefore = $metrics.memoryBytesSpilled();
+ | long $startTime = System.nanoTime();
| $sortedIterator = $sorterVariable.sort();
+ | $sortTime.add(System.nanoTime() - $startTime);
| $dataSize.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 909f124d2c..1a5ff5fcec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -66,6 +66,32 @@ object SortPrefixUtils {
}
/**
+ * Returns whether the specified SortOrder can be satisfied with a radix sort on the prefix.
+ */
+ def canSortFullyWithPrefix(sortOrder: SortOrder): Boolean = {
+ sortOrder.dataType match {
+ // TODO(ekl) long-type is problematic because it's null prefix representation collides with
+ // the lowest possible long value. Handle this special case outside radix sort.
+ case LongType if sortOrder.nullable =>
+ false
+ case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType |
+ TimestampType | FloatType | DoubleType =>
+ true
+ case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
+ true
+ case _ =>
+ false
+ }
+ }
+
+ /**
+ * Returns whether the fully sorting on the specified key field is possible with radix sort.
+ */
+ def canSortFullyWithPrefix(field: StructField): Boolean = {
+ canSortFullyWithPrefix(SortOrder(BoundReference(0, field.dataType, field.nullable), Ascending))
+ }
+
+ /**
* Creates the prefix computer for the first field in the given schema, in ascending order.
*/
def createPrefixGenerator(schema: StructType): UnsafeExternalRowSorter.PrefixComputer = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 85ce388de0..a46d0e0ba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -344,7 +344,8 @@ case class Window(
null,
null,
1024,
- SparkEnv.get.memoryManager.pageSizeBytes)
+ SparkEnv.get.memoryManager.pageSizeBytes,
+ false)
rows.foreach { r =>
sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index edb4c5a16f..b1de52b5f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -46,7 +46,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
null,
null,
1024,
- SparkEnv.get.memoryManager.pageSizeBytes)
+ SparkEnv.get.memoryManager.pageSizeBytes,
+ false)
val partition = split.asInstanceOf[CartesianPartition]
for (y <- rdd2.iterator(partition.s2, context)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6e7c1bc133..a4e82d80f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -95,6 +95,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val RADIX_SORT_ENABLED = SQLConfigBuilder("spark.sql.sort.enableRadixSort")
+ .internal()
+ .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " +
+ "requires additional memory to be reserved up-front. The memory overhead may be " +
+ "significant when sorting very small rows (up to 50% more in this case).")
+ .booleanConf
+ .createWithDefault(true)
+
val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
@@ -584,6 +592,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
+ def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
+
def defaultSizeInBytes: Long =
getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L)