aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCK50 <christian.kurz@oracle.com>2015-11-30 20:08:49 +0800
committerReynold Xin <rxin@databricks.com>2015-11-30 20:09:05 +0800
commit2db4662fe2d72749c06ad5f11f641a388343f77c (patch)
tree79fff354453226db6de26b2d294efe891f917dad
parentbf0e85a70a54a2d7fd6804b6bd00c63c20e2bb00 (diff)
downloadspark-2db4662fe2d72749c06ad5f11f641a388343f77c.tar.gz
spark-2db4662fe2d72749c06ad5f11f641a388343f77c.tar.bz2
spark-2db4662fe2d72749c06ad5f11f641a388343f77c.zip
[SPARK-11989][SQL] Only use commit in JDBC data source if the underlying database supports transactions
Fixes [SPARK-11989](https://issues.apache.org/jira/browse/SPARK-11989) Author: CK50 <christian.kurz@oracle.com> Author: Christian Kurz <christian.kurz@oracle.com> Closes #9973 from CK50/branch-1.6_non-transactional. (cherry picked from commit a589736a1b237ef2f3bd59fbaeefe143ddcc8f4e) Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala22
1 files changed, 19 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 7375a5c091..252f1cfd5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import scala.util.Try
+import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType, JdbcDialects}
@@ -125,8 +126,19 @@ object JdbcUtils extends Logging {
dialect: JdbcDialect): Iterator[Byte] = {
val conn = getConnection()
var committed = false
+ val supportsTransactions = try {
+ conn.getMetaData().supportsDataManipulationTransactionsOnly() ||
+ conn.getMetaData().supportsDataDefinitionAndDataManipulationTransactions()
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Exception while detecting transaction support", e)
+ true
+ }
+
try {
- conn.setAutoCommit(false) // Everything in the same db transaction.
+ if (supportsTransactions) {
+ conn.setAutoCommit(false) // Everything in the same db transaction.
+ }
val stmt = insertStatement(conn, table, rddSchema)
try {
var rowCount = 0
@@ -175,14 +187,18 @@ object JdbcUtils extends Logging {
} finally {
stmt.close()
}
- conn.commit()
+ if (supportsTransactions) {
+ conn.commit()
+ }
committed = true
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
- conn.rollback()
+ if (supportsTransactions) {
+ conn.rollback()
+ }
conn.close()
} else {
// The stage must succeed. We cannot propagate any exception close() might throw.