aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-25 22:05:48 +0800
committerCheng Lian <lian@databricks.com>2016-07-25 22:05:48 +0800
commit64529b186a1c33740067cc7639d630bc5b9ae6e8 (patch)
treeab923216bc0a6480b21df76b024e7d232033eb1e /sql/core/src
parentd27d362ebae0c4a5cc6c99f13ef20049214dd4f9 (diff)
downloadspark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.gz
spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.tar.bz2
spark-64529b186a1c33740067cc7639d630bc5b9ae6e8.zip
[SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable
## What changes were proposed in this pull request? It's weird that we have `BucketSpec` to abstract bucket info, but don't use it in `CatalogTable`. This PR moves `BucketSpec` into catalyst module. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14331 from cloud-fan/check.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala)20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala2
16 files changed, 33 insertions, 52 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 753b64b983..44189881dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,8 +23,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 2a62b864a1..03f81c46a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -21,12 +21,11 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index a62853b05f..8f3adadbf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -498,23 +498,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
- def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = {
- append(buffer, "Num Buckets:", numBuckets.toString, "")
- append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "")
- append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
+ def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match {
+ case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+ append(buffer, "Num Buckets:", numBuckets.toString, "")
+ append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "")
+ append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "")
+
+ case _ =>
}
- DDLUtils.getBucketSpecFromTableProperties(metadata) match {
- case Some(bucketSpec) =>
- appendBucketInfo(
- bucketSpec.numBuckets,
- bucketSpec.bucketColumnNames,
- bucketSpec.sortColumnNames)
- case None =>
- appendBucketInfo(
- metadata.numBuckets,
- metadata.bucketColumnNames,
- metadata.sortColumnNames)
+ if (DDLUtils.isDatasourceTable(metadata)) {
+ appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata))
+ } else {
+ appendBucketInfo(metadata.bucketSpec)
}
}
@@ -808,7 +804,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}
- if (metadata.bucketColumnNames.nonEmpty) {
+ if (metadata.bucketSpec.isDefined) {
throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
index 961d035b76..377b818096 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
@@ -17,26 +17,6 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.AnalysisException
-
-/**
- * A container for bucketing information.
- * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
- * of buckets is fixed so it does not fluctuate with data.
- *
- * @param numBuckets number of buckets.
- * @param bucketColumnNames the names of the columns that used to generate the bucket id.
- * @param sortColumnNames the names of the columns that used to sort data in each bucket.
- */
-private[sql] case class BucketSpec(
- numBuckets: Int,
- bucketColumnNames: Seq[String],
- sortColumnNames: Seq[String]) {
- if (numBuckets <= 0) {
- throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
- }
-}
-
private[sql] object BucketingUtils {
// The file name of bucketed data should have 3 parts:
// 1. some other information in the head of file name
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f572b93991..79024fda2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 04f166f8ff..32aa4713eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 1426dcf469..b49525c8ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark._
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 9a0b46c1a4..c801436b0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -28,6 +28,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31a2075d2f..18369b51b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index d238da242f..5ce8350de2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -30,6 +29,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 1ae9b5524c..05dfb8cb22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -153,7 +153,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
- val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
+ val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val columns = tableMetadata.schema.map { c =>
new Column(
name = c.name,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index b170a3a77e..999afc9751 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.command
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
+import org.apache.spark.sql.execution.datasources.CreateTableUsing
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
-import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
// TODO: merge this with DDLSuite (SPARK-14441)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 34c980e321..a354594a6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -26,13 +26,12 @@ import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 8d8a18fa93..ddcc24a7f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.execution.DataSourceScanExec
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 07aeaeb695..8aa81854b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -90,11 +90,12 @@ class CatalogSuite
.getOrElse { spark.catalog.listColumns(tableName) }
assume(tableMetadata.schema.nonEmpty, "bad test")
assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
- assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test")
+ assume(tableMetadata.bucketSpec.isDefined, "bad test")
assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
+ val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
columns.collect().foreach { col =>
assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
- assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name))
+ assert(col.isBucket == bucketColumnNames.contains(col.name))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 5ab585faa4..49153f7736 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -24,9 +24,9 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils