aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-12-03 14:13:35 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-03 14:13:46 -0800
commit47931975eaffaf6f4c2a9b65d56a2f25806a2e12 (patch)
tree77a991296b0897b8468a88ca30f8795ae6d85c46
parent38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce (diff)
downloadspark-47931975eaffaf6f4c2a9b65d56a2f25806a2e12.tar.gz
spark-47931975eaffaf6f4c2a9b65d56a2f25806a2e12.tar.bz2
spark-47931975eaffaf6f4c2a9b65d56a2f25806a2e12.zip
[SPARK-4552][SQL] Avoid exception when reading empty parquet data through Hive
This is a very small fix that catches one specific exception and returns an empty table. #3441 will address this in a more principled way. Author: Michael Armbrust <michael@databricks.com> Closes #3586 from marmbrus/fixEmptyParquet and squashes the following commits: 2781d9f [Michael Armbrust] Handle empty lists for newParquet 04dd376 [Michael Armbrust] Avoid exception when reading empty parquet data through Hive (cherry picked from commit 513ef82e85661552e596d0b483b645ac24e86d4d) Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala96
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala6
3 files changed, 62 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 9b89c3bfb3..14f8659f15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -191,7 +191,10 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p)))
val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath))
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles:_*)
+ // FileInputFormat cannot handle empty lists.
+ if (selectedFiles.nonEmpty) {
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
+ }
// Push down filters when possible
predicates
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 56fc85239e..edf291f917 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
-import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
+import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
@@ -104,53 +104,61 @@ private[hive] trait HiveStrategies {
case a: AttributeReference => UnresolvedAttribute(a.name)
})
- if (relation.hiveQlTable.isPartitioned) {
- val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
- // Translate the predicate so that it automatically casts the input values to the correct
- // data types during evaluation
- val castedPredicate = rawPredicate transform {
- case a: AttributeReference =>
- val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
- val key = relation.partitionKeys(idx)
- Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
- }
-
- val inputData = new GenericMutableRow(relation.partitionKeys.size)
- val pruningCondition =
- if(codegenEnabled) {
- GeneratePredicate(castedPredicate)
- } else {
- InterpretedPredicate(castedPredicate)
+ try {
+ if (relation.hiveQlTable.isPartitioned) {
+ val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+ // Translate the predicate so that it automatically casts the input values to the
+ // correct data types during evaluation.
+ val castedPredicate = rawPredicate transform {
+ case a: AttributeReference =>
+ val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
+ val key = relation.partitionKeys(idx)
+ Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}
- val partitions = relation.hiveQlPartitions.filter { part =>
- val partitionValues = part.getValues
- var i = 0
- while (i < partitionValues.size()) {
- inputData(i) = partitionValues(i)
- i += 1
+ val inputData = new GenericMutableRow(relation.partitionKeys.size)
+ val pruningCondition =
+ if (codegenEnabled) {
+ GeneratePredicate(castedPredicate)
+ } else {
+ InterpretedPredicate(castedPredicate)
+ }
+
+ val partitions = relation.hiveQlPartitions.filter { part =>
+ val partitionValues = part.getValues
+ var i = 0
+ while (i < partitionValues.size()) {
+ inputData(i) = partitionValues(i)
+ i += 1
+ }
+ pruningCondition(inputData)
}
- pruningCondition(inputData)
- }
- hiveContext
- .parquetFile(partitions.map(_.getLocation).mkString(","))
- .addPartitioningAttributes(relation.partitionKeys)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection:_*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)):: Nil
- } else {
- hiveContext
- .parquetFile(relation.hiveQlTable.getDataLocation.toString)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection:_*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ hiveContext
+ .parquetFile(partitions.map(_.getLocation).mkString(","))
+ .addPartitioningAttributes(relation.partitionKeys)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection: _*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ } else {
+ hiveContext
+ .parquetFile(relation.hiveQlTable.getDataLocation.toString)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection: _*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ }
+ } catch {
+ // parquetFile will throw an exception when there is no data.
+ // TODO: Remove this hack for Spark 1.3.
+ case iae: java.lang.IllegalArgumentException
+ if iae.getMessage.contains("Can not create a Path from an empty string") =>
+ PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
}
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 7159ebd035..488ebba043 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -218,6 +218,12 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
10)
}
+ test(s"non-existant partition $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+ 0)
+ }
+
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),