path: root/sql/core/src/test/scala
diff options
Diffstat (limited to 'sql/core/src/test/scala')
1 files changed, 206 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a354594a6d..7bd1b0bcdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
@@ -252,6 +252,208 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
+ private def checkSchemaInCreatedDataSourceTable(
+ path: File,
+ userSpecifiedSchema: Option[String],
+ userSpecifiedPartitionCols: Option[String],
+ expectedSchema: StructType,
+ expectedPartitionCols: Seq[String]): Unit = {
+ var tableSchema = StructType(Nil)
+ var partCols = Seq.empty[String]
+ val tabName = "tab1"
+ withTable(tabName) {
+ val partitionClause =
+ userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
+ val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
+ sql(
+ s"""
+ |CREATE TABLE $tabName $schemaClause
+ |USING parquet
+ | path '$path'
+ |)
+ |$partitionClause
+ """.stripMargin)
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+ tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+ partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ }
+ assert(tableSchema == expectedSchema)
+ assert(partCols == expectedPartitionCols)
+ }
+ test("Create partitioned data source table without user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+ withTempPath { pathToPartitionedTable =>
+ df.write.format("parquet").partitionBy("num")
+ .save(pathToPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToPartitionedTable,
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
+ expectedPartitionCols = Seq("num"))
+ }
+ }
+ }
+ test("Create partitioned data source table with user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+ // Case 1: with partitioning columns but no schema: Option("num")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("num"), None).foreach { partitionCols =>
+ withTempPath { pathToPartitionedTable =>
+ df.write.format("parquet").partitionBy("num")
+ .save(pathToPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToPartitionedTable,
+ userSpecifiedSchema = Option("num int, str string"),
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+ }
+ }
+ }
+ test("Create non-partitioned data source table without user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+ withTempPath { pathToNonPartitionedTable =>
+ df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToNonPartitionedTable,
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = Seq.empty[String])
+ }
+ }
+ }
+ test("Create non-partitioned data source table with user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("num"), None).foreach { partitionCols =>
+ withTempPath { pathToNonPartitionedTable =>
+ df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToNonPartitionedTable,
+ userSpecifiedSchema = Option("num int, str string"),
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+ }
+ }
+ }
+ test("Describe Table with Corrupted Schema") {
+ import testImplicits._
+ val tabName = "tab1"
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
+ df.write.format("json").save(path)
+ withTable(tabName) {
+ sql(
+ s"""
+ |CREATE TABLE $tabName
+ |USING json
+ | path '$path'
+ |)
+ """.stripMargin)
+ val catalog = spark.sessionState.catalog
+ val table = catalog.getTableMetadata(TableIdentifier(tabName))
+ val newProperties = table.properties.filterKeys(key =>
+ key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS)
+ val newTable = table.copy(properties = newProperties)
+ catalog.alterTable(newTable)
+ val e = intercept[AnalysisException] {
+ sql(s"DESC $tabName")
+ }.getMessage
+ assert(e.contains(s"Could not read schema from the metastore because it is corrupted"))
+ }
+ }
+ }
+ test("Refresh table after changing the data source table partitioning") {
+ import testImplicits._
+ val tabName = "tab1"
+ val catalog = spark.sessionState.catalog
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i))
+ .toDF("col1", "col2", "col3", "col4")
+ df.write.format("json").partitionBy("col1", "col3").save(path)
+ val schema = new StructType()
+ .add("col2", StringType).add("col4", LongType)
+ .add("col1", IntegerType).add("col3", IntegerType)
+ val partitionCols = Seq("col1", "col3")
+ withTable(tabName) {
+ spark.sql(
+ s"""
+ |CREATE TABLE $tabName
+ |USING json
+ | path '$path'
+ |)
+ """.stripMargin)
+ val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+ assert(tableSchema == schema)
+ val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ assert(partCols == partitionCols)
+ // Change the schema
+ val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
+ .toDF("newCol1", "newCol2")
+ newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path)
+ // No change on the schema
+ val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchemaBeforeRefresh =
+ DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
+ assert(tableSchemaBeforeRefresh == schema)
+ val partColsBeforeRefresh =
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
+ assert(partColsBeforeRefresh == partitionCols)
+ // Refresh does not affect the schema
+ spark.catalog.refreshTable(tabName)
+ val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchemaAfterRefresh =
+ DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
+ assert(tableSchemaAfterRefresh == schema)
+ val partColsAfterRefresh =
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
+ assert(partColsAfterRefresh == partitionCols)
+ }
+ }
+ }
test("desc table for parquet data source table using in-memory catalog") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tab1"
@@ -413,7 +615,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
- Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ new StructType().add("a", IntegerType).add("b", IntegerType))
assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
@@ -429,7 +631,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
- Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ new StructType().add("a", IntegerType).add("b", IntegerType))
assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
Some(BucketSpec(5, Seq("a"), Seq("b"))))