From 2db4662fe2d72749c06ad5f11f641a388343f77c Mon Sep 17 00:00:00 2001 From: CK50 Date: Mon, 30 Nov 2015 20:08:49 +0800 Subject: [SPARK-11989][SQL] Only use commit in JDBC data source if the underlying database supports transactions Fixes [SPARK-11989](https://issues.apache.org/jira/browse/SPARK-11989) Author: CK50 Author: Christian Kurz Closes #9973 from CK50/branch-1.6_non-transactional. (cherry picked from commit a589736a1b237ef2f3bd59fbaeefe143ddcc8f4e) Signed-off-by: Reynold Xin --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7375a5c091..252f1cfd5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement} import java.util.Properties import scala.util.Try +import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType, JdbcDialects} @@ -125,8 +126,19 @@ object JdbcUtils extends Logging { dialect: JdbcDialect): Iterator[Byte] = { val conn = getConnection() var committed = false + val supportsTransactions = try { + conn.getMetaData().supportsDataManipulationTransactionsOnly() || + conn.getMetaData().supportsDataDefinitionAndDataManipulationTransactions() + } catch { + case NonFatal(e) => + logWarning("Exception while detecting transaction support", e) + true + } + try { - conn.setAutoCommit(false) // Everything in the same db transaction. + if (supportsTransactions) { + conn.setAutoCommit(false) // Everything in the same db transaction. + } val stmt = insertStatement(conn, table, rddSchema) try { var rowCount = 0 @@ -175,14 +187,18 @@ object JdbcUtils extends Logging { } finally { stmt.close() } - conn.commit() + if (supportsTransactions) { + conn.commit() + } committed = true } finally { if (!committed) { // The stage must fail. We got here through an exception path, so // let the exception through unless rollback() or close() want to // tell the user about another problem. - conn.rollback() + if (supportsTransactions) { + conn.rollback() + } conn.close() } else { // The stage must succeed. We cannot propagate any exception close() might throw. -- cgit v1.2.3