aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-01-25 13:38:09 -0800
committerMichael Armbrust <michael@databricks.com>2016-01-25 13:38:09 -0800
commit9348431da212ec3ab7be2b8e89a952a48b4e2a31 (patch)
tree73081cab9e62beaf17ca98783556e7b20eae6470
parent00026fa9912ecee5637f1e7dd222f977f31f6766 (diff)
downloadspark-9348431da212ec3ab7be2b8e89a952a48b4e2a31.tar.gz
spark-9348431da212ec3ab7be2b8e89a952a48b4e2a31.tar.bz2
spark-9348431da212ec3ab7be2b8e89a952a48b4e2a31.zip
[SPARK-12975][SQL] Throwing Exception when Bucketing Columns are part of Partitioning Columns
When users are using `partitionBy` and `bucketBy` at the same time, some bucketing columns might be part of partitioning columns. For example, ``` df.write .format(source) .partitionBy("i") .bucketBy(8, "i", "k") .saveAsTable("bucketed_table") ``` However, in the above case, adding column `i` into `bucketBy` is useless. It is just wasting extra CPU when reading or writing bucket tables. Thus, like Hive, we can issue an exception and let users do the change. Also added a test case for checking if the information of `sortBy` and `bucketBy` columns are correctly saved in the metastore table. Could you check if my understanding is correct? cloud-fan rxin marmbrus Thanks! Author: gatorsmile <gatorsmile@gmail.com> Closes #10891 from gatorsmile/commonKeysInPartitionByBucketBy.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala55
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala22
3 files changed, 83 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ab63fe4aa8..12eb239363 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -240,6 +240,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
n <- numBuckets
} yield {
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")
+
+ // partitionBy columns cannot be used in bucketBy
+ if (normalizedParCols.nonEmpty &&
+ normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) {
+ throw new AnalysisException(
+ s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " +
+ s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
+ }
+
BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 253f13c598..211932fea0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -745,7 +745,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
- test("Saving partition columns information") {
+ test("Saving partitionBy columns information") {
val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
val tableName = s"partitionInfo_${System.currentTimeMillis()}"
@@ -776,6 +776,59 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
+ test("Saving information for sortBy and bucketBy columns") {
+ val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
+ val tableName = s"bucketingInfo_${System.currentTimeMillis()}"
+
+ withTable(tableName) {
+ df.write
+ .format("parquet")
+ .bucketBy(8, "d", "b")
+ .sortBy("c")
+ .saveAsTable(tableName)
+ invalidateTable(tableName)
+ val metastoreTable = catalog.client.getTable("default", tableName)
+ val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
+ val expectedSortByColumns = StructType(df.schema("c") :: Nil)
+
+ val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
+ assert(numBuckets == 8)
+
+ val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
+ assert(numBucketCols == 2)
+
+ val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
+ assert(numSortCols == 1)
+
+ val actualBucketByColumns =
+ StructType(
+ (0 until numBucketCols).map { index =>
+ df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
+ })
+ // Make sure bucketBy columns are correctly stored in metastore.
+ assert(
+ expectedBucketByColumns.sameType(actualBucketByColumns),
+ s"Partitions columns stored in metastore $actualBucketByColumns is not the " +
+ s"partition columns defined by the saveAsTable operation $expectedBucketByColumns.")
+
+ val actualSortByColumns =
+ StructType(
+ (0 until numSortCols).map { index =>
+ df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
+ })
+ // Make sure sortBy columns are correctly stored in metastore.
+ assert(
+ expectedSortByColumns.sameType(actualSortByColumns),
+ s"Partitions columns stored in metastore $actualSortByColumns is not the " +
+ s"partition columns defined by the saveAsTable operation $expectedSortByColumns.")
+
+ // Check the content of the saved table.
+ checkAnswer(
+ table(tableName).select("c", "b", "d", "a"),
+ df.select("c", "b", "d", "a"))
+ }
+ }
+
test("insert into a table") {
def createDF(from: Int, to: Int): DataFrame = {
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 59b74d2b4c..a32f8fb4c5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -92,10 +92,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
fail(s"Unable to find the related bucket files.")
}
+ // Remove the duplicate columns in bucketCols and sortCols;
+ // Otherwise, we got analysis errors due to duplicate names
+ val selectedColumns = (bucketCols ++ sortCols).distinct
// We may lose the type information after write(e.g. json format doesn't keep schema
// information), here we get the types from the original dataframe.
- val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType)
- val columns = (bucketCols ++ sortCols).zip(types).map {
+ val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType)
+ val columns = selectedColumns.zip(types).map {
case (colName, dt) => col(colName).cast(dt)
}
@@ -158,6 +161,21 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
}
+ test("write bucketed data with the overlapping bucketBy and partitionBy columns") {
+ intercept[AnalysisException](df.write
+ .partitionBy("i", "j")
+ .bucketBy(8, "j", "k")
+ .sortBy("k")
+ .saveAsTable("bucketed_table"))
+ }
+
+ test("write bucketed data with the identical bucketBy and partitionBy columns") {
+ intercept[AnalysisException](df.write
+ .partitionBy("i")
+ .bucketBy(8, "i")
+ .saveAsTable("bucketed_table"))
+ }
+
test("write bucketed data without partitionBy") {
for (source <- Seq("parquet", "json", "orc")) {
withTable("bucketed_table") {