diff options
author | Cheng Lian <lian@databricks.com> | 2015-05-21 10:56:17 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-05-21 10:56:17 -0700 |
commit | 8730fbb47b09fcf955fe16dd03b75596db6d53b6 (patch) | |
tree | eeb6bd4c9d95011f52ce8a7f522dc8e1e32608ff /sql/hive/src | |
parent | 13348e21b6b1c0df42c18b82b86c613291228863 (diff) | |
download | spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.tar.gz spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.tar.bz2 spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.zip |
[SPARK-7749] [SQL] Fixes partition discovery for non-partitioned tables
When no partition columns can be found, we should have an empty `PartitionSpec`, rather than a `PartitionSpec` with empty partition columns.
This PR together with #6285 should fix SPARK-7749.
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes #6287 from liancheng/spark-7749 and squashes the following commits:
a799ff3 [Cheng Lian] Adds test cases for SPARK-7749
c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name
5aa87ea [Yin Huai] Make parsePartitions more robust.
fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be inferred
19ae41e [Cheng Lian] Don't list base directory as leaf directory
Diffstat (limited to 'sql/hive/src')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 51 |
1 files changed, 50 insertions, 1 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 05d99983b6..1da990bc95 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ @@ -29,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} +import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode} import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. @@ -385,6 +386,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { sql("DROP TABLE ms_convert") } + def collectParquetRelation(df: DataFrame): ParquetRelation2 = { + val plan = df.queryExecution.analyzed + plan.collectFirst { + case LogicalRelation(r: ParquetRelation2) => r + }.getOrElse { + fail(s"Expecting a ParquetRelation2, but got:\n$plan") + } + } + + test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") { + sql( + s"""CREATE TABLE nonPartitioned ( + | key INT, + | value STRING + |) + |STORED AS PARQUET + """.stripMargin) + + // First lookup fills the cache + val r1 = collectParquetRelation(table("nonPartitioned")) + // Second lookup should reuse the cache + val r2 = collectParquetRelation(table("nonPartitioned")) + // They should be the same instance + assert(r1 eq r2) + + sql("DROP TABLE nonPartitioned") + } + + test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") { + sql( + s"""CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + + // First lookup fills the cache + val r1 = collectParquetRelation(table("partitioned")) + // Second lookup should reuse the cache + val r2 = collectParquetRelation(table("partitioned")) + // They should be the same instance + assert(r1 eq r2) + + sql("DROP TABLE partitioned") + } + test("Caching converted data source Parquet Relations") { def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. |