aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorRick Hillegas <rhilleg@us.ibm.com>2015-10-09 13:36:51 -0700
committerReynold Xin <rxin@databricks.com>2015-10-09 13:36:51 -0700
commit12b7191d2075ae870c73529de450cbb5725872ec (patch)
tree289022d110b9d0546794f6b97c173b9a93752845 /sql
parent015f7ef503d5544f79512b6333326749a1f0c48b (diff)
downloadspark-12b7191d2075ae870c73529de450cbb5725872ec.tar.gz
spark-12b7191d2075ae870c73529de450cbb5725872ec.tar.bz2
spark-12b7191d2075ae870c73529de450cbb5725872ec.zip
[SPARK-10855] [SQL] Add a JDBC dialect for Apache Derby
marmbrus rxin This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects. JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890 This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html Touches the following files: --------------------------------- org.apache.spark.sql.jdbc.JdbcDialects Adds a DerbyDialect. --------------------------------- org.apache.spark.sql.jdbc.JDBCSuite Adds unit tests for the new DerbyDialect. Author: Rick Hillegas <rhilleg@us.ibm.com> Closes #8982 from rick-ibm/b_10855.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala14
2 files changed, 41 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 0cd356f222..a2ff4cc1c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -138,6 +138,7 @@ object JdbcDialects {
registerDialect(PostgresDialect)
registerDialect(DB2Dialect)
registerDialect(MsSqlServerDialect)
+ registerDialect(DerbyDialect)
/**
@@ -287,3 +288,30 @@ case object MsSqlServerDialect extends JdbcDialect {
case _ => None
}
}
+
+/**
+ * :: DeveloperApi ::
+ * Default Apache Derby dialect, mapping real on read
+ * and string/byte/short/boolean/decimal on write.
+ */
+@DeveloperApi
+case object DerbyDialect extends JdbcDialect {
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ if (sqlType == Types.REAL) Option(FloatType) else None
+ }
+
+ override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+ case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
+ case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+ case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+ case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
+ // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
+ case (t: DecimalType) if (t.precision > 31) =>
+ Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
+ case _ => None
+ }
+
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index bbf705ce95..d530b1a469 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -409,18 +409,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect)
assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect)
+ assert(JdbcDialects.get("jdbc:derby:db") == DerbyDialect)
assert(JdbcDialects.get("test.invalid") == NoopDialect)
}
test("quote column names by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
+ val Derby = JdbcDialects.get("jdbc:derby:db")
val columns = Seq("abc", "key")
val MySQLColumns = columns.map(MySQL.quoteIdentifier(_))
val PostgresColumns = columns.map(Postgres.quoteIdentifier(_))
+ val DerbyColumns = columns.map(Derby.quoteIdentifier(_))
assert(MySQLColumns === Seq("`abc`", "`key`"))
assert(PostgresColumns === Seq(""""abc"""", """"key""""))
+ assert(DerbyColumns === Seq(""""abc"""", """"key""""))
}
test("Dialect unregister") {
@@ -454,16 +458,23 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
test("PostgresDialect type mapping") {
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
- // SPARK-7869: Testing JSON types handling
assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType))
assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType))
}
+ test("DerbyDialect jdbc type mapping") {
+ val derbyDialect = JdbcDialects.get("jdbc:derby:db")
+ assert(derbyDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB")
+ assert(derbyDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT")
+ assert(derbyDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN")
+ }
+
test("table exists query by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
val h2 = JdbcDialects.get(url)
+ val derby = JdbcDialects.get("jdbc:derby:db")
val table = "weblogs"
val defaultQuery = s"SELECT * FROM $table WHERE 1=0"
val limitQuery = s"SELECT 1 FROM $table LIMIT 1"
@@ -471,5 +482,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(Postgres.getTableExistsQuery(table) == limitQuery)
assert(db2.getTableExistsQuery(table) == defaultQuery)
assert(h2.getTableExistsQuery(table) == defaultQuery)
+ assert(derby.getTableExistsQuery(table) == defaultQuery)
}
}