aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYijie Shen <henry.yijieshen@gmail.com>2015-08-09 11:44:51 -0700
committerYin Huai <yhuai@databricks.com>2015-08-09 11:44:51 -0700
commit68ccc6e184598822b19a880fdd4597b66a1c2d92 (patch)
tree61ee617039d97e8db1926eed322a1b89dbefb744 /sql
parente9c36938ba972b6fe3c9f6228508e3c9f1c876b2 (diff)
downloadspark-68ccc6e184598822b19a880fdd4597b66a1c2d92.tar.gz
spark-68ccc6e184598822b19a880fdd4597b66a1c2d92.tar.bz2
spark-68ccc6e184598822b19a880fdd4597b66a1c2d92.zip
[SPARK-8930] [SQL] Throw a AnalysisException with meaningful messages if DataFrame#explode takes a star in expressions
Author: Yijie Shen <henry.yijieshen@gmail.com> Closes #8057 from yjshen/explode_star and squashes the following commits: eae181d [Yijie Shen] change explaination message 54c9d11 [Yijie Shen] meaning message for * in explode
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala15
3 files changed, 21 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 82158e61e3..a684dbc3af 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -408,7 +408,7 @@ class Analyzer(
/**
* Returns true if `exprs` contains a [[Star]].
*/
- protected def containsStar(exprs: Seq[Expression]): Boolean =
+ def containsStar(exprs: Seq[Expression]): Boolean =
exprs.exists(_.collect { case _: Star => true }.nonEmpty)
}
@@ -602,6 +602,8 @@ class Analyzer(
*/
object ResolveGenerate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case g: Generate if ResolveReferences.containsStar(g.generator.children) =>
+ failAnalysis("Cannot explode *, explode can only be applied on a specific column.")
case p: Generate if !p.child.resolved || !p.generator.resolved => p
case g: Generate if !g.resolved =>
g.copy(generatorOutput = makeGeneratorOutput(g.generator, g.generatorOutput.map(_.name)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index ee1f8f5425..53b3695a86 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -71,6 +71,8 @@ trait AnalysisTest extends PlanTest {
val e = intercept[Exception] {
analyzer.checkAnalysis(analyzer.execute(inputPlan))
}
- expectedErrors.forall(e.getMessage.contains)
+ assert(expectedErrors.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains),
+ s"Expected to throw Exception contains: ${expectedErrors.mkString(", ")}, " +
+ s"actually we get ${e.getMessage}")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 0212637a82..c49f256be5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -134,6 +134,21 @@ class DataFrameSuite extends QueryTest with SQLTestUtils {
)
}
+ test("SPARK-8930: explode should fail with a meaningful message if it takes a star") {
+ val df = Seq(("1", "1,2"), ("2", "4"), ("3", "7,8,9")).toDF("prefix", "csv")
+ val e = intercept[AnalysisException] {
+ df.explode($"*") { case Row(prefix: String, csv: String) =>
+ csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq
+ }.queryExecution.assertAnalyzed()
+ }
+ assert(e.getMessage.contains(
+ "Cannot explode *, explode can only be applied on a specific column."))
+
+ df.explode('prefix, 'csv) { case Row(prefix: String, csv: String) =>
+ csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq
+ }.queryExecution.assertAnalyzed()
+ }
+
test("explode alias and star") {
val df = Seq((Array("a"), 1)).toDF("a", "b")