aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-17 11:53:33 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-17 11:53:33 -0700
commitf10660fe7b809be2059da4f9781a5743f117a35a (patch)
tree41808b7b86436ccc65d41df2425d758bf57ab807
parenta4acdabb103f6d04603163c9555c1ddc413c3b80 (diff)
downloadspark-f10660fe7b809be2059da4f9781a5743f117a35a.tar.gz
spark-f10660fe7b809be2059da4f9781a5743f117a35a.tar.bz2
spark-f10660fe7b809be2059da4f9781a5743f117a35a.zip
[SPARK-10036] [SQL] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc
This PR uses `JDBCRDD.getConnector` to load JDBC driver before creating connection in `DataFrameReader.jdbc` and `DataFrameWriter.jdbc`. Author: zsxwing <zsxwing@gmail.com> Closes #8232 from zsxwing/SPARK-10036 and squashes the following commits: adf75de [zsxwing] Add extraOptions to the connection properties 57f59d4 [zsxwing] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala4
4 files changed, 20 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9ea955b010..6dc7bfe333 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -197,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame = {
- val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
+ val props = new Properties()
+ extraOptions.foreach { case (key, value) =>
+ props.put(key, value)
+ }
+ // connectionProperties should override settings in extraOptions
+ props.putAll(connectionProperties)
+ val relation = JDBCRelation(url, table, parts, props)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5fa11da4c3..f0bf1be506 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -244,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* should be included.
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
- val conn = JdbcUtils.createConnection(url, connectionProperties)
+ val props = new Properties()
+ extraOptions.foreach { case (key, value) =>
+ props.put(key, value)
+ }
+ // connectionProperties should override settings in extraOptions
+ props.putAll(connectionProperties)
+ val conn = JdbcUtils.createConnection(url, props)
try {
var tableExists = JdbcUtils.tableExists(conn, table)
@@ -272,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
conn.close()
}
- JdbcUtils.saveTable(df, url, table, connectionProperties)
+ JdbcUtils.saveTable(df, url, table, props)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8eab6a0adc..e537d631f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging {
*/
def resolveTable(url: String, table: String, properties: Properties): StructType = {
val dialect = JdbcDialects.get(url)
- val conn: Connection = DriverManager.getConnection(url, properties)
+ val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
try {
val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
try {
@@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging {
* getConnector is run on the driver code, while the function it returns
* is run on the executor.
*
- * @param driver - The class name of the JDBC driver for the given url.
+ * @param driver - The class name of the JDBC driver for the given url, or null if the class name
+ * is not necessary.
* @param url - The JDBC url to connect to.
*
* @return A function that loads the driver and connects to the url.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 8ee3b8bda8..2d0e736ee4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.jdbc
-import java.sql.{Connection, DriverManager, PreparedStatement}
+import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import scala.util.Try
@@ -36,7 +36,7 @@ object JdbcUtils extends Logging {
* Establishes a JDBC connection.
*/
def createConnection(url: String, connectionProperties: Properties): Connection = {
- DriverManager.getConnection(url, connectionProperties)
+ JDBCRDD.getConnector(connectionProperties.getProperty("driver"), url, connectionProperties)()
}
/**