aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYadong Qi <qiyadong2010@gmail.com>2016-05-28 10:19:29 -0700
committerWenchen Fan <wenchen@databricks.com>2016-05-28 10:19:29 -0700
commitb4c32c4952f7af2733258aa4e27f21e8832c8a3a (patch)
treef99524ff00497fa744b8adfba7bbb79c74e04e30 /sql
parentf1b220eeeed1d4d12121fe0b3b175da44488da68 (diff)
downloadspark-b4c32c4952f7af2733258aa4e27f21e8832c8a3a.tar.gz
spark-b4c32c4952f7af2733258aa4e27f21e8832c8a3a.tar.bz2
spark-b4c32c4952f7af2733258aa4e27f21e8832c8a3a.zip
[SPARK-15549][SQL] Disable bucketing when the output doesn't contain all bucketing columns
## What changes were proposed in this pull request? I create a bucketed table bucketed_table with bucket column i, ```scala case class Data(i: Int, j: Int, k: Int) sc.makeRDD(Array((1, 2, 3))).map(x => Data(x._1, x._2, x._3)).toDF.write.bucketBy(2, "i").saveAsTable("bucketed_table") ``` and I run the following SQLs: ```sql SELECT j FROM bucketed_table; Error in query: bucket column i not found in existing columns (j); SELECT j, MAX(k) FROM bucketed_table GROUP BY j; Error in query: bucket column i not found in existing columns (j, k); ``` I think we should add a check that, we only enable bucketing when it satisfies all conditions below: 1. the conf is enabled 2. the relation is bucketed 3. the output contains all bucketing columns ## How was this patch tested? Updated test cases to reflect the changes. Author: Yadong Qi <qiyadong2010@gmail.com> Closes #13321 from watermen/SPARK-15549.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala11
2 files changed, 17 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 412f5fa87e..fef3255c73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -347,15 +347,14 @@ private[sql] object DataSourceScanExec {
case _ => None
}
- def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
- throw new AnalysisException(s"bucket column $colName not found in existing columns " +
- s"(${output.map(_.name).mkString(", ")})")
- }
-
bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.map(toAttribute)
- HashPartitioning(bucketColumns, numBuckets)
+ val bucketColumns = spec.bucketColumnNames.flatMap { n => output.find(_.name == n) }
+ if (bucketColumns.size == spec.bucketColumnNames.size) {
+ HashPartitioning(bucketColumns, numBuckets)
+ } else {
+ UnknownPartitioning(0)
+ }
}.getOrElse {
UnknownPartitioning(0)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index f9891ac571..bab0092c37 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -362,4 +362,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(error.toString contains "Invalid bucket file")
}
}
+
+ test("disable bucketing when the output doesn't contain all bucketing columns") {
+ withTable("bucketed_table") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
+
+ checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j"))
+
+ checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
+ df1.groupBy("j").agg(max("k")))
+ }
+ }
}