aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorGraceH <93113783@qq.com>2016-08-13 11:39:58 +0100
committerSean Owen <sowen@cloudera.com>2016-08-13 11:39:58 +0100
commit8c8acdec9365136cba13060ce36c22b28e29b59b (patch)
tree77d36550321fb0872d1c9a5ad4e4f8328a56cea4 /sql/core/src
parent7f7133bdccecaccd6dfb52f13c18c1e320d65f86 (diff)
downloadspark-8c8acdec9365136cba13060ce36c22b28e29b59b.tar.gz
spark-8c8acdec9365136cba13060ce36c22b28e29b59b.tar.bz2
spark-8c8acdec9365136cba13060ce36c22b28e29b59b.zip
[SPARK-16968] Add additional options in jdbc when creating a new table
## What changes were proposed in this pull request? In the PR, we just allow the user to add additional options when create a new table in JDBC writer. The options can be table_options or partition_options. E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" Here is the usage example: ``` df.write.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8").jdbc(...) ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) will apply test result soon. Author: GraceH <93113783@qq.com> Closes #14559 from GraceH/jdbc_options.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala12
3 files changed, 51 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6dbed26b0d..44a9f312bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.StructType
/**
@@ -415,39 +415,49 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")
+ // to add required options like URL and dbtable
+ val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
+ val jdbcOptions = new JDBCOptions(params)
+ val jdbcUrl = jdbcOptions.url
+ val jdbcTable = jdbcOptions.table
+
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
- val conn = JdbcUtils.createConnectionFactory(url, props)()
+ val conn = JdbcUtils.createConnectionFactory(jdbcUrl, props)()
try {
- var tableExists = JdbcUtils.tableExists(conn, url, table)
+ var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)
if (mode == SaveMode.Ignore && tableExists) {
return
}
if (mode == SaveMode.ErrorIfExists && tableExists) {
- sys.error(s"Table $table already exists.")
+ sys.error(s"Table $jdbcTable already exists.")
}
if (mode == SaveMode.Overwrite && tableExists) {
- if (extraOptions.getOrElse("truncate", "false").toBoolean &&
- JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
- JdbcUtils.truncateTable(conn, table)
+ if (jdbcOptions.isTruncate &&
+ JdbcUtils.isCascadingTruncateTable(jdbcUrl) == Some(false)) {
+ JdbcUtils.truncateTable(conn, jdbcTable)
} else {
- JdbcUtils.dropTable(conn, table)
+ JdbcUtils.dropTable(conn, jdbcTable)
tableExists = false
}
}
// Create the table if the table didn't exist.
if (!tableExists) {
- val schema = JdbcUtils.schemaString(df, url)
- val sql = s"CREATE TABLE $table ($schema)"
+ val schema = JdbcUtils.schemaString(df, jdbcUrl)
+ // To allow certain options to append when create a new table, which can be
+ // table_options or partition_options.
+ // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+ val createtblOptions = jdbcOptions.createTableOptions
+ val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
@@ -459,7 +469,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
conn.close()
}
- JdbcUtils.saveTable(df, url, table, props)
+ JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 6c6ec89746..1db090eaf9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -20,14 +20,21 @@ package org.apache.spark.sql.execution.datasources.jdbc
/**
* Options for the JDBC data source.
*/
-private[jdbc] class JDBCOptions(
+class JDBCOptions(
@transient private val parameters: Map[String, String])
extends Serializable {
+ // ------------------------------------------------------------
+ // Required parameters
+ // ------------------------------------------------------------
// a JDBC URL
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
// name of table
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
+
+ // ------------------------------------------------------------
+ // Optional parameter list
+ // ------------------------------------------------------------
// the column used to partition
val partitionColumn = parameters.getOrElse("partitionColumn", null)
// the lower bound of partition column
@@ -36,4 +43,14 @@ private[jdbc] class JDBCOptions(
val upperBound = parameters.getOrElse("upperBound", null)
// the number of partitions
val numPartitions = parameters.getOrElse("numPartitions", null)
+
+ // ------------------------------------------------------------
+ // The options for DataFrameWriter
+ // ------------------------------------------------------------
+ // if to truncate the table from the JDBC database
+ val isTruncate = parameters.getOrElse("truncate", "false").toBoolean
+ // the create table option , which can be table_options or partition_options.
+ // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+ // TODO: to reuse the existing partition parameters for those partition specific options
+ val createTableOptions = parameters.getOrElse("createTableOptions", "")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index d99b3cf975..ff3309874f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -174,6 +174,18 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
JdbcDialects.unregisterDialect(testH2Dialect)
}
+ test("createTableOptions") {
+ JdbcDialects.registerDialect(testH2Dialect)
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ val m = intercept[org.h2.jdbc.JdbcSQLException] {
+ df.write.option("createTableOptions", "ENGINE tableEngineName")
+ .jdbc(url1, "TEST.CREATETBLOPTS", properties)
+ }.getMessage
+ assert(m.contains("Class \"TABLEENGINENAME\" not found"))
+ JdbcDialects.unregisterDialect(testH2Dialect)
+ }
+
test("Incompatible INSERT to append") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)