aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-10 16:14:22 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-10 16:14:22 -0800
commitfb9beda54622e0c3190c6504fc468fa4e50eeb45 (patch)
tree90a8303e9ac42b1cfe6a7046b27d4ef6c6e07895 /sql
parentffee4f1cefb0dfd8d9145ee3be82c6f7b799870b (diff)
downloadspark-fb9beda54622e0c3190c6504fc468fa4e50eeb45.tar.gz
spark-fb9beda54622e0c3190c6504fc468fa4e50eeb45.tar.bz2
spark-fb9beda54622e0c3190c6504fc468fa4e50eeb45.zip
[SPARK-19893][SQL] should not run DataFrame set oprations with map type
## What changes were proposed in this pull request? In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17236 from cloud-fan/map.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala14
3 files changed, 47 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7529f90284..d32fbeb4e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -44,6 +44,18 @@ trait CheckAnalysis extends PredicateHelper {
}).length > 1
}
+ protected def hasMapType(dt: DataType): Boolean = {
+ dt.existsRecursively(_.isInstanceOf[MapType])
+ }
+
+ protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match {
+ case _: Intersect | _: Except | _: Distinct =>
+ plan.output.find(a => hasMapType(a.dataType))
+ case d: Deduplicate =>
+ d.keys.find(a => hasMapType(a.dataType))
+ case _ => None
+ }
+
private def checkLimitClause(limitExpr: Expression): Unit = {
limitExpr match {
case e if !e.foldable => failAnalysis(
@@ -121,8 +133,7 @@ trait CheckAnalysis extends PredicateHelper {
if (conditions.isEmpty && query.output.size != 1) {
failAnalysis(
s"Scalar subquery must return only one column, but got ${query.output.size}")
- }
- else if (conditions.nonEmpty) {
+ } else if (conditions.nonEmpty) {
// Collect the columns from the subquery for further checking.
var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains)
@@ -200,7 +211,7 @@ trait CheckAnalysis extends PredicateHelper {
s"filter expression '${f.condition.sql}' " +
s"of type ${f.condition.dataType.simpleString} is not a boolean.")
- case f @ Filter(condition, child) =>
+ case Filter(condition, _) =>
splitConjunctivePredicates(condition).foreach {
case _: PredicateSubquery | Not(_: PredicateSubquery) =>
case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) =>
@@ -374,6 +385,14 @@ trait CheckAnalysis extends PredicateHelper {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
+ // TODO: although map type is not orderable, technically map type should be able to be
+ // used in equality comparison, remove this type check once we support it.
+ case o if mapColumnInSetOperation(o).isDefined =>
+ val mapCol = mapColumnInSetOperation(o).get
+ failAnalysis("Cannot have map type columns in DataFrame which calls " +
+ s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " +
+ "is " + mapCol.dataType.simpleString)
+
case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 19c2d5532d..52bd4e19f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1703,4 +1703,23 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as x").selectExpr("percentile(x, 0.5)")
checkAnswer(df, Row(BigDecimal(0.0)) :: Nil)
}
+
+ test("SPARK-19893: cannot run set operations with map type") {
+ val df = spark.range(1).select(map(lit("key"), $"id").as("m"))
+ val e = intercept[AnalysisException](df.intersect(df))
+ assert(e.message.contains(
+ "Cannot have map type columns in DataFrame which calls set operations"))
+ val e2 = intercept[AnalysisException](df.except(df))
+ assert(e2.message.contains(
+ "Cannot have map type columns in DataFrame which calls set operations"))
+ val e3 = intercept[AnalysisException](df.distinct())
+ assert(e3.message.contains(
+ "Cannot have map type columns in DataFrame which calls set operations"))
+ withTempView("v") {
+ df.createOrReplaceTempView("v")
+ val e4 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v"))
+ assert(e4.message.contains(
+ "Cannot have map type columns in DataFrame which calls set operations"))
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index f355a5200c..0250a53fe2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -234,8 +234,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
Seq(StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
- DateType, TimestampType,
- ArrayType(IntegerType), MapType(StringType, LongType), struct)
+ DateType, TimestampType, ArrayType(IntegerType), struct)
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, true)
}
@@ -244,10 +243,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
// Create an RDD for the schema
val rdd =
- sparkContext.parallelize((1 to 10000), 10).map { i =>
+ sparkContext.parallelize(1 to 10000, 10).map { i =>
Row(
- s"str${i}: test cache.",
- s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8),
+ s"str$i: test cache.",
+ s"binary$i: test cache.".getBytes(StandardCharsets.UTF_8),
null,
i % 2 == 0,
i.toByte,
@@ -255,13 +254,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
i,
Long.MaxValue - i.toLong,
(i + 0.25).toFloat,
- (i + 0.75),
+ i + 0.75,
BigDecimal(Long.MaxValue.toString + ".12345"),
new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"),
new Date(i),
new Timestamp(i * 1000000L),
- (i to i + 10).toSeq,
- (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap,
+ i to i + 10,
Row((i - 0.25).toFloat, Seq(true, false, null)))
}
spark.createDataFrame(rdd, schema).createOrReplaceTempView("InMemoryCache_different_data_types")