aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala28
3 files changed, 26 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 314c957d57..2d3e1714d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)
- val writerContainer = if (partitionColumns.isEmpty && relation.getBucketSpec.isEmpty) {
+ val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) {
new DefaultWriterContainer(relation, job, isAppend)
} else {
val output = df.queryExecution.executedPlan.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 563fd9eefc..6340229dbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -311,7 +311,7 @@ private[sql] class DynamicPartitionWriterContainer(
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
- private val bucketSpec = relation.getBucketSpec
+ private val bucketSpec = relation.maybeBucketSpec
private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 8cac7fe48f..59b74d2b4c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources
import java.io.File
import java.net.URI
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, SQLConf}
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.BucketingUtils
@@ -88,10 +88,11 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
)
for (bucketFile <- allBucketFiles) {
- val bucketId = BucketingUtils.getBucketId(bucketFile.getName).get
- assert(bucketId >= 0 && bucketId < numBuckets)
+ val bucketId = BucketingUtils.getBucketId(bucketFile.getName).getOrElse {
+ fail(s"Unable to find the related bucket files.")
+ }
- // We may loss the type information after write(e.g. json format doesn't keep schema
+ // We may lose the type information after write(e.g. json format doesn't keep schema
// information), here we get the types from the original dataframe.
val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType)
val columns = (bucketCols ++ sortCols).zip(types).map {
@@ -183,4 +184,23 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
}
}
+
+ test("write bucketed data with bucketing disabled") {
+ // The configuration BUCKETING_ENABLED does not affect the writing path
+ withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
+ for (source <- Seq("parquet", "json", "orc")) {
+ withTable("bucketed_table") {
+ df.write
+ .format(source)
+ .partitionBy("i")
+ .bucketBy(8, "j", "k")
+ .saveAsTable("bucketed_table")
+
+ for (i <- 0 until 5) {
+ testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k"))
+ }
+ }
+ }
+ }
+ }
}