aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
index dd8aaf6474..f7ea852fe7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
@@ -58,13 +58,12 @@ package object jdbc {
* are used.
*/
def savePartition(
- url: String,
+ getConnection: () => Connection,
table: String,
iterator: Iterator[Row],
rddSchema: StructType,
- nullTypes: Array[Int],
- properties: Properties): Iterator[Byte] = {
- val conn = DriverManager.getConnection(url, properties)
+ nullTypes: Array[Int]): Iterator[Byte] = {
+ val conn = getConnection()
var committed = false
try {
conn.setAutoCommit(false) // Everything in the same db transaction.
@@ -185,8 +184,10 @@ package object jdbc {
}
val rddSchema = df.schema
+ val driver: String = DriverRegistry.getDriverClassName(url)
+ val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
df.foreachPartition { iterator =>
- JDBCWriteDetails.savePartition(url, table, iterator, rddSchema, nullTypes, properties)
+ JDBCWriteDetails.savePartition(getConnection, table, iterator, rddSchema, nullTypes)
}
}