aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/sql-programming-guide.md7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala12
4 files changed, 33 insertions, 1 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index ba3e55fc06..656e7ecdab 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1087,6 +1087,13 @@ the following case-sensitive options:
</tr>
<tr>
+ <td><code>maxConnections</code></td>
+ <td>
+ The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
+ </td>
+ </tr>
+
+ <tr>
<td><code>isolationLevel</code></td>
<td>
The transaction isolation level, which applies to current connection. It can be one of <code>NONE<code>, <code>READ_COMMITTED<code>, <code>READ_UNCOMMITTED<code>, <code>REPEATABLE_READ<code>, or <code>SERIALIZABLE<code>, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of <code>READ_UNCOMMITTED<code>. This option applies only to writing. Please refer the documentation in <code>java.sql.Connection</code>.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 7f419b5788..d416eec6dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -122,6 +122,11 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
+ // the maximum number of connections
+ val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
+ require(maxConnections.isEmpty || maxConnections.get > 0,
+ s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
+ "The minimum value is 1.")
}
object JDBCOptions {
@@ -144,4 +149,5 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
+ val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 41edb6511c..cdc3c99daa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val isolationLevel = options.isolationLevel
- df.foreachPartition(iterator => savePartition(
+ val maxConnections = options.maxConnections
+ val repartitionedDF =
+ if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
+ df.coalesce(maxConnections.get)
+ } else {
+ df
+ }
+ repartitionedDF.foreachPartition(iterator => savePartition(
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index e3d3c6c3a8..5795b4d860 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -312,4 +312,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
.options(properties.asScala)
.save()
}
+
+ test("SPARK-18413: Add `maxConnections` JDBCOption") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ val e = intercept[IllegalArgumentException] {
+ df.write.format("jdbc")
+ .option("dbtable", "TEST.SAVETEST")
+ .option("url", url1)
+ .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
+ .save()
+ }.getMessage
+ assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
+ }
}