aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-06-06 22:59:31 -0700
committerReynold Xin <rxin@databricks.com>2015-06-06 22:59:31 -0700
commit901a552c5e973262fddbf70ee2d4078c948bc668 (patch)
tree95b5ef167d6df669e57eaee7209d093274990b59
parent3285a51121397bfd2e62dbee8e1f0fa7c72512a7 (diff)
downloadspark-901a552c5e973262fddbf70ee2d4078c948bc668.tar.gz
spark-901a552c5e973262fddbf70ee2d4078c948bc668.tar.bz2
spark-901a552c5e973262fddbf70ee2d4078c948bc668.zip
[SPARK-8004][SQL] Enclose column names by JDBC Dialect
JIRA: https://issues.apache.org/jira/browse/SPARK-8004 Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #6577 from viirya/enclose_jdbc_columns and squashes the following commits: 614606a [Liang-Chi Hsieh] For comment. bc50182 [Liang-Chi Hsieh] Enclose column names by JDBC Dialect.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala11
3 files changed, 27 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 40b604d710..2930f7bb4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -211,12 +211,14 @@ private[sql] object JDBCRDD extends Logging {
requiredColumns: Array[String],
filters: Array[Filter],
parts: Array[Partition]): RDD[Row] = {
+ val dialect = JdbcDialects.get(url)
+ val enclosedColumns = requiredColumns.map(dialect.columnEnclosing(_))
new JDBCRDD(
sc,
getConnector(driver, url, properties),
pruneSchema(schema, requiredColumns),
fqTable,
- requiredColumns,
+ enclosedColumns,
filters,
parts,
properties)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 6a169e106b..04052f80f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -80,6 +80,15 @@ abstract class JdbcDialect {
* @return The new JdbcType if there is an override for this DataType
*/
def getJDBCType(dt: DataType): Option[JdbcType] = None
+
+ /**
+ * Enclose column name
+ * @param colName The coulmn name
+ * @return Enclosed column name
+ */
+ def columnEnclosing(colName: String): String = {
+ s""""$colName""""
+ }
}
/**
@@ -208,4 +217,8 @@ case object MySQLDialect extends JdbcDialect {
Some(BooleanType)
} else None
}
+
+ override def columnEnclosing(colName: String): String = {
+ s"`$colName`"
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 7931854db2..a228543953 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -410,6 +410,17 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
assert(JdbcDialects.get("test.invalid") == NoopDialect)
}
+ test("Enclosing column names by jdbc dialect") {
+ val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
+ val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
+
+ val columns = Seq("abc", "key")
+ val MySQLColumns = columns.map(MySQL.columnEnclosing(_))
+ val PostgresColumns = columns.map(Postgres.columnEnclosing(_))
+ assert(MySQLColumns === Seq("`abc`", "`key`"))
+ assert(PostgresColumns === Seq(""""abc"""", """"key""""))
+ }
+
test("Dialect unregister") {
JdbcDialects.registerDialect(testH2Dialect)
JdbcDialects.unregisterDialect(testH2Dialect)