diff options
3 files changed, 38 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index cd83836178..fe34d597db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -165,6 +165,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { + assertNoSpecifiedSchema("jdbc") // properties should override settings in extraOptions. this.extraOptions ++= properties.asScala // explicit url and dbtable should override all @@ -235,6 +236,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { table: String, predicates: Array[String], connectionProperties: Properties): DataFrame = { + assertNoSpecifiedSchema("jdbc") // connectionProperties should override settings in extraOptions. val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val options = new JDBCOptions(url, table, params) @@ -475,6 +477,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { + assertNoSpecifiedSchema("table") sparkSession.table(tableName) } @@ -540,10 +543,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def textFile(paths: String*): Dataset[String] = { + assertNoSpecifiedSchema("textFile") + text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) + } + + /** + * A convenient function for schema validation in APIs. + */ + private def assertNoSpecifiedSchema(operation: String): Unit = { if (userSpecifiedSchema.nonEmpty) { - throw new AnalysisException("User specified schema not supported with `textFile`") + throw new AnalysisException(s"User specified schema not supported with `$operation`") } - text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) } /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 74ca66b103..039625421e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand @@ -900,4 +900,19 @@ class JDBCSuite extends SparkFunSuite assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty) assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty) } + + test("SPARK-16848: jdbc API throws an exception for user specified schema") { + val schema = StructType(Seq( + StructField("name", StringType, false), StructField("theid", IntegerType, false))) + val parts = Array[String]("THEID < 2", "THEID >= 2") + val e1 = intercept[AnalysisException] { + spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) + }.getMessage + assert(e1.contains("User specified schema not supported with `jdbc`")) + + val e2 = intercept[AnalysisException] { + spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) + }.getMessage + assert(e2.contains("User specified schema not supported with `jdbc`")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 4bec2e3fdb..8a8ba05534 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -635,4 +635,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) } } + + test("SPARK-16848: table API throws an exception for user specified schema") { + withTable("t") { + val schema = StructType(StructField("a", StringType) :: Nil) + val e = intercept[AnalysisException] { + spark.read.schema(schema).table("t") + }.getMessage + assert(e.contains("User specified schema not supported with `table`")) + } + } } |