From f5abd271292f5c98eb8b1974c1df31d08ed388dd Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 10 Jul 2014 19:23:44 -0700 Subject: [SPARK-2415] [SQL] RowWriteSupport should handle empty ArrayType correctly. `RowWriteSupport` doesn't write empty `ArrayType` value, so the read value becomes `null`. It should write empty `ArrayType` value as it is. Author: Takuya UESHIN Closes #1339 from ueshin/issues/SPARK-2415 and squashes the following commits: 32afc87 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-2415 2f05196 [Takuya UESHIN] Fix RowWriteSupport to handle empty ArrayType correctly. --- .../org/apache/spark/sql/parquet/ParquetConverter.scala | 12 ++++++------ .../org/apache/spark/sql/parquet/ParquetTableSupport.scala | 10 +++++----- .../org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 10 +++++----- 3 files changed, 16 insertions(+), 16 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 889a408e3c..75748b2b54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter( this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null) protected [parquet] val converters: Array[Converter] = - schema.map(field => - CatalystConverter.createConverter(field, schema.indexOf(field), this)) - .toArray + schema.zipWithIndex.map { + case (field, idx) => CatalystConverter.createConverter(field, idx, this) + }.toArray override val size = schema.size @@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter( new ParquetRelation.RowType(attributes.length)) protected [parquet] val converters: Array[Converter] = - schema.map(field => - CatalystConverter.createConverter(field, schema.indexOf(field), this)) - .toArray + schema.zipWithIndex.map { + case (field, idx) => CatalystConverter.createConverter(field, idx, this) + }.toArray override val size = schema.size diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 9cd5dc5bbd..108f8b6815 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { writer.startMessage() while(index < attributes.size) { // null values indicate optional fields but we do not check currently - if (record(index) != null && record(index) != Nil) { + if (record(index) != null) { writer.startField(attributes(index).name, index) writeValue(attributes(index).dataType, record(index)) writer.endField(attributes(index).name, index) @@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } private[parquet] def writeValue(schema: DataType, value: Any): Unit = { - if (value != null && value != Nil) { + if (value != null) { schema match { case t @ ArrayType(_) => writeArray( t, @@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = { - if (value != null && value != Nil) { + if (value != null) { schema match { case StringType => writer.addBinary( Binary.fromByteArray( @@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { private[parquet] def writeStruct( schema: StructType, struct: CatalystConverter.StructScalaType[_]): Unit = { - if (struct != null && struct != Nil) { + if (struct != null) { val fields = schema.fields.toArray writer.startGroup() var i = 0 while(i < fields.size) { - if (struct(i) != null && struct(i) != Nil) { + if (struct(i) != null) { writer.startField(fields(i).name, i) writeValue(fields(i).dataType, struct(i)) writer.endField(fields(i).name, i) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index dbf315947f..8fa143e2de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -78,7 +78,7 @@ case class AllDataTypesWithNonPrimitiveType( booleanField: Boolean, array: Seq[Int], map: Map[Int, String], - nested: Nested) + data: Data) class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { TestData // Load test data tables. @@ -138,7 +138,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestSQLContext.sparkContext.parallelize(range) .map(x => AllDataTypesWithNonPrimitiveType( s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0, - Seq(x), Map(x -> s"$x"), Nested(x, s"$x"))) + (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x")))) .saveAsParquetFile(tempDir) val result = parquetFile(tempDir).collect() range.foreach { @@ -151,9 +151,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(result(i).getShort(5) === i.toShort) assert(result(i).getByte(6) === i.toByte) assert(result(i).getBoolean(7) === (i % 2 == 0)) - assert(result(i)(8) === Seq(i)) - assert(result(i)(9) === Map(i -> s"$i")) - assert(result(i)(10) === new GenericRow(Array[Any](i, s"$i"))) + assert(result(i)(8) === (0 until i)) + assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap) + assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i"))))) } } -- cgit v1.2.3