aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-02-01 09:43:35 -0800
committergatorsmile <gatorsmile@gmail.com>2017-02-01 09:43:35 -0800
commit5ed397baa758c29c54a853d3f8fee0ad44e97c14 (patch)
treeeb0fdc093d1d41cd33c74bb7cfe7e6516ff3d911 /sql/core/src/main/scala/org
parent04ee8cf633e17b6bf95225a8dd77bf2e06980eb3 (diff)
downloadspark-5ed397baa758c29c54a853d3f8fee0ad44e97c14.tar.gz
spark-5ed397baa758c29c54a853d3f8fee0ad44e97c14.tar.bz2
spark-5ed397baa758c29c54a853d3f8fee0ad44e97c14.zip
[SPARK-19296][SQL] Deduplicate url and table in JdbcUtils
## What changes were proposed in this pull request? This PR deduplicates arguments, `url` and `table` in `JdbcUtils` with `JDBCOptions`. It avoids to use duplicated arguments, for example, as below: from ```scala val jdbcOptions = new JDBCOptions(url, table, map) JdbcUtils.saveTable(ds, url, table, jdbcOptions) ``` to ```scala val jdbcOptions = new JDBCOptions(url, table, map) JdbcUtils.saveTable(ds, jdbcOptions) ``` ## How was this patch tested? Running unit test in `JdbcSuite`/`JDBCWriteSuite` Building with Scala 2.10 as below: ``` ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16753 from HyukjinKwon/SPARK-19296.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala26
2 files changed, 28 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index e39d936f39..88f6cb0021 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -52,38 +52,34 @@ class JdbcRelationProvider extends CreatableRelationProvider
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
- val jdbcOptions = new JDBCOptions(parameters)
- val url = jdbcOptions.url
- val table = jdbcOptions.table
- val createTableOptions = jdbcOptions.createTableOptions
- val isTruncate = jdbcOptions.isTruncate
+ val options = new JDBCOptions(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
- val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
+ val conn = JdbcUtils.createConnectionFactory(options)()
try {
- val tableExists = JdbcUtils.tableExists(conn, url, table)
+ val tableExists = JdbcUtils.tableExists(conn, options)
if (tableExists) {
mode match {
case SaveMode.Overwrite =>
- if (isTruncate && isCascadingTruncateTable(url) == Some(false)) {
+ if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
// In this case, we should truncate table and then load.
- truncateTable(conn, table)
- val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
- saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions)
+ truncateTable(conn, options.table)
+ val tableSchema = JdbcUtils.getSchemaOption(conn, options)
+ saveTable(df, tableSchema, isCaseSensitive, options)
} else {
// Otherwise, do not truncate the table, instead drop and recreate it
- dropTable(conn, table)
- createTable(df.schema, url, table, createTableOptions, conn)
- saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions)
+ dropTable(conn, options.table)
+ createTable(conn, df.schema, options)
+ saveTable(df, Some(df.schema), isCaseSensitive, options)
}
case SaveMode.Append =>
- val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
- saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions)
+ val tableSchema = JdbcUtils.getSchemaOption(conn, options)
+ saveTable(df, tableSchema, isCaseSensitive, options)
case SaveMode.ErrorIfExists =>
throw new AnalysisException(
- s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")
+ s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.")
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
@@ -91,8 +87,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
// Therefore, it is okay to do nothing here and then just return the relation below.
}
} else {
- createTable(df.schema, url, table, createTableOptions, conn)
- saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions)
+ createTable(conn, df.schema, options)
+ saveTable(df, Some(df.schema), isCaseSensitive, options)
}
} finally {
conn.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 0590aec77c..d89f600874 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -63,14 +63,14 @@ object JdbcUtils extends Logging {
/**
* Returns true if the table already exists in the JDBC database.
*/
- def tableExists(conn: Connection, url: String, table: String): Boolean = {
- val dialect = JdbcDialects.get(url)
+ def tableExists(conn: Connection, options: JDBCOptions): Boolean = {
+ val dialect = JdbcDialects.get(options.url)
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems using JDBC meta data calls, considering "table" could also include
// the database name. Query used to find table exists can be overridden by the dialects.
Try {
- val statement = conn.prepareStatement(dialect.getTableExistsQuery(table))
+ val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table))
try {
statement.executeQuery()
} finally {
@@ -235,11 +235,11 @@ object JdbcUtils extends Logging {
/**
* Returns the schema if the table already exists in the JDBC database.
*/
- def getSchemaOption(conn: Connection, url: String, table: String): Option[StructType] = {
- val dialect = JdbcDialects.get(url)
+ def getSchemaOption(conn: Connection, options: JDBCOptions): Option[StructType] = {
+ val dialect = JdbcDialects.get(options.url)
try {
- val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+ val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table))
try {
Some(getSchema(statement.executeQuery(), dialect))
} catch {
@@ -697,11 +697,11 @@ object JdbcUtils extends Logging {
*/
def saveTable(
df: DataFrame,
- url: String,
- table: String,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
options: JDBCOptions): Unit = {
+ val url = options.url
+ val table = options.table
val dialect = JdbcDialects.get(url)
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(options)
@@ -725,12 +725,12 @@ object JdbcUtils extends Logging {
* Creates a table with a given schema.
*/
def createTable(
+ conn: Connection,
schema: StructType,
- url: String,
- table: String,
- createTableOptions: String,
- conn: Connection): Unit = {
- val strSchema = schemaString(schema, url)
+ options: JDBCOptions): Unit = {
+ val strSchema = schemaString(schema, options.url)
+ val table = options.table
+ val createTableOptions = options.createTableOptions
// Create the table if the table does not exist.
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.