From 94f14b52a6a99047c0e30015d435bddb7f2b95fe Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 22 Jul 2016 13:27:17 +0800 Subject: [SPARK-16556][SPARK-16559][SQL] Fix Two Bugs in Bucket Specification ### What changes were proposed in this pull request? **Issue 1: Silent Ignorance of Bucket Specification When Creating Table Using Schema Inference** When creating a data source table without explicit specification of schema or SELECT clause, we silently ignore the bucket specification (CLUSTERED BY... SORTED BY...) in [the code](https://github.com/apache/spark/blob/ce3b98bae28af72299722f56e4e4ef831f471ec0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L339-L354). For example, ```SQL CREATE TABLE jsonTable USING org.apache.spark.sql.json OPTIONS ( path '${tempDir.getCanonicalPath}' ) CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS ``` This PR captures it and issues an error message. **Issue 2: Got a run-time `java.lang.ArithmeticException` when num of buckets is set to zero.** For example, ```SQL CREATE TABLE t USING PARQUET OPTIONS (PATH '${path.toString}') CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS AS SELECT 1 AS a, 2 AS b ``` The exception we got is ``` ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 2) java.lang.ArithmeticException: / by zero ``` This PR captures the misuse and issues an appropriate error message. ### How was this patch tested? Added a test case in DDLSuite Author: gatorsmile Closes #14210 from gatorsmile/createTableWithoutSchema. --- .../spark/sql/execution/SparkSqlParser.scala | 5 +++++ .../spark/sql/execution/datasources/bucket.scala | 8 +++++++- .../spark/sql/execution/command/DDLSuite.scala | 24 ++++++++++++++++++++++ .../sql/sources/CreateTableAsSelectSuite.scala | 21 +++++++++++++++++-- 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index fa4ccf42b5..1316d90fa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -344,6 +344,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { table, provider, partitionColumnNames, bucketSpec, mode, options, query) } else { val struct = Option(ctx.colTypeList()).map(createStructType) + if (struct.isEmpty && bucketSpec.nonEmpty) { + throw new ParseException( + "Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx) + } + CreateTableUsing( table, struct, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala index 6008d73717..961d035b76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.AnalysisException + /** * A container for bucketing information. * Bucketing is a technology for decomposing data sets into more manageable parts, and the number @@ -29,7 +31,11 @@ package org.apache.spark.sql.execution.datasources private[sql] case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], - sortColumnNames: Seq[String]) + sortColumnNames: Seq[String]) { + if (numBuckets <= 0) { + throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") + } +} private[sql] object BucketingUtils { // The file name of bucketed data should have 3 parts: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 169250d9bb..28f625b1cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf @@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create table using CLUSTERED BY without schema specification") { + import testImplicits._ + withTempPath { tempDir => + withTable("jsonTable") { + (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath) + + val e = intercept[ParseException] { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS + """.stripMargin) + }.getMessage + assert(e.contains( + "Expected explicit specification of table schema when using CLUSTERED BY clause")) + } + } + } + test("create table with datasource properties (not allowed)") { assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')") assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 251a25665a..5ab585faa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.DDLUtils @@ -204,7 +205,7 @@ class CreateTableAsSelectSuite } } - test("create table using as select - with bucket") { + test("create table using as select - with non-zero buckets") { val catalog = spark.sessionState.catalog withTable("t") { sql( @@ -217,7 +218,23 @@ class CreateTableAsSelectSuite ) val table = catalog.getTableMetadata(TableIdentifier("t")) assert(DDLUtils.getBucketSpecFromTableProperties(table) == - Some(BucketSpec(5, Seq("a"), Seq("b")))) + Option(BucketSpec(5, Seq("a"), Seq("b")))) + } + } + + test("create table using as select - with zero buckets") { + withTable("t") { + val e = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '${path.toString}') + |CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + }.getMessage + assert(e.contains("Expected positive number of buckets, but got `0`")) } } } -- cgit v1.2.3