aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala4
3 files changed, 28 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index c988f1d1b9..bfcf111385 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -145,7 +145,13 @@ final class Decimal extends Ordered[Decimal] with Serializable {
}
}
- def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.underlying()
+ def toJavaBigDecimal: java.math.BigDecimal = {
+ if (decimalVal.ne(null)) {
+ decimalVal.underlying()
+ } else {
+ java.math.BigDecimal.valueOf(longVal, _scale)
+ }
+ }
def toUnscaledLong: Long = {
if (decimalVal.ne(null)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 2ff2fda361..050d3610a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -302,7 +302,13 @@ private[parquet] class CatalystRowConverter(
}
override def addBinary(value: Binary): Unit = {
- updater.set(UTF8String.fromBytes(value.getBytes))
+ // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we
+ // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying
+ // it.
+ val buffer = value.toByteBuffer
+ val offset = buffer.position()
+ val numBytes = buffer.limit() - buffer.position()
+ updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
}
}
@@ -332,24 +338,30 @@ private[parquet] class CatalystRowConverter(
private def toDecimal(value: Binary): Decimal = {
val precision = decimalType.precision
val scale = decimalType.scale
- val bytes = value.getBytes
if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
- // Constructs a `Decimal` with an unscaled `Long` value if possible.
+ // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying
+ // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
+ // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
+ val buffer = value.toByteBuffer
+ val bytes = buffer.array()
+ val start = buffer.position()
+ val end = buffer.limit()
+
var unscaled = 0L
- var i = 0
+ var i = start
- while (i < bytes.length) {
+ while (i < end) {
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
i += 1
}
- val bits = 8 * bytes.length
+ val bits = 8 * (end - start)
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
Decimal(unscaled, precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
- Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale)
+ Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 2d237da81c..97ffeb08aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -567,9 +567,9 @@ private[parquet] object CatalystSchemaConverter {
}
}
- val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4)
+ val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
- val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8)
+ val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) /* 18 */
// Max precision of a decimal value stored in `numBytes` bytes
def maxPrecisionForBytes(numBytes: Int): Int = {