aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2014-07-10 19:23:44 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-10 19:23:44 -0700
commitf5abd271292f5c98eb8b1974c1df31d08ed388dd (patch)
tree51816d0e04830607f8e682dbacff7d8bf1afc85e /sql
parentf62c42728990266d5d5099abe241f699189ba025 (diff)
downloadspark-f5abd271292f5c98eb8b1974c1df31d08ed388dd.tar.gz
spark-f5abd271292f5c98eb8b1974c1df31d08ed388dd.tar.bz2
spark-f5abd271292f5c98eb8b1974c1df31d08ed388dd.zip
[SPARK-2415] [SQL] RowWriteSupport should handle empty ArrayType correctly.
`RowWriteSupport` doesn't write empty `ArrayType` value, so the read value becomes `null`. It should write empty `ArrayType` value as it is. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #1339 from ueshin/issues/SPARK-2415 and squashes the following commits: 32afc87 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-2415 2f05196 [Takuya UESHIN] Fix RowWriteSupport to handle empty ArrayType correctly.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala10
3 files changed, 16 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 889a408e3c..75748b2b54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter(
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
protected [parquet] val converters: Array[Converter] =
- schema.map(field =>
- CatalystConverter.createConverter(field, schema.indexOf(field), this))
- .toArray
+ schema.zipWithIndex.map {
+ case (field, idx) => CatalystConverter.createConverter(field, idx, this)
+ }.toArray
override val size = schema.size
@@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
new ParquetRelation.RowType(attributes.length))
protected [parquet] val converters: Array[Converter] =
- schema.map(field =>
- CatalystConverter.createConverter(field, schema.indexOf(field), this))
- .toArray
+ schema.zipWithIndex.map {
+ case (field, idx) => CatalystConverter.createConverter(field, idx, this)
+ }.toArray
override val size = schema.size
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 9cd5dc5bbd..108f8b6815 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.startMessage()
while(index < attributes.size) {
// null values indicate optional fields but we do not check currently
- if (record(index) != null && record(index) != Nil) {
+ if (record(index) != null) {
writer.startField(attributes(index).name, index)
writeValue(attributes(index).dataType, record(index))
writer.endField(attributes(index).name, index)
@@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}
private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
- if (value != null && value != Nil) {
+ if (value != null) {
schema match {
case t @ ArrayType(_) => writeArray(
t,
@@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}
private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
- if (value != null && value != Nil) {
+ if (value != null) {
schema match {
case StringType => writer.addBinary(
Binary.fromByteArray(
@@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] def writeStruct(
schema: StructType,
struct: CatalystConverter.StructScalaType[_]): Unit = {
- if (struct != null && struct != Nil) {
+ if (struct != null) {
val fields = schema.fields.toArray
writer.startGroup()
var i = 0
while(i < fields.size) {
- if (struct(i) != null && struct(i) != Nil) {
+ if (struct(i) != null) {
writer.startField(fields(i).name, i)
writeValue(fields(i).dataType, struct(i))
writer.endField(fields(i).name, i)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index dbf315947f..8fa143e2de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -78,7 +78,7 @@ case class AllDataTypesWithNonPrimitiveType(
booleanField: Boolean,
array: Seq[Int],
map: Map[Int, String],
- nested: Nested)
+ data: Data)
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
TestData // Load test data tables.
@@ -138,7 +138,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypesWithNonPrimitiveType(
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
- Seq(x), Map(x -> s"$x"), Nested(x, s"$x")))
+ (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
@@ -151,9 +151,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
- assert(result(i)(8) === Seq(i))
- assert(result(i)(9) === Map(i -> s"$i"))
- assert(result(i)(10) === new GenericRow(Array[Any](i, s"$i")))
+ assert(result(i)(8) === (0 until i))
+ assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
+ assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
}
}