diff options
author | CK50 <christian.kurz@oracle.com> | 2015-11-30 20:08:49 +0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-30 20:09:05 +0800 |
commit | 2db4662fe2d72749c06ad5f11f641a388343f77c (patch) | |
tree | 79fff354453226db6de26b2d294efe891f917dad | |
parent | bf0e85a70a54a2d7fd6804b6bd00c63c20e2bb00 (diff) | |
download | spark-2db4662fe2d72749c06ad5f11f641a388343f77c.tar.gz spark-2db4662fe2d72749c06ad5f11f641a388343f77c.tar.bz2 spark-2db4662fe2d72749c06ad5f11f641a388343f77c.zip |
[SPARK-11989][SQL] Only use commit in JDBC data source if the underlying database supports transactions
Fixes [SPARK-11989](https://issues.apache.org/jira/browse/SPARK-11989)
Author: CK50 <christian.kurz@oracle.com>
Author: Christian Kurz <christian.kurz@oracle.com>
Closes #9973 from CK50/branch-1.6_non-transactional.
(cherry picked from commit a589736a1b237ef2f3bd59fbaeefe143ddcc8f4e)
Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7375a5c091..252f1cfd5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement} import java.util.Properties import scala.util.Try +import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType, JdbcDialects} @@ -125,8 +126,19 @@ object JdbcUtils extends Logging { dialect: JdbcDialect): Iterator[Byte] = { val conn = getConnection() var committed = false + val supportsTransactions = try { + conn.getMetaData().supportsDataManipulationTransactionsOnly() || + conn.getMetaData().supportsDataDefinitionAndDataManipulationTransactions() + } catch { + case NonFatal(e) => + logWarning("Exception while detecting transaction support", e) + true + } + try { - conn.setAutoCommit(false) // Everything in the same db transaction. + if (supportsTransactions) { + conn.setAutoCommit(false) // Everything in the same db transaction. + } val stmt = insertStatement(conn, table, rddSchema) try { var rowCount = 0 @@ -175,14 +187,18 @@ object JdbcUtils extends Logging { } finally { stmt.close() } - conn.commit() + if (supportsTransactions) { + conn.commit() + } committed = true } finally { if (!committed) { // The stage must fail. We got here through an exception path, so // let the exception through unless rollback() or close() want to // tell the user about another problem. - conn.rollback() + if (supportsTransactions) { + conn.rollback() + } conn.close() } else { // The stage must succeed. We cannot propagate any exception close() might throw. |