diff options
author | Rick Hillegas <rhilleg@us.ibm.com> | 2015-10-09 13:36:51 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-09 13:36:51 -0700 |
commit | 12b7191d2075ae870c73529de450cbb5725872ec (patch) | |
tree | 289022d110b9d0546794f6b97c173b9a93752845 /sql | |
parent | 015f7ef503d5544f79512b6333326749a1f0c48b (diff) | |
download | spark-12b7191d2075ae870c73529de450cbb5725872ec.tar.gz spark-12b7191d2075ae870c73529de450cbb5725872ec.tar.bz2 spark-12b7191d2075ae870c73529de450cbb5725872ec.zip |
[SPARK-10855] [SQL] Add a JDBC dialect for Apache Derby
marmbrus
rxin
This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects.
JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890
This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html
Touches the following files:
---------------------------------
org.apache.spark.sql.jdbc.JdbcDialects
Adds a DerbyDialect.
---------------------------------
org.apache.spark.sql.jdbc.JDBCSuite
Adds unit tests for the new DerbyDialect.
Author: Rick Hillegas <rhilleg@us.ibm.com>
Closes #8982 from rick-ibm/b_10855.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 28 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 |
2 files changed, 41 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 0cd356f222..a2ff4cc1c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -138,6 +138,7 @@ object JdbcDialects { registerDialect(PostgresDialect) registerDialect(DB2Dialect) registerDialect(MsSqlServerDialect) + registerDialect(DerbyDialect) /** @@ -287,3 +288,30 @@ case object MsSqlServerDialect extends JdbcDialect { case _ => None } } + +/** + * :: DeveloperApi :: + * Default Apache Derby dialect, mapping real on read + * and string/byte/short/boolean/decimal on write. + */ +@DeveloperApi +case object DerbyDialect extends JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + if (sqlType == Types.REAL) Option(FloatType) else None + } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) + case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) + // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL + case (t: DecimalType) if (t.precision > 31) => + Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL)) + case _ => None + } + +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index bbf705ce95..d530b1a469 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -409,18 +409,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect) + assert(JdbcDialects.get("jdbc:derby:db") == DerbyDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } test("quote column names by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") + val Derby = JdbcDialects.get("jdbc:derby:db") val columns = Seq("abc", "key") val MySQLColumns = columns.map(MySQL.quoteIdentifier(_)) val PostgresColumns = columns.map(Postgres.quoteIdentifier(_)) + val DerbyColumns = columns.map(Derby.quoteIdentifier(_)) assert(MySQLColumns === Seq("`abc`", "`key`")) assert(PostgresColumns === Seq(""""abc"""", """"key"""")) + assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } test("Dialect unregister") { @@ -454,16 +458,23 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("PostgresDialect type mapping") { val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") - // SPARK-7869: Testing JSON types handling assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType)) assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType)) } + test("DerbyDialect jdbc type mapping") { + val derbyDialect = JdbcDialects.get("jdbc:derby:db") + assert(derbyDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") + assert(derbyDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT") + assert(derbyDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN") + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db") val h2 = JdbcDialects.get(url) + val derby = JdbcDialects.get("jdbc:derby:db") val table = "weblogs" val defaultQuery = s"SELECT * FROM $table WHERE 1=0" val limitQuery = s"SELECT 1 FROM $table LIMIT 1" @@ -471,5 +482,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(Postgres.getTableExistsQuery(table) == limitQuery) assert(db2.getTableExistsQuery(table) == defaultQuery) assert(h2.getTableExistsQuery(table) == defaultQuery) + assert(derby.getTableExistsQuery(table) == defaultQuery) } } |