diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-07-24 09:25:02 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-24 09:25:02 +0100 |
commit | cc1d2dcb612fb5df39c9a9e57a3484ecad90c745 (patch) | |
tree | 86a6f7deb1e717a48f838145f302caac4550ad94 /sql/core/src/main | |
parent | d6795c7a254b83d4ae4785f3add74981e5273c91 (diff) | |
download | spark-cc1d2dcb612fb5df39c9a9e57a3484ecad90c745.tar.gz spark-cc1d2dcb612fb5df39c9a9e57a3484ecad90c745.tar.bz2 spark-cc1d2dcb612fb5df39c9a9e57a3484ecad90c745.zip |
[SPARK-16463][SQL] Support `truncate` option in Overwrite mode for JDBC DataFrameWriter
## What changes were proposed in this pull request?
This PR adds a boolean option, `truncate`, for `SaveMode.Overwrite` of JDBC DataFrameWriter. If this option is `true`, it try to take advantage of `TRUNCATE TABLE` instead of `DROP TABLE`. This is a trivial option, but will provide great **convenience** for BI tool users based on RDBMS tables generated by Spark.
**Goal**
- Without `CREATE/DROP` privilege, we can save dataframe to database. Sometime these are not allowed for security.
- It will preserve the existing table information, so users can add and keep some additional `INDEX` and `CONSTRAINT`s for the table.
- Sometime, `TRUNCATE` is faster than the combination of `DROP/CREATE`.
**Supported DBMS**
The following is `truncate`-option support table. Due to the different behavior of `TRUNCATE TABLE` among DBMSs, it's not always safe to use `TRUNCATE TABLE`. Spark will ignore the `truncate` option for **unknown** and **some** DBMS with **default CASCADING** behavior. Newly added JDBCDialect should implement corresponding function to support `truncate` option additionally.
Spark Dialects | `truncate` OPTION SUPPORT
---------------|-------------------------------
MySQLDialect | O
PostgresDialect | X
DB2Dialect | O
MsSqlServerDialect | O
DerbyDialect | O
OracleDialect | O
**Before (TABLE with INDEX case)**: SparkShell & MySQL CLI are interleaved intentionally.
```scala
scala> val (url, prop)=("jdbc:mysql://localhost:3306/temp?useSSL=false", new java.util.Properties)
scala> prop.setProperty("user","root")
scala> df.write.mode("overwrite").jdbc(url, "table_with_index", prop)
scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | | NULL | |
+-------+------------+------+-----+---------+-------+
mysql> CREATE UNIQUE INDEX idx_id ON table_with_index(id);
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | PRI | NULL | |
+-------+------------+------+-----+---------+-------+
scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | | NULL | |
+-------+------------+------+-----+---------+-------+
```
**After (TABLE with INDEX case)**
```scala
scala> spark.range(10).write.mode("overwrite").option("truncate", true).jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | PRI | NULL | |
+-------+------------+------+-----+---------+-------+
```
**Error Handling**
- In case of exceptions, Spark will not retry. Users should turn off the `truncate` option.
- In case of schema change:
- If one of the column names changes, this will raise exceptions intuitively.
- If there exists only type difference, this will work like Append mode.
## How was this patch tested?
Pass the Jenkins tests with a updated testcase.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #14086 from dongjoon-hyun/SPARK-16410.
Diffstat (limited to 'sql/core/src/main')
8 files changed, 49 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6a8dfac0a..753b64b983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -387,6 +387,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * + * You can set the following JDBC-specific option(s) for storing JDBC: + * <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP TABLE`.</li> + * + * In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also, + * due to the different behavior of `TRUNCATE TABLE` among DBMS, it's not always safe to use this. + * MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this + * while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect, + * the user option `truncate` is ignored. + * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string @@ -423,8 +432,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } if (mode == SaveMode.Overwrite && tableExists) { - JdbcUtils.dropTable(conn, table) - tableExists = false + if (extraOptions.getOrElse("truncate", "false").toBoolean && + JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { + JdbcUtils.truncateTable(conn, table) + } else { + JdbcUtils.dropTable(conn, table) + tableExists = false + } } // Create the table if the table didn't exist. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index ce71a7d1e6..cb474cbd0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -99,6 +99,22 @@ object JdbcUtils extends Logging { } /** + * Truncates a table from the JDBC database. + */ + def truncateTable(conn: Connection, table: String): Unit = { + val statement = conn.createStatement + try { + statement.executeUpdate(s"TRUNCATE TABLE $table") + } finally { + statement.close() + } + } + + def isCascadingTruncateTable(url: String): Option[Boolean] = { + JdbcDialects.get(url).isCascadingTruncateTable() + } + + /** * Returns a PreparedStatement that inserts a row into table via conn. */ def insertStatement(conn: Connection, table: String, rddSchema: StructType, dialect: JdbcDialect) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index f12b6ca9d6..190463df0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -28,4 +28,6 @@ private object DB2Dialect extends JdbcDialect { case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR)) case _ => None } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 948106fd06..78107809a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -108,6 +108,13 @@ abstract class JdbcDialect extends Serializable { def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = { } + /** + * Return Some[true] iff `TRUNCATE TABLE` causes cascading default. + * Some[true] : TRUNCATE TABLE causes cascading. + * Some[false] : TRUNCATE TABLE does not cause cascading. + * None: The behavior of TRUNCATE TABLE is unknown (default). + */ + def isCascadingTruncateTable(): Option[Boolean] = None } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 3eb722b070..70122f2599 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -38,4 +38,6 @@ private object MsSqlServerDialect extends JdbcDialect { case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP)) case _ => None } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index e1717049f3..b2cff7877d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -44,4 +44,6 @@ private case object MySQLDialect extends JdbcDialect { override def getTableExistsQuery(table: String): String = { s"SELECT 1 FROM $table LIMIT 1" } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index b795e8b42d..ce8731efd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -53,4 +53,6 @@ private case object OracleDialect extends JdbcDialect { case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) case _ => None } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 6baf1b6f16..fb959d881e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -94,4 +94,6 @@ private object PostgresDialect extends JdbcDialect { } } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(true) } |