diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-07-09 20:35:45 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-07-09 20:35:45 +0800 |
commit | 7374e518e2641fddfe57003340db410224b37581 (patch) | |
tree | 35d93fedbd29fb517bc402fd8de7c62026d622f3 /sql | |
parent | b1db26acc51003e68e4e8d7d324cf74e3aa03cfd (diff) | |
download | spark-7374e518e2641fddfe57003340db410224b37581.tar.gz spark-7374e518e2641fddfe57003340db410224b37581.tar.bz2 spark-7374e518e2641fddfe57003340db410224b37581.zip |
[SPARK-16401][SQL] Data Source API: Enable Extending RelationProvider and CreatableRelationProvider without Extending SchemaRelationProvider
#### What changes were proposed in this pull request?
When users try to implement a data source API with extending only `RelationProvider` and `CreatableRelationProvider`, they will hit an error when resolving the relation.
```Scala
spark.read
.format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
.load()
.write.
format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
.save()
```
The error they hit is like
```
org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified schemas.;
org.apache.spark.sql.AnalysisException: org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified schemas.;
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:319)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:494)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
```
Actually, the bug fix is simple. [`DataSource.createRelation(sparkSession.sqlContext, mode, options, data)`](https://github.com/gatorsmile/spark/blob/dd644f8117e889cebd6caca58702a7c7e3d88bef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L429) already returns a BaseRelation. We should not assign schema to `userSpecifiedSchema`. That schema assignment only makes sense for the data sources that extend `FileFormat`.
#### How was this patch tested?
Added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #14075 from gatorsmile/dataSource.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 5 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 32 |
2 files changed, 34 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6dc27c1952..f572b93991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -485,12 +485,11 @@ case class DataSource( data.logicalPlan, mode) sparkSession.sessionState.executePlan(plan).toRdd + // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. + copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() case _ => sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } - - // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. - copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index d454100ccb..05935cec4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -82,6 +82,29 @@ class DefaultSource } } +/** Dummy provider with only RelationProvider and CreatableRelationProvider. */ +class DefaultSourceWithoutUserSpecifiedSchema + extends RelationProvider + with CreatableRelationProvider { + + case class FakeRelation(sqlContext: SQLContext) extends BaseRelation { + override def schema: StructType = StructType(Seq(StructField("a", StringType))) + } + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + FakeRelation(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + FakeRelation(sqlContext) + } +} class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { @@ -120,6 +143,15 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be .save() } + test("resolve default source without extending SchemaRelationProvider") { + spark.read + .format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema") + .load() + .write + .format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema") + .save() + } + test("resolve full class") { spark.read .format("org.apache.spark.sql.test.DefaultSource") |