aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-09-29 08:24:34 -0400
committerSean Owen <sowen@cloudera.com>2016-09-29 08:24:34 -0400
commitb35b0dbbfa3dc1bdf5e2fa1e9677d06635142b22 (patch)
treebbf407d931595d182c70f6a9888991d21a4d1c40
parentf7082ac12518ae84d6d1d4b7330a9f12cf95e7c1 (diff)
downloadspark-b35b0dbbfa3dc1bdf5e2fa1e9677d06635142b22.tar.gz
spark-b35b0dbbfa3dc1bdf5e2fa1e9677d06635142b22.tar.bz2
spark-b35b0dbbfa3dc1bdf5e2fa1e9677d06635142b22.zip
[SPARK-17614][SQL] sparkSession.read() .jdbc(***) use the sql syntax "where 1=0" that Cassandra does not support
## What changes were proposed in this pull request? Use dialect's table-exists query rather than hard-coded WHERE 1=0 query ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #15196 from srowen/SPARK-17614.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala15
2 files changed, 16 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index a7da29f925..f10615ebe4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -58,11 +58,11 @@ object JDBCRDD extends Logging {
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(url, properties)()
try {
- val statement = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0")
+ val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
try {
val rs = statement.executeQuery()
try {
- return JdbcUtils.getSchema(rs, dialect)
+ JdbcUtils.getSchema(rs, dialect)
} finally {
rs.close()
}
@@ -72,8 +72,6 @@ object JDBCRDD extends Logging {
} finally {
conn.close()
}
-
- throw new RuntimeException("This line is unreachable.")
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 3a6d5b7f1c..8dd4b8f662 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc
import java.sql.Connection
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.types._
/**
@@ -100,6 +100,19 @@ abstract class JdbcDialect extends Serializable {
}
/**
+ * The SQL query that should be used to discover the schema of a table. It only needs to
+ * ensure that the result set has the same schema as the table, such as by calling
+ * "SELECT * ...". Dialects can override this method to return a query that works best in a
+ * particular database.
+ * @param table The name of the table.
+ * @return The SQL query to use for discovering the schema.
+ */
+ @Since("2.1.0")
+ def getSchemaQuery(table: String): String = {
+ s"SELECT * FROM $table WHERE 1=0"
+ }
+
+ /**
* Override connection specific properties to run before a select is made. This is in place to
* allow dialects that need special treatment to optimize behavior.
* @param connection The connection object