From aa3a6841ebaf45efb5d3930a93869948bdd0d2b6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 7 Oct 2016 10:52:32 -0700 Subject: [SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider ## What changes were proposed in this pull request? This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940 Also, this PR proposes the changes below: - Add documentation for `batchsize` and `isolationLevel`. - Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon Closes #15263 from HyukjinKwon/SPARK-14525. --- .../datasources/jdbc/JdbcRelationProvider.scala | 82 +++++++++------------- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 29 +++++++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 13 ++++ 4 files changed, 74 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index ae04af2479..3a8a197ef5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -22,6 +22,7 @@ import java.util.Properties import scala.collection.JavaConverters.mapAsJavaMapConverter import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._ import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} class JdbcRelationProvider extends CreatableRelationProvider @@ -50,67 +51,52 @@ class JdbcRelationProvider extends CreatableRelationProvider JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } - /* - * The following structure applies to this code: - * | tableExists | !tableExists - *------------------------------------------------------------------------------------ - * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation - * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation - * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation - * | saveTable, BaseRelation | - * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation - * - * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate - */ override def createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], - data: DataFrame): BaseRelation = { - val jdbcOptions = new JDBCOptions(parameters) - val url = jdbcOptions.url - val table = jdbcOptions.table - + df: DataFrame): BaseRelation = { + val options = new JDBCOptions(parameters) + val url = options.url + val table = options.table + val createTableOptions = options.createTableOptions + val isTruncate = options.isTruncate val props = new Properties() props.putAll(parameters.asJava) - val conn = JdbcUtils.createConnectionFactory(url, props)() + val conn = JdbcUtils.createConnectionFactory(url, props)() try { val tableExists = JdbcUtils.tableExists(conn, url, table) + if (tableExists) { + mode match { + case SaveMode.Overwrite => + if (isTruncate && isCascadingTruncateTable(url).contains(false)) { + // In this case, we should truncate table and then load. + truncateTable(conn, table) + saveTable(df, url, table, props) + } else { + // Otherwise, do not truncate the table, instead drop and recreate it + dropTable(conn, table) + createTable(df.schema, url, table, createTableOptions, conn) + saveTable(df, url, table, props) + } - val (doCreate, doSave) = (mode, tableExists) match { - case (SaveMode.Ignore, true) => (false, false) - case (SaveMode.ErrorIfExists, true) => throw new AnalysisException( - s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.") - case (SaveMode.Overwrite, true) => - if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { - JdbcUtils.truncateTable(conn, table) - (false, true) - } else { - JdbcUtils.dropTable(conn, table) - (true, true) - } - case (SaveMode.Append, true) => (false, true) - case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," + - " for handling existing tables.") - case (_, false) => (true, true) - } + case SaveMode.Append => + saveTable(df, url, table, props) + + case SaveMode.ErrorIfExists => + throw new AnalysisException( + s"Table or view '$table' already exists. SaveMode: ErrorIfExists.") - if (doCreate) { - val schema = JdbcUtils.schemaString(data, url) - // To allow certain options to append when create a new table, which can be - // table_options or partition_options. - // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" - val createtblOptions = jdbcOptions.createTableOptions - val sql = s"CREATE TABLE $table ($schema) $createtblOptions" - val statement = conn.createStatement - try { - statement.executeUpdate(sql) - } finally { - statement.close() + case SaveMode.Ignore => + // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected + // to not save the contents of the DataFrame and to not change the existing data. + // Therefore, it is okay to do nothing here and then just return the relation below. } + } else { + createTable(df.schema, url, table, createTableOptions, conn) + saveTable(df, url, table, props) } - if (doSave) JdbcUtils.saveTable(data, url, table, props) } finally { conn.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 3db1d1f109..66f2bada2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -552,7 +552,7 @@ object JdbcUtils extends Logging { isolationLevel: Int): Iterator[Byte] = { require(batchSize >= 1, s"Invalid value `${batchSize.toString}` for parameter " + - s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.") + s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.") val conn = getConnection() var committed = false @@ -657,10 +657,10 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(df: DataFrame, url: String): String = { + def schemaString(schema: StructType, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) - df.schema.fields foreach { field => + schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" @@ -697,4 +697,27 @@ object JdbcUtils extends Logging { getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel) ) } + + /** + * Creates a table with a given schema. + */ + def createTable( + schema: StructType, + url: String, + table: String, + createTableOptions: String, + conn: Connection): Unit = { + val strSchema = schemaString(schema, url) + // Create the table if the table does not exist. + // To allow certain options to append when create a new table, which can be + // table_options or partition_options. + // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" + val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions" + val statement = conn.createStatement + try { + statement.executeUpdate(sql) + } finally { + statement.close() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 10f15ca280..7cc3989b79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -788,7 +788,7 @@ class JDBCSuite extends SparkFunSuite test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") { val df = spark.createDataset(Seq("a", "b", "c")).toDF("order") - val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") + val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 506971362f..62b29db4d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -132,6 +132,19 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } } + test("CREATE with ignore") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3) + val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + + df.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties) + assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count()) + assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) + + df2.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties) + assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count()) + assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) + } + test("CREATE with overwrite") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3) val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) -- cgit v1.2.3