From a2d5c72091b1c602694dbca823a7b26f86b02864 Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Mon, 31 Aug 2015 12:39:58 -0700 Subject: [SPARK-10170] [SQL] Add DB2 JDBC dialect support. Data frame write to DB2 database is failing because by default JDBC data source implementation is generating a table schema with DB2 unsupported data types TEXT for String, and BIT1(1) for Boolean. This patch registers DB2 JDBC Dialect that maps String, Boolean to valid DB2 data types. Author: sureshthalamati Closes #8393 from sureshthalamati/db2_dialect_spark-10170. --- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 18 ++++++++++++++++++ .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +++++++ 2 files changed, 25 insertions(+) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 8849fc2f1f..c6d05c9b83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -125,6 +125,7 @@ object JdbcDialects { registerDialect(MySQLDialect) registerDialect(PostgresDialect) + registerDialect(DB2Dialect) /** * Fetch the JdbcDialect class corresponding to a given database url. @@ -222,3 +223,20 @@ case object MySQLDialect extends JdbcDialect { s"`$colName`" } } + +/** + * :: DeveloperApi :: + * Default DB2 dialect, mapping string/boolean on write to valid DB2 types. + * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1). + */ +@DeveloperApi +case object DB2Dialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2") + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) + case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) + case _ => None + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edac0848c..d8c9a08d84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -407,6 +407,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) + assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } @@ -443,4 +444,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) } + + test("DB2Dialect type mapping") { + val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db") + assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") + assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") + } } -- cgit v1.2.3