aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-25 22:05:48 +0800
committerCheng Lian <lian@databricks.com>2016-07-25 22:05:48 +0800
commit64529b186a1c33740067cc7639d630bc5b9ae6e8 (patch)
treeab923216bc0a6480b21df76b024e7d232033eb1e
parentd27d362ebae0c4a5cc6c99f13ef20049214dd4f9 (diff)
downloadspark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.gz
spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.bz2
spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.zip
[SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable
## What changes were proposed in this pull request? It's weird that we have `BucketSpec` to abstract bucket info, but don't use it in `CatalogTable`. This PR moves `BucketSpec` into catalyst module. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14331 from cloud-fan/check.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala49
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala)20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala3
21 files changed, 78 insertions, 78 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2a20651459..710bce5da9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
/**
@@ -110,6 +111,24 @@ case class CatalogTablePartition(
/**
+ * A container for bucketing information.
+ * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
+ * of buckets is fixed so it does not fluctuate with data.
+ *
+ * @param numBuckets number of buckets.
+ * @param bucketColumnNames the names of the columns that used to generate the bucket id.
+ * @param sortColumnNames the names of the columns that used to sort data in each bucket.
+ */
+case class BucketSpec(
+ numBuckets: Int,
+ bucketColumnNames: Seq[String],
+ sortColumnNames: Seq[String]) {
+ if (numBuckets <= 0) {
+ throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
+ }
+}
+
+/**
* A table defined in the catalog.
*
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
@@ -124,9 +143,7 @@ case class CatalogTable(
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
partitionColumnNames: Seq[String] = Seq.empty,
- sortColumnNames: Seq[String] = Seq.empty,
- bucketColumnNames: Seq[String] = Seq.empty,
- numBuckets: Int = -1,
+ bucketSpec: Option[BucketSpec] = None,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
@@ -143,8 +160,8 @@ case class CatalogTable(
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
- requireSubsetOfSchema(sortColumnNames, "sort")
- requireSubsetOfSchema(bucketColumnNames, "bucket")
+ requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
+ requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
/** Columns this table is partitioned by. */
def partitionColumns: Seq[CatalogColumn] =
@@ -172,9 +189,19 @@ case class CatalogTable(
override def toString: String = {
val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
- val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
- val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
- val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
+ val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+ val bucketStrings = bucketSpec match {
+ case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+ val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+ val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+ Seq(
+ s"Num Buckets: $numBuckets",
+ if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "",
+ if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else ""
+ )
+
+ case _ => Nil
+ }
val output =
Seq(s"Table: ${identifier.quotedString}",
@@ -183,10 +210,8 @@ case class CatalogTable(
s"Last Access: ${new Date(lastAccessTime).toString}",
s"Type: ${tableType.name}",
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
- if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "",
- if (numBuckets != -1) s"Num Buckets: $numBuckets" else "",
- if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else "",
- if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "",
+ if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
+ ) ++ bucketStrings ++ Seq(
viewOriginalText.map("Original View: " + _).getOrElse(""),
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 5bb50cba53..3a0dcea903 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -692,7 +692,7 @@ abstract class CatalogTestUtils {
CatalogColumn("a", "int"),
CatalogColumn("b", "string")),
partitionColumnNames = Seq("a", "b"),
- bucketColumnNames = Seq("col1"))
+ bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 753b64b983..44189881dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,8 +23,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 2a62b864a1..03f81c46a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -21,12 +21,11 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index a62853b05f..8f3adadbf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -498,23 +498,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
- def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = {
- append(buffer, "Num Buckets:", numBuckets.toString, "")
- append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "")
- append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
+ def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match {
+ case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+ append(buffer, "Num Buckets:", numBuckets.toString, "")
+ append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "")
+ append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "")
+
+ case _ =>
}
- DDLUtils.getBucketSpecFromTableProperties(metadata) match {
- case Some(bucketSpec) =>
- appendBucketInfo(
- bucketSpec.numBuckets,
- bucketSpec.bucketColumnNames,
- bucketSpec.sortColumnNames)
- case None =>
- appendBucketInfo(
- metadata.numBuckets,
- metadata.bucketColumnNames,
- metadata.sortColumnNames)
+ if (DDLUtils.isDatasourceTable(metadata)) {
+ appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata))
+ } else {
+ appendBucketInfo(metadata.bucketSpec)
}
}
@@ -808,7 +804,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}
- if (metadata.bucketColumnNames.nonEmpty) {
+ if (metadata.bucketSpec.isDefined) {
throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
index 961d035b76..377b818096 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
@@ -17,26 +17,6 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.AnalysisException
-
-/**
- * A container for bucketing information.
- * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
- * of buckets is fixed so it does not fluctuate with data.
- *
- * @param numBuckets number of buckets.
- * @param bucketColumnNames the names of the columns that used to generate the bucket id.
- * @param sortColumnNames the names of the columns that used to sort data in each bucket.
- */
-private[sql] case class BucketSpec(
- numBuckets: Int,
- bucketColumnNames: Seq[String],
- sortColumnNames: Seq[String]) {
- if (numBuckets <= 0) {
- throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
- }
-}
-
private[sql] object BucketingUtils {
// The file name of bucketed data should have 3 parts:
// 1. some other information in the head of file name
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f572b93991..79024fda2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 04f166f8ff..32aa4713eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 1426dcf469..b49525c8ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark._
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 9a0b46c1a4..c801436b0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -28,6 +28,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31a2075d2f..18369b51b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index d238da242f..5ce8350de2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -30,6 +29,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 1ae9b5524c..05dfb8cb22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -153,7 +153,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
- val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
+ val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val columns = tableMetadata.schema.map { c =>
new Column(
name = c.name,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index b170a3a77e..999afc9751 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.command
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
+import org.apache.spark.sql.execution.datasources.CreateTableUsing
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
-import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
// TODO: merge this with DDLSuite (SPARK-14441)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 34c980e321..a354594a6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -26,13 +26,12 @@ import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 8d8a18fa93..ddcc24a7f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.execution.DataSourceScanExec
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 07aeaeb695..8aa81854b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -90,11 +90,12 @@ class CatalogSuite
.getOrElse { spark.catalog.listColumns(tableName) }
assume(tableMetadata.schema.nonEmpty, "bad test")
assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
- assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test")
+ assume(tableMetadata.bucketSpec.isDefined, "bad test")
assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
+ val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
columns.collect().foreach { col =>
assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
- assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name))
+ assert(col.isBucket == bucketColumnNames.contains(col.name))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 5ab585faa4..49153f7736 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -24,9 +24,9 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 9f5782f045..2392cc0bdd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -365,9 +365,9 @@ private[hive] class HiveClientImpl(
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
- sortColumnNames = Seq(), // TODO: populate this
- bucketColumnNames = h.getBucketCols.asScala,
- numBuckets = h.getNumBuckets,
+ // We can not populate bucketing information for Hive tables as Spark SQL has a different
+ // implementation of hash function from Hive.
+ bucketSpec = None,
owner = h.getOwner,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
@@ -764,10 +764,7 @@ private[hive] class HiveClientImpl(
hiveTable.setFields(schema.asJava)
}
hiveTable.setPartCols(partCols.asJava)
- // TODO: set sort columns here too
- hiveTable.setBucketCols(table.bucketColumnNames.asJava)
hiveTable.setOwner(conf.getUser)
- hiveTable.setNumBuckets(table.numBuckets)
hiveTable.setCreateTime((table.createTime / 1000).toInt)
hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index a708434f5e..5450fba753 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -293,9 +293,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
assert(desc.partitionColumnNames.isEmpty)
- assert(desc.sortColumnNames.isEmpty)
- assert(desc.bucketColumnNames.isEmpty)
- assert(desc.numBuckets == -1)
+ assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.locationUri.isEmpty)
@@ -453,9 +451,7 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("name", "string"),
CatalogColumn("month", "int")))
assert(desc.partitionColumnNames == Seq("month"))
- assert(desc.sortColumnNames.isEmpty)
- assert(desc.bucketColumnNames.isEmpty)
- assert(desc.numBuckets == -1)
+ assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.locationUri == Some("/path/to/mercury"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index fc01ff3f5a..e461490310 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.sources
import java.io.File
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._