aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-29 20:46:03 -0700
committerReynold Xin <rxin@databricks.com>2015-07-29 20:46:03 -0700
commit9514d874f0cf61f1eb4ec4f5f66e053119f769c9 (patch)
treeed692e94ed59d559b4b2b16ca8b88045021bbd6b
parenta200e64561c8803731578267df16906f6773cbea (diff)
downloadspark-9514d874f0cf61f1eb4ec4f5f66e053119f769c9.tar.gz
spark-9514d874f0cf61f1eb4ec4f5f66e053119f769c9.tar.bz2
spark-9514d874f0cf61f1eb4ec4f5f66e053119f769c9.zip
[SPARK-9458] Avoid object allocation in prefix generation.
In our existing sort prefix generation code, we use expression's eval method to generate the prefix, which results in object allocation for every prefix. We can use the specialized getters available on InternalRow directly to avoid the object allocation. I also removed the FLOAT prefix, opting for converting float directly to double. Author: Reynold Xin <rxin@databricks.com> Closes #7763 from rxin/sort-prefix and squashes the following commits: 5dc2f06 [Reynold Xin] [SPARK-9458] Avoid object allocation in prefix generation.
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java16
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala12
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala10
8 files changed, 35 insertions, 67 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index bf1bc5dffb..5624e067da 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -31,7 +31,6 @@ public class PrefixComparators {
public static final StringPrefixComparator STRING = new StringPrefixComparator();
public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
- public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
public static final class StringPrefixComparator extends PrefixComparator {
@@ -78,21 +77,6 @@ public class PrefixComparators {
public final long NULL_PREFIX = Long.MIN_VALUE;
}
- public static final class FloatPrefixComparator extends PrefixComparator {
- @Override
- public int compare(long aPrefix, long bPrefix) {
- float a = Float.intBitsToFloat((int) aPrefix);
- float b = Float.intBitsToFloat((int) bPrefix);
- return Utils.nanSafeCompareFloats(a, b);
- }
-
- public long computePrefix(float value) {
- return Float.floatToIntBits(value) & 0xffffffffL;
- }
-
- public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
- }
-
public static final class DoublePrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index dc03e374b5..28fe925945 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -48,18 +48,6 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) }
}
- test("float prefix comparator handles NaN properly") {
- val nan1: Float = java.lang.Float.intBitsToFloat(0x7f800001)
- val nan2: Float = java.lang.Float.intBitsToFloat(0x7fffffff)
- assert(nan1.isNaN)
- assert(nan2.isNaN)
- val nan1Prefix = PrefixComparators.FLOAT.computePrefix(nan1)
- val nan2Prefix = PrefixComparators.FLOAT.computePrefix(nan2)
- assert(nan1Prefix === nan2Prefix)
- val floatMaxPrefix = PrefixComparators.FLOAT.computePrefix(Float.MaxValue)
- assert(PrefixComparators.FLOAT.compare(nan1Prefix, floatMaxPrefix) === 1)
- }
-
test("double prefix comparator handles NaNs properly") {
val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L)
val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 4c3f2c6557..8342833246 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -121,7 +121,7 @@ final class UnsafeExternalRowSorter {
// here in order to prevent memory leaks.
cleanupResources();
}
- return new AbstractScalaRowIterator() {
+ return new AbstractScalaRowIterator<InternalRow>() {
private final int numFields = schema.length();
private UnsafeRow row = new UnsafeRow();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 2dee3542d6..050d27f146 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, SortOrder}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
@@ -39,57 +39,54 @@ object SortPrefixUtils {
sortOrder.dataType match {
case StringType => PrefixComparators.STRING
case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL
- case FloatType => PrefixComparators.FLOAT
- case DoubleType => PrefixComparators.DOUBLE
+ case FloatType | DoubleType => PrefixComparators.DOUBLE
case _ => NoOpPrefixComparator
}
}
def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
+ val bound = sortOrder.child.asInstanceOf[BoundReference]
+ val pos = bound.ordinal
sortOrder.dataType match {
- case StringType => (row: InternalRow) => {
- PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
- }
+ case StringType =>
+ (row: InternalRow) => {
+ PrefixComparators.STRING.computePrefix(row.getUTF8String(pos))
+ }
case BooleanType =>
(row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
- else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1
+ if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else if (row.getBoolean(pos)) 1
else 0
}
case ByteType =>
(row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
- else sortOrder.child.eval(row).asInstanceOf[Byte]
+ if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getByte(pos)
}
case ShortType =>
(row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
- else sortOrder.child.eval(row).asInstanceOf[Short]
+ if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getShort(pos)
}
case IntegerType =>
(row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
- else sortOrder.child.eval(row).asInstanceOf[Int]
+ if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getInt(pos)
}
case LongType =>
(row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
- else sortOrder.child.eval(row).asInstanceOf[Long]
+ if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getLong(pos)
}
case FloatType => (row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX
- else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float])
+ if (row.isNullAt(pos)) {
+ PrefixComparators.DOUBLE.NULL_PREFIX
+ } else {
+ PrefixComparators.DOUBLE.computePrefix(row.getFloat(pos).toDouble)
+ }
}
case DoubleType => (row: InternalRow) => {
- val exprVal = sortOrder.child.eval(row)
- if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX
- else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double])
+ if (row.isNullAt(pos)) {
+ PrefixComparators.DOUBLE.NULL_PREFIX
+ } else {
+ PrefixComparators.DOUBLE.computePrefix(row.getDouble(pos))
+ }
}
case _ => (row: InternalRow) => 0L
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f3ef066528..4ab2c41f1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
- UnsafeExternalSort.supportsSchema(child.schema)) {
- execution.UnsafeExternalSort(sortExprs, global, child)
+ TungstenSort.supportsSchema(child.schema)) {
+ execution.TungstenSort(sortExprs, global, child)
} else if (sqlContext.conf.externalSortEnabled) {
execution.ExternalSort(sortExprs, global, child)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index f82208868c..d0ad310062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -97,7 +97,7 @@ case class ExternalSort(
* @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
* spill every `frequency` records.
*/
-case class UnsafeExternalSort(
+case class TungstenSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
@@ -110,7 +110,6 @@ case class UnsafeExternalSort(
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
- assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
val ordering = newOrdering(sortOrder, child.output)
val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
@@ -149,7 +148,7 @@ case class UnsafeExternalSort(
}
@DeveloperApi
-object UnsafeExternalSort {
+object TungstenSort {
/**
* Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 7b75f75591..c458f95ca1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -31,7 +31,7 @@ class RowFormatConvertersSuite extends SparkPlanTest {
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(!outputsSafe.outputsUnsafeRows)
- private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
+ private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(outputsUnsafe.outputsUnsafeRows)
test("planner should insert unsafe->safe conversions when required") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 7a4baa9e4a..9cabc4b90b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -42,7 +42,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
+ (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
@@ -53,7 +53,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
try {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
+ (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
@@ -68,7 +68,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
val stringLength = 1024 * 1024 * 2
checkThatPlansAgree(
Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
- UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+ TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
Sort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
@@ -88,11 +88,11 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
StructType(StructField("a", dataType, nullable = true) :: Nil)
)
- assert(UnsafeExternalSort.supportsSchema(inputDf.schema))
+ assert(TungstenSort.supportsSchema(inputDf.schema))
checkThatPlansAgree(
inputDf,
plan => ConvertToSafe(
- UnsafeExternalSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
+ TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
Sort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)