aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-01-12 15:19:09 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-12 15:19:09 -0800
commit5d9fa550820543ee1b0ce82997917745973a5d65 (patch)
treece55479e489574922b378c4b05b97cae14f76504 /sql/core
parent3aed3051c0b6cd5f38d7db7d20fa7a1680bfde6f (diff)
downloadspark-5d9fa550820543ee1b0ce82997917745973a5d65.tar.gz
spark-5d9fa550820543ee1b0ce82997917745973a5d65.tar.bz2
spark-5d9fa550820543ee1b0ce82997917745973a5d65.zip
[SPARK-5049][SQL] Fix ordering of partition columns in ParquetTableScan
Followup to #3870. Props to rahulaggarwalguavus for identifying the issue. Author: Michael Armbrust <michael@databricks.com> Closes #3990 from marmbrus/SPARK-5049 and squashes the following commits: dd03e4e [Michael Armbrust] Fill in the partition values of parquet scans instead of using JoinedRow
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala43
2 files changed, 29 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index b237a07c72..2835dc3408 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -28,7 +28,7 @@ import parquet.schema.MessageType
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
/**
@@ -67,6 +67,8 @@ private[sql] case class ParquetRelation(
conf,
sqlContext.isParquetBinaryAsString)
+ lazy val attributeMap = AttributeMap(output.map(o => o -> o))
+
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
// Equals must also take into account the output attributes so that we can distinguish between
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 96bace1769..f5487740d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -64,18 +64,17 @@ case class ParquetTableScan(
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
// by exprId. note: output cannot be transient, see
// https://issues.apache.org/jira/browse/SPARK-1367
- val normalOutput =
- attributes
- .filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
- .flatMap(a => relation.output.find(o => o.exprId == a.exprId))
+ val output = attributes.map(relation.attributeMap)
- val partOutput =
- attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
+ // A mapping of ordinals partitionRow -> finalOutput.
+ val requestedPartitionOrdinals = {
+ val partitionAttributeOrdinals = AttributeMap(relation.partitioningAttributes.zipWithIndex)
- def output = partOutput ++ normalOutput
-
- assert(normalOutput.size + partOutput.size == attributes.size,
- s"$normalOutput + $partOutput != $attributes, ${relation.output}")
+ attributes.zipWithIndex.flatMap {
+ case (attribute, finalOrdinal) =>
+ partitionAttributeOrdinals.get(attribute).map(_ -> finalOrdinal)
+ }
+ }.toArray
override def execute(): RDD[Row] = {
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
@@ -97,7 +96,7 @@ case class ParquetTableScan(
// Store both requested and original schema in `Configuration`
conf.set(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- ParquetTypesConverter.convertToString(normalOutput))
+ ParquetTypesConverter.convertToString(output))
conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(relation.output))
@@ -125,7 +124,7 @@ case class ParquetTableScan(
classOf[Row],
conf)
- if (partOutput.nonEmpty) {
+ if (requestedPartitionOrdinals.nonEmpty) {
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
@@ -138,15 +137,25 @@ case class ParquetTableScan(
case _ => None
}.toMap
+ // Convert the partitioning attributes into the correct types
val partitionRowValues =
- partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
+ relation.partitioningAttributes
+ .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
new Iterator[Row] {
- private[this] val joinedRow = new JoinedRow5(Row(partitionRowValues:_*), null)
-
def hasNext = iter.hasNext
-
- def next() = joinedRow.withRight(iter.next()._2)
+ def next() = {
+ val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
+
+ // Parquet will leave partitioning columns empty, so we fill them in here.
+ var i = 0
+ while (i < requestedPartitionOrdinals.size) {
+ row(requestedPartitionOrdinals(i)._2) =
+ partitionRowValues(requestedPartitionOrdinals(i)._1)
+ i += 1
+ }
+ row
+ }
}
}
} else {