aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-22 13:19:06 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-22 13:19:06 +0800
commit3a80f92f8f4b91d0a85724bca7d81c6f5bbb78fd (patch)
treeb843f9034b2e984efd80997c5708e56761019063
parentcb324f61150c962aeabf0a779f6a09797b3d5072 (diff)
downloadspark-3a80f92f8f4b91d0a85724bca7d81c6f5bbb78fd.tar.gz
spark-3a80f92f8f4b91d0a85724bca7d81c6f5bbb78fd.tar.bz2
spark-3a80f92f8f4b91d0a85724bca7d81c6f5bbb78fd.zip
[SPARK-17492][SQL] Fix Reading Cataloged Data Sources without Extending SchemaRelationProvider
### What changes were proposed in this pull request? For data sources without extending `SchemaRelationProvider`, we expect users to not specify schemas when they creating tables. If the schema is input from users, an exception is issued. Since Spark 2.1, for any data source, to avoid infer the schema every time, we store the schema in the metastore catalog. Thus, when reading a cataloged data source table, the schema could be read from metastore catalog. In this case, we also got an exception. For example, ```Scala sql( s""" |CREATE TABLE relationProvierWithSchema |USING org.apache.spark.sql.sources.SimpleScanSource |OPTIONS ( | From '1', | To '10' |) """.stripMargin) spark.table(tableName).show() ``` ``` org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas.; ``` This PR is to fix the above issue. When building a data source, we introduce a flag `isSchemaFromUsers` to indicate whether the schema is really input from users. If true, we issue an exception. Otherwise, we will call the `createRelation` of `RelationProvider` to generate the `BaseRelation`, in which it contains the actual schema. ### How was this patch tested? Added a few cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #15046 from gatorsmile/tempViewCases.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala64
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala33
4 files changed, 102 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 413976a7ef..32067011c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -333,8 +333,13 @@ case class DataSource(
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
- case (_: RelationProvider, Some(_)) =>
- throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ case (dataSource: RelationProvider, Some(schema)) =>
+ val baseRelation =
+ dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
+ if (baseRelation.schema != schema) {
+ throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ }
+ baseRelation
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 6454d716ec..5eb54643f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
)
}
+ test("insert into a temp view that does not point to an insertable data source") {
+ import testImplicits._
+ withTempView("t1", "t2") {
+ sql(
+ """
+ |CREATE TEMPORARY VIEW t1
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10')
+ """.stripMargin)
+ sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2")
+
+ val message = intercept[AnalysisException] {
+ sql("INSERT INTO TABLE t1 SELECT a FROM t2")
+ }.getMessage
+ assert(message.contains("does not allow insertion"))
+ }
+ }
+
test("PreInsert casting and renaming") {
sql(
s"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index e8fed039fa..86bcb4d4b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
test("exceptions") {
// Make sure we do throw correct exception when users use a relation provider that
// only implements the RelationProvider or the SchemaRelationProvider.
- val schemaNotAllowed = intercept[Exception] {
- sql(
- """
- |CREATE TEMPORARY VIEW relationProvierWithSchema (i int)
- |USING org.apache.spark.sql.sources.SimpleScanSource
- |OPTIONS (
- | From '1',
- | To '10'
- |)
- """.stripMargin)
+ Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
+ val schemaNotAllowed = intercept[Exception] {
+ sql(
+ s"""
+ |CREATE $tableType relationProvierWithSchema (i int)
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
+
+ val schemaNeeded = intercept[Exception] {
+ sql(
+ s"""
+ |CREATE $tableType schemaRelationProvierWithoutSchema
+ |USING org.apache.spark.sql.sources.AllDataTypesScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
}
- assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
+ }
- val schemaNeeded = intercept[Exception] {
- sql(
- """
- |CREATE TEMPORARY VIEW schemaRelationProvierWithoutSchema
- |USING org.apache.spark.sql.sources.AllDataTypesScanSource
- |OPTIONS (
- | From '1',
- | To '10'
- |)
- """.stripMargin)
+ test("read the data source tables that do not extend SchemaRelationProvider") {
+ Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
+ val tableName = "relationProvierWithSchema"
+ withTable (tableName) {
+ sql(
+ s"""
+ |CREATE $tableType $tableName
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ checkAnswer(spark.table(tableName), spark.range(1, 11).toDF())
+ }
}
- assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
}
test("SPARK-5196 schema field with comment") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 7368dad628..a7fda01098 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
Option(dir).map(spark.read.format("org.apache.spark.sql.test").load)
}
+ test("read a data source that does not extend SchemaRelationProvider") {
+ val dfReader = spark.read
+ .option("from", "1")
+ .option("TO", "10")
+ .format("org.apache.spark.sql.sources.SimpleScanSource")
+
+ // when users do not specify the schema
+ checkAnswer(dfReader.load(), spark.range(1, 11).toDF())
+
+ // when users specify the schema
+ val inputSchema = new StructType().add("s", IntegerType, nullable = false)
+ val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() }
+ assert(e.getMessage.contains(
+ "org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas"))
+ }
+
+ test("read a data source that does not extend RelationProvider") {
+ val dfReader = spark.read
+ .option("from", "1")
+ .option("TO", "10")
+ .option("option_with_underscores", "someval")
+ .option("option.with.dots", "someval")
+ .format("org.apache.spark.sql.sources.AllDataTypesScanSource")
+
+ // when users do not specify the schema
+ val e = intercept[AnalysisException] { dfReader.load() }
+ assert(e.getMessage.contains("A schema needs to be specified when using"))
+
+ // when users specify the schema
+ val inputSchema = new StructType().add("s", StringType, nullable = false)
+ assert(dfReader.schema(inputSchema).load().count() == 10)
+ }
+
test("text - API and behavior regarding schema") {
// Writer
spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir)