aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-01-22 01:03:41 -0800
committerReynold Xin <rxin@databricks.com>2016-01-22 01:03:41 -0800
commite13c147e74a52d74e259f04e49e368fab64cdc1f (patch)
tree4ac4de338e38afabf1a9623cd1d2a889d04bafee
parent006906db591666a7111066afd226325452be2e3e (diff)
downloadspark-e13c147e74a52d74e259f04e49e368fab64cdc1f.tar.gz
spark-e13c147e74a52d74e259f04e49e368fab64cdc1f.tar.bz2
spark-e13c147e74a52d74e259f04e49e368fab64cdc1f.zip
[SPARK-12959][SQL] Writing Bucketed Data with Disabled Bucketing in SQLConf
When users turn off bucketing in SQLConf, we should issue some messages to tell users these operations will be converted to normal way. Also added a test case for this scenario and fixed the helper function. Do you think this PR is helpful when using bucket tables? cloud-fan Thank you! Author: gatorsmile <gatorsmile@gmail.com> Closes #10870 from gatorsmile/bucketTableWritingTestcases.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala28
3 files changed, 26 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 314c957d57..2d3e1714d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)
- val writerContainer = if (partitionColumns.isEmpty && relation.getBucketSpec.isEmpty) {
+ val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) {
new DefaultWriterContainer(relation, job, isAppend)
} else {
val output = df.queryExecution.executedPlan.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 563fd9eefc..6340229dbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -311,7 +311,7 @@ private[sql] class DynamicPartitionWriterContainer(
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
- private val bucketSpec = relation.getBucketSpec
+ private val bucketSpec = relation.maybeBucketSpec
private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 8cac7fe48f..59b74d2b4c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources
import java.io.File
import java.net.URI
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, SQLConf}
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.BucketingUtils
@@ -88,10 +88,11 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
)
for (bucketFile <- allBucketFiles) {
- val bucketId = BucketingUtils.getBucketId(bucketFile.getName).get
- assert(bucketId >= 0 && bucketId < numBuckets)
+ val bucketId = BucketingUtils.getBucketId(bucketFile.getName).getOrElse {
+ fail(s"Unable to find the related bucket files.")
+ }
- // We may loss the type information after write(e.g. json format doesn't keep schema
+ // We may lose the type information after write(e.g. json format doesn't keep schema
// information), here we get the types from the original dataframe.
val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType)
val columns = (bucketCols ++ sortCols).zip(types).map {
@@ -183,4 +184,23 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
}
}
+
+ test("write bucketed data with bucketing disabled") {
+ // The configuration BUCKETING_ENABLED does not affect the writing path
+ withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
+ for (source <- Seq("parquet", "json", "orc")) {
+ withTable("bucketed_table") {
+ df.write
+ .format(source)
+ .partitionBy("i")
+ .bucketBy(8, "j", "k")
+ .saveAsTable("bucketed_table")
+
+ for (i <- 0 until 5) {
+ testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k"))
+ }
+ }
+ }
+ }
+ }
}