aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-28 21:15:43 +0800
committerCheng Lian <lian@databricks.com>2015-02-28 21:15:43 +0800
commite6003f0a571ba44fcd011e695c8622e11cfee7dd (patch)
treedf9e1852405e08f2c58f03b215b2a59a919ff5b5 /sql/core
parent9168259813713a12251fb0d457ffbbed8ba857f8 (diff)
downloadspark-e6003f0a571ba44fcd011e695c8622e11cfee7dd.tar.gz
spark-e6003f0a571ba44fcd011e695c8622e11cfee7dd.tar.bz2
spark-e6003f0a571ba44fcd011e695c8622e11cfee7dd.zip
[SPARK-5775] [SQL] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
This PR adapts anselmevignon's #4697 to master and branch-1.3. Please refer to PR description of #4697 for details. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4792) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Author: Cheng Lian <liancheng@users.noreply.github.com> Author: Yin Huai <yhuai@databricks.com> Closes #4792 from liancheng/spark-5775 and squashes the following commits: 538f506 [Cheng Lian] Addresses comments cee55cf [Cheng Lian] Merge pull request #4 from yhuai/spark-5775-yin b0b74fb [Yin Huai] Remove runtime pattern matching. ca6e038 [Cheng Lian] Fixes SPARK-5775
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala48
2 files changed, 86 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 4e4f647767..225ec6db7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -126,6 +126,13 @@ private[sql] case class ParquetTableScan(
conf)
if (requestedPartitionOrdinals.nonEmpty) {
+ // This check is based on CatalystConverter.createRootConverter.
+ val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
+
+ // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into
+ // the `mapPartitionsWithInputSplit` closure below.
+ val outputSize = output.size
+
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
@@ -143,19 +150,47 @@ private[sql] case class ParquetTableScan(
relation.partitioningAttributes
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
- new Iterator[Row] {
- def hasNext = iter.hasNext
- def next() = {
- val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
-
- // Parquet will leave partitioning columns empty, so we fill them in here.
- var i = 0
- while (i < requestedPartitionOrdinals.size) {
- row(requestedPartitionOrdinals(i)._2) =
- partitionRowValues(requestedPartitionOrdinals(i)._1)
- i += 1
+ if (primitiveRow) {
+ new Iterator[Row] {
+ def hasNext = iter.hasNext
+ def next() = {
+ // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
+ val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
+
+ // Parquet will leave partitioning columns empty, so we fill them in here.
+ var i = 0
+ while (i < requestedPartitionOrdinals.size) {
+ row(requestedPartitionOrdinals(i)._2) =
+ partitionRowValues(requestedPartitionOrdinals(i)._1)
+ i += 1
+ }
+ row
+ }
+ }
+ } else {
+ // Create a mutable row since we need to fill in values from partition columns.
+ val mutableRow = new GenericMutableRow(outputSize)
+ new Iterator[Row] {
+ def hasNext = iter.hasNext
+ def next() = {
+ // We are using CatalystGroupConverter and it returns a GenericRow.
+ // Since GenericRow is not mutable, we just cast it to a Row.
+ val row = iter.next()._2.asInstanceOf[Row]
+
+ var i = 0
+ while (i < row.size) {
+ mutableRow(i) = row(i)
+ i += 1
+ }
+ // Parquet will leave partitioning columns empty, so we fill them in here.
+ i = 0
+ while (i < requestedPartitionOrdinals.size) {
+ mutableRow(requestedPartitionOrdinals(i)._2) =
+ partitionRowValues(requestedPartitionOrdinals(i)._1)
+ i += 1
+ }
+ mutableRow
}
- row
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e648618468..6d56be3ab8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -482,6 +482,10 @@ private[sql] case class ParquetRelation2(
// When the data does not include the key and the key is requested then we must fill it in
// based on information from the input split.
if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
+ // This check is based on CatalystConverter.createRootConverter.
+ val primitiveRow =
+ requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
+
baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
val partValues = selectedPartitions.collectFirst {
case p if split.getPath.getParent.toString == p.path => p.values
@@ -489,16 +493,42 @@ private[sql] case class ParquetRelation2(
val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
- iterator.map { pair =>
- val row = pair._2.asInstanceOf[SpecificMutableRow]
- var i = 0
- while (i < requiredPartOrdinal.size) {
- // TODO Avoids boxing cost here!
- val partOrdinal = requiredPartOrdinal(i)
- row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
- i += 1
+ if (primitiveRow) {
+ iterator.map { pair =>
+ // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
+ val row = pair._2.asInstanceOf[SpecificMutableRow]
+ var i = 0
+ while (i < requiredPartOrdinal.size) {
+ // TODO Avoids boxing cost here!
+ val partOrdinal = requiredPartOrdinal(i)
+ row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
+ i += 1
+ }
+ row
+ }
+ } else {
+ // Create a mutable row since we need to fill in values from partition columns.
+ val mutableRow = new GenericMutableRow(requestedSchema.size)
+ iterator.map { pair =>
+ // We are using CatalystGroupConverter and it returns a GenericRow.
+ // Since GenericRow is not mutable, we just cast it to a Row.
+ val row = pair._2.asInstanceOf[Row]
+ var i = 0
+ while (i < row.size) {
+ // TODO Avoids boxing cost here!
+ mutableRow(i) = row(i)
+ i += 1
+ }
+
+ i = 0
+ while (i < requiredPartOrdinal.size) {
+ // TODO Avoids boxing cost here!
+ val partOrdinal = requiredPartOrdinal(i)
+ mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
+ i += 1
+ }
+ mutableRow
}
- row
}
}
} else {