aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-12-03 14:13:35 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-03 14:13:35 -0800
commit513ef82e85661552e596d0b483b645ac24e86d4d (patch)
treeee1a169705a96ee41d340ea2e4de2bb475bbb34a /sql/hive
parent90ec643e9af4c8bbb9000edca08c07afb17939c7 (diff)
downloadspark-513ef82e85661552e596d0b483b645ac24e86d4d.tar.gz
spark-513ef82e85661552e596d0b483b645ac24e86d4d.tar.bz2
spark-513ef82e85661552e596d0b483b645ac24e86d4d.zip
[SPARK-4552][SQL] Avoid exception when reading empty parquet data through Hive
This is a very small fix that catches one specific exception and returns an empty table. #3441 will address this in a more principled way. Author: Michael Armbrust <michael@databricks.com> Closes #3586 from marmbrus/fixEmptyParquet and squashes the following commits: 2781d9f [Michael Armbrust] Handle empty lists for newParquet 04dd376 [Michael Armbrust] Avoid exception when reading empty parquet data through Hive
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala96
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala6
2 files changed, 58 insertions, 44 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 56fc85239e..edf291f917 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
-import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
+import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
@@ -104,53 +104,61 @@ private[hive] trait HiveStrategies {
case a: AttributeReference => UnresolvedAttribute(a.name)
})
- if (relation.hiveQlTable.isPartitioned) {
- val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
- // Translate the predicate so that it automatically casts the input values to the correct
- // data types during evaluation
- val castedPredicate = rawPredicate transform {
- case a: AttributeReference =>
- val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
- val key = relation.partitionKeys(idx)
- Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
- }
-
- val inputData = new GenericMutableRow(relation.partitionKeys.size)
- val pruningCondition =
- if(codegenEnabled) {
- GeneratePredicate(castedPredicate)
- } else {
- InterpretedPredicate(castedPredicate)
+ try {
+ if (relation.hiveQlTable.isPartitioned) {
+ val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+ // Translate the predicate so that it automatically casts the input values to the
+ // correct data types during evaluation.
+ val castedPredicate = rawPredicate transform {
+ case a: AttributeReference =>
+ val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
+ val key = relation.partitionKeys(idx)
+ Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}
- val partitions = relation.hiveQlPartitions.filter { part =>
- val partitionValues = part.getValues
- var i = 0
- while (i < partitionValues.size()) {
- inputData(i) = partitionValues(i)
- i += 1
+ val inputData = new GenericMutableRow(relation.partitionKeys.size)
+ val pruningCondition =
+ if (codegenEnabled) {
+ GeneratePredicate(castedPredicate)
+ } else {
+ InterpretedPredicate(castedPredicate)
+ }
+
+ val partitions = relation.hiveQlPartitions.filter { part =>
+ val partitionValues = part.getValues
+ var i = 0
+ while (i < partitionValues.size()) {
+ inputData(i) = partitionValues(i)
+ i += 1
+ }
+ pruningCondition(inputData)
}
- pruningCondition(inputData)
- }
- hiveContext
- .parquetFile(partitions.map(_.getLocation).mkString(","))
- .addPartitioningAttributes(relation.partitionKeys)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection:_*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)):: Nil
- } else {
- hiveContext
- .parquetFile(relation.hiveQlTable.getDataLocation.toString)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection:_*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ hiveContext
+ .parquetFile(partitions.map(_.getLocation).mkString(","))
+ .addPartitioningAttributes(relation.partitionKeys)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection: _*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ } else {
+ hiveContext
+ .parquetFile(relation.hiveQlTable.getDataLocation.toString)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection: _*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ }
+ } catch {
+ // parquetFile will throw an exception when there is no data.
+ // TODO: Remove this hack for Spark 1.3.
+ case iae: java.lang.IllegalArgumentException
+ if iae.getMessage.contains("Can not create a Path from an empty string") =>
+ PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
}
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 7159ebd035..488ebba043 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -218,6 +218,12 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
10)
}
+ test(s"non-existant partition $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+ 0)
+ }
+
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),