diff options
author | Michael Armbrust <michael@databricks.com> | 2014-12-03 14:13:35 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-03 14:13:35 -0800 |
commit | 513ef82e85661552e596d0b483b645ac24e86d4d (patch) | |
tree | ee1a169705a96ee41d340ea2e4de2bb475bbb34a | |
parent | 90ec643e9af4c8bbb9000edca08c07afb17939c7 (diff) | |
download | spark-513ef82e85661552e596d0b483b645ac24e86d4d.tar.gz spark-513ef82e85661552e596d0b483b645ac24e86d4d.tar.bz2 spark-513ef82e85661552e596d0b483b645ac24e86d4d.zip |
[SPARK-4552][SQL] Avoid exception when reading empty parquet data through Hive
This is a very small fix that catches one specific exception and returns an empty table. #3441 will address this in a more principled way.
Author: Michael Armbrust <michael@databricks.com>
Closes #3586 from marmbrus/fixEmptyParquet and squashes the following commits:
2781d9f [Michael Armbrust] Handle empty lists for newParquet
04dd376 [Michael Armbrust] Avoid exception when reading empty parquet data through Hive
3 files changed, 62 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9b89c3bfb3..14f8659f15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -191,7 +191,10 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p))) val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration) val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath)) - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles:_*) + // FileInputFormat cannot handle empty lists. + if (selectedFiles.nonEmpty) { + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*) + } // Push down filters when possible predicates diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 56fc85239e..edf291f917 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.types.StringType -import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} +import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD} import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation @@ -104,53 +104,61 @@ private[hive] trait HiveStrategies { case a: AttributeReference => UnresolvedAttribute(a.name) }) - if (relation.hiveQlTable.isPartitioned) { - val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) - // Translate the predicate so that it automatically casts the input values to the correct - // data types during evaluation - val castedPredicate = rawPredicate transform { - case a: AttributeReference => - val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) - val key = relation.partitionKeys(idx) - Cast(BoundReference(idx, StringType, nullable = true), key.dataType) - } - - val inputData = new GenericMutableRow(relation.partitionKeys.size) - val pruningCondition = - if(codegenEnabled) { - GeneratePredicate(castedPredicate) - } else { - InterpretedPredicate(castedPredicate) + try { + if (relation.hiveQlTable.isPartitioned) { + val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) + // Translate the predicate so that it automatically casts the input values to the + // correct data types during evaluation. + val castedPredicate = rawPredicate transform { + case a: AttributeReference => + val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) + val key = relation.partitionKeys(idx) + Cast(BoundReference(idx, StringType, nullable = true), key.dataType) } - val partitions = relation.hiveQlPartitions.filter { part => - val partitionValues = part.getValues - var i = 0 - while (i < partitionValues.size()) { - inputData(i) = partitionValues(i) - i += 1 + val inputData = new GenericMutableRow(relation.partitionKeys.size) + val pruningCondition = + if (codegenEnabled) { + GeneratePredicate(castedPredicate) + } else { + InterpretedPredicate(castedPredicate) + } + + val partitions = relation.hiveQlPartitions.filter { part => + val partitionValues = part.getValues + var i = 0 + while (i < partitionValues.size()) { + inputData(i) = partitionValues(i) + i += 1 + } + pruningCondition(inputData) } - pruningCondition(inputData) - } - hiveContext - .parquetFile(partitions.map(_.getLocation).mkString(",")) - .addPartitioningAttributes(relation.partitionKeys) - .lowerCase - .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)):: Nil - } else { - hiveContext - .parquetFile(relation.hiveQlTable.getDataLocation.toString) - .lowerCase - .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)) :: Nil + hiveContext + .parquetFile(partitions.map(_.getLocation).mkString(",")) + .addPartitioningAttributes(relation.partitionKeys) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection: _*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute)) :: Nil + } else { + hiveContext + .parquetFile(relation.hiveQlTable.getDataLocation.toString) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection: _*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute)) :: Nil + } + } catch { + // parquetFile will throw an exception when there is no data. + // TODO: Remove this hack for Spark 1.3. + case iae: java.lang.IllegalArgumentException + if iae.getMessage.contains("Can not create a Path from an empty string") => + PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil } case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 7159ebd035..488ebba043 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -218,6 +218,12 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { 10) } + test(s"non-existant partition $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"), + 0) + } + test(s"multi-partition pruned count $table") { checkAnswer( sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), |