diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-07-03 16:48:04 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-07-03 16:48:04 +0800 |
commit | ea990f96930066c36055734d4f17eaf8e496eb3f (patch) | |
tree | 0ba98d13847dc371527d97db56872ed826685dc2 /sql | |
parent | 3000b4b29f9165f436f186a8c1ba818e24f90615 (diff) | |
download | spark-ea990f96930066c36055734d4f17eaf8e496eb3f.tar.gz spark-ea990f96930066c36055734d4f17eaf8e496eb3f.tar.bz2 spark-ea990f96930066c36055734d4f17eaf8e496eb3f.zip |
[SPARK-16329][SQL] Star Expansion over Table Containing No Column
#### What changes were proposed in this pull request?
Star expansion over a table containing zero column does not work since 1.6. However, it works in Spark 1.5.1. This PR is to fix the issue in the master branch.
For example,
```scala
val rddNoCols = sqlContext.sparkContext.parallelize(1 to 10).map(_ => Row.empty)
val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.registerTempTable("temp_table_no_cols")
sqlContext.sql("select * from temp_table_no_cols").show
```
Without the fix, users will get the following the exception:
```
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199)
```
#### How was this patch tested?
Tests are added
Author: gatorsmile <gatorsmile@gmail.com>
Closes #14007 from gatorsmile/starExpansionTableWithZeroColumn.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala | 15 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 31 |
2 files changed, 37 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index b883546135..609089a302 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -215,23 +215,20 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + // If there is no table specified, use all input attributes. + if (target.isEmpty) return input.output - // First try to expand assuming it is table.*. - val expandedAttributes: Seq[Attribute] = target match { - // If there is no table specified, use all input attributes. - case None => input.output - // If there is a table, pick out attributes that are part of this table. - case Some(t) => if (t.size == 1) { - input.output.filter(_.qualifier.exists(resolver(_, t.head))) + val expandedAttributes = + if (target.get.size == 1) { + // If there is a table, pick out attributes that are part of this table. + input.output.filter(_.qualifier.exists(resolver(_, target.get.head))) } else { List() } - } if (expandedAttributes.nonEmpty) return expandedAttributes // Try to resolve it as a struct expansion. If there is a conflict and both are possible, // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - require(target.isDefined) val attribute = input.resolve(target.get, resolver) if (attribute.isDefined) { // This target resolved to an attribute in child. It must be a struct. Expand it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 084ba9b78e..dca9e5e503 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2115,6 +2115,37 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("Star Expansion - table with zero column") { + withTempTable("temp_table_no_cols") { + val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty) + val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) + dfNoCols.createTempView("temp_table_no_cols") + + // ResolvedStar + checkAnswer( + dfNoCols, + dfNoCols.select(dfNoCols.col("*"))) + + // UnresolvedStar + checkAnswer( + dfNoCols, + sql("SELECT * FROM temp_table_no_cols")) + checkAnswer( + dfNoCols, + dfNoCols.select($"*")) + + var e = intercept[AnalysisException] { + sql("SELECT a.* FROM temp_table_no_cols a") + }.getMessage + assert(e.contains("cannot resolve 'a.*' give input columns ''")) + + e = intercept[AnalysisException] { + dfNoCols.select($"b.*") + }.getMessage + assert(e.contains("cannot resolve 'b.*' give input columns ''")) + } + } + test("Common subexpression elimination") { // TODO: support subexpression elimination in whole stage codegen withSQLConf("spark.sql.codegen.wholeStage" -> "false") { |