aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala10
3 files changed, 38 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index cd83836178..fe34d597db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -165,6 +165,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+ assertNoSpecifiedSchema("jdbc")
// properties should override settings in extraOptions.
this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all
@@ -235,6 +236,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
+ assertNoSpecifiedSchema("jdbc")
// connectionProperties should override settings in extraOptions.
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params)
@@ -475,6 +477,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
+ assertNoSpecifiedSchema("table")
sparkSession.table(tableName)
}
@@ -540,10 +543,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
@scala.annotation.varargs
def textFile(paths: String*): Dataset[String] = {
+ assertNoSpecifiedSchema("textFile")
+ text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
+ }
+
+ /**
+ * A convenient function for schema validation in APIs.
+ */
+ private def assertNoSpecifiedSchema(operation: String): Unit = {
if (userSpecifiedSchema.nonEmpty) {
- throw new AnalysisException("User specified schema not supported with `textFile`")
+ throw new AnalysisException(s"User specified schema not supported with `$operation`")
}
- text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 74ca66b103..039625421e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
@@ -900,4 +900,19 @@ class JDBCSuite extends SparkFunSuite
assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
}
+
+ test("SPARK-16848: jdbc API throws an exception for user specified schema") {
+ val schema = StructType(Seq(
+ StructField("name", StringType, false), StructField("theid", IntegerType, false)))
+ val parts = Array[String]("THEID < 2", "THEID >= 2")
+ val e1 = intercept[AnalysisException] {
+ spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
+ }.getMessage
+ assert(e1.contains("User specified schema not supported with `jdbc`"))
+
+ val e2 = intercept[AnalysisException] {
+ spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
+ }.getMessage
+ assert(e2.contains("User specified schema not supported with `jdbc`"))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 4bec2e3fdb..8a8ba05534 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -635,4 +635,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
}
}
+
+ test("SPARK-16848: table API throws an exception for user specified schema") {
+ withTable("t") {
+ val schema = StructType(StructField("a", StringType) :: Nil)
+ val e = intercept[AnalysisException] {
+ spark.read.schema(schema).table("t")
+ }.getMessage
+ assert(e.contains("User specified schema not supported with `table`"))
+ }
+ }
}