diff options
2 files changed, 28 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index e39d936f39..88f6cb0021 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -52,38 +52,34 @@ class JdbcRelationProvider extends CreatableRelationProvider mode: SaveMode, parameters: Map[String, String], df: DataFrame): BaseRelation = { - val jdbcOptions = new JDBCOptions(parameters) - val url = jdbcOptions.url - val table = jdbcOptions.table - val createTableOptions = jdbcOptions.createTableOptions - val isTruncate = jdbcOptions.isTruncate + val options = new JDBCOptions(parameters) val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis - val conn = JdbcUtils.createConnectionFactory(jdbcOptions)() + val conn = JdbcUtils.createConnectionFactory(options)() try { - val tableExists = JdbcUtils.tableExists(conn, url, table) + val tableExists = JdbcUtils.tableExists(conn, options) if (tableExists) { mode match { case SaveMode.Overwrite => - if (isTruncate && isCascadingTruncateTable(url) == Some(false)) { + if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) { // In this case, we should truncate table and then load. - truncateTable(conn, table) - val tableSchema = JdbcUtils.getSchemaOption(conn, url, table) - saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions) + truncateTable(conn, options.table) + val tableSchema = JdbcUtils.getSchemaOption(conn, options) + saveTable(df, tableSchema, isCaseSensitive, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it - dropTable(conn, table) - createTable(df.schema, url, table, createTableOptions, conn) - saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions) + dropTable(conn, options.table) + createTable(conn, df.schema, options) + saveTable(df, Some(df.schema), isCaseSensitive, options) } case SaveMode.Append => - val tableSchema = JdbcUtils.getSchemaOption(conn, url, table) - saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions) + val tableSchema = JdbcUtils.getSchemaOption(conn, options) + saveTable(df, tableSchema, isCaseSensitive, options) case SaveMode.ErrorIfExists => throw new AnalysisException( - s"Table or view '$table' already exists. SaveMode: ErrorIfExists.") + s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.") case SaveMode.Ignore => // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected @@ -91,8 +87,8 @@ class JdbcRelationProvider extends CreatableRelationProvider // Therefore, it is okay to do nothing here and then just return the relation below. } } else { - createTable(df.schema, url, table, createTableOptions, conn) - saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions) + createTable(conn, df.schema, options) + saveTable(df, Some(df.schema), isCaseSensitive, options) } } finally { conn.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 0590aec77c..d89f600874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -63,14 +63,14 @@ object JdbcUtils extends Logging { /** * Returns true if the table already exists in the JDBC database. */ - def tableExists(conn: Connection, url: String, table: String): Boolean = { - val dialect = JdbcDialects.get(url) + def tableExists(conn: Connection, options: JDBCOptions): Boolean = { + val dialect = JdbcDialects.get(options.url) // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overridden by the dialects. Try { - val statement = conn.prepareStatement(dialect.getTableExistsQuery(table)) + val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table)) try { statement.executeQuery() } finally { @@ -235,11 +235,11 @@ object JdbcUtils extends Logging { /** * Returns the schema if the table already exists in the JDBC database. */ - def getSchemaOption(conn: Connection, url: String, table: String): Option[StructType] = { - val dialect = JdbcDialects.get(url) + def getSchemaOption(conn: Connection, options: JDBCOptions): Option[StructType] = { + val dialect = JdbcDialects.get(options.url) try { - val statement = conn.prepareStatement(dialect.getSchemaQuery(table)) + val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table)) try { Some(getSchema(statement.executeQuery(), dialect)) } catch { @@ -697,11 +697,11 @@ object JdbcUtils extends Logging { */ def saveTable( df: DataFrame, - url: String, - table: String, tableSchema: Option[StructType], isCaseSensitive: Boolean, options: JDBCOptions): Unit = { + val url = options.url + val table = options.table val dialect = JdbcDialects.get(url) val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(options) @@ -725,12 +725,12 @@ object JdbcUtils extends Logging { * Creates a table with a given schema. */ def createTable( + conn: Connection, schema: StructType, - url: String, - table: String, - createTableOptions: String, - conn: Connection): Unit = { - val strSchema = schemaString(schema, url) + options: JDBCOptions): Unit = { + val strSchema = schemaString(schema, options.url) + val table = options.table + val createTableOptions = options.createTableOptions // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be // table_options or partition_options. |