From e13c147e74a52d74e259f04e49e368fab64cdc1f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 22 Jan 2016 01:03:41 -0800 Subject: [SPARK-12959][SQL] Writing Bucketed Data with Disabled Bucketing in SQLConf When users turn off bucketing in SQLConf, we should issue some messages to tell users these operations will be converted to normal way. Also added a test case for this scenario and fixed the helper function. Do you think this PR is helpful when using bucket tables? cloud-fan Thank you! Author: gatorsmile Closes #10870 from gatorsmile/bucketTableWritingTestcases. --- .../datasources/InsertIntoHadoopFsRelation.scala | 2 +- .../execution/datasources/WriterContainer.scala | 2 +- .../spark/sql/sources/BucketedWriteSuite.scala | 28 ++++++++++++++++++---- 3 files changed, 26 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 314c957d57..2d3e1714d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation( |Actual: ${partitionColumns.mkString(", ")} """.stripMargin) - val writerContainer = if (partitionColumns.isEmpty && relation.getBucketSpec.isEmpty) { + val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { new DefaultWriterContainer(relation, job, isAppend) } else { val output = df.queryExecution.executedPlan.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 563fd9eefc..6340229dbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -311,7 +311,7 @@ private[sql] class DynamicPartitionWriterContainer( isAppend: Boolean) extends BaseWriterContainer(relation, job, isAppend) { - private val bucketSpec = relation.getBucketSpec + private val bucketSpec = relation.maybeBucketSpec private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 8cac7fe48f..59b74d2b4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources import java.io.File import java.net.URI -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.{AnalysisException, QueryTest, SQLConf} import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.BucketingUtils @@ -88,10 +88,11 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle ) for (bucketFile <- allBucketFiles) { - val bucketId = BucketingUtils.getBucketId(bucketFile.getName).get - assert(bucketId >= 0 && bucketId < numBuckets) + val bucketId = BucketingUtils.getBucketId(bucketFile.getName).getOrElse { + fail(s"Unable to find the related bucket files.") + } - // We may loss the type information after write(e.g. json format doesn't keep schema + // We may lose the type information after write(e.g. json format doesn't keep schema // information), here we get the types from the original dataframe. val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType) val columns = (bucketCols ++ sortCols).zip(types).map { @@ -183,4 +184,23 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } } } + + test("write bucketed data with bucketing disabled") { + // The configuration BUCKETING_ENABLED does not affect the writing path + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + for (source <- Seq("parquet", "json", "orc")) { + withTable("bucketed_table") { + df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "j", "k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) + } + } + } + } + } } -- cgit v1.2.3