aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java30
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala2
2 files changed, 16 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
index b23e0efc83..f7849ebebc 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
@@ -39,7 +39,7 @@ public final class UnsafeFixedWidthAggregationMap {
* An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the
* map, we copy this buffer and use it as the value.
*/
- private final long[] emptyAggregationBuffer;
+ private final byte[] emptyAggregationBuffer;
private final StructType aggregationBufferSchema;
@@ -63,10 +63,10 @@ public final class UnsafeFixedWidthAggregationMap {
/**
* Scratch space that is used when encoding grouping keys into UnsafeRow format.
*
- * By default, this is a 1MB array, but it will grow as necessary in case larger keys are
+ * By default, this is a 8 kb array, but it will grow as necessary in case larger keys are
* encountered.
*/
- private long[] groupingKeyConversionScratchSpace = new long[1024 / 8];
+ private byte[] groupingKeyConversionScratchSpace = new byte[1024 * 8];
private final boolean enablePerfMetrics;
@@ -123,13 +123,13 @@ public final class UnsafeFixedWidthAggregationMap {
}
/**
- * Convert a Java object row into an UnsafeRow, allocating it into a new long array.
+ * Convert a Java object row into an UnsafeRow, allocating it into a new byte array.
*/
- private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
+ private static byte[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
- final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
- final long writtenLength =
- converter.writeRow(javaRow, unsafeRow, PlatformDependent.LONG_ARRAY_OFFSET);
+ final byte[] unsafeRow = new byte[converter.getSizeRequirement(javaRow)];
+ final int writtenLength =
+ converter.writeRow(javaRow, unsafeRow, PlatformDependent.BYTE_ARRAY_OFFSET);
assert (writtenLength == unsafeRow.length): "Size requirement calculation was wrong!";
return unsafeRow;
}
@@ -143,34 +143,34 @@ public final class UnsafeFixedWidthAggregationMap {
// Make sure that the buffer is large enough to hold the key. If it's not, grow it:
if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
// This new array will be initially zero, so there's no need to zero it out here
- groupingKeyConversionScratchSpace = new long[groupingKeySize];
+ groupingKeyConversionScratchSpace = new byte[groupingKeySize];
} else {
// Zero out the buffer that's used to hold the current row. This is necessary in order
// to ensure that rows hash properly, since garbage data from the previous row could
// otherwise end up as padding in this row. As a performance optimization, we only zero out
// the portion of the buffer that we'll actually write to.
- Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0);
+ Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, (byte) 0);
}
- final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
+ final int actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
groupingKey,
groupingKeyConversionScratchSpace,
- PlatformDependent.LONG_ARRAY_OFFSET);
+ PlatformDependent.BYTE_ARRAY_OFFSET);
assert (groupingKeySize == actualGroupingKeySize) : "Size requirement calculation was wrong!";
// Probe our map using the serialized key
final BytesToBytesMap.Location loc = map.lookup(
groupingKeyConversionScratchSpace,
- PlatformDependent.LONG_ARRAY_OFFSET,
+ PlatformDependent.BYTE_ARRAY_OFFSET,
groupingKeySize);
if (!loc.isDefined()) {
// This is the first time that we've seen this grouping key, so we'll insert a copy of the
// empty aggregation buffer into the map:
loc.putNewKey(
groupingKeyConversionScratchSpace,
- PlatformDependent.LONG_ARRAY_OFFSET,
+ PlatformDependent.BYTE_ARRAY_OFFSET,
groupingKeySize,
emptyAggregationBuffer,
- PlatformDependent.LONG_ARRAY_OFFSET,
+ PlatformDependent.BYTE_ARRAY_OFFSET,
emptyAggregationBuffer.length
);
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
index d771e454b5..5c92f41c63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
* @param baseOffset the base offset of the destination address
* @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
*/
- def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = {
+ def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Int = {
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
var fieldNumber = 0
var appendCursor: Int = fixedLengthSize