diff options
author | windpiger <songjun@outlook.com> | 2017-02-15 13:21:48 -0800 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-02-15 13:21:48 -0800 |
commit | 6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f (patch) | |
tree | 55bf88c2c32bfebaf6fbe000c07f2d7d3d2a0f4d /sql | |
parent | 59dc26e378c5960a955ad238fdf1c9745c732c8a (diff) | |
download | spark-6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f.tar.gz spark-6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f.tar.bz2 spark-6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f.zip |
[SPARK-19329][SQL] Reading from or writing to a datasource table with a non pre-existing location should succeed
## What changes were proposed in this pull request?
when we insert data into a datasource table use `sqlText`, and the table has an not exists location,
this will throw an Exception.
example:
```
spark.sql("create table t(a string, b int) using parquet")
spark.sql("alter table t set location '/xx'")
spark.sql("insert into table t select 'c', 1")
```
Exception:
```
com.google.common.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: Path does not exist: /xx;
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4814)
at com.google.common.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4830)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:122)
at org.apache.spark.sql.hive.HiveSessionCatalog.lookupRelation(HiveSessionCatalog.scala:69)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:456)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:465)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:463)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:453)
```
As discussed following comments, we should unify the action when we reading from or writing to a datasource table with a non pre-existing locaiton:
1. reading from a datasource table: return 0 rows
2. writing to a datasource table: write data successfully
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes #16672 from windpiger/insertNotExistLocation.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala | 3 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 119 |
2 files changed, 121 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d8a5158287..f4292320e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -233,7 +233,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // TODO: improve `InMemoryCatalog` and remove this limitation. catalogTable = if (withHiveSupport) Some(table) else None) - LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), + catalogTable = Some(table)) } }) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 278d247250..e1a3b247fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1832,4 +1832,123 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("insert data to a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "$dir") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = dir.getAbsolutePath.stripSuffix("/") + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete + val tableLocFile = new File(table.location.stripPrefix("file:")) + assert(!tableLocFile.exists) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(tableLocFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!tableLocFile.exists) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + assert(tableLocFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + spark.sessionState.catalog.refreshTable(TableIdentifier("t")) + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!newDirFile.exists) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(newDirFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + } + } + } + + test("insert into a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "$dir" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = dir.getAbsolutePath.stripSuffix("/") + assert(table.location.stripSuffix("/") == expectedPath) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + Utils.deleteRecursively(partLoc) + assert(!partLoc.exists()) + // insert overwrite into a partition which location has been deleted. + spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") + assert(partLoc.exists()) + checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) + } + } + } + + test("read data from a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "$dir") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = dir.getAbsolutePath.stripSuffix("/") + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete() + checkAnswer(spark.table("t"), Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) + } + } + } + + test("read data from a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "$dir" + """.stripMargin) + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + // select from a partition which location has been deleted. + Utils.deleteRecursively(dir) + assert(!dir.exists()) + spark.sql("REFRESH TABLE t") + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + } + } } |