diff options
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 210 |
1 files changed, 206 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a354594a6d..7bd1b0bcdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.config._ -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { @@ -252,6 +252,208 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def checkSchemaInCreatedDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String], + expectedSchema: StructType, + expectedPartitionCols: Seq[String]): Unit = { + var tableSchema = StructType(Nil) + var partCols = Seq.empty[String] + + val tabName = "tab1" + withTable(tabName) { + val partitionClause = + userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") + val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") + sql( + s""" + |CREATE TABLE $tabName $schemaClause + |USING parquet + |OPTIONS ( + | path '$path' + |) + |$partitionClause + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + } + assert(tableSchema == expectedSchema) + assert(partCols == expectedPartitionCols) + } + + test("Create partitioned data source table without user specified schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 2: without schema and partitioning columns: None + Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => + df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) + checkSchemaInCreatedDataSourceTable( + pathToPartitionedTable, + userSpecifiedSchema = None, + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("str", StringType).add("num", IntegerType), + expectedPartitionCols = Seq("num")) + } + } + } + + test("Create partitioned data source table with user specified schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("num") + // Case 2: without schema and partitioning columns: None + Seq(Option("num"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => + df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) + checkSchemaInCreatedDataSourceTable( + pathToPartitionedTable, + userSpecifiedSchema = Option("num int, str string"), + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String])) + } + } + } + + test("Create non-partitioned data source table without user specified schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 2: without schema and partitioning columns: None + Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToNonPartitionedTable => + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + checkSchemaInCreatedDataSourceTable( + pathToNonPartitionedTable, + userSpecifiedSchema = None, + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedPartitionCols = Seq.empty[String]) + } + } + } + + test("Create non-partitioned data source table with user specified schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 2: without schema and partitioning columns: None + Seq(Option("num"), None).foreach { partitionCols => + withTempPath { pathToNonPartitionedTable => + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + checkSchemaInCreatedDataSourceTable( + pathToNonPartitionedTable, + userSpecifiedSchema = Option("num int, str string"), + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String])) + } + } + } + + test("Describe Table with Corrupted Schema") { + import testImplicits._ + + val tabName = "tab1" + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2") + df.write.format("json").save(path) + + withTable(tabName) { + sql( + s""" + |CREATE TABLE $tabName + |USING json + |OPTIONS ( + | path '$path' + |) + """.stripMargin) + + val catalog = spark.sessionState.catalog + val table = catalog.getTableMetadata(TableIdentifier(tabName)) + val newProperties = table.properties.filterKeys(key => + key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) + val newTable = table.copy(properties = newProperties) + catalog.alterTable(newTable) + + val e = intercept[AnalysisException] { + sql(s"DESC $tabName") + }.getMessage + assert(e.contains(s"Could not read schema from the metastore because it is corrupted")) + } + } + } + + test("Refresh table after changing the data source table partitioning") { + import testImplicits._ + + val tabName = "tab1" + val catalog = spark.sessionState.catalog + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i)) + .toDF("col1", "col2", "col3", "col4") + df.write.format("json").partitionBy("col1", "col3").save(path) + val schema = new StructType() + .add("col2", StringType).add("col4", LongType) + .add("col1", IntegerType).add("col3", IntegerType) + val partitionCols = Seq("col1", "col3") + + withTable(tabName) { + spark.sql( + s""" + |CREATE TABLE $tabName + |USING json + |OPTIONS ( + | path '$path' + |) + """.stripMargin) + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema == schema) + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + assert(partCols == partitionCols) + + // Change the schema + val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) + .toDF("newCol1", "newCol2") + newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path) + + // No change on the schema + val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaBeforeRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh) + assert(tableSchemaBeforeRefresh == schema) + val partColsBeforeRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) + assert(partColsBeforeRefresh == partitionCols) + + // Refresh does not affect the schema + spark.catalog.refreshTable(tabName) + + val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaAfterRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) + assert(tableSchemaAfterRefresh == schema) + val partColsAfterRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) + assert(partColsAfterRefresh == partitionCols) + } + } + } + test("desc table for parquet data source table using in-memory catalog") { assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") val tabName = "tab1" @@ -413,7 +615,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible assert(table.properties(DATASOURCE_PROVIDER) == "parquet") assert(DDLUtils.getSchemaFromTableProperties(table) == - Some(new StructType().add("a", IntegerType).add("b", IntegerType))) + new StructType().add("a", IntegerType).add("b", IntegerType)) assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a")) } @@ -429,7 +631,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible assert(table.properties(DATASOURCE_PROVIDER) == "parquet") assert(DDLUtils.getSchemaFromTableProperties(table) == - Some(new StructType().add("a", IntegerType).add("b", IntegerType))) + new StructType().add("a", IntegerType).add("b", IntegerType)) assert(DDLUtils.getBucketSpecFromTableProperties(table) == Some(BucketSpec(5, Seq("a"), Seq("b")))) } |