aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-21 10:56:17 -0700
committerYin Huai <yhuai@databricks.com>2015-05-21 10:56:17 -0700
commit8730fbb47b09fcf955fe16dd03b75596db6d53b6 (patch)
treeeeb6bd4c9d95011f52ce8a7f522dc8e1e32608ff /sql/hive
parent13348e21b6b1c0df42c18b82b86c613291228863 (diff)
downloadspark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.tar.gz
spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.tar.bz2
spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.zip
[SPARK-7749] [SQL] Fixes partition discovery for non-partitioned tables
When no partition columns can be found, we should have an empty `PartitionSpec`, rather than a `PartitionSpec` with empty partition columns. This PR together with #6285 should fix SPARK-7749. Author: Cheng Lian <lian@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #6287 from liancheng/spark-7749 and squashes the following commits: a799ff3 [Cheng Lian] Adds test cases for SPARK-7749 c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name 5aa87ea [Yin Huai] Make parsePartitions more robust. fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be inferred 19ae41e [Cheng Lian] Don't list base directory as leaf directory
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala51
1 files changed, 50 insertions, 1 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 05d99983b6..1da990bc95 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
@@ -29,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
+import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
@@ -385,6 +386,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE ms_convert")
}
+ def collectParquetRelation(df: DataFrame): ParquetRelation2 = {
+ val plan = df.queryExecution.analyzed
+ plan.collectFirst {
+ case LogicalRelation(r: ParquetRelation2) => r
+ }.getOrElse {
+ fail(s"Expecting a ParquetRelation2, but got:\n$plan")
+ }
+ }
+
+ test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
+ sql(
+ s"""CREATE TABLE nonPartitioned (
+ | key INT,
+ | value STRING
+ |)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // First lookup fills the cache
+ val r1 = collectParquetRelation(table("nonPartitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectParquetRelation(table("nonPartitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+
+ sql("DROP TABLE nonPartitioned")
+ }
+
+ test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
+ sql(
+ s"""CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // First lookup fills the cache
+ val r1 = collectParquetRelation(table("partitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectParquetRelation(table("partitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+
+ sql("DROP TABLE partitioned")
+ }
+
test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.