aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorbomeng <bmeng@us.ibm.com>2016-06-23 11:06:19 +0800
committerWenchen Fan <wenchen@databricks.com>2016-06-23 11:06:19 +0800
commit925884a612dd88beaddf555c74d90856ab040ec7 (patch)
tree105810e15188347f5a8a93aeee1f8225ff6d7ca0 /sql
parent37f3be5d29192db0a54f6c4699237b149bd0ecae (diff)
downloadspark-925884a612dd88beaddf555c74d90856ab040ec7.tar.gz
spark-925884a612dd88beaddf555c74d90856ab040ec7.tar.bz2
spark-925884a612dd88beaddf555c74d90856ab040ec7.zip
[SPARK-15230][SQL] distinct() does not handle column name with dot properly
## What changes were proposed in this pull request? When table is created with column name containing dot, distinct() will fail to run. For example, ```scala val rowRDD = sparkContext.parallelize(Seq(Row(1), Row(1), Row(2))) val schema = StructType(Array(StructField("column.with.dot", IntegerType, nullable = false))) val df = spark.createDataFrame(rowRDD, schema) ``` running the following will have no problem: ```scala df.select(new Column("`column.with.dot`")) ``` but running the query with additional distinct() will cause exception: ```scala df.select(new Column("`column.with.dot`")).distinct() ``` The issue is that distinct() will try to resolve the column name, but the column name in the schema does not have backtick with it. So the solution is to add the backtick before passing the column name to resolve(). ## How was this patch tested? Added a new test case. Author: bomeng <bmeng@us.ibm.com> Closes #13140 from bomeng/SPARK-15230.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala5
2 files changed, 12 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 02cc3985b0..f1d33c3e5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1812,7 +1812,13 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
- val groupCols = colNames.map(resolve)
+ val resolver = sparkSession.sessionState.analyzer.resolver
+ val allColumns = queryExecution.analyzed.output
+ val groupCols = colNames.map { colName =>
+ allColumns.find(col => resolver(col.name, colName)).getOrElse(
+ throw new AnalysisException(
+ s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})"""))
+ }
val groupColExprIds = groupCols.map(_.exprId)
val aggCols = logicalPlan.output.map { attr =>
if (groupColExprIds.contains(attr.exprId)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c8a0f7134d..1afee9f021 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1536,4 +1536,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Utils.deleteRecursively(baseDir)
}
}
+
+ test("SPARK-15230: distinct() does not handle column name with dot properly") {
+ val df = Seq(1, 1, 2).toDF("column.with.dot")
+ checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil)
+ }
}