diff options
4 files changed, 102 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 413976a7ef..32067011c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -333,8 +333,13 @@ case class DataSource( dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) case (_: SchemaRelationProvider, None) => throw new AnalysisException(s"A schema needs to be specified when using $className.") - case (_: RelationProvider, Some(_)) => - throw new AnalysisException(s"$className does not allow user-specified schemas.") + case (dataSource: RelationProvider, Some(schema)) => + val baseRelation = + dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) + if (baseRelation.schema != schema) { + throw new AnalysisException(s"$className does not allow user-specified schemas.") + } + baseRelation // We are reading from the results of a streaming query. Load files from the metadata log // instead of listing them using HDFS APIs. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 6454d716ec..5eb54643f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { ) } + test("insert into a temp view that does not point to an insertable data source") { + import testImplicits._ + withTempView("t1", "t2") { + sql( + """ + |CREATE TEMPORARY VIEW t1 + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10') + """.stripMargin) + sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2") + + val message = intercept[AnalysisException] { + sql("INSERT INTO TABLE t1 SELECT a FROM t2") + }.getMessage + assert(message.contains("does not allow insertion")) + } + } + test("PreInsert casting and renaming") { sql( s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index e8fed039fa..86bcb4d4b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { test("exceptions") { // Make sure we do throw correct exception when users use a relation provider that // only implements the RelationProvider or the SchemaRelationProvider. - val schemaNotAllowed = intercept[Exception] { - sql( - """ - |CREATE TEMPORARY VIEW relationProvierWithSchema (i int) - |USING org.apache.spark.sql.sources.SimpleScanSource - |OPTIONS ( - | From '1', - | To '10' - |) - """.stripMargin) + Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => + val schemaNotAllowed = intercept[Exception] { + sql( + s""" + |CREATE $tableType relationProvierWithSchema (i int) + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) + + val schemaNeeded = intercept[Exception] { + sql( + s""" + |CREATE $tableType schemaRelationProvierWithoutSchema + |USING org.apache.spark.sql.sources.AllDataTypesScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using")) } - assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) + } - val schemaNeeded = intercept[Exception] { - sql( - """ - |CREATE TEMPORARY VIEW schemaRelationProvierWithoutSchema - |USING org.apache.spark.sql.sources.AllDataTypesScanSource - |OPTIONS ( - | From '1', - | To '10' - |) - """.stripMargin) + test("read the data source tables that do not extend SchemaRelationProvider") { + Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => + val tableName = "relationProvierWithSchema" + withTable (tableName) { + sql( + s""" + |CREATE $tableType $tableName + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + checkAnswer(spark.table(tableName), spark.range(1, 11).toDF()) + } } - assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using")) } test("SPARK-5196 schema field with comment") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 7368dad628..a7fda01098 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be Option(dir).map(spark.read.format("org.apache.spark.sql.test").load) } + test("read a data source that does not extend SchemaRelationProvider") { + val dfReader = spark.read + .option("from", "1") + .option("TO", "10") + .format("org.apache.spark.sql.sources.SimpleScanSource") + + // when users do not specify the schema + checkAnswer(dfReader.load(), spark.range(1, 11).toDF()) + + // when users specify the schema + val inputSchema = new StructType().add("s", IntegerType, nullable = false) + val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() } + assert(e.getMessage.contains( + "org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas")) + } + + test("read a data source that does not extend RelationProvider") { + val dfReader = spark.read + .option("from", "1") + .option("TO", "10") + .option("option_with_underscores", "someval") + .option("option.with.dots", "someval") + .format("org.apache.spark.sql.sources.AllDataTypesScanSource") + + // when users do not specify the schema + val e = intercept[AnalysisException] { dfReader.load() } + assert(e.getMessage.contains("A schema needs to be specified when using")) + + // when users specify the schema + val inputSchema = new StructType().add("s", StringType, nullable = false) + assert(dfReader.schema(inputSchema).load().count() == 10) + } + test("text - API and behavior regarding schema") { // Writer spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir) |