From 64529b186a1c33740067cc7639d630bc5b9ae6e8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 25 Jul 2016 22:05:48 +0800 Subject: [SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable ## What changes were proposed in this pull request? It's weird that we have `BucketSpec` to abstract bucket info, but don't use it in `CatalogTable`. This PR moves `BucketSpec` into catalyst module. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #14331 from cloud-fan/check. --- .../spark/sql/catalyst/catalog/interface.scala | 49 +++++++++++++----- .../catalyst/catalog/ExternalCatalogSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 5 +- .../apache/spark/sql/execution/command/ddl.scala | 3 +- .../spark/sql/execution/command/tables.scala | 30 +++++------ .../sql/execution/datasources/BucketingUtils.scala | 39 ++++++++++++++ .../sql/execution/datasources/DataSource.scala | 1 + .../execution/datasources/FileSourceStrategy.scala | 1 + .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../execution/datasources/WriterContainer.scala | 1 + .../spark/sql/execution/datasources/bucket.scala | 59 ---------------------- .../spark/sql/execution/datasources/ddl.scala | 1 + .../datasources/fileSourceInterfaces.scala | 2 +- .../apache/spark/sql/internal/CatalogImpl.scala | 2 +- .../sql/execution/command/DDLCommandSuite.scala | 6 +-- .../spark/sql/execution/command/DDLSuite.scala | 3 +- .../datasources/FileSourceStrategySuite.scala | 1 + .../apache/spark/sql/internal/CatalogSuite.scala | 5 +- .../sql/sources/CreateTableAsSelectSuite.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 9 ++-- .../spark/sql/hive/HiveDDLCommandSuite.scala | 8 +-- .../spark/sql/sources/BucketedReadSuite.scala | 3 +- 22 files changed, 117 insertions(+), 117 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2a20651459..710bce5da9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.util.quoteIdentifier /** @@ -109,6 +110,24 @@ case class CatalogTablePartition( storage: CatalogStorageFormat) +/** + * A container for bucketing information. + * Bucketing is a technology for decomposing data sets into more manageable parts, and the number + * of buckets is fixed so it does not fluctuate with data. + * + * @param numBuckets number of buckets. + * @param bucketColumnNames the names of the columns that used to generate the bucket id. + * @param sortColumnNames the names of the columns that used to sort data in each bucket. + */ +case class BucketSpec( + numBuckets: Int, + bucketColumnNames: Seq[String], + sortColumnNames: Seq[String]) { + if (numBuckets <= 0) { + throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") + } +} + /** * A table defined in the catalog. * @@ -124,9 +143,7 @@ case class CatalogTable( storage: CatalogStorageFormat, schema: Seq[CatalogColumn], partitionColumnNames: Seq[String] = Seq.empty, - sortColumnNames: Seq[String] = Seq.empty, - bucketColumnNames: Seq[String] = Seq.empty, - numBuckets: Int = -1, + bucketSpec: Option[BucketSpec] = None, owner: String = "", createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, @@ -143,8 +160,8 @@ case class CatalogTable( s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") } requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(sortColumnNames, "sort") - requireSubsetOfSchema(bucketColumnNames, "bucket") + requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") + requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") /** Columns this table is partitioned by. */ def partitionColumns: Seq[CatalogColumn] = @@ -172,9 +189,19 @@ case class CatalogTable( override def toString: String = { val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") - val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") - val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") - val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") + val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + val bucketStrings = bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => + val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + Seq( + s"Num Buckets: $numBuckets", + if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "", + if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else "" + ) + + case _ => Nil + } val output = Seq(s"Table: ${identifier.quotedString}", @@ -183,10 +210,8 @@ case class CatalogTable( s"Last Access: ${new Date(lastAccessTime).toString}", s"Type: ${tableType.name}", if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "", - if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "", - if (numBuckets != -1) s"Num Buckets: $numBuckets" else "", - if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else "", - if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "", + if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "" + ) ++ bucketStrings ++ Seq( viewOriginalText.map("Original View: " + _).getOrElse(""), viewText.map("View: " + _).getOrElse(""), comment.map("Comment: " + _).getOrElse(""), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 5bb50cba53..3a0dcea903 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -692,7 +692,7 @@ abstract class CatalogTestUtils { CatalogColumn("a", "int"), CatalogColumn("b", "string")), partitionColumnNames = Seq("a", "b"), - bucketColumnNames = Seq("col1")) + bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil))) } def newFunc(name: String, database: Option[String] = None): CatalogFunction = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 753b64b983..44189881dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,8 +23,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2a62b864a1..03f81c46a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,12 +21,11 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index a62853b05f..8f3adadbf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -498,23 +498,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = { - append(buffer, "Num Buckets:", numBuckets.toString, "") - append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "") + def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => + append(buffer, "Num Buckets:", numBuckets.toString, "") + append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "") + + case _ => } - DDLUtils.getBucketSpecFromTableProperties(metadata) match { - case Some(bucketSpec) => - appendBucketInfo( - bucketSpec.numBuckets, - bucketSpec.bucketColumnNames, - bucketSpec.sortColumnNames) - case None => - appendBucketInfo( - metadata.numBuckets, - metadata.bucketColumnNames, - metadata.sortColumnNames) + if (DDLUtils.isDatasourceTable(metadata)) { + appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata)) + } else { + appendBucketInfo(metadata.bucketSpec) } } @@ -808,7 +804,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") } - if (metadata.bucketColumnNames.nonEmpty) { + if (metadata.bucketSpec.isDefined) { throw new UnsupportedOperationException( "Creating Hive table with bucket spec is not supported yet.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala new file mode 100644 index 0000000000..377b818096 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +private[sql] object BucketingUtils { + // The file name of bucketed data should have 3 parts: + // 1. some other information in the head of file name + // 2. bucket id part, some numbers, starts with "_" + // * The other-information part may use `-` as separator and may have numbers at the end, + // e.g. a normal parquet file without bucketing may have name: + // part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly + // treat `431234567891` as bucket id. So here we pick `_` as separator. + // 3. optional file extension part, in the tail of file name, starts with `.` + // An example of bucketed parquet file name with bucket id 3: + // part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r + + def getBucketId(fileName: String): Option[Int] = fileName match { + case bucketedFileName(bucketId) => Some(bucketId.toInt) + case other => None + } + + def bucketIdToString(id: Int): String = f"_$id%05d" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f572b93991..79024fda2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 04f166f8ff..32aa4713eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 1426dcf469..b49525c8ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.spark._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 9a0b46c1a4..c801436b0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -28,6 +28,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala deleted file mode 100644 index 961d035b76..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.sql.AnalysisException - -/** - * A container for bucketing information. - * Bucketing is a technology for decomposing data sets into more manageable parts, and the number - * of buckets is fixed so it does not fluctuate with data. - * - * @param numBuckets number of buckets. - * @param bucketColumnNames the names of the columns that used to generate the bucket id. - * @param sortColumnNames the names of the columns that used to sort data in each bucket. - */ -private[sql] case class BucketSpec( - numBuckets: Int, - bucketColumnNames: Seq[String], - sortColumnNames: Seq[String]) { - if (numBuckets <= 0) { - throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") - } -} - -private[sql] object BucketingUtils { - // The file name of bucketed data should have 3 parts: - // 1. some other information in the head of file name - // 2. bucket id part, some numbers, starts with "_" - // * The other-information part may use `-` as separator and may have numbers at the end, - // e.g. a normal parquet file without bucketing may have name: - // part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly - // treat `431234567891` as bucket id. So here we pick `_` as separator. - // 3. optional file extension part, in the tail of file name, starts with `.` - // An example of bucketed parquet file name with bucket id 3: - // part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet - private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r - - def getBucketId(fileName: String): Option[Int] = fileName match { - case bucketedFileName(bucketId) => Some(bucketId.toInt) - case other => None - } - - def bucketIdToString(id: Int): String = f"_$id%05d" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 31a2075d2f..18369b51b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index d238da242f..5ce8350de2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ @@ -30,6 +29,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 1ae9b5524c..05dfb8cb22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -153,7 +153,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier) val partitionColumnNames = tableMetadata.partitionColumnNames.toSet - val bucketColumnNames = tableMetadata.bucketColumnNames.toSet + val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet val columns = tableMetadata.schema.map { c => new Column( name = c.name, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b170a3a77e..999afc9751 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.command import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.catalog.FunctionResourceType import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing} +import org.apache.spark.sql.execution.datasources.CreateTableUsing import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} // TODO: merge this with DDLSuite (SPARK-14441) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 34c980e321..a354594a6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -26,13 +26,12 @@ import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 8d8a18fa93..ddcc24a7f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util import org.apache.spark.sql.execution.DataSourceScanExec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 07aeaeb695..8aa81854b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -90,11 +90,12 @@ class CatalogSuite .getOrElse { spark.catalog.listColumns(tableName) } assume(tableMetadata.schema.nonEmpty, "bad test") assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test") - assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test") + assume(tableMetadata.bucketSpec.isDefined, "bad test") assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet) + val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet columns.collect().foreach { col => assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name)) - assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name)) + assert(col.isBucket == bucketColumnNames.contains(col.name)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 5ab585faa4..49153f7736 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -24,9 +24,9 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 9f5782f045..2392cc0bdd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -365,9 +365,9 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), - sortColumnNames = Seq(), // TODO: populate this - bucketColumnNames = h.getBucketCols.asScala, - numBuckets = h.getNumBuckets, + // We can not populate bucketing information for Hive tables as Spark SQL has a different + // implementation of hash function from Hive. + bucketSpec = None, owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, @@ -764,10 +764,7 @@ private[hive] class HiveClientImpl( hiveTable.setFields(schema.asJava) } hiveTable.setPartCols(partCols.asJava) - // TODO: set sort columns here too - hiveTable.setBucketCols(table.bucketColumnNames.asJava) hiveTable.setOwner(conf.getUser) - hiveTable.setNumBuckets(table.numBuckets) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index a708434f5e..5450fba753 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -293,9 +293,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.tableType == CatalogTableType.MANAGED) assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string"))) assert(desc.partitionColumnNames.isEmpty) - assert(desc.sortColumnNames.isEmpty) - assert(desc.bucketColumnNames.isEmpty) - assert(desc.numBuckets == -1) + assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri.isEmpty) @@ -453,9 +451,7 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("name", "string"), CatalogColumn("month", "int"))) assert(desc.partitionColumnNames == Seq("month")) - assert(desc.sortColumnNames.isEmpty) - assert(desc.bucketColumnNames.isEmpty) - assert(desc.numBuckets == -1) + assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri == Some("/path/to/mercury")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index fc01ff3f5a..e461490310 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ -- cgit v1.2.3