aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-01-12 14:19:53 -0800
committerCheng Lian <lian@databricks.com>2016-01-12 14:19:53 -0800
commit8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5 (patch)
tree1d9a40901178eb95eaadb44934fc000afdf0d9ee
parent0d543b98f3e3da5053f0476f4647a765460861f3 (diff)
downloadspark-8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5.tar.gz
spark-8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5.tar.bz2
spark-8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5.zip
[SPARK-12724] SQL generation support for persisted data source tables
This PR implements SQL generation support for persisted data source tables. A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`. When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation. Author: Cheng Lian <lian@databricks.com> Closes #10712 from liancheng/spark-12724-datasources-sql-gen.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala2
17 files changed, 55 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 60d2f05b86..91bf2f8ce4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1728,7 +1728,7 @@ class DataFrame private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = logicalPlan.collect {
- case LogicalRelation(fsBasedRelation: FileRelation, _) =>
+ case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 1d6290e027..da9320ffb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
*/
private[sql] object DataSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
@@ -49,14 +49,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
pruneFilterProject(
l,
projects,
@@ -64,7 +64,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
// Scanning partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _))
if t.partitionSpec.partitionColumns.nonEmpty =>
// We divide the filter expressions into 3 parts
val partitionColumns = AttributeSet(
@@ -118,7 +118,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
).getOrElse(scan) :: Nil
// Scanning non-partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) =>
// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
@@ -130,16 +130,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters,
(a, f) => t.buildInternalScan(a.map(_.name).toArray, f, t.paths, confBroadcast)) :: Nil
- case l @ LogicalRelation(baseRelation: TableScan, _) =>
+ case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
execution.PhysicalRDD.createFromDataSource(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
- case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _),
+ case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
part, query, overwrite, false) if part.isEmpty =>
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) =>
+ l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 219dae88e5..fa97f3d719 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -30,7 +31,8 @@ import org.apache.spark.sql.sources.BaseRelation
*/
case class LogicalRelation(
relation: BaseRelation,
- expectedOutputAttributes: Option[Seq[Attribute]] = None)
+ expectedOutputAttributes: Option[Seq[Attribute]] = None,
+ metastoreTableIdentifier: Option[TableIdentifier] = None)
extends LeafNode with MultiInstanceRelation {
override val output: Seq[AttributeReference] = {
@@ -49,7 +51,7 @@ case class LogicalRelation(
// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
- case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output
+ case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
case _ => false
}
@@ -58,7 +60,7 @@ case class LogicalRelation(
}
override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
- case LogicalRelation(otherRelation, _) => relation == otherRelation
+ case LogicalRelation(otherRelation, _, _) => relation == otherRelation
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 7754edc803..991a5d5aef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -44,9 +44,9 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.LegacyTypeStringParser
-import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -147,12 +147,6 @@ private[sql] class ParquetRelation(
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
- // If this relation is converted from a Hive metastore table, this method returns the name of the
- // original Hive metastore table.
- private[sql] def metastoreTableName: Option[TableIdentifier] = {
- parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map(SqlParser.parseTableIdentifier)
- }
-
private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
meta.refresh()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 1c773e6927..dd3e66d8a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -61,7 +61,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
// We are inserting into an InsertableRelation or HadoopFsRelation.
case i @ InsertIntoTable(
- l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => {
+ l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _), _, child, _, _) =>
// First, make sure the data to be inserted have the same number of fields with the
// schema of the relation.
if (l.output.size != child.output.size) {
@@ -70,7 +70,6 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
s"statement generates the same number of columns as its schema.")
}
castAndRenameChildOutput(i, l.output, child)
- }
}
/** If necessary, cast data types and rename fields to the expected types and names. */
@@ -108,14 +107,15 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) =>
+ l @ LogicalRelation(t: InsertableRelation, _, _),
+ partition, query, overwrite, ifNotExists) =>
// Right now, we do not support insert into a data source table with partition specs.
if (partition.nonEmpty) {
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
} else {
// Get all input data source relations of the query.
val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation, _) => src
+ case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(t)) {
failAnalysis(
@@ -126,7 +126,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
case logical.InsertIntoTable(
- LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) =>
+ LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) =>
// We need to make sure the partition columns specified by users do match partition
// columns of the relation.
val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
@@ -145,7 +145,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation, _) => src
+ case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(r)) {
failAnalysis(
@@ -173,10 +173,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
EliminateSubQueries(catalog.lookupRelation(c.tableIdent)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
- case l @ LogicalRelation(dest: BaseRelation, _) =>
+ case l @ LogicalRelation(dest: BaseRelation, _, _) =>
// Get all input data source relations of the query.
val srcRelations = c.child.collect {
- case LogicalRelation(src: BaseRelation, _) => src
+ case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 587aa5fd30..97c5313f0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -59,7 +59,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
var maybeRelation: Option[ParquetRelation] = None
val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) =>
+ case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _, _)) =>
maybeRelation = Some(relation)
filters
}.flatten.reduceLeftOption(_ && _)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 0feb945fbb..3d1677bed4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -563,7 +563,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: ParquetRelation, _) =>
+ case LogicalRelation(relation: ParquetRelation, _, _) =>
assert(relation.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 398b8a1a66..7196b6dc13 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -317,7 +317,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
- case LogicalRelation(r, _) => r
+ case LogicalRelation(r, _, _) => r
}.get
assert(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 67228f3f3c..daaa5a5709 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -184,7 +184,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
table.properties("spark.sql.sources.provider"),
options)
- LogicalRelation(resolvedRelation.relation)
+ LogicalRelation(
+ resolvedRelation.relation,
+ metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
}
}
@@ -447,7 +449,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation, _, _) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 61e3f183bb..e83b4bffff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.hive
import java.util.concurrent.atomic.AtomicLong
-import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
/**
* A builder class used to convert a resolved logical plan into a SQL query string. Note that this
@@ -135,13 +135,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
rightSQL <- toSQL(right)
} yield s"$leftSQL UNION ALL $rightSQL"
- // ParquetRelation converted from Hive metastore table
- case Subquery(alias, LogicalRelation(r: ParquetRelation, _)) =>
- // There seems to be a bug related to `ParquetConversions` analysis rule. The problem is
- // that, the metastore database name and table name are not always propagated to converted
- // `ParquetRelation` instances via data source options. Here we use subquery alias as a
- // workaround.
- Some(s"`$alias`")
+ // Persisted data source relation
+ case Subquery(alias, LogicalRelation(_, _, Some(TableIdentifier(table, Some(database))))) =>
+ Some(s"`$database`.`$table`")
case Subquery(alias, child) =>
toSQL(child).map(childSQL => s"($childSQL) AS $alias")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 612f01cda8..07a352873d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -216,7 +216,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
- case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 2ee8150fb8..0604d9f47c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -146,4 +146,14 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
ignore("distinct and non-distinct aggregation") {
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
}
+
+ test("persisted data source relations") {
+ Seq("orc", "json", "parquet").foreach { format =>
+ val tableName = s"${format}_t0"
+ withTable(tableName) {
+ sqlContext.range(10).write.format(format).saveAsTable(tableName)
+ checkHiveQl(s"SELECT id FROM $tableName")
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 202851ae13..253f13c598 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -571,7 +571,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
Row(3) :: Row(4) :: Nil)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation, _) => // OK
+ case LogicalRelation(p: ParquetRelation, _, _) => // OK
case _ =>
fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 593fac2c32..f6c687aab7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -268,7 +268,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
relation match {
- case LogicalRelation(r: ParquetRelation, _) =>
+ case LogicalRelation(r: ParquetRelation, _, _) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 5afc7e77ab..c94e73c4aa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -42,7 +42,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
var maybeRelation: Option[OrcRelation] = None
val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(orcRelation: OrcRelation, _)) =>
+ case PhysicalOperation(_, filters, LogicalRelation(orcRelation: OrcRelation, _, _)) =>
maybeRelation = Some(orcRelation)
filters
}.flatten.reduceLeftOption(_ && _)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2ceb836681..ed544c6380 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -282,7 +282,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: ParquetRelation, _) => // OK
+ case LogicalRelation(_: ParquetRelation, _, _) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
s"${classOf[ParquetRelation].getCanonicalName }")
@@ -369,7 +369,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: ParquetRelation, _) => r
+ case r @ LogicalRelation(_: ParquetRelation, _, _) => r
}.size
}
}
@@ -378,7 +378,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
def collectParquetRelation(df: DataFrame): ParquetRelation = {
val plan = df.queryExecution.analyzed
plan.collectFirst {
- case LogicalRelation(r: ParquetRelation, _) => r
+ case LogicalRelation(r: ParquetRelation, _, _) => r
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$plan")
}
@@ -428,7 +428,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => // OK
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation, _, _) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index efbf9988dd..3f9ecf6965 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -500,7 +500,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
val actualPaths = df.queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: HadoopFsRelation, _) =>
+ case LogicalRelation(relation: HadoopFsRelation, _, _) =>
relation.paths.toSet
}.getOrElse {
fail("Expect an FSBasedRelation, but none could be found")