aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-10 16:14:22 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-10 16:14:22 -0800
commitfb9beda54622e0c3190c6504fc468fa4e50eeb45 (patch)
tree90a8303e9ac42b1cfe6a7046b27d4ef6c6e07895 /sql/catalyst
parentffee4f1cefb0dfd8d9145ee3be82c6f7b799870b (diff)
downloadspark-fb9beda54622e0c3190c6504fc468fa4e50eeb45.tar.gz
spark-fb9beda54622e0c3190c6504fc468fa4e50eeb45.tar.bz2
spark-fb9beda54622e0c3190c6504fc468fa4e50eeb45.zip
[SPARK-19893][SQL] should not run DataFrame set oprations with map type
## What changes were proposed in this pull request? In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17236 from cloud-fan/map.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala25
1 files changed, 22 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7529f90284..d32fbeb4e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -44,6 +44,18 @@ trait CheckAnalysis extends PredicateHelper {
}).length > 1
}
+ protected def hasMapType(dt: DataType): Boolean = {
+ dt.existsRecursively(_.isInstanceOf[MapType])
+ }
+
+ protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match {
+ case _: Intersect | _: Except | _: Distinct =>
+ plan.output.find(a => hasMapType(a.dataType))
+ case d: Deduplicate =>
+ d.keys.find(a => hasMapType(a.dataType))
+ case _ => None
+ }
+
private def checkLimitClause(limitExpr: Expression): Unit = {
limitExpr match {
case e if !e.foldable => failAnalysis(
@@ -121,8 +133,7 @@ trait CheckAnalysis extends PredicateHelper {
if (conditions.isEmpty && query.output.size != 1) {
failAnalysis(
s"Scalar subquery must return only one column, but got ${query.output.size}")
- }
- else if (conditions.nonEmpty) {
+ } else if (conditions.nonEmpty) {
// Collect the columns from the subquery for further checking.
var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains)
@@ -200,7 +211,7 @@ trait CheckAnalysis extends PredicateHelper {
s"filter expression '${f.condition.sql}' " +
s"of type ${f.condition.dataType.simpleString} is not a boolean.")
- case f @ Filter(condition, child) =>
+ case Filter(condition, _) =>
splitConjunctivePredicates(condition).foreach {
case _: PredicateSubquery | Not(_: PredicateSubquery) =>
case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) =>
@@ -374,6 +385,14 @@ trait CheckAnalysis extends PredicateHelper {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
+ // TODO: although map type is not orderable, technically map type should be able to be
+ // used in equality comparison, remove this type check once we support it.
+ case o if mapColumnInSetOperation(o).isDefined =>
+ val mapCol = mapColumnInSetOperation(o).get
+ failAnalysis("Cannot have map type columns in DataFrame which calls " +
+ s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " +
+ "is " + mapCol.dataType.simpleString)
+
case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>