diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-07-25 22:05:48 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-07-25 22:05:48 +0800 |
commit | 64529b186a1c33740067cc7639d630bc5b9ae6e8 (patch) | |
tree | ab923216bc0a6480b21df76b024e7d232033eb1e /sql/catalyst/src/main/scala | |
parent | d27d362ebae0c4a5cc6c99f13ef20049214dd4f9 (diff) | |
download | spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.gz spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.bz2 spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.zip |
[SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable
## What changes were proposed in this pull request?
It's weird that we have `BucketSpec` to abstract bucket info, but don't use it in `CatalogTable`. This PR moves `BucketSpec` into catalyst module.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #14331 from cloud-fan/check.
Diffstat (limited to 'sql/catalyst/src/main/scala')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 49 |
1 files changed, 37 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2a20651459..710bce5da9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.util.quoteIdentifier /** @@ -110,6 +111,24 @@ case class CatalogTablePartition( /** + * A container for bucketing information. + * Bucketing is a technology for decomposing data sets into more manageable parts, and the number + * of buckets is fixed so it does not fluctuate with data. + * + * @param numBuckets number of buckets. + * @param bucketColumnNames the names of the columns that used to generate the bucket id. + * @param sortColumnNames the names of the columns that used to sort data in each bucket. + */ +case class BucketSpec( + numBuckets: Int, + bucketColumnNames: Seq[String], + sortColumnNames: Seq[String]) { + if (numBuckets <= 0) { + throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") + } +} + +/** * A table defined in the catalog. * * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the @@ -124,9 +143,7 @@ case class CatalogTable( storage: CatalogStorageFormat, schema: Seq[CatalogColumn], partitionColumnNames: Seq[String] = Seq.empty, - sortColumnNames: Seq[String] = Seq.empty, - bucketColumnNames: Seq[String] = Seq.empty, - numBuckets: Int = -1, + bucketSpec: Option[BucketSpec] = None, owner: String = "", createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, @@ -143,8 +160,8 @@ case class CatalogTable( s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") } requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(sortColumnNames, "sort") - requireSubsetOfSchema(bucketColumnNames, "bucket") + requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") + requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") /** Columns this table is partitioned by. */ def partitionColumns: Seq[CatalogColumn] = @@ -172,9 +189,19 @@ case class CatalogTable( override def toString: String = { val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") - val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") - val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") - val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") + val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + val bucketStrings = bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => + val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + Seq( + s"Num Buckets: $numBuckets", + if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "", + if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else "" + ) + + case _ => Nil + } val output = Seq(s"Table: ${identifier.quotedString}", @@ -183,10 +210,8 @@ case class CatalogTable( s"Last Access: ${new Date(lastAccessTime).toString}", s"Type: ${tableType.name}", if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "", - if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "", - if (numBuckets != -1) s"Num Buckets: $numBuckets" else "", - if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else "", - if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "", + if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "" + ) ++ bucketStrings ++ Seq( viewOriginalText.map("Original View: " + _).getOrElse(""), viewText.map("View: " + _).getOrElse(""), comment.map("Comment: " + _).getOrElse(""), |