From 07beb5d21c6803e80733149f1560c71cd3cacc86 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 21 Nov 2016 13:57:36 +0000 Subject: [SPARK-18413][SQL] Add `maxConnections` JDBCOption ## What changes were proposed in this pull request? This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API. **Reported Scenario** For the following cases, the number of connections becomes 200 and database cannot handle all of them. ```sql CREATE OR REPLACE TEMPORARY VIEW resultview USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:oracle:thin:10.129.10.111:1521:BKDB", dbtable "result", user "HIVE", password "HIVE" ); -- set spark.sql.shuffle.partitions=200 INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g ``` ## How was this patch tested? Manual. Do the followings and see Spark UI. **Step 1 (MySQL)** ``` CREATE TABLE t1 (a INT); CREATE TABLE data (a INT); INSERT INTO data VALUES (1); INSERT INTO data VALUES (2); INSERT INTO data VALUES (3); ``` **Step 2 (Spark)** ```scala SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar scala> sql("SET spark.sql.shuffle.partitions=3") scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") ``` ![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png) Author: Dongjoon Hyun Closes #15868 from dongjoon-hyun/SPARK-18413. --- docs/sql-programming-guide.md | 7 +++++++ .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 6 ++++++ .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 9 ++++++++- .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 12 ++++++++++++ 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ba3e55fc06..656e7ecdab 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1086,6 +1086,13 @@ the following case-sensitive options: + + maxConnections + + The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. + + + isolationLevel diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 7f419b5788..d416eec6dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -122,6 +122,11 @@ class JDBCOptions( case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } + // the maximum number of connections + val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt) + require(maxConnections.isEmpty || maxConnections.get > 0, + s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " + + "The minimum value is 1.") } object JDBCOptions { @@ -144,4 +149,5 @@ object JDBCOptions { val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") + val JDBC_MAX_CONNECTIONS = newOption("maxConnections") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 41edb6511c..cdc3c99daa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel - df.foreachPartition(iterator => savePartition( + val maxConnections = options.maxConnections + val repartitionedDF = + if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) { + df.coalesce(maxConnections.get) + } else { + df + } + repartitionedDF.foreachPartition(iterator => savePartition( getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index e3d3c6c3a8..5795b4d860 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -312,4 +312,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { .options(properties.asScala) .save() } + + test("SPARK-18413: Add `maxConnections` JDBCOption") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val e = intercept[IllegalArgumentException] { + df.write.format("jdbc") + .option("dbtable", "TEST.SAVETEST") + .option("url", url1) + .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0") + .save() + }.getMessage + assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1")) + } } -- cgit v1.2.3