diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2014-08-26 18:28:41 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-26 18:28:41 -0700 |
commit | 727cb25bcc29481d6b744abef1ca091e64b5f91f (patch) | |
tree | 4edc54a23a8f4581e931ced97ddda2a7a5da4085 /sql | |
parent | 73b3089b8d2901dab11bb1ef6f46c29625b677fe (diff) | |
download | spark-727cb25bcc29481d6b744abef1ca091e64b5f91f.tar.gz spark-727cb25bcc29481d6b744abef1ca091e64b5f91f.tar.bz2 spark-727cb25bcc29481d6b744abef1ca091e64b5f91f.zip |
[SPARK-3036][SPARK-3037][SQL] Add MapType/ArrayType containing null value support to Parquet.
JIRA:
- https://issues.apache.org/jira/browse/SPARK-3036
- https://issues.apache.org/jira/browse/SPARK-3037
Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`:
```
message root {
optional group a (MAP) {
repeated group map (MAP_KEY_VALUE) {
required int32 key;
optional int32 value;
}
}
}
```
for `ArrayType` when `containsNull` is `true`:
```
message root {
optional group a (LIST) {
repeated group bag {
optional int32 array;
}
}
}
```
We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues.
Notice:
This PR is based on #1963 and #1889.
Please check them first.
/cc marmbrus, yhuai
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits:
4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet.
013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet.
62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037
8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037
Diffstat (limited to 'sql')
4 files changed, 167 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index ef4526ec03..9fd6aed402 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -58,6 +58,7 @@ private[sql] object CatalystConverter { // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). // Note that "array" for the array elements is chosen by ParquetAvro. // Using a different value will result in Parquet silently dropping columns. + val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" val ARRAY_ELEMENTS_SCHEMA_NAME = "array" val MAP_KEY_SCHEMA_NAME = "key" val MAP_VALUE_SCHEMA_NAME = "value" @@ -82,6 +83,9 @@ private[sql] object CatalystConverter { case ArrayType(elementType: DataType, false) => { new CatalystArrayConverter(elementType, fieldIndex, parent) } + case ArrayType(elementType: DataType, true) => { + new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent) + } case StructType(fields: Seq[StructField]) => { new CatalystStructConverter(fields.toArray, fieldIndex, parent) } @@ -568,6 +572,85 @@ private[parquet] class CatalystNativeArrayConverter( } /** + * A `parquet.io.api.GroupConverter` that converts a single-element groups that + * match the characteristics of an array contains null (see + * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an + * [[org.apache.spark.sql.catalyst.types.ArrayType]]. + * + * @param elementType The type of the array elements (complex or primitive) + * @param index The position of this (array) field inside its parent converter + * @param parent The parent converter + * @param buffer A data buffer + */ +private[parquet] class CatalystArrayContainsNullConverter( + val elementType: DataType, + val index: Int, + protected[parquet] val parent: CatalystConverter, + protected[parquet] var buffer: Buffer[Any]) + extends CatalystConverter { + + def this(elementType: DataType, index: Int, parent: CatalystConverter) = + this( + elementType, + index, + parent, + new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) + + protected[parquet] val converter: Converter = new CatalystConverter { + + private var current: Any = null + + val converter = CatalystConverter.createConverter( + new CatalystConverter.FieldType( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + elementType, + false), + fieldIndex = 0, + parent = this) + + override def getConverter(fieldIndex: Int): Converter = converter + + override def end(): Unit = parent.updateField(index, current) + + override def start(): Unit = { + current = null + } + + override protected[parquet] val size: Int = 1 + override protected[parquet] val index: Int = 0 + override protected[parquet] val parent = CatalystArrayContainsNullConverter.this + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + current = value + } + + override protected[parquet] def clearBuffer(): Unit = {} + } + + override def getConverter(fieldIndex: Int): Converter = converter + + // arrays have only one (repeated) field, which is its elements + override val size = 1 + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + buffer += value + } + + override protected[parquet] def clearBuffer(): Unit = { + buffer.clear() + } + + override def start(): Unit = {} + + override def end(): Unit = { + assert(parent != null) + // here we need to make sure to use ArrayScalaType + parent.updateField(index, buffer.toArray.toSeq) + clearBuffer() + } +} + +/** * This converter is for multi-element groups of primitive or complex types * that have repetition level optional or required (so struct fields). * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 6a657c20fe..bdf02401b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -173,7 +173,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { private[parquet] def writeValue(schema: DataType, value: Any): Unit = { if (value != null) { schema match { - case t @ ArrayType(_, false) => writeArray( + case t @ ArrayType(_, _) => writeArray( t, value.asInstanceOf[CatalystConverter.ArrayScalaType[_]]) case t @ MapType(_, _, _) => writeMap( @@ -228,45 +228,57 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } } - // TODO: support null values, see - // https://issues.apache.org/jira/browse/SPARK-1649 private[parquet] def writeArray( schema: ArrayType, array: CatalystConverter.ArrayScalaType[_]): Unit = { val elementType = schema.elementType writer.startGroup() if (array.size > 0) { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - var i = 0 - while(i < array.size) { - writeValue(elementType, array(i)) - i = i + 1 + if (schema.containsNull) { + writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) + var i = 0 + while (i < array.size) { + writer.startGroup() + if (array(i) != null) { + writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + writeValue(elementType, array(i)) + writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + } + writer.endGroup() + i = i + 1 + } + writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) + } else { + writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + var i = 0 + while (i < array.size) { + writeValue(elementType, array(i)) + i = i + 1 + } + writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) } - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) } writer.endGroup() } - // TODO: support null values, see - // https://issues.apache.org/jira/browse/SPARK-1649 private[parquet] def writeMap( schema: MapType, map: CatalystConverter.MapScalaType[_, _]): Unit = { writer.startGroup() if (map.size > 0) { writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0) - writer.startGroup() - writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - for(key <- map.keys) { + for ((key, value) <- map) { + writer.startGroup() + writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) writeValue(schema.keyType, key) + writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) + if (value != null) { + writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) + writeValue(schema.valueType, value) + writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) + } + writer.endGroup() } - writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - for(value <- map.values) { - writeValue(schema.valueType, value) - } - writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - writer.endGroup() writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0) } writer.endGroup() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index af8cd0a73b..1a52377651 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -119,7 +119,13 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetOriginalType.LIST => { // TODO: check enums! assert(groupType.getFieldCount == 1) val field = groupType.getFields.apply(0) - ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) { + val bag = field.asGroupType() + assert(bag.getFieldCount == 1) + ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true) + } else { + ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + } } case ParquetOriginalType.MAP => { assert( @@ -129,28 +135,32 @@ private[parquet] object ParquetTypesConverter extends Logging { assert( keyValueGroup.getFieldCount == 2, "Parquet Map type malformatted: nested group should have 2 (key, value) fields!") - val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + + val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) - assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) - // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true - // at here. - MapType(keyType, valueType) + MapType(keyType, valueType, + keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED) } case _ => { // Note: the order of these checks is important! if (correspondsToMap(groupType)) { // MapType val keyValueGroup = groupType.getFields.apply(0).asGroupType() - val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + + val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) - assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) - // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true - // at here. - MapType(keyType, valueType) + MapType(keyType, valueType, + keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED) } else if (correspondsToArray(groupType)) { // ArrayType - val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString) - ArrayType(elementType, containsNull = false) + val field = groupType.getFields.apply(0) + if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) { + val bag = field.asGroupType() + assert(bag.getFieldCount == 1) + ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true) + } else { + ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + } } else { // everything else: StructType val fields = groupType .getFields @@ -249,13 +259,27 @@ private[parquet] object ParquetTypesConverter extends Logging { inArray = true) ConversionPatterns.listType(repetition, name, parquetElementType) } + case ArrayType(elementType, true) => { + val parquetElementType = fromDataType( + elementType, + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + nullable = true, + inArray = false) + ConversionPatterns.listType( + repetition, + name, + new ParquetGroupType( + Repetition.REPEATED, + CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, + parquetElementType)) + } case StructType(structFields) => { val fields = structFields.map { field => fromDataType(field.dataType, field.name, field.nullable, inArray = false) } new ParquetGroupType(repetition, name, fields) } - case MapType(keyType, valueType, _) => { + case MapType(keyType, valueType, valueContainsNull) => { val parquetKeyType = fromDataType( keyType, @@ -266,7 +290,7 @@ private[parquet] object ParquetTypesConverter extends Logging { fromDataType( valueType, CatalystConverter.MAP_VALUE_SCHEMA_NAME, - nullable = false, + nullable = valueContainsNull, inArray = false) ConversionPatterns.mapType( repetition, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 28f43b3683..4219cc0800 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -78,7 +78,9 @@ case class AllDataTypesWithNonPrimitiveType( booleanField: Boolean, binaryField: Array[Byte], array: Seq[Int], - map: Map[Int, String], + arrayContainsNull: Seq[Option[Int]], + map: Map[Int, Long], + mapValueContainsNull: Map[Int, Option[Long]], data: Data) class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { @@ -287,7 +289,11 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA .map(x => AllDataTypesWithNonPrimitiveType( s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0, (0 to x).map(_.toByte).toArray, - (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x")))) + (0 until x), + (0 until x).map(Option(_).filter(_ % 3 == 0)), + (0 until x).map(i => i -> i.toLong).toMap, + (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None), + Data((0 until x), Nested(x, s"$x")))) .saveAsParquetFile(tempDir) val result = parquetFile(tempDir).collect() range.foreach { @@ -302,8 +308,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(result(i).getBoolean(7) === (i % 2 == 0)) assert(result(i)(8) === (0 to i).map(_.toByte).toArray) assert(result(i)(9) === (0 until i)) - assert(result(i)(10) === (0 until i).map(i => i -> s"$i").toMap) - assert(result(i)(11) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i"))))) + assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null)) + assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap) + assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null)) + assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i"))))) } } |