aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorVenkata Ramana Gollamudi <ramana.gollamudi@huawei.com>2015-05-13 17:24:04 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-13 17:24:04 -0700
commit59aaa1dad6bee06e38ee5c03bdf82354242286ee (patch)
treeb444bbfce211aaa0fb6b3bd99be41cd3dbdb3a3a /sql/core
parent73bed408fbb47dfc28063afa3898c27fbdec7735 (diff)
downloadspark-59aaa1dad6bee06e38ee5c03bdf82354242286ee.tar.gz
spark-59aaa1dad6bee06e38ee5c03bdf82354242286ee.tar.bz2
spark-59aaa1dad6bee06e38ee5c03bdf82354242286ee.zip
[SPARK-7601] [SQL] Support Insert into JDBC Datasource
Supported InsertableRelation for JDBC Datasource JDBCRelation. Example usage: sqlContext.sql( s""" |CREATE TEMPORARY TABLE testram1 |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url', dbtable 'testram1', user 'xx', password 'xx', driver 'com.h2.Driver') """.stripMargin.replaceAll("\n", " ")) sqlContext.sql("insert into table testram1 select * from testsrc") sqlContext.sql("insert overwrite table testram1 select * from testsrc") Author: Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com> Closes #6121 from gvramana/JDBCDatasource_insert and squashes the following commits: f3fb5f1 [Venkata Ramana Gollamudi] Support for JDBC Datasource InsertableRelation
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala37
2 files changed, 43 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index d6b3fb3291..93e82549f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.sources._
@@ -129,7 +130,8 @@ private[sql] case class JDBCRelation(
parts: Array[Partition],
properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
extends BaseRelation
- with PrunedFilteredScan {
+ with PrunedFilteredScan
+ with InsertableRelation {
override val needConversion: Boolean = false
@@ -148,4 +150,8 @@ private[sql] case class JDBCRelation(
filters,
parts)
}
+
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ data.insertIntoJDBC(url, table, overwrite, properties)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index f3ce8e6646..0800eded44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
conn1 = DriverManager.getConnection(url1, properties)
conn1.prepareStatement("create schema test").executeUpdate()
+ conn1.prepareStatement("drop table if exists test.people").executeUpdate()
+ conn1.prepareStatement(
+ "create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
+ conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
+ conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
+ conn1.prepareStatement("drop table if exists test.people1").executeUpdate()
+ conn1.prepareStatement(
+ "create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
+ conn1.commit()
+
+ TestSQLContext.sql(
+ s"""
+ |CREATE TEMPORARY TABLE PEOPLE
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
+ """.stripMargin.replaceAll("\n", " "))
+
+ TestSQLContext.sql(
+ s"""
+ |CREATE TEMPORARY TABLE PEOPLE1
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass')
+ """.stripMargin.replaceAll("\n", " "))
}
after {
@@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
}
}
-
+
+ test("INSERT to JDBC Datasource") {
+ TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
+ assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
+ assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+ }
+
+ test("INSERT to JDBC Datasource with overwrite") {
+ TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
+ TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
+ assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
+ assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+ }
}