diff options
author | Davies Liu <davies@databricks.com> | 2015-06-29 15:59:20 -0700 |
---|---|---|
committer | Davies Liu <davies@databricks.com> | 2015-06-29 15:59:20 -0700 |
commit | ed359de595d5dd67b666660eddf092eaf89041c8 (patch) | |
tree | 1cb3205b2155cfd828522f25d6ff74e468917010 /sql/core | |
parent | 931da5c8ab271ff2ee04419c7e3c6b0012459694 (diff) | |
download | spark-ed359de595d5dd67b666660eddf092eaf89041c8.tar.gz spark-ed359de595d5dd67b666660eddf092eaf89041c8.tar.bz2 spark-ed359de595d5dd67b666660eddf092eaf89041c8.zip |
[SPARK-8579] [SQL] support arbitrary object in UnsafeRow
This PR brings arbitrary object support in UnsafeRow (both in grouping key and aggregation buffer).
Two object pools will be created to hold those non-primitive objects, and put the index of them into UnsafeRow. In order to compare the grouping key as bytes, the objects in key will be stored in a unique object pool, to make sure same objects will have same index (used as hashCode).
For StringType and BinaryType, we still put them as var-length in UnsafeRow when initializing for better performance. But for update, they will be an object inside object pools (there will be some garbages left in the buffer).
BTW: Will create a JIRA once issue.apache.org is available.
cc JoshRosen rxin
Author: Davies Liu <davies@databricks.com>
Closes #6959 from davies/unsafe_obj and squashes the following commits:
5ce39da [Davies Liu] fix comment
5e797bf [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj
5803d64 [Davies Liu] fix conflict
461d304 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj
2f41c90 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj
b04d69c [Davies Liu] address comments
4859b80 [Davies Liu] fix comments
f38011c [Davies Liu] add a test for grouping by decimal
d2cf7ab [Davies Liu] add more tests for null checking
71983c5 [Davies Liu] add test for timestamp
e8a1649 [Davies Liu] reuse buffer for string
39f09ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj
035501e [Davies Liu] fix style
236d6de [Davies Liu] support arbitrary object in UnsafeRow
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala | 16 |
1 files changed, 4 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index ba2c8f53d7..44930f82b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -238,11 +238,6 @@ case class GeneratedAggregate( StructType(fields) } - val schemaSupportsUnsafe: Boolean = { - UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema) && - UnsafeFixedWidthAggregationMap.supportsGroupKeySchema(groupKeySchema) - } - child.execute().mapPartitions { iter => // Builds a new custom class for holding the results of aggregation for a group. val initialValues = computeFunctions.flatMap(_.initialValues) @@ -283,12 +278,12 @@ case class GeneratedAggregate( val resultProjection = resultProjectionBuilder() Iterator(resultProjection(buffer)) - } else if (unsafeEnabled && schemaSupportsUnsafe) { + } else if (unsafeEnabled) { log.info("Using Unsafe-based aggregator") val aggregationMap = new UnsafeFixedWidthAggregationMap( - newAggregationBuffer(EmptyRow), - aggregationBufferSchema, - groupKeySchema, + newAggregationBuffer, + new UnsafeRowConverter(groupKeySchema), + new UnsafeRowConverter(aggregationBufferSchema), TaskContext.get.taskMemoryManager(), 1024 * 16, // initial capacity false // disable tracking of performance metrics @@ -323,9 +318,6 @@ case class GeneratedAggregate( } } } else { - if (unsafeEnabled) { - log.info("Not using Unsafe-based aggregator because it is not supported for this schema") - } val buffers = new java.util.HashMap[InternalRow, MutableRow]() var currentRow: InternalRow = null |