aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala83
1 files changed, 71 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 247d35363b..49007e45ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{DOUBLE, INT32, INT64, BINARY, FIXED_LEN_BYTE_ARRAY}
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
import org.apache.spark.Logging
@@ -222,8 +222,25 @@ private[parquet] class CatalystRowConverter(
updater.setShort(value.asInstanceOf[ShortType#InternalType])
}
+ // For INT32 backed decimals
+ case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
+ new CatalystIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+
+ // For INT64 backed decimals
+ case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
+ new CatalystLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+
+ // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
+ case t: DecimalType
+ if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
+ parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
+ new CatalystBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+
case t: DecimalType =>
- new CatalystDecimalConverter(t, updater)
+ throw new RuntimeException(
+ s"Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is " +
+ s"$parquetType. Parquet DECIMAL type can only be backed by INT32, INT64, " +
+ "FIXED_LEN_BYTE_ARRAY, or BINARY.")
case StringType =>
new CatalystStringConverter(updater)
@@ -274,9 +291,10 @@ private[parquet] class CatalystRowConverter(
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
- case _ =>
+ case t =>
throw new RuntimeException(
- s"Unable to create Parquet converter for data type ${catalystType.json}")
+ s"Unable to create Parquet converter for data type ${t.json} " +
+ s"whose Parquet type is $parquetType")
}
}
@@ -314,11 +332,18 @@ private[parquet] class CatalystRowConverter(
/**
* Parquet converter for fixed-precision decimals.
*/
- private final class CatalystDecimalConverter(
- decimalType: DecimalType,
- updater: ParentContainerUpdater)
+ private abstract class CatalystDecimalConverter(
+ precision: Int, scale: Int, updater: ParentContainerUpdater)
extends CatalystPrimitiveConverter(updater) {
+ protected var expandedDictionary: Array[Decimal] = _
+
+ override def hasDictionarySupport: Boolean = true
+
+ override def addValueFromDictionary(dictionaryId: Int): Unit = {
+ updater.set(expandedDictionary(dictionaryId))
+ }
+
// Converts decimals stored as INT32
override def addInt(value: Int): Unit = {
addLong(value: Long)
@@ -326,18 +351,19 @@ private[parquet] class CatalystRowConverter(
// Converts decimals stored as INT64
override def addLong(value: Long): Unit = {
- updater.set(Decimal(value, decimalType.precision, decimalType.scale))
+ updater.set(decimalFromLong(value))
}
// Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
override def addBinary(value: Binary): Unit = {
- updater.set(toDecimal(value))
+ updater.set(decimalFromBinary(value))
}
- private def toDecimal(value: Binary): Decimal = {
- val precision = decimalType.precision
- val scale = decimalType.scale
+ protected def decimalFromLong(value: Long): Decimal = {
+ Decimal(value, precision, scale)
+ }
+ protected def decimalFromBinary(value: Binary): Decimal = {
if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
// Constructs a `Decimal` with an unscaled `Long` value if possible.
val unscaled = binaryToUnscaledLong(value)
@@ -371,6 +397,39 @@ private[parquet] class CatalystRowConverter(
}
}
+ private class CatalystIntDictionaryAwareDecimalConverter(
+ precision: Int, scale: Int, updater: ParentContainerUpdater)
+ extends CatalystDecimalConverter(precision, scale, updater) {
+
+ override def setDictionary(dictionary: Dictionary): Unit = {
+ this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+ decimalFromLong(dictionary.decodeToInt(id).toLong)
+ }
+ }
+ }
+
+ private class CatalystLongDictionaryAwareDecimalConverter(
+ precision: Int, scale: Int, updater: ParentContainerUpdater)
+ extends CatalystDecimalConverter(precision, scale, updater) {
+
+ override def setDictionary(dictionary: Dictionary): Unit = {
+ this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+ decimalFromLong(dictionary.decodeToLong(id))
+ }
+ }
+ }
+
+ private class CatalystBinaryDictionaryAwareDecimalConverter(
+ precision: Int, scale: Int, updater: ParentContainerUpdater)
+ extends CatalystDecimalConverter(precision, scale, updater) {
+
+ override def setDictionary(dictionary: Dictionary): Unit = {
+ this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+ decimalFromBinary(dictionary.decodeToBinary(id))
+ }
+ }
+ }
+
/**
* Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard
* Parquet lists are represented as a 3-level group annotated by `LIST`: