aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-11-21 13:57:36 +0000
committerSean Owen <sowen@cloudera.com>2016-11-21 13:57:36 +0000
commit07beb5d21c6803e80733149f1560c71cd3cacc86 (patch)
tree2ac23532eb040d56ac1ba86aaabea492aac42a53
parent9f262ae163b6dca6526665b3ad12b3b2ea8fb873 (diff)
downloadspark-07beb5d21c6803e80733149f1560c71cd3cacc86.tar.gz
spark-07beb5d21c6803e80733149f1560c71cd3cacc86.tar.bz2
spark-07beb5d21c6803e80733149f1560c71cd3cacc86.zip
[SPARK-18413][SQL] Add `maxConnections` JDBCOption
## What changes were proposed in this pull request? This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API. **Reported Scenario** For the following cases, the number of connections becomes 200 and database cannot handle all of them. ```sql CREATE OR REPLACE TEMPORARY VIEW resultview USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:oracle:thin:10.129.10.111:1521:BKDB", dbtable "result", user "HIVE", password "HIVE" ); -- set spark.sql.shuffle.partitions=200 INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g ``` ## How was this patch tested? Manual. Do the followings and see Spark UI. **Step 1 (MySQL)** ``` CREATE TABLE t1 (a INT); CREATE TABLE data (a INT); INSERT INTO data VALUES (1); INSERT INTO data VALUES (2); INSERT INTO data VALUES (3); ``` **Step 2 (Spark)** ```scala SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar scala> sql("SET spark.sql.shuffle.partitions=3") scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") ``` ![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15868 from dongjoon-hyun/SPARK-18413.
-rw-r--r--docs/sql-programming-guide.md7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala12
4 files changed, 33 insertions, 1 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index ba3e55fc06..656e7ecdab 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1087,6 +1087,13 @@ the following case-sensitive options:
</tr>
<tr>
+ <td><code>maxConnections</code></td>
+ <td>
+ The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
+ </td>
+ </tr>
+
+ <tr>
<td><code>isolationLevel</code></td>
<td>
The transaction isolation level, which applies to current connection. It can be one of <code>NONE<code>, <code>READ_COMMITTED<code>, <code>READ_UNCOMMITTED<code>, <code>REPEATABLE_READ<code>, or <code>SERIALIZABLE<code>, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of <code>READ_UNCOMMITTED<code>. This option applies only to writing. Please refer the documentation in <code>java.sql.Connection</code>.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 7f419b5788..d416eec6dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -122,6 +122,11 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
+ // the maximum number of connections
+ val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
+ require(maxConnections.isEmpty || maxConnections.get > 0,
+ s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
+ "The minimum value is 1.")
}
object JDBCOptions {
@@ -144,4 +149,5 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
+ val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 41edb6511c..cdc3c99daa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val isolationLevel = options.isolationLevel
- df.foreachPartition(iterator => savePartition(
+ val maxConnections = options.maxConnections
+ val repartitionedDF =
+ if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
+ df.coalesce(maxConnections.get)
+ } else {
+ df
+ }
+ repartitionedDF.foreachPartition(iterator => savePartition(
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index e3d3c6c3a8..5795b4d860 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -312,4 +312,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
.options(properties.asScala)
.save()
}
+
+ test("SPARK-18413: Add `maxConnections` JDBCOption") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ val e = intercept[IllegalArgumentException] {
+ df.write.format("jdbc")
+ .option("dbtable", "TEST.SAVETEST")
+ .option("url", url1)
+ .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
+ .save()
+ }.getMessage
+ assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
+ }
}