From 12b7191d2075ae870c73529de450cbb5725872ec Mon Sep 17 00:00:00 2001 From: Rick Hillegas Date: Fri, 9 Oct 2015 13:36:51 -0700 Subject: [SPARK-10855] [SQL] Add a JDBC dialect for Apache Derby marmbrus rxin This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects. JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890 This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html Touches the following files: --------------------------------- org.apache.spark.sql.jdbc.JdbcDialects Adds a DerbyDialect. --------------------------------- org.apache.spark.sql.jdbc.JDBCSuite Adds unit tests for the new DerbyDialect. Author: Rick Hillegas Closes #8982 from rick-ibm/b_10855. --- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 28 ++++++++++++++++++++++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 ++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 0cd356f222..a2ff4cc1c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -138,6 +138,7 @@ object JdbcDialects { registerDialect(PostgresDialect) registerDialect(DB2Dialect) registerDialect(MsSqlServerDialect) + registerDialect(DerbyDialect) /** @@ -287,3 +288,30 @@ case object MsSqlServerDialect extends JdbcDialect { case _ => None } } + +/** + * :: DeveloperApi :: + * Default Apache Derby dialect, mapping real on read + * and string/byte/short/boolean/decimal on write. + */ +@DeveloperApi +case object DerbyDialect extends JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + if (sqlType == Types.REAL) Option(FloatType) else None + } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) + case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) + // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL + case (t: DecimalType) if (t.precision > 31) => + Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL)) + case _ => None + } + +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index bbf705ce95..d530b1a469 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -409,18 +409,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect) + assert(JdbcDialects.get("jdbc:derby:db") == DerbyDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } test("quote column names by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") + val Derby = JdbcDialects.get("jdbc:derby:db") val columns = Seq("abc", "key") val MySQLColumns = columns.map(MySQL.quoteIdentifier(_)) val PostgresColumns = columns.map(Postgres.quoteIdentifier(_)) + val DerbyColumns = columns.map(Derby.quoteIdentifier(_)) assert(MySQLColumns === Seq("`abc`", "`key`")) assert(PostgresColumns === Seq(""""abc"""", """"key"""")) + assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } test("Dialect unregister") { @@ -454,16 +458,23 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("PostgresDialect type mapping") { val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") - // SPARK-7869: Testing JSON types handling assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType)) assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType)) } + test("DerbyDialect jdbc type mapping") { + val derbyDialect = JdbcDialects.get("jdbc:derby:db") + assert(derbyDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") + assert(derbyDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT") + assert(derbyDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN") + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db") val h2 = JdbcDialects.get(url) + val derby = JdbcDialects.get("jdbc:derby:db") val table = "weblogs" val defaultQuery = s"SELECT * FROM $table WHERE 1=0" val limitQuery = s"SELECT 1 FROM $table LIMIT 1" @@ -471,5 +482,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(Postgres.getTableExistsQuery(table) == limitQuery) assert(db2.getTableExistsQuery(table) == defaultQuery) assert(h2.getTableExistsQuery(table) == defaultQuery) + assert(derby.getTableExistsQuery(table) == defaultQuery) } } -- cgit v1.2.3