aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorsureshthalamati <suresh.thalamati@gmail.com>2017-03-23 17:39:33 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-23 17:39:33 -0700
commitc7911807050227fcd13161ce090330d9d8daa533 (patch)
treed22689bed1b891c4e988f5334a47b92c06e4fe15 /sql/core/src/test/scala
parentb7be05a203b3e2a307147ea0c6cb0dec03da82a2 (diff)
downloadspark-c7911807050227fcd13161ce090330d9d8daa533.tar.gz
spark-c7911807050227fcd13161ce090330d9d8daa533.tar.bz2
spark-c7911807050227fcd13161ce090330d9d8daa533.zip
[SPARK-10849][SQL] Adds option to the JDBC data source write for user to specify database column type for the create table
## What changes were proposed in this pull request? Currently JDBC data source creates tables in the target database using the default type mapping, and the JDBC dialect mechanism.  If users want to specify different database data type for only some of columns, there is no option available. In scenarios where default mapping does not work, users are forced to create tables on the target database before writing. This workaround is probably not acceptable from a usability point of view. This PR is to provide a user-defined type mapping for specific columns. The solution is to allow users to specify database column data type for the create table as JDBC datasource option(createTableColumnTypes) on write. Data type information can be specified in the same format as table schema DDL format (e.g: `name CHAR(64), comments VARCHAR(1024)`). All supported target database types can not be specified , the data types has to be valid spark sql data types also. For example user can not specify target database CLOB data type. This will be supported in the follow-up PR. Example: ```Scala df.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc(url, "TEST.DBCOLTYPETEST", properties) ``` ## How was this patch tested? Added new test cases to the JDBCWriteSuite Author: sureshthalamati <suresh.thalamati@gmail.com> Closes #16209 from sureshthalamati/jdbc_custom_dbtype_option_json-spark-10849.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala150
2 files changed, 148 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5463728ca0..4a02277631 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -869,7 +869,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
- val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp")
+ val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
assert(schema.contains("`order` TEXT"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index ec7b19e666..bf1fd16070 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -17,15 +17,16 @@
package org.apache.spark.sql.jdbc
-import java.sql.DriverManager
+import java.sql.{Date, DriverManager, Timestamp}
import java.util.Properties
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
-import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -362,4 +363,147 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(sql("select * from people_view").count() == 2)
}
}
+
+ test("SPARK-10849: test schemaString - from createTableColumnTypes option values") {
+ def testCreateTableColDataTypes(types: Seq[String]): Unit = {
+ val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) }
+ val schema = colTypes
+ .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2))
+ val createTableColTypes =
+ colTypes.map { case (col, dataType) => s"$col $dataType" }.mkString(", ")
+ val df = spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema)
+
+ val expectedSchemaStr =
+ colTypes.map { case (col, dataType) => s""""$col" $dataType """ }.mkString(", ")
+
+ assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes)) == expectedSchemaStr)
+ }
+
+ testCreateTableColDataTypes(Seq("boolean"))
+ testCreateTableColDataTypes(Seq("tinyint", "smallint", "int", "bigint"))
+ testCreateTableColDataTypes(Seq("float", "double"))
+ testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)"))
+ testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)"))
+ testCreateTableColDataTypes(Seq("date", "timestamp"))
+ testCreateTableColDataTypes(Seq("binary"))
+ }
+
+ test("SPARK-10849: create table using user specified column type and verify on target table") {
+ def testUserSpecifiedColTypes(
+ df: DataFrame,
+ createTableColTypes: String,
+ expectedTypes: Map[String, String]): Unit = {
+ df.write
+ .mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", createTableColTypes)
+ .jdbc(url1, "TEST.DBCOLTYPETEST", properties)
+
+ // verify the data types of the created table by reading the database catalog of H2
+ val query =
+ """
+ |(SELECT column_name, type_name, character_maximum_length
+ | FROM information_schema.columns WHERE table_name = 'DBCOLTYPETEST')
+ """.stripMargin
+ val rows = spark.read.jdbc(url1, query, properties).collect()
+
+ rows.foreach { row =>
+ val typeName = row.getString(1)
+ // For CHAR and VARCHAR, we also compare the max length
+ if (typeName.contains("CHAR")) {
+ val charMaxLength = row.getInt(2)
+ assert(expectedTypes(row.getString(0)) == s"$typeName($charMaxLength)")
+ } else {
+ assert(expectedTypes(row.getString(0)) == typeName)
+ }
+ }
+ }
+
+ val data = Seq[Row](Row(1, "dave", "Boston"))
+ val schema = StructType(
+ StructField("id", IntegerType) ::
+ StructField("first#name", StringType) ::
+ StructField("city", StringType) :: Nil)
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+
+ // out-of-order
+ val expected1 = Map("id" -> "BIGINT", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)")
+ testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), id BIGINT, city CHAR(20)", expected1)
+ // partial schema
+ val expected2 = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)")
+ testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), city CHAR(20)", expected2)
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ // should still respect the original column names
+ val expected = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CLOB")
+ testUserSpecifiedColTypes(df, "`FiRsT#NaMe` VARCHAR(123)", expected)
+ }
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val schema = StructType(
+ StructField("id", IntegerType) ::
+ StructField("First#Name", StringType) ::
+ StructField("city", StringType) :: Nil)
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ val expected = Map("id" -> "INTEGER", "First#Name" -> "VARCHAR(123)", "city" -> "CLOB")
+ testUserSpecifiedColTypes(df, "`First#Name` VARCHAR(123)", expected)
+ }
+ }
+
+ test("SPARK-10849: jdbc CreateTableColumnTypes option with invalid data type") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ val msg = intercept[ParseException] {
+ df.write.mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", "name CLOB(2000)")
+ .jdbc(url1, "TEST.USERDBTYPETEST", properties)
+ }.getMessage()
+ assert(msg.contains("DataType clob(2000) is not supported."))
+ }
+
+ test("SPARK-10849: jdbc CreateTableColumnTypes option with invalid syntax") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ val msg = intercept[ParseException] {
+ df.write.mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", "`name char(20)") // incorrectly quoted column
+ .jdbc(url1, "TEST.USERDBTYPETEST", properties)
+ }.getMessage()
+ assert(msg.contains("no viable alternative at input"))
+ }
+
+ test("SPARK-10849: jdbc CreateTableColumnTypes duplicate columns") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ val msg = intercept[AnalysisException] {
+ df.write.mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", "name CHAR(20), id int, NaMe VARCHAR(100)")
+ .jdbc(url1, "TEST.USERDBTYPETEST", properties)
+ }.getMessage()
+ assert(msg.contains(
+ "Found duplicate column(s) in createTableColumnTypes option value: name, NaMe"))
+ }
+ }
+
+ test("SPARK-10849: jdbc CreateTableColumnTypes invalid columns") {
+ // schema2 has the column "id" and "name"
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val msg = intercept[AnalysisException] {
+ df.write.mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", "firstName CHAR(20), id int")
+ .jdbc(url1, "TEST.USERDBTYPETEST", properties)
+ }.getMessage()
+ assert(msg.contains("createTableColumnTypes option column firstName not found in " +
+ "schema struct<name:string,id:int>"))
+ }
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val msg = intercept[AnalysisException] {
+ df.write.mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", "id int, Name VARCHAR(100)")
+ .jdbc(url1, "TEST.USERDBTYPETEST", properties)
+ }.getMessage()
+ assert(msg.contains("createTableColumnTypes option column Name not found in " +
+ "schema struct<name:string,id:int>"))
+ }
+ }
}