aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-06-07 10:52:02 -0700
committerReynold Xin <rxin@databricks.com>2015-06-07 10:52:02 -0700
commitd6d601a07b17069d41eb4114bd5f7ab2c106720d (patch)
tree2e6d963709121ae86a69a2387afbfdd8d6602b68 /sql
parent835f1380d95a345208b682492f0735155e61a824 (diff)
downloadspark-d6d601a07b17069d41eb4114bd5f7ab2c106720d.tar.gz
spark-d6d601a07b17069d41eb4114bd5f7ab2c106720d.tar.bz2
spark-d6d601a07b17069d41eb4114bd5f7ab2c106720d.zip
[SPARK-8004][SQL] Quote identifier in JDBC data source.
This is a follow-up patch to #6577 to replace columnEnclosing to quoteIdentifier. I also did some minor cleanup to the JdbcDialect file. Author: Reynold Xin <rxin@databricks.com> Closes #6689 from rxin/jdbc-quote and squashes the following commits: bad365f [Reynold Xin] Fixed test compilation... e39e14e [Reynold Xin] Fixed compilation. db9a8e0 [Reynold Xin] [SPARK-8004][SQL] Quote identifier in JDBC data source.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala6
3 files changed, 22 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 2930f7bb4c..db68b9c86d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -212,13 +212,13 @@ private[sql] object JDBCRDD extends Logging {
filters: Array[Filter],
parts: Array[Partition]): RDD[Row] = {
val dialect = JdbcDialects.get(url)
- val enclosedColumns = requiredColumns.map(dialect.columnEnclosing(_))
+ val quotedColumns = requiredColumns.map(colName => dialect.quoteIdentifier(colName))
new JDBCRDD(
sc,
getConnector(driver, url, properties),
pruneSchema(schema, requiredColumns),
fqTable,
- enclosedColumns,
+ quotedColumns,
filters,
parts,
properties)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 04052f80f5..8849fc2f1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.jdbc
+import java.sql.Types
+
import org.apache.spark.sql.types._
import org.apache.spark.annotation.DeveloperApi
-import java.sql.Types
-
/**
* :: DeveloperApi ::
* A database type definition coupled with the jdbc type needed to send null
@@ -82,11 +82,10 @@ abstract class JdbcDialect {
def getJDBCType(dt: DataType): Option[JdbcType] = None
/**
- * Enclose column name
- * @param colName The coulmn name
- * @return Enclosed column name
+ * Quotes the identifier. This is used to put quotes around the identifier in case the column
+ * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
*/
- def columnEnclosing(colName: String): String = {
+ def quoteIdentifier(colName: String): String = {
s""""$colName""""
}
}
@@ -150,18 +149,19 @@ object JdbcDialects {
@DeveloperApi
class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
- require(!dialects.isEmpty)
+ require(dialects.nonEmpty)
- def canHandle(url : String): Boolean =
+ override def canHandle(url : String): Boolean =
dialects.map(_.canHandle(url)).reduce(_ && _)
override def getCatalystType(
- sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] =
- dialects.map(_.getCatalystType(sqlType, typeName, size, md)).flatten.headOption
-
- override def getJDBCType(dt: DataType): Option[JdbcType] =
- dialects.map(_.getJDBCType(dt)).flatten.headOption
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
+ dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
+ }
+ override def getJDBCType(dt: DataType): Option[JdbcType] = {
+ dialects.flatMap(_.getJDBCType(dt)).headOption
+ }
}
/**
@@ -170,7 +170,7 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
*/
@DeveloperApi
case object NoopDialect extends JdbcDialect {
- def canHandle(url : String): Boolean = true
+ override def canHandle(url : String): Boolean = true
}
/**
@@ -179,7 +179,7 @@ case object NoopDialect extends JdbcDialect {
*/
@DeveloperApi
case object PostgresDialect extends JdbcDialect {
- def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
@@ -205,7 +205,7 @@ case object PostgresDialect extends JdbcDialect {
*/
@DeveloperApi
case object MySQLDialect extends JdbcDialect {
- def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
+ override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
@@ -218,7 +218,7 @@ case object MySQLDialect extends JdbcDialect {
} else None
}
- override def columnEnclosing(colName: String): String = {
+ override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index a228543953..49d348c3ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -410,13 +410,13 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
assert(JdbcDialects.get("test.invalid") == NoopDialect)
}
- test("Enclosing column names by jdbc dialect") {
+ test("quote column names by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val columns = Seq("abc", "key")
- val MySQLColumns = columns.map(MySQL.columnEnclosing(_))
- val PostgresColumns = columns.map(Postgres.columnEnclosing(_))
+ val MySQLColumns = columns.map(MySQL.quoteIdentifier(_))
+ val PostgresColumns = columns.map(Postgres.quoteIdentifier(_))
assert(MySQLColumns === Seq("`abc`", "`key`"))
assert(PostgresColumns === Seq(""""abc"""", """"key""""))
}