aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java7
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java216
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala97
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala253
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala104
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala21
11 files changed, 721 insertions, 123 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index edb7202245..4b99030d10 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -61,9 +61,10 @@ public final class UnsafeRow extends MutableRow {
/** A pool to hold non-primitive objects */
private ObjectPool pool;
- Object getBaseObject() { return baseObject; }
- long getBaseOffset() { return baseOffset; }
- ObjectPool getPool() { return pool; }
+ public Object getBaseObject() { return baseObject; }
+ public long getBaseOffset() { return baseOffset; }
+ public int getSizeInBytes() { return sizeInBytes; }
+ public ObjectPool getPool() { return pool; }
/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
private int numFields;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
new file mode 100644
index 0000000000..b94601cf6d
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution;
+
+import java.io.IOException;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.sql.AbstractScalaRowIterator;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.ObjectUnsafeColumnWriter;
+import org.apache.spark.sql.catalyst.expressions.UnsafeColumnWriter;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverter;
+import org.apache.spark.sql.catalyst.util.ObjectPool;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.PlatformDependent;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator;
+
+final class UnsafeExternalRowSorter {
+
+ /**
+ * If positive, forces records to be spilled to disk at the given frequency (measured in numbers
+ * of records). This is only intended to be used in tests.
+ */
+ private int testSpillFrequency = 0;
+
+ private long numRowsInserted = 0;
+
+ private final StructType schema;
+ private final UnsafeRowConverter rowConverter;
+ private final PrefixComputer prefixComputer;
+ private final UnsafeExternalSorter sorter;
+ private byte[] rowConversionBuffer = new byte[1024 * 8];
+
+ public static abstract class PrefixComputer {
+ abstract long computePrefix(InternalRow row);
+ }
+
+ public UnsafeExternalRowSorter(
+ StructType schema,
+ Ordering<InternalRow> ordering,
+ PrefixComparator prefixComparator,
+ PrefixComputer prefixComputer) throws IOException {
+ this.schema = schema;
+ this.rowConverter = new UnsafeRowConverter(schema);
+ this.prefixComputer = prefixComputer;
+ final SparkEnv sparkEnv = SparkEnv.get();
+ final TaskContext taskContext = TaskContext.get();
+ sorter = new UnsafeExternalSorter(
+ taskContext.taskMemoryManager(),
+ sparkEnv.shuffleMemoryManager(),
+ sparkEnv.blockManager(),
+ taskContext,
+ new RowComparator(ordering, schema.length(), null),
+ prefixComparator,
+ 4096,
+ sparkEnv.conf()
+ );
+ }
+
+ /**
+ * Forces spills to occur every `frequency` records. Only for use in tests.
+ */
+ @VisibleForTesting
+ void setTestSpillFrequency(int frequency) {
+ assert frequency > 0 : "Frequency must be positive";
+ testSpillFrequency = frequency;
+ }
+
+ @VisibleForTesting
+ void insertRow(InternalRow row) throws IOException {
+ final int sizeRequirement = rowConverter.getSizeRequirement(row);
+ if (sizeRequirement > rowConversionBuffer.length) {
+ rowConversionBuffer = new byte[sizeRequirement];
+ }
+ final int bytesWritten = rowConverter.writeRow(
+ row, rowConversionBuffer, PlatformDependent.BYTE_ARRAY_OFFSET, sizeRequirement, null);
+ assert (bytesWritten == sizeRequirement);
+ final long prefix = prefixComputer.computePrefix(row);
+ sorter.insertRecord(
+ rowConversionBuffer,
+ PlatformDependent.BYTE_ARRAY_OFFSET,
+ sizeRequirement,
+ prefix
+ );
+ numRowsInserted++;
+ if (testSpillFrequency > 0 && (numRowsInserted % testSpillFrequency) == 0) {
+ spill();
+ }
+ }
+
+ @VisibleForTesting
+ void spill() throws IOException {
+ sorter.spill();
+ }
+
+ private void cleanupResources() {
+ sorter.freeMemory();
+ }
+
+ @VisibleForTesting
+ Iterator<InternalRow> sort() throws IOException {
+ try {
+ final UnsafeSorterIterator sortedIterator = sorter.getSortedIterator();
+ if (!sortedIterator.hasNext()) {
+ // Since we won't ever call next() on an empty iterator, we need to clean up resources
+ // here in order to prevent memory leaks.
+ cleanupResources();
+ }
+ return new AbstractScalaRowIterator() {
+
+ private final int numFields = schema.length();
+ private final UnsafeRow row = new UnsafeRow();
+
+ @Override
+ public boolean hasNext() {
+ return sortedIterator.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ try {
+ sortedIterator.loadNext();
+ row.pointTo(
+ sortedIterator.getBaseObject(),
+ sortedIterator.getBaseOffset(),
+ numFields,
+ sortedIterator.getRecordLength(),
+ null);
+ if (!hasNext()) {
+ row.copy(); // so that we don't have dangling pointers to freed page
+ cleanupResources();
+ }
+ return row;
+ } catch (IOException e) {
+ cleanupResources();
+ // Scala iterators don't declare any checked exceptions, so we need to use this hack
+ // to re-throw the exception:
+ PlatformDependent.throwException(e);
+ }
+ throw new RuntimeException("Exception should have been re-thrown in next()");
+ };
+ };
+ } catch (IOException e) {
+ cleanupResources();
+ throw e;
+ }
+ }
+
+
+ public Iterator<InternalRow> sort(Iterator<InternalRow> inputIterator) throws IOException {
+ while (inputIterator.hasNext()) {
+ insertRow(inputIterator.next());
+ }
+ return sort();
+ }
+
+ /**
+ * Return true if UnsafeExternalRowSorter can sort rows with the given schema, false otherwise.
+ */
+ public static boolean supportsSchema(StructType schema) {
+ // TODO: add spilling note to explain why we do this for now:
+ for (StructField field : schema.fields()) {
+ if (UnsafeColumnWriter.forType(field.dataType()) instanceof ObjectUnsafeColumnWriter) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static final class RowComparator extends RecordComparator {
+ private final Ordering<InternalRow> ordering;
+ private final int numFields;
+ private final ObjectPool objPool;
+ private final UnsafeRow row1 = new UnsafeRow();
+ private final UnsafeRow row2 = new UnsafeRow();
+
+ public RowComparator(Ordering<InternalRow> ordering, int numFields, ObjectPool objPool) {
+ this.numFields = numFields;
+ this.ordering = ordering;
+ this.objPool = objPool;
+ }
+
+ @Override
+ public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
+ row1.pointTo(baseObj1, baseOff1, numFields, -1, objPool);
+ row2.pointTo(baseObj2, baseOff2, numFields, -1, objPool);
+ return ordering.compare(row1, row2);
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala
new file mode 100644
index 0000000000..cfefb13e77
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * Shim to allow us to implement [[scala.Iterator]] in Java. Scala 2.11+ has an AbstractIterator
+ * class for this, but that class is `private[scala]` in 2.10. We need to explicitly fix this to
+ * `Row` in order to work around a spurious IntelliJ compiler error.
+ */
+private[spark] abstract class AbstractScalaRowIterator extends Iterator[InternalRow]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 74d9334045..4b783e30d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -289,11 +289,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
}
val withSort = if (needSort) {
- if (sqlContext.conf.externalSortEnabled) {
- ExternalSort(rowOrdering, global = false, withShuffle)
- } else {
- Sort(rowOrdering, global = false, withShuffle)
- }
+ sqlContext.planner.BasicOperators.getSortOperator(
+ rowOrdering, global = false, withShuffle)
} else {
withShuffle
}
@@ -321,11 +318,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
case (UnspecifiedDistribution, Seq(), child) =>
child
case (UnspecifiedDistribution, rowOrdering, child) =>
- if (sqlContext.conf.externalSortEnabled) {
- ExternalSort(rowOrdering, global = false, child)
- } else {
- Sort(rowOrdering, global = false, child)
- }
+ sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
case (dist, ordering, _) =>
sys.error(s"Don't know how to ensure $dist with ordering $ordering")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
new file mode 100644
index 0000000000..2dee3542d6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
+
+
+object SortPrefixUtils {
+
+ /**
+ * A dummy prefix comparator which always claims that prefixes are equal. This is used in cases
+ * where we don't know how to generate or compare prefixes for a SortOrder.
+ */
+ private object NoOpPrefixComparator extends PrefixComparator {
+ override def compare(prefix1: Long, prefix2: Long): Int = 0
+ }
+
+ def getPrefixComparator(sortOrder: SortOrder): PrefixComparator = {
+ sortOrder.dataType match {
+ case StringType => PrefixComparators.STRING
+ case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL
+ case FloatType => PrefixComparators.FLOAT
+ case DoubleType => PrefixComparators.DOUBLE
+ case _ => NoOpPrefixComparator
+ }
+ }
+
+ def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
+ sortOrder.dataType match {
+ case StringType => (row: InternalRow) => {
+ PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
+ }
+ case BooleanType =>
+ (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1
+ else 0
+ }
+ case ByteType =>
+ (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Byte]
+ }
+ case ShortType =>
+ (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Short]
+ }
+ case IntegerType =>
+ (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Int]
+ }
+ case LongType =>
+ (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Long]
+ }
+ case FloatType => (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX
+ else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float])
+ }
+ case DoubleType => (row: InternalRow) => {
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX
+ else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double])
+ }
+ case _ => (row: InternalRow) => 0L
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 59b9b553a7..ce25af58b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -302,6 +302,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object BasicOperators extends Strategy {
def numPartitions: Int = self.numPartitions
+ /**
+ * Picks an appropriate sort operator.
+ *
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ * if necessary.
+ */
+ def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
+ if (sqlContext.conf.unsafeEnabled && UnsafeExternalSort.supportsSchema(child.schema)) {
+ execution.UnsafeExternalSort(sortExprs, global, child)
+ } else if (sqlContext.conf.externalSortEnabled) {
+ execution.ExternalSort(sortExprs, global, child)
+ } else {
+ execution.Sort(sortExprs, global, child)
+ }
+ }
+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
@@ -313,11 +329,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
- execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
- case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
- execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
+ getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) =>
- execution.Sort(sortExprs, global, planLater(child)):: Nil
+ getSortOperator(sortExprs, global, planLater(child)):: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index de14e6ad79..4c063c299b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.types.StructType
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
@@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.{HashPartitioner, SparkEnv}
@@ -248,6 +250,77 @@ case class ExternalSort(
/**
* :: DeveloperApi ::
+ * Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
+ * Project Tungsten).
+ *
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ * if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
+ * spill every `frequency` records.
+ */
+@DeveloperApi
+case class UnsafeExternalSort(
+ sortOrder: Seq[SortOrder],
+ global: Boolean,
+ child: SparkPlan,
+ testSpillFrequency: Int = 0)
+ extends UnaryNode {
+
+ private[this] val schema: StructType = child.schema
+
+ override def requiredChildDistribution: Seq[Distribution] =
+ if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+ assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
+ def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
+ val ordering = newOrdering(sortOrder, child.output)
+ val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
+ // Hack until we generate separate comparator implementations for ascending vs. descending
+ // (or choose to codegen them):
+ val prefixComparator = {
+ val comp = SortPrefixUtils.getPrefixComparator(boundSortExpression)
+ if (sortOrder.head.direction == Descending) {
+ new PrefixComparator {
+ override def compare(p1: Long, p2: Long): Int = -1 * comp.compare(p1, p2)
+ }
+ } else {
+ comp
+ }
+ }
+ val prefixComputer = {
+ val prefixComputer = SortPrefixUtils.getPrefixComputer(boundSortExpression)
+ new UnsafeExternalRowSorter.PrefixComputer {
+ override def computePrefix(row: InternalRow): Long = prefixComputer(row)
+ }
+ }
+ val sorter = new UnsafeExternalRowSorter(schema, ordering, prefixComparator, prefixComputer)
+ if (testSpillFrequency > 0) {
+ sorter.setTestSpillFrequency(testSpillFrequency)
+ }
+ sorter.sort(iterator)
+ }
+ child.execute().mapPartitions(doSort, preservesPartitioning = true)
+ }
+
+ override def output: Seq[Attribute] = child.output
+
+ override def outputOrdering: Seq[SortOrder] = sortOrder
+}
+
+@DeveloperApi
+object UnsafeExternalSort {
+ /**
+ * Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
+ */
+ def supportsSchema(schema: StructType): Boolean = {
+ UnsafeExternalRowSorter.supportsSchema(schema)
+ }
+}
+
+
+/**
+ * :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
*/
@DeveloperApi
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index a1e3ca11b1..a2c10fdaf6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
class SortSuite extends SparkPlanTest {
@@ -33,12 +34,14 @@ class SortSuite extends SparkPlanTest {
checkAnswer(
input.toDF("a", "b", "c"),
- ExternalSort('a.asc :: 'b.asc :: Nil, global = false, _: SparkPlan),
- input.sorted)
+ ExternalSort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+ input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
+ sortAnswers = false)
checkAnswer(
input.toDF("a", "b", "c"),
- ExternalSort('b.asc :: 'a.asc :: Nil, global = false, _: SparkPlan),
- input.sortBy(t => (t._2, t._1)))
+ ExternalSort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+ input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
+ sortAnswers = false)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index 108b1122f7..6a8f394545 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -17,18 +17,15 @@
package org.apache.spark.sql.execution
-import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
-import scala.util.control.NonFatal
-
import org.apache.spark.SparkFunSuite
-
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.catalyst.util._
-
import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.{DataFrameHolder, Row, DataFrame}
+import org.apache.spark.sql.{DataFrame, DataFrameHolder, Row}
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+import scala.util.control.NonFatal
/**
* Base class for writing tests for individual physical operators. For an example of how this
@@ -49,12 +46,19 @@ class SparkPlanTest extends SparkFunSuite {
* @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
* the physical operator that's being tested.
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+ * @param sortAnswers if true, the answers will be sorted by their toString representations prior
+ * to being compared.
*/
protected def checkAnswer(
input: DataFrame,
planFunction: SparkPlan => SparkPlan,
- expectedAnswer: Seq[Row]): Unit = {
- checkAnswer(input :: Nil, (plans: Seq[SparkPlan]) => planFunction(plans.head), expectedAnswer)
+ expectedAnswer: Seq[Row],
+ sortAnswers: Boolean = true): Unit = {
+ doCheckAnswer(
+ input :: Nil,
+ (plans: Seq[SparkPlan]) => planFunction(plans.head),
+ expectedAnswer,
+ sortAnswers)
}
/**
@@ -64,86 +68,131 @@ class SparkPlanTest extends SparkFunSuite {
* @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
* the physical operator that's being tested.
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+ * @param sortAnswers if true, the answers will be sorted by their toString representations prior
+ * to being compared.
*/
- protected def checkAnswer(
+ protected def checkAnswer2(
left: DataFrame,
right: DataFrame,
planFunction: (SparkPlan, SparkPlan) => SparkPlan,
- expectedAnswer: Seq[Row]): Unit = {
- checkAnswer(left :: right :: Nil,
- (plans: Seq[SparkPlan]) => planFunction(plans(0), plans(1)), expectedAnswer)
+ expectedAnswer: Seq[Row],
+ sortAnswers: Boolean = true): Unit = {
+ doCheckAnswer(
+ left :: right :: Nil,
+ (plans: Seq[SparkPlan]) => planFunction(plans(0), plans(1)),
+ expectedAnswer,
+ sortAnswers)
}
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param input the input data to be used.
- * @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
- * the physical operator that's being tested.
+ * @param planFunction a function which accepts a sequence of input SparkPlans and uses them to
+ * instantiate the physical operator that's being tested.
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+ * @param sortAnswers if true, the answers will be sorted by their toString representations prior
+ * to being compared.
*/
- protected def checkAnswer(
+ protected def doCheckAnswer(
input: Seq[DataFrame],
planFunction: Seq[SparkPlan] => SparkPlan,
- expectedAnswer: Seq[Row]): Unit = {
- SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer) match {
+ expectedAnswer: Seq[Row],
+ sortAnswers: Boolean = true): Unit = {
+ SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers) match {
case Some(errorMessage) => fail(errorMessage)
case None =>
}
}
/**
- * Runs the plan and makes sure the answer matches the expected result.
+ * Runs the plan and makes sure the answer matches the result produced by a reference plan.
* @param input the input data to be used.
* @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
* the physical operator that's being tested.
- * @param expectedAnswer the expected result in a [[Seq]] of [[Product]]s.
+ * @param expectedPlanFunction a function which accepts the input SparkPlan and uses it to
+ * instantiate a reference implementation of the physical operator
+ * that's being tested. The result of executing this plan will be
+ * treated as the source-of-truth for the test.
+ * @param sortAnswers if true, the answers will be sorted by their toString representations prior
+ * to being compared.
*/
- protected def checkAnswer[A <: Product : TypeTag](
+ protected def checkThatPlansAgree(
input: DataFrame,
planFunction: SparkPlan => SparkPlan,
- expectedAnswer: Seq[A]): Unit = {
- val expectedRows = expectedAnswer.map(Row.fromTuple)
- checkAnswer(input, planFunction, expectedRows)
+ expectedPlanFunction: SparkPlan => SparkPlan,
+ sortAnswers: Boolean = true): Unit = {
+ SparkPlanTest.checkAnswer(input, planFunction, expectedPlanFunction, sortAnswers) match {
+ case Some(errorMessage) => fail(errorMessage)
+ case None =>
+ }
}
+}
- /**
- * Runs the plan and makes sure the answer matches the expected result.
- * @param left the left input data to be used.
- * @param right the right input data to be used.
- * @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
- * the physical operator that's being tested.
- * @param expectedAnswer the expected result in a [[Seq]] of [[Product]]s.
- */
- protected def checkAnswer[A <: Product : TypeTag](
- left: DataFrame,
- right: DataFrame,
- planFunction: (SparkPlan, SparkPlan) => SparkPlan,
- expectedAnswer: Seq[A]): Unit = {
- val expectedRows = expectedAnswer.map(Row.fromTuple)
- checkAnswer(left, right, planFunction, expectedRows)
- }
+/**
+ * Helper methods for writing tests of individual physical operators.
+ */
+object SparkPlanTest {
/**
- * Runs the plan and makes sure the answer matches the expected result.
+ * Runs the plan and makes sure the answer matches the result produced by a reference plan.
* @param input the input data to be used.
* @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
* the physical operator that's being tested.
- * @param expectedAnswer the expected result in a [[Seq]] of [[Product]]s.
+ * @param expectedPlanFunction a function which accepts the input SparkPlan and uses it to
+ * instantiate a reference implementation of the physical operator
+ * that's being tested. The result of executing this plan will be
+ * treated as the source-of-truth for the test.
*/
- protected def checkAnswer[A <: Product : TypeTag](
- input: Seq[DataFrame],
- planFunction: Seq[SparkPlan] => SparkPlan,
- expectedAnswer: Seq[A]): Unit = {
- val expectedRows = expectedAnswer.map(Row.fromTuple)
- checkAnswer(input, planFunction, expectedRows)
- }
+ def checkAnswer(
+ input: DataFrame,
+ planFunction: SparkPlan => SparkPlan,
+ expectedPlanFunction: SparkPlan => SparkPlan,
+ sortAnswers: Boolean): Option[String] = {
-}
+ val outputPlan = planFunction(input.queryExecution.sparkPlan)
+ val expectedOutputPlan = expectedPlanFunction(input.queryExecution.sparkPlan)
-/**
- * Helper methods for writing tests of individual physical operators.
- */
-object SparkPlanTest {
+ val expectedAnswer: Seq[Row] = try {
+ executePlan(expectedOutputPlan)
+ } catch {
+ case NonFatal(e) =>
+ val errorMessage =
+ s"""
+ | Exception thrown while executing Spark plan to calculate expected answer:
+ | $expectedOutputPlan
+ | == Exception ==
+ | $e
+ | ${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+ """.stripMargin
+ return Some(errorMessage)
+ }
+
+ val actualAnswer: Seq[Row] = try {
+ executePlan(outputPlan)
+ } catch {
+ case NonFatal(e) =>
+ val errorMessage =
+ s"""
+ | Exception thrown while executing Spark plan:
+ | $outputPlan
+ | == Exception ==
+ | $e
+ | ${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+ """.stripMargin
+ return Some(errorMessage)
+ }
+
+ compareAnswers(actualAnswer, expectedAnswer, sortAnswers).map { errorMessage =>
+ s"""
+ | Results do not match.
+ | Actual result Spark plan:
+ | $outputPlan
+ | Expected result Spark plan:
+ | $expectedOutputPlan
+ | $errorMessage
+ """.stripMargin
+ }
+ }
/**
* Runs the plan and makes sure the answer matches the expected result.
@@ -151,28 +200,45 @@ object SparkPlanTest {
* @param planFunction a function which accepts the input SparkPlan and uses it to instantiate
* the physical operator that's being tested.
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+ * @param sortAnswers if true, the answers will be sorted by their toString representations prior
+ * to being compared.
*/
def checkAnswer(
input: Seq[DataFrame],
planFunction: Seq[SparkPlan] => SparkPlan,
- expectedAnswer: Seq[Row]): Option[String] = {
+ expectedAnswer: Seq[Row],
+ sortAnswers: Boolean): Option[String] = {
val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan))
- // A very simple resolver to make writing tests easier. In contrast to the real resolver
- // this is always case sensitive and does not try to handle scoping or complex type resolution.
- val resolvedPlan = TestSQLContext.prepareForExecution.execute(
- outputPlan transform {
- case plan: SparkPlan =>
- val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap
- plan.transformExpressions {
- case UnresolvedAttribute(Seq(u)) =>
- inputMap.getOrElse(u,
- sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
- }
- }
- )
+ val sparkAnswer: Seq[Row] = try {
+ executePlan(outputPlan)
+ } catch {
+ case NonFatal(e) =>
+ val errorMessage =
+ s"""
+ | Exception thrown while executing Spark plan:
+ | $outputPlan
+ | == Exception ==
+ | $e
+ | ${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+ """.stripMargin
+ return Some(errorMessage)
+ }
+ compareAnswers(sparkAnswer, expectedAnswer, sortAnswers).map { errorMessage =>
+ s"""
+ | Results do not match for Spark plan:
+ | $outputPlan
+ | $errorMessage
+ """.stripMargin
+ }
+ }
+
+ private def compareAnswers(
+ sparkAnswer: Seq[Row],
+ expectedAnswer: Seq[Row],
+ sort: Boolean): Option[String] = {
def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
// Converts data to types that we can do equality comparison using Scala collections.
// For BigDecimal type, the Scala type has a better definition of equality test (similar to
@@ -187,40 +253,43 @@ object SparkPlanTest {
case o => o
})
}
- converted.sortBy(_.toString())
- }
-
- val sparkAnswer: Seq[Row] = try {
- resolvedPlan.executeCollect().toSeq
- } catch {
- case NonFatal(e) =>
- val errorMessage =
- s"""
- | Exception thrown while executing Spark plan:
- | $outputPlan
- | == Exception ==
- | $e
- | ${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
- """.stripMargin
- return Some(errorMessage)
+ if (sort) {
+ converted.sortBy(_.toString())
+ } else {
+ converted
+ }
}
-
if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
val errorMessage =
s"""
- | Results do not match for Spark plan:
- | $outputPlan
| == Results ==
| ${sideBySide(
- s"== Correct Answer - ${expectedAnswer.size} ==" +:
+ s"== Expected Answer - ${expectedAnswer.size} ==" +:
prepareAnswer(expectedAnswer).map(_.toString()),
- s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ s"== Actual Answer - ${sparkAnswer.size} ==" +:
prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")}
""".stripMargin
- return Some(errorMessage)
+ Some(errorMessage)
+ } else {
+ None
}
+ }
- None
+ private def executePlan(outputPlan: SparkPlan): Seq[Row] = {
+ // A very simple resolver to make writing tests easier. In contrast to the real resolver
+ // this is always case sensitive and does not try to handle scoping or complex type resolution.
+ val resolvedPlan = TestSQLContext.prepareForExecution.execute(
+ outputPlan transform {
+ case plan: SparkPlan =>
+ val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap
+ plan.transformExpressions {
+ case UnresolvedAttribute(Seq(u)) =>
+ inputMap.getOrElse(u,
+ sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
+ }
+ }
+ )
+ resolvedPlan.executeCollect().toSeq
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
new file mode 100644
index 0000000000..4f4c1f2856
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.{RandomDataGenerator, Row, SQLConf}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.types._
+
+class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, true)
+ }
+
+ override def afterAll(): Unit = {
+ TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
+ }
+
+ ignore("sort followed by limit should not leak memory") {
+ // TODO: this test is going to fail until we implement a proper iterator interface
+ // with a close() method.
+ TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ checkThatPlansAgree(
+ (1 to 100).map(v => Tuple1(v)).toDF("a"),
+ (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
+ (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
+ sortAnswers = false
+ )
+ }
+
+ test("sort followed by limit") {
+ TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
+ try {
+ checkThatPlansAgree(
+ (1 to 100).map(v => Tuple1(v)).toDF("a"),
+ (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
+ (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
+ sortAnswers = false
+ )
+ } finally {
+ TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
+
+ }
+ }
+
+ test("sorting does not crash for large inputs") {
+ val sortOrder = 'a.asc :: Nil
+ val stringLength = 1024 * 1024 * 2
+ checkThatPlansAgree(
+ Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
+ UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+ Sort(sortOrder, global = true, _: SparkPlan),
+ sortAnswers = false
+ )
+ }
+
+ // Test sorting on different data types
+ for (
+ dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType)
+ if !dataType.isInstanceOf[DecimalType]; // We don't have an unsafe representation for decimals
+ nullable <- Seq(true, false);
+ sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
+ randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
+ ) {
+ test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
+ val inputData = Seq.fill(1000)(randomDataGenerator()).filter {
+ case d: Double => !d.isNaN
+ case f: Float => !java.lang.Float.isNaN(f)
+ case x => true
+ }
+ val inputDf = TestSQLContext.createDataFrame(
+ TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
+ StructType(StructField("a", dataType, nullable = true) :: Nil)
+ )
+ assert(UnsafeExternalSort.supportsSchema(inputDf.schema))
+ checkThatPlansAgree(
+ inputDf,
+ UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 23),
+ Sort(sortOrder, global = true, _: SparkPlan),
+ sortAnswers = false
+ )
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 5707d2fb30..2c27da596b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.joins
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Expression, LessThan}
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter}
@@ -41,23 +42,23 @@ class OuterJoinSuite extends SparkPlanTest {
val condition = Some(LessThan('b, 'd))
test("shuffled hash outer join") {
- checkAnswer(left, right, (left: SparkPlan, right: SparkPlan) =>
+ checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) =>
ShuffledHashOuterJoin(leftKeys, rightKeys, LeftOuter, condition, left, right),
Seq(
(1, 2.0, null, null),
(2, 1.0, 2, 3.0),
(3, 3.0, null, null)
- ))
+ ).map(Row.fromTuple))
- checkAnswer(left, right, (left: SparkPlan, right: SparkPlan) =>
+ checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) =>
ShuffledHashOuterJoin(leftKeys, rightKeys, RightOuter, condition, left, right),
Seq(
(2, 1.0, 2, 3.0),
(null, null, 3, 2.0),
(null, null, 4, 1.0)
- ))
+ ).map(Row.fromTuple))
- checkAnswer(left, right, (left: SparkPlan, right: SparkPlan) =>
+ checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) =>
ShuffledHashOuterJoin(leftKeys, rightKeys, FullOuter, condition, left, right),
Seq(
(1, 2.0, null, null),
@@ -65,24 +66,24 @@ class OuterJoinSuite extends SparkPlanTest {
(3, 3.0, null, null),
(null, null, 3, 2.0),
(null, null, 4, 1.0)
- ))
+ ).map(Row.fromTuple))
}
test("broadcast hash outer join") {
- checkAnswer(left, right, (left: SparkPlan, right: SparkPlan) =>
+ checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) =>
BroadcastHashOuterJoin(leftKeys, rightKeys, LeftOuter, condition, left, right),
Seq(
(1, 2.0, null, null),
(2, 1.0, 2, 3.0),
(3, 3.0, null, null)
- ))
+ ).map(Row.fromTuple))
- checkAnswer(left, right, (left: SparkPlan, right: SparkPlan) =>
+ checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) =>
BroadcastHashOuterJoin(leftKeys, rightKeys, RightOuter, condition, left, right),
Seq(
(2, 1.0, 2, 3.0),
(null, null, 3, 2.0),
(null, null, 4, 1.0)
- ))
+ ).map(Row.fromTuple))
}
}