aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala64
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala33
4 files changed, 102 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 413976a7ef..32067011c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -333,8 +333,13 @@ case class DataSource(
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
- case (_: RelationProvider, Some(_)) =>
- throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ case (dataSource: RelationProvider, Some(schema)) =>
+ val baseRelation =
+ dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
+ if (baseRelation.schema != schema) {
+ throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ }
+ baseRelation
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 6454d716ec..5eb54643f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
)
}
+ test("insert into a temp view that does not point to an insertable data source") {
+ import testImplicits._
+ withTempView("t1", "t2") {
+ sql(
+ """
+ |CREATE TEMPORARY VIEW t1
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10')
+ """.stripMargin)
+ sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2")
+
+ val message = intercept[AnalysisException] {
+ sql("INSERT INTO TABLE t1 SELECT a FROM t2")
+ }.getMessage
+ assert(message.contains("does not allow insertion"))
+ }
+ }
+
test("PreInsert casting and renaming") {
sql(
s"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index e8fed039fa..86bcb4d4b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
test("exceptions") {
// Make sure we do throw correct exception when users use a relation provider that
// only implements the RelationProvider or the SchemaRelationProvider.
- val schemaNotAllowed = intercept[Exception] {
- sql(
- """
- |CREATE TEMPORARY VIEW relationProvierWithSchema (i int)
- |USING org.apache.spark.sql.sources.SimpleScanSource
- |OPTIONS (
- | From '1',
- | To '10'
- |)
- """.stripMargin)
+ Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
+ val schemaNotAllowed = intercept[Exception] {
+ sql(
+ s"""
+ |CREATE $tableType relationProvierWithSchema (i int)
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
+
+ val schemaNeeded = intercept[Exception] {
+ sql(
+ s"""
+ |CREATE $tableType schemaRelationProvierWithoutSchema
+ |USING org.apache.spark.sql.sources.AllDataTypesScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
}
- assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
+ }
- val schemaNeeded = intercept[Exception] {
- sql(
- """
- |CREATE TEMPORARY VIEW schemaRelationProvierWithoutSchema
- |USING org.apache.spark.sql.sources.AllDataTypesScanSource
- |OPTIONS (
- | From '1',
- | To '10'
- |)
- """.stripMargin)
+ test("read the data source tables that do not extend SchemaRelationProvider") {
+ Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
+ val tableName = "relationProvierWithSchema"
+ withTable (tableName) {
+ sql(
+ s"""
+ |CREATE $tableType $tableName
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ checkAnswer(spark.table(tableName), spark.range(1, 11).toDF())
+ }
}
- assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
}
test("SPARK-5196 schema field with comment") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 7368dad628..a7fda01098 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
Option(dir).map(spark.read.format("org.apache.spark.sql.test").load)
}
+ test("read a data source that does not extend SchemaRelationProvider") {
+ val dfReader = spark.read
+ .option("from", "1")
+ .option("TO", "10")
+ .format("org.apache.spark.sql.sources.SimpleScanSource")
+
+ // when users do not specify the schema
+ checkAnswer(dfReader.load(), spark.range(1, 11).toDF())
+
+ // when users specify the schema
+ val inputSchema = new StructType().add("s", IntegerType, nullable = false)
+ val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() }
+ assert(e.getMessage.contains(
+ "org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas"))
+ }
+
+ test("read a data source that does not extend RelationProvider") {
+ val dfReader = spark.read
+ .option("from", "1")
+ .option("TO", "10")
+ .option("option_with_underscores", "someval")
+ .option("option.with.dots", "someval")
+ .format("org.apache.spark.sql.sources.AllDataTypesScanSource")
+
+ // when users do not specify the schema
+ val e = intercept[AnalysisException] { dfReader.load() }
+ assert(e.getMessage.contains("A schema needs to be specified when using"))
+
+ // when users specify the schema
+ val inputSchema = new StructType().add("s", StringType, nullable = false)
+ assert(dfReader.schema(inputSchema).load().count() == 10)
+ }
+
test("text - API and behavior regarding schema") {
// Writer
spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir)