aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala119
2 files changed, 121 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index d8a5158287..f4292320e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -233,7 +233,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)
- LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
+ catalogTable = Some(table))
}
})
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 278d247250..e1a3b247fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1832,4 +1832,123 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}
+
+ test("insert data to a data source table which has a not existed location should succeed") {
+ withTable("t") {
+ withTempDir { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string, b int)
+ |USING parquet
+ |OPTIONS(path "$dir")
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ val expectedPath = dir.getAbsolutePath.stripSuffix("/")
+ assert(table.location.stripSuffix("/") == expectedPath)
+
+ dir.delete
+ val tableLocFile = new File(table.location.stripPrefix("file:"))
+ assert(!tableLocFile.exists)
+ spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
+ assert(tableLocFile.exists)
+ checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
+
+ Utils.deleteRecursively(dir)
+ assert(!tableLocFile.exists)
+ spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
+ assert(tableLocFile.exists)
+ checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
+
+ val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
+ val newDirFile = new File(newDir)
+ spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
+ spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
+
+ val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table1.location == newDir)
+ assert(!newDirFile.exists)
+
+ spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
+ assert(newDirFile.exists)
+ checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
+ }
+ }
+ }
+
+ test("insert into a data source table with no existed partition location should succeed") {
+ withTable("t") {
+ withTempDir { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a int, b int, c int, d int)
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION "$dir"
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ val expectedPath = dir.getAbsolutePath.stripSuffix("/")
+ assert(table.location.stripSuffix("/") == expectedPath)
+
+ spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
+
+ val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
+ Utils.deleteRecursively(partLoc)
+ assert(!partLoc.exists())
+ // insert overwrite into a partition which location has been deleted.
+ spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
+ assert(partLoc.exists())
+ checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
+ }
+ }
+ }
+
+ test("read data from a data source table which has a not existed location should succeed") {
+ withTable("t") {
+ withTempDir { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string, b int)
+ |USING parquet
+ |OPTIONS(path "$dir")
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ val expectedPath = dir.getAbsolutePath.stripSuffix("/")
+ assert(table.location.stripSuffix("/") == expectedPath)
+
+ dir.delete()
+ checkAnswer(spark.table("t"), Nil)
+
+ val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
+ spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
+
+ val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table1.location == newDir)
+ assert(!new File(newDir).exists())
+ checkAnswer(spark.table("t"), Nil)
+ }
+ }
+ }
+
+ test("read data from a data source table with no existed partition location should succeed") {
+ withTable("t") {
+ withTempDir { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a int, b int, c int, d int)
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION "$dir"
+ """.stripMargin)
+ spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
+
+ // select from a partition which location has been deleted.
+ Utils.deleteRecursively(dir)
+ assert(!dir.exists())
+ spark.sql("REFRESH TABLE t")
+ checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
+ }
+ }
+ }
}