aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-29 15:59:20 -0700
committerDavies Liu <davies@databricks.com>2015-06-29 15:59:20 -0700
commited359de595d5dd67b666660eddf092eaf89041c8 (patch)
tree1cb3205b2155cfd828522f25d6ff74e468917010 /sql/core
parent931da5c8ab271ff2ee04419c7e3c6b0012459694 (diff)
downloadspark-ed359de595d5dd67b666660eddf092eaf89041c8.tar.gz
spark-ed359de595d5dd67b666660eddf092eaf89041c8.tar.bz2
spark-ed359de595d5dd67b666660eddf092eaf89041c8.zip
[SPARK-8579] [SQL] support arbitrary object in UnsafeRow
This PR brings arbitrary object support in UnsafeRow (both in grouping key and aggregation buffer). Two object pools will be created to hold those non-primitive objects, and put the index of them into UnsafeRow. In order to compare the grouping key as bytes, the objects in key will be stored in a unique object pool, to make sure same objects will have same index (used as hashCode). For StringType and BinaryType, we still put them as var-length in UnsafeRow when initializing for better performance. But for update, they will be an object inside object pools (there will be some garbages left in the buffer). BTW: Will create a JIRA once issue.apache.org is available. cc JoshRosen rxin Author: Davies Liu <davies@databricks.com> Closes #6959 from davies/unsafe_obj and squashes the following commits: 5ce39da [Davies Liu] fix comment 5e797bf [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj 5803d64 [Davies Liu] fix conflict 461d304 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj 2f41c90 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj b04d69c [Davies Liu] address comments 4859b80 [Davies Liu] fix comments f38011c [Davies Liu] add a test for grouping by decimal d2cf7ab [Davies Liu] add more tests for null checking 71983c5 [Davies Liu] add test for timestamp e8a1649 [Davies Liu] reuse buffer for string 39f09ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj 035501e [Davies Liu] fix style 236d6de [Davies Liu] support arbitrary object in UnsafeRow
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala16
1 files changed, 4 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index ba2c8f53d7..44930f82b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -238,11 +238,6 @@ case class GeneratedAggregate(
StructType(fields)
}
- val schemaSupportsUnsafe: Boolean = {
- UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema) &&
- UnsafeFixedWidthAggregationMap.supportsGroupKeySchema(groupKeySchema)
- }
-
child.execute().mapPartitions { iter =>
// Builds a new custom class for holding the results of aggregation for a group.
val initialValues = computeFunctions.flatMap(_.initialValues)
@@ -283,12 +278,12 @@ case class GeneratedAggregate(
val resultProjection = resultProjectionBuilder()
Iterator(resultProjection(buffer))
- } else if (unsafeEnabled && schemaSupportsUnsafe) {
+ } else if (unsafeEnabled) {
log.info("Using Unsafe-based aggregator")
val aggregationMap = new UnsafeFixedWidthAggregationMap(
- newAggregationBuffer(EmptyRow),
- aggregationBufferSchema,
- groupKeySchema,
+ newAggregationBuffer,
+ new UnsafeRowConverter(groupKeySchema),
+ new UnsafeRowConverter(aggregationBufferSchema),
TaskContext.get.taskMemoryManager(),
1024 * 16, // initial capacity
false // disable tracking of performance metrics
@@ -323,9 +318,6 @@ case class GeneratedAggregate(
}
}
} else {
- if (unsafeEnabled) {
- log.info("Not using Unsafe-based aggregator because it is not supported for this schema")
- }
val buffers = new java.util.HashMap[InternalRow, MutableRow]()
var currentRow: InternalRow = null