aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2014-08-26 18:28:41 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-26 18:28:41 -0700
commit727cb25bcc29481d6b744abef1ca091e64b5f91f (patch)
tree4edc54a23a8f4581e931ced97ddda2a7a5da4085 /sql/core/src/main
parent73b3089b8d2901dab11bb1ef6f46c29625b677fe (diff)
downloadspark-727cb25bcc29481d6b744abef1ca091e64b5f91f.tar.gz
spark-727cb25bcc29481d6b744abef1ca091e64b5f91f.tar.bz2
spark-727cb25bcc29481d6b744abef1ca091e64b5f91f.zip
[SPARK-3036][SPARK-3037][SQL] Add MapType/ArrayType containing null value support to Parquet.
JIRA: - https://issues.apache.org/jira/browse/SPARK-3036 - https://issues.apache.org/jira/browse/SPARK-3037 Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`: ``` message root { optional group a (MAP) { repeated group map (MAP_KEY_VALUE) { required int32 key; optional int32 value; } } } ``` for `ArrayType` when `containsNull` is `true`: ``` message root { optional group a (LIST) { repeated group bag { optional int32 array; } } } ``` We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues. Notice: This PR is based on #1963 and #1889. Please check them first. /cc marmbrus, yhuai Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits: 4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet. 013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet. 62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037 8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala83
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala54
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala54
3 files changed, 155 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index ef4526ec03..9fd6aed402 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -58,6 +58,7 @@ private[sql] object CatalystConverter {
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
// Note that "array" for the array elements is chosen by ParquetAvro.
// Using a different value will result in Parquet silently dropping columns.
+ val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
val MAP_KEY_SCHEMA_NAME = "key"
val MAP_VALUE_SCHEMA_NAME = "value"
@@ -82,6 +83,9 @@ private[sql] object CatalystConverter {
case ArrayType(elementType: DataType, false) => {
new CatalystArrayConverter(elementType, fieldIndex, parent)
}
+ case ArrayType(elementType: DataType, true) => {
+ new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent)
+ }
case StructType(fields: Seq[StructField]) => {
new CatalystStructConverter(fields.toArray, fieldIndex, parent)
}
@@ -568,6 +572,85 @@ private[parquet] class CatalystNativeArrayConverter(
}
/**
+ * A `parquet.io.api.GroupConverter` that converts a single-element groups that
+ * match the characteristics of an array contains null (see
+ * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
+ * [[org.apache.spark.sql.catalyst.types.ArrayType]].
+ *
+ * @param elementType The type of the array elements (complex or primitive)
+ * @param index The position of this (array) field inside its parent converter
+ * @param parent The parent converter
+ * @param buffer A data buffer
+ */
+private[parquet] class CatalystArrayContainsNullConverter(
+ val elementType: DataType,
+ val index: Int,
+ protected[parquet] val parent: CatalystConverter,
+ protected[parquet] var buffer: Buffer[Any])
+ extends CatalystConverter {
+
+ def this(elementType: DataType, index: Int, parent: CatalystConverter) =
+ this(
+ elementType,
+ index,
+ parent,
+ new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
+
+ protected[parquet] val converter: Converter = new CatalystConverter {
+
+ private var current: Any = null
+
+ val converter = CatalystConverter.createConverter(
+ new CatalystConverter.FieldType(
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ elementType,
+ false),
+ fieldIndex = 0,
+ parent = this)
+
+ override def getConverter(fieldIndex: Int): Converter = converter
+
+ override def end(): Unit = parent.updateField(index, current)
+
+ override def start(): Unit = {
+ current = null
+ }
+
+ override protected[parquet] val size: Int = 1
+ override protected[parquet] val index: Int = 0
+ override protected[parquet] val parent = CatalystArrayContainsNullConverter.this
+
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
+ current = value
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = {}
+ }
+
+ override def getConverter(fieldIndex: Int): Converter = converter
+
+ // arrays have only one (repeated) field, which is its elements
+ override val size = 1
+
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
+ buffer += value
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = {
+ buffer.clear()
+ }
+
+ override def start(): Unit = {}
+
+ override def end(): Unit = {
+ assert(parent != null)
+ // here we need to make sure to use ArrayScalaType
+ parent.updateField(index, buffer.toArray.toSeq)
+ clearBuffer()
+ }
+}
+
+/**
* This converter is for multi-element groups of primitive or complex types
* that have repetition level optional or required (so struct fields).
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 6a657c20fe..bdf02401b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -173,7 +173,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
if (value != null) {
schema match {
- case t @ ArrayType(_, false) => writeArray(
+ case t @ ArrayType(_, _) => writeArray(
t,
value.asInstanceOf[CatalystConverter.ArrayScalaType[_]])
case t @ MapType(_, _, _) => writeMap(
@@ -228,45 +228,57 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}
}
- // TODO: support null values, see
- // https://issues.apache.org/jira/browse/SPARK-1649
private[parquet] def writeArray(
schema: ArrayType,
array: CatalystConverter.ArrayScalaType[_]): Unit = {
val elementType = schema.elementType
writer.startGroup()
if (array.size > 0) {
- writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
- var i = 0
- while(i < array.size) {
- writeValue(elementType, array(i))
- i = i + 1
+ if (schema.containsNull) {
+ writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
+ var i = 0
+ while (i < array.size) {
+ writer.startGroup()
+ if (array(i) != null) {
+ writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+ writeValue(elementType, array(i))
+ writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+ }
+ writer.endGroup()
+ i = i + 1
+ }
+ writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
+ } else {
+ writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+ var i = 0
+ while (i < array.size) {
+ writeValue(elementType, array(i))
+ i = i + 1
+ }
+ writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
}
- writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
}
writer.endGroup()
}
- // TODO: support null values, see
- // https://issues.apache.org/jira/browse/SPARK-1649
private[parquet] def writeMap(
schema: MapType,
map: CatalystConverter.MapScalaType[_, _]): Unit = {
writer.startGroup()
if (map.size > 0) {
writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
- writer.startGroup()
- writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
- for(key <- map.keys) {
+ for ((key, value) <- map) {
+ writer.startGroup()
+ writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
writeValue(schema.keyType, key)
+ writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
+ if (value != null) {
+ writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
+ writeValue(schema.valueType, value)
+ writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
+ }
+ writer.endGroup()
}
- writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
- writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
- for(value <- map.values) {
- writeValue(schema.valueType, value)
- }
- writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
- writer.endGroup()
writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
}
writer.endGroup()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index af8cd0a73b..1a52377651 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -119,7 +119,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetOriginalType.LIST => { // TODO: check enums!
assert(groupType.getFieldCount == 1)
val field = groupType.getFields.apply(0)
- ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+ if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
+ val bag = field.asGroupType()
+ assert(bag.getFieldCount == 1)
+ ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
+ } else {
+ ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+ }
}
case ParquetOriginalType.MAP => {
assert(
@@ -129,28 +135,32 @@ private[parquet] object ParquetTypesConverter extends Logging {
assert(
keyValueGroup.getFieldCount == 2,
"Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
- val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
+
+ val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
- assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
- // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
- // at here.
- MapType(keyType, valueType)
+ MapType(keyType, valueType,
+ keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
}
case _ => {
// Note: the order of these checks is important!
if (correspondsToMap(groupType)) { // MapType
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
- val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
+
+ val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
- assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
- // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
- // at here.
- MapType(keyType, valueType)
+ MapType(keyType, valueType,
+ keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
} else if (correspondsToArray(groupType)) { // ArrayType
- val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
- ArrayType(elementType, containsNull = false)
+ val field = groupType.getFields.apply(0)
+ if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
+ val bag = field.asGroupType()
+ assert(bag.getFieldCount == 1)
+ ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
+ } else {
+ ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+ }
} else { // everything else: StructType
val fields = groupType
.getFields
@@ -249,13 +259,27 @@ private[parquet] object ParquetTypesConverter extends Logging {
inArray = true)
ConversionPatterns.listType(repetition, name, parquetElementType)
}
+ case ArrayType(elementType, true) => {
+ val parquetElementType = fromDataType(
+ elementType,
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ nullable = true,
+ inArray = false)
+ ConversionPatterns.listType(
+ repetition,
+ name,
+ new ParquetGroupType(
+ Repetition.REPEATED,
+ CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME,
+ parquetElementType))
+ }
case StructType(structFields) => {
val fields = structFields.map {
field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
}
new ParquetGroupType(repetition, name, fields)
}
- case MapType(keyType, valueType, _) => {
+ case MapType(keyType, valueType, valueContainsNull) => {
val parquetKeyType =
fromDataType(
keyType,
@@ -266,7 +290,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
fromDataType(
valueType,
CatalystConverter.MAP_VALUE_SCHEMA_NAME,
- nullable = false,
+ nullable = valueContainsNull,
inArray = false)
ConversionPatterns.mapType(
repetition,