aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-03 18:47:02 -0700
committerReynold Xin <rxin@databricks.com>2015-08-03 18:47:02 -0700
commit5eb89f67e323dcf9fa3d5b30f9b5cb8f10ca1e8c (patch)
treeb6dcd534c519f6f046f1b0283b84b2cb75bff3c3 /sql
parent3b0e44490aebfba30afc147e4a34a63439d985c6 (diff)
downloadspark-5eb89f67e323dcf9fa3d5b30f9b5cb8f10ca1e8c.tar.gz
spark-5eb89f67e323dcf9fa3d5b30f9b5cb8f10ca1e8c.tar.bz2
spark-5eb89f67e323dcf9fa3d5b30f9b5cb8f10ca1e8c.zip
[SPARK-9577][SQL] Surface concrete iterator types in various sort classes.
We often return abstract iterator types in various sort-related classes (e.g. UnsafeKVExternalSorter). It is actually better to return a more concrete type, so the callsite uses that type and JIT can inline the iterator calls. Author: Reynold Xin <rxin@databricks.com> Closes #7911 from rxin/surface-concrete-type and squashes the following commits: 0422add [Reynold Xin] [SPARK-9577][SQL] Surface concrete iterator types in various sort classes.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java112
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala30
2 files changed, 61 insertions, 81 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index f6b0176863..312ec8ea0d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -134,7 +134,7 @@ public final class UnsafeKVExternalSorter {
value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(), prefix);
}
- public KVIterator<UnsafeRow, UnsafeRow> sortedIterator() throws IOException {
+ public KVSorterIterator sortedIterator() throws IOException {
try {
final UnsafeSorterIterator underlying = sorter.getSortedIterator();
if (!underlying.hasNext()) {
@@ -142,58 +142,7 @@ public final class UnsafeKVExternalSorter {
// here in order to prevent memory leaks.
cleanupResources();
}
-
- return new KVIterator<UnsafeRow, UnsafeRow>() {
- private UnsafeRow key = new UnsafeRow();
- private UnsafeRow value = new UnsafeRow();
- private int numKeyFields = keySchema.size();
- private int numValueFields = valueSchema.size();
-
- @Override
- public boolean next() throws IOException {
- try {
- if (underlying.hasNext()) {
- underlying.loadNext();
-
- Object baseObj = underlying.getBaseObject();
- long recordOffset = underlying.getBaseOffset();
- int recordLen = underlying.getRecordLength();
-
- // Note that recordLen = keyLen + valueLen + 4 bytes (for the keyLen itself)
- int keyLen = PlatformDependent.UNSAFE.getInt(baseObj, recordOffset);
- int valueLen = recordLen - keyLen - 4;
-
- key.pointTo(baseObj, recordOffset + 4, numKeyFields, keyLen);
- value.pointTo(baseObj, recordOffset + 4 + keyLen, numValueFields, valueLen);
-
- return true;
- } else {
- key = null;
- value = null;
- cleanupResources();
- return false;
- }
- } catch (IOException e) {
- cleanupResources();
- throw e;
- }
- }
-
- @Override
- public UnsafeRow getKey() {
- return key;
- }
-
- @Override
- public UnsafeRow getValue() {
- return value;
- }
-
- @Override
- public void close() {
- cleanupResources();
- }
- };
+ return new KVSorterIterator(underlying);
} catch (IOException e) {
cleanupResources();
throw e;
@@ -233,4 +182,61 @@ public final class UnsafeKVExternalSorter {
return ordering.compare(row1, row2);
}
}
+
+ public class KVSorterIterator extends KVIterator<UnsafeRow, UnsafeRow> {
+ private UnsafeRow key = new UnsafeRow();
+ private UnsafeRow value = new UnsafeRow();
+ private final int numKeyFields = keySchema.size();
+ private final int numValueFields = valueSchema.size();
+ private final UnsafeSorterIterator underlying;
+
+ private KVSorterIterator(UnsafeSorterIterator underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ try {
+ if (underlying.hasNext()) {
+ underlying.loadNext();
+
+ Object baseObj = underlying.getBaseObject();
+ long recordOffset = underlying.getBaseOffset();
+ int recordLen = underlying.getRecordLength();
+
+ // Note that recordLen = keyLen + valueLen + 4 bytes (for the keyLen itself)
+ int keyLen = PlatformDependent.UNSAFE.getInt(baseObj, recordOffset);
+ int valueLen = recordLen - keyLen - 4;
+
+ key.pointTo(baseObj, recordOffset + 4, numKeyFields, keyLen);
+ value.pointTo(baseObj, recordOffset + 4 + keyLen, numValueFields, valueLen);
+
+ return true;
+ } else {
+ key = null;
+ value = null;
+ cleanupResources();
+ return false;
+ }
+ } catch (IOException e) {
+ cleanupResources();
+ throw e;
+ }
+ }
+
+ @Override
+ public UnsafeRow getKey() {
+ return key;
+ }
+
+ @Override
+ public UnsafeRow getValue() {
+ return value;
+ }
+
+ @Override
+ public void close() {
+ cleanupResources();
+ }
+ };
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala
index 37d34eb7cc..b465787fe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala
@@ -17,12 +17,12 @@
package org.apache.spark.sql.execution.aggregate
-import org.apache.spark.sql.execution.{UnsafeKeyValueSorter, UnsafeFixedWidthAggregationMap}
import org.apache.spark.unsafe.KVIterator
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.execution.{UnsafeKVExternalSorter, UnsafeFixedWidthAggregationMap}
import org.apache.spark.sql.types.StructType
/**
@@ -230,7 +230,7 @@ class UnsafeHybridAggregationIterator(
}
// Step 5: Get the sorted iterator from the externalSorter.
- val sortedKVIterator: KVIterator[UnsafeRow, UnsafeRow] = externalSorter.sortedIterator()
+ val sortedKVIterator: UnsafeKVExternalSorter#KVSorterIterator = externalSorter.sortedIterator()
// Step 6: We now create a SortBasedAggregationIterator based on sortedKVIterator.
// For a aggregate function with mode Partial, its mode in the SortBasedAggregationIterator
@@ -368,31 +368,5 @@ object UnsafeHybridAggregationIterator {
newMutableProjection,
outputsUnsafeRows)
}
-
- def createFromKVIterator(
- groupingKeyAttributes: Seq[Attribute],
- valueAttributes: Seq[Attribute],
- inputKVIterator: KVIterator[UnsafeRow, InternalRow],
- nonCompleteAggregateExpressions: Seq[AggregateExpression2],
- nonCompleteAggregateAttributes: Seq[Attribute],
- completeAggregateExpressions: Seq[AggregateExpression2],
- completeAggregateAttributes: Seq[Attribute],
- initialInputBufferOffset: Int,
- resultExpressions: Seq[NamedExpression],
- newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
- outputsUnsafeRows: Boolean): UnsafeHybridAggregationIterator = {
- new UnsafeHybridAggregationIterator(
- groupingKeyAttributes,
- valueAttributes,
- inputKVIterator,
- nonCompleteAggregateExpressions,
- nonCompleteAggregateAttributes,
- completeAggregateExpressions,
- completeAggregateAttributes,
- initialInputBufferOffset,
- resultExpressions,
- newMutableProjection,
- outputsUnsafeRows)
- }
// scalastyle:on
}