aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-10-07 10:52:32 -0700
committergatorsmile <gatorsmile@gmail.com>2016-10-07 10:52:32 -0700
commitaa3a6841ebaf45efb5d3930a93869948bdd0d2b6 (patch)
treeaccbb233c9e1b621d9f2958cb51f6d7b1e3b52cc
parentcff560755244dd4ccb998e0c56e81d2620cd4cff (diff)
downloadspark-aa3a6841ebaf45efb5d3930a93869948bdd0d2b6.tar.gz
spark-aa3a6841ebaf45efb5d3930a93869948bdd0d2b6.tar.bz2
spark-aa3a6841ebaf45efb5d3930a93869948bdd0d2b6.zip
[SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider
## What changes were proposed in this pull request? This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940 Also, this PR proposes the changes below: - Add documentation for `batchsize` and `isolationLevel`. - Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15263 from HyukjinKwon/SPARK-14525.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala82
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala13
4 files changed, 74 insertions, 52 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index ae04af2479..3a8a197ef5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
class JdbcRelationProvider extends CreatableRelationProvider
@@ -50,67 +51,52 @@ class JdbcRelationProvider extends CreatableRelationProvider
JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
}
- /*
- * The following structure applies to this code:
- * | tableExists | !tableExists
- *------------------------------------------------------------------------------------
- * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation
- * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation
- * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation
- * | saveTable, BaseRelation |
- * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation
- *
- * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate
- */
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- val jdbcOptions = new JDBCOptions(parameters)
- val url = jdbcOptions.url
- val table = jdbcOptions.table
-
+ df: DataFrame): BaseRelation = {
+ val options = new JDBCOptions(parameters)
+ val url = options.url
+ val table = options.table
+ val createTableOptions = options.createTableOptions
+ val isTruncate = options.isTruncate
val props = new Properties()
props.putAll(parameters.asJava)
- val conn = JdbcUtils.createConnectionFactory(url, props)()
+ val conn = JdbcUtils.createConnectionFactory(url, props)()
try {
val tableExists = JdbcUtils.tableExists(conn, url, table)
+ if (tableExists) {
+ mode match {
+ case SaveMode.Overwrite =>
+ if (isTruncate && isCascadingTruncateTable(url).contains(false)) {
+ // In this case, we should truncate table and then load.
+ truncateTable(conn, table)
+ saveTable(df, url, table, props)
+ } else {
+ // Otherwise, do not truncate the table, instead drop and recreate it
+ dropTable(conn, table)
+ createTable(df.schema, url, table, createTableOptions, conn)
+ saveTable(df, url, table, props)
+ }
- val (doCreate, doSave) = (mode, tableExists) match {
- case (SaveMode.Ignore, true) => (false, false)
- case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(
- s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.")
- case (SaveMode.Overwrite, true) =>
- if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
- JdbcUtils.truncateTable(conn, table)
- (false, true)
- } else {
- JdbcUtils.dropTable(conn, table)
- (true, true)
- }
- case (SaveMode.Append, true) => (false, true)
- case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," +
- " for handling existing tables.")
- case (_, false) => (true, true)
- }
+ case SaveMode.Append =>
+ saveTable(df, url, table, props)
+
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(
+ s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")
- if (doCreate) {
- val schema = JdbcUtils.schemaString(data, url)
- // To allow certain options to append when create a new table, which can be
- // table_options or partition_options.
- // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
- val createtblOptions = jdbcOptions.createTableOptions
- val sql = s"CREATE TABLE $table ($schema) $createtblOptions"
- val statement = conn.createStatement
- try {
- statement.executeUpdate(sql)
- } finally {
- statement.close()
+ case SaveMode.Ignore =>
+ // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
+ // to not save the contents of the DataFrame and to not change the existing data.
+ // Therefore, it is okay to do nothing here and then just return the relation below.
}
+ } else {
+ createTable(df.schema, url, table, createTableOptions, conn)
+ saveTable(df, url, table, props)
}
- if (doSave) JdbcUtils.saveTable(data, url, table, props)
} finally {
conn.close()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 3db1d1f109..66f2bada2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -552,7 +552,7 @@ object JdbcUtils extends Logging {
isolationLevel: Int): Iterator[Byte] = {
require(batchSize >= 1,
s"Invalid value `${batchSize.toString}` for parameter " +
- s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.")
+ s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.")
val conn = getConnection()
var committed = false
@@ -657,10 +657,10 @@ object JdbcUtils extends Logging {
/**
* Compute the schema string for this RDD.
*/
- def schemaString(df: DataFrame, url: String): String = {
+ def schemaString(schema: StructType, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
- df.schema.fields foreach { field =>
+ schema.fields foreach { field =>
val name = dialect.quoteIdentifier(field.name)
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
val nullable = if (field.nullable) "" else "NOT NULL"
@@ -697,4 +697,27 @@ object JdbcUtils extends Logging {
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
+
+ /**
+ * Creates a table with a given schema.
+ */
+ def createTable(
+ schema: StructType,
+ url: String,
+ table: String,
+ createTableOptions: String,
+ conn: Connection): Unit = {
+ val strSchema = schemaString(schema, url)
+ // Create the table if the table does not exist.
+ // To allow certain options to append when create a new table, which can be
+ // table_options or partition_options.
+ // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+ val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
+ val statement = conn.createStatement
+ try {
+ statement.executeUpdate(sql)
+ } finally {
+ statement.close()
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 10f15ca280..7cc3989b79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -788,7 +788,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
- val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
+ val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp")
assert(schema.contains("`order` TEXT"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 506971362f..62b29db4d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -132,6 +132,19 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
}
}
+ test("CREATE with ignore") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
+ val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
+
+ df.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
+ assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
+ assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+
+ df2.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
+ assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
+ assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+ }
+
test("CREATE with overwrite") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)