aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-09-29 23:30:27 -0700
committerCheng Lian <lian@databricks.com>2015-09-29 23:30:27 -0700
commit4d5a005b0d2591d4d57a19be48c4954b9f1434a9 (patch)
tree2a2cb24819477e69c2d8f5c8a2a79a87b558859c /sql
parentc1ad373f26053e1906fce7681c03d130a642bf33 (diff)
downloadspark-4d5a005b0d2591d4d57a19be48c4954b9f1434a9.tar.gz
spark-4d5a005b0d2591d4d57a19be48c4954b9f1434a9.tar.bz2
spark-4d5a005b0d2591d4d57a19be48c4954b9f1434a9.zip
[SPARK-10811] [SQL] Eliminates unnecessary byte array copying
When reading Parquet string and binary-backed decimal values, Parquet `Binary.getBytes` always returns a copied byte array, which is unnecessary. Since the underlying implementation of `Binary` values there is guaranteed to be `ByteArraySliceBackedBinary`, and Parquet itself never reuses underlying byte arrays, we can use `Binary.toByteBuffer.array()` to steal the underlying byte arrays without copying them. This brings performance benefits when scanning Parquet string and binary-backed decimal columns. Note that, this trick doesn't cover binary-backed decimals with precision greater than 18. My micro-benchmark result is that, this brings a ~15% performance boost for scanning TPC-DS `store_sales` table (scale factor 15). Another minor optimization done in this PR is that, now we directly construct a Java `BigDecimal` in `Decimal.toJavaBigDecimal` without constructing a Scala `BigDecimal` first. This brings another ~5% performance gain. Author: Cheng Lian <lian@databricks.com> Closes #8907 from liancheng/spark-10811/eliminate-array-copying.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala4
3 files changed, 28 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index c988f1d1b9..bfcf111385 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -145,7 +145,13 @@ final class Decimal extends Ordered[Decimal] with Serializable {
}
}
- def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.underlying()
+ def toJavaBigDecimal: java.math.BigDecimal = {
+ if (decimalVal.ne(null)) {
+ decimalVal.underlying()
+ } else {
+ java.math.BigDecimal.valueOf(longVal, _scale)
+ }
+ }
def toUnscaledLong: Long = {
if (decimalVal.ne(null)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 2ff2fda361..050d3610a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -302,7 +302,13 @@ private[parquet] class CatalystRowConverter(
}
override def addBinary(value: Binary): Unit = {
- updater.set(UTF8String.fromBytes(value.getBytes))
+ // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we
+ // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying
+ // it.
+ val buffer = value.toByteBuffer
+ val offset = buffer.position()
+ val numBytes = buffer.limit() - buffer.position()
+ updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
}
}
@@ -332,24 +338,30 @@ private[parquet] class CatalystRowConverter(
private def toDecimal(value: Binary): Decimal = {
val precision = decimalType.precision
val scale = decimalType.scale
- val bytes = value.getBytes
if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
- // Constructs a `Decimal` with an unscaled `Long` value if possible.
+ // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying
+ // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
+ // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
+ val buffer = value.toByteBuffer
+ val bytes = buffer.array()
+ val start = buffer.position()
+ val end = buffer.limit()
+
var unscaled = 0L
- var i = 0
+ var i = start
- while (i < bytes.length) {
+ while (i < end) {
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
i += 1
}
- val bits = 8 * bytes.length
+ val bits = 8 * (end - start)
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
Decimal(unscaled, precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
- Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale)
+ Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 2d237da81c..97ffeb08aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -567,9 +567,9 @@ private[parquet] object CatalystSchemaConverter {
}
}
- val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4)
+ val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
- val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8)
+ val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) /* 18 */
// Max precision of a decimal value stored in `numBytes` bytes
def maxPrecisionForBytes(numBytes: Int): Int = {