diff options
author | Cheng Lian <lian@databricks.com> | 2015-09-29 23:30:27 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-09-29 23:30:27 -0700 |
commit | 4d5a005b0d2591d4d57a19be48c4954b9f1434a9 (patch) | |
tree | 2a2cb24819477e69c2d8f5c8a2a79a87b558859c | |
parent | c1ad373f26053e1906fce7681c03d130a642bf33 (diff) | |
download | spark-4d5a005b0d2591d4d57a19be48c4954b9f1434a9.tar.gz spark-4d5a005b0d2591d4d57a19be48c4954b9f1434a9.tar.bz2 spark-4d5a005b0d2591d4d57a19be48c4954b9f1434a9.zip |
[SPARK-10811] [SQL] Eliminates unnecessary byte array copying
When reading Parquet string and binary-backed decimal values, Parquet `Binary.getBytes` always returns a copied byte array, which is unnecessary. Since the underlying implementation of `Binary` values there is guaranteed to be `ByteArraySliceBackedBinary`, and Parquet itself never reuses underlying byte arrays, we can use `Binary.toByteBuffer.array()` to steal the underlying byte arrays without copying them.
This brings performance benefits when scanning Parquet string and binary-backed decimal columns. Note that, this trick doesn't cover binary-backed decimals with precision greater than 18.
My micro-benchmark result is that, this brings a ~15% performance boost for scanning TPC-DS `store_sales` table (scale factor 15).
Another minor optimization done in this PR is that, now we directly construct a Java `BigDecimal` in `Decimal.toJavaBigDecimal` without constructing a Scala `BigDecimal` first. This brings another ~5% performance gain.
Author: Cheng Lian <lian@databricks.com>
Closes #8907 from liancheng/spark-10811/eliminate-array-copying.
3 files changed, 28 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index c988f1d1b9..bfcf111385 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -145,7 +145,13 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } - def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.underlying() + def toJavaBigDecimal: java.math.BigDecimal = { + if (decimalVal.ne(null)) { + decimalVal.underlying() + } else { + java.math.BigDecimal.valueOf(longVal, _scale) + } + } def toUnscaledLong: Long = { if (decimalVal.ne(null)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 2ff2fda361..050d3610a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -302,7 +302,13 @@ private[parquet] class CatalystRowConverter( } override def addBinary(value: Binary): Unit = { - updater.set(UTF8String.fromBytes(value.getBytes)) + // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we + // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying + // it. + val buffer = value.toByteBuffer + val offset = buffer.position() + val numBytes = buffer.limit() - buffer.position() + updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes)) } } @@ -332,24 +338,30 @@ private[parquet] class CatalystRowConverter( private def toDecimal(value: Binary): Decimal = { val precision = decimalType.precision val scale = decimalType.scale - val bytes = value.getBytes if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) { - // Constructs a `Decimal` with an unscaled `Long` value if possible. + // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying + // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using + // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it. + val buffer = value.toByteBuffer + val bytes = buffer.array() + val start = buffer.position() + val end = buffer.limit() + var unscaled = 0L - var i = 0 + var i = start - while (i < bytes.length) { + while (i < end) { unscaled = (unscaled << 8) | (bytes(i) & 0xff) i += 1 } - val bits = 8 * bytes.length + val bits = 8 * (end - start) unscaled = (unscaled << (64 - bits)) >> (64 - bits) Decimal(unscaled, precision, scale) } else { // Otherwise, resorts to an unscaled `BigInteger` instead. - Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale) + Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 2d237da81c..97ffeb08aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -567,9 +567,9 @@ private[parquet] object CatalystSchemaConverter { } } - val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) + val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */ - val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) + val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) /* 18 */ // Max precision of a decimal value stored in `numBytes` bytes def maxPrecisionForBytes(numBytes: Int): Int = { |