diff options
3 files changed, 26 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 314c957d57..2d3e1714d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation( |Actual: ${partitionColumns.mkString(", ")} """.stripMargin) - val writerContainer = if (partitionColumns.isEmpty && relation.getBucketSpec.isEmpty) { + val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { new DefaultWriterContainer(relation, job, isAppend) } else { val output = df.queryExecution.executedPlan.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 563fd9eefc..6340229dbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -311,7 +311,7 @@ private[sql] class DynamicPartitionWriterContainer( isAppend: Boolean) extends BaseWriterContainer(relation, job, isAppend) { - private val bucketSpec = relation.getBucketSpec + private val bucketSpec = relation.maybeBucketSpec private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 8cac7fe48f..59b74d2b4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources import java.io.File import java.net.URI -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.{AnalysisException, QueryTest, SQLConf} import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.BucketingUtils @@ -88,10 +88,11 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle ) for (bucketFile <- allBucketFiles) { - val bucketId = BucketingUtils.getBucketId(bucketFile.getName).get - assert(bucketId >= 0 && bucketId < numBuckets) + val bucketId = BucketingUtils.getBucketId(bucketFile.getName).getOrElse { + fail(s"Unable to find the related bucket files.") + } - // We may loss the type information after write(e.g. json format doesn't keep schema + // We may lose the type information after write(e.g. json format doesn't keep schema // information), here we get the types from the original dataframe. val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType) val columns = (bucketCols ++ sortCols).zip(types).map { @@ -183,4 +184,23 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } } } + + test("write bucketed data with bucketing disabled") { + // The configuration BUCKETING_ENABLED does not affect the writing path + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + for (source <- Seq("parquet", "json", "orc")) { + withTable("bucketed_table") { + df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "j", "k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) + } + } + } + } + } } |