aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-25 22:05:48 +0800
committerCheng Lian <lian@databricks.com>2016-07-25 22:05:48 +0800
commit64529b186a1c33740067cc7639d630bc5b9ae6e8 (patch)
treeab923216bc0a6480b21df76b024e7d232033eb1e /sql/catalyst
parentd27d362ebae0c4a5cc6c99f13ef20049214dd4f9 (diff)
downloadspark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.gz
spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.bz2
spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.zip
[SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable
## What changes were proposed in this pull request? It's weird that we have `BucketSpec` to abstract bucket info, but don't use it in `CatalogTable`. This PR moves `BucketSpec` into catalyst module. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14331 from cloud-fan/check.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala49
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala2
2 files changed, 38 insertions, 13 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2a20651459..710bce5da9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
/**
@@ -110,6 +111,24 @@ case class CatalogTablePartition(
/**
+ * A container for bucketing information.
+ * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
+ * of buckets is fixed so it does not fluctuate with data.
+ *
+ * @param numBuckets number of buckets.
+ * @param bucketColumnNames the names of the columns that used to generate the bucket id.
+ * @param sortColumnNames the names of the columns that used to sort data in each bucket.
+ */
+case class BucketSpec(
+ numBuckets: Int,
+ bucketColumnNames: Seq[String],
+ sortColumnNames: Seq[String]) {
+ if (numBuckets <= 0) {
+ throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
+ }
+}
+
+/**
* A table defined in the catalog.
*
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
@@ -124,9 +143,7 @@ case class CatalogTable(
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
partitionColumnNames: Seq[String] = Seq.empty,
- sortColumnNames: Seq[String] = Seq.empty,
- bucketColumnNames: Seq[String] = Seq.empty,
- numBuckets: Int = -1,
+ bucketSpec: Option[BucketSpec] = None,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
@@ -143,8 +160,8 @@ case class CatalogTable(
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
- requireSubsetOfSchema(sortColumnNames, "sort")
- requireSubsetOfSchema(bucketColumnNames, "bucket")
+ requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
+ requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
/** Columns this table is partitioned by. */
def partitionColumns: Seq[CatalogColumn] =
@@ -172,9 +189,19 @@ case class CatalogTable(
override def toString: String = {
val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
- val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
- val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
- val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
+ val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+ val bucketStrings = bucketSpec match {
+ case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+ val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+ val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+ Seq(
+ s"Num Buckets: $numBuckets",
+ if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "",
+ if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else ""
+ )
+
+ case _ => Nil
+ }
val output =
Seq(s"Table: ${identifier.quotedString}",
@@ -183,10 +210,8 @@ case class CatalogTable(
s"Last Access: ${new Date(lastAccessTime).toString}",
s"Type: ${tableType.name}",
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
- if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "",
- if (numBuckets != -1) s"Num Buckets: $numBuckets" else "",
- if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else "",
- if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "",
+ if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
+ ) ++ bucketStrings ++ Seq(
viewOriginalText.map("Original View: " + _).getOrElse(""),
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 5bb50cba53..3a0dcea903 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -692,7 +692,7 @@ abstract class CatalogTestUtils {
CatalogColumn("a", "int"),
CatalogColumn("b", "string")),
partitionColumnNames = Seq("a", "b"),
- bucketColumnNames = Seq("col1"))
+ bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {