diff options
author | Cheng Lian <lian@databricks.com> | 2016-01-12 14:19:53 -0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-01-12 14:19:53 -0800 |
commit | 8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5 (patch) | |
tree | 1d9a40901178eb95eaadb44934fc000afdf0d9ee /sql/core | |
parent | 0d543b98f3e3da5053f0476f4647a765460861f3 (diff) | |
download | spark-8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5.tar.gz spark-8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5.tar.bz2 spark-8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5.zip |
[SPARK-12724] SQL generation support for persisted data source tables
This PR implements SQL generation support for persisted data source tables. A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`. When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation.
Author: Cheng Lian <lian@databricks.com>
Closes #10712 from liancheng/spark-12724-datasources-sql-gen.
Diffstat (limited to 'sql/core')
8 files changed, 27 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 60d2f05b86..91bf2f8ce4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1728,7 +1728,7 @@ class DataFrame private[sql]( */ def inputFiles: Array[String] = { val files: Seq[String] = logicalPlan.collect { - case LogicalRelation(fsBasedRelation: FileRelation, _) => + case LogicalRelation(fsBasedRelation: FileRelation, _, _) => fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 1d6290e027..da9320ffb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[sql] object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) => pruneFilterProjectRaw( l, projects, @@ -49,14 +49,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (requestedColumns, allPredicates, _) => toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) => pruneFilterProject( l, projects, filters, (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) => pruneFilterProject( l, projects, @@ -64,7 +64,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil // Scanning partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) if t.partitionSpec.partitionColumns.nonEmpty => // We divide the filter expressions into 3 parts val partitionColumns = AttributeSet( @@ -118,7 +118,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { ).getOrElse(scan) :: Nil // Scanning non-partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) => // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf @@ -130,16 +130,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { filters, (a, f) => t.buildInternalScan(a.map(_.name).toArray, f, t.paths, confBroadcast)) :: Nil - case l @ LogicalRelation(baseRelation: TableScan, _) => + case l @ LogicalRelation(baseRelation: TableScan, _, _) => execution.PhysicalRDD.createFromDataSource( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil - case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _), + case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), part, query, overwrite, false) if part.isEmpty => execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) => + l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 219dae88e5..fa97f3d719 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} @@ -30,7 +31,8 @@ import org.apache.spark.sql.sources.BaseRelation */ case class LogicalRelation( relation: BaseRelation, - expectedOutputAttributes: Option[Seq[Attribute]] = None) + expectedOutputAttributes: Option[Seq[Attribute]] = None, + metastoreTableIdentifier: Option[TableIdentifier] = None) extends LeafNode with MultiInstanceRelation { override val output: Seq[AttributeReference] = { @@ -49,7 +51,7 @@ case class LogicalRelation( // Logical Relations are distinct if they have different output for the sake of transformations. override def equals(other: Any): Boolean = other match { - case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output + case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output case _ => false } @@ -58,7 +60,7 @@ case class LogicalRelation( } override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation, _) => relation == otherRelation + case LogicalRelation(otherRelation, _, _) => relation == otherRelation case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 7754edc803..991a5d5aef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -44,9 +44,9 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.LegacyTypeStringParser -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -147,12 +147,6 @@ private[sql] class ParquetRelation( .get(ParquetRelation.METASTORE_SCHEMA) .map(DataType.fromJson(_).asInstanceOf[StructType]) - // If this relation is converted from a Hive metastore table, this method returns the name of the - // original Hive metastore table. - private[sql] def metastoreTableName: Option[TableIdentifier] = { - parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map(SqlParser.parseTableIdentifier) - } - private lazy val metadataCache: MetadataCache = { val meta = new MetadataCache meta.refresh() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 1c773e6927..dd3e66d8a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -61,7 +61,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { // We are inserting into an InsertableRelation or HadoopFsRelation. case i @ InsertIntoTable( - l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => { + l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _), _, child, _, _) => // First, make sure the data to be inserted have the same number of fields with the // schema of the relation. if (l.output.size != child.output.size) { @@ -70,7 +70,6 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { s"statement generates the same number of columns as its schema.") } castAndRenameChildOutput(i, l.output, child) - } } /** If necessary, cast data types and rename fields to the expected types and names. */ @@ -108,14 +107,15 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => def apply(plan: LogicalPlan): Unit = { plan.foreach { case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) => + l @ LogicalRelation(t: InsertableRelation, _, _), + partition, query, overwrite, ifNotExists) => // Right now, we do not support insert into a data source table with partition specs. if (partition.nonEmpty) { failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") } else { // Get all input data source relations of the query. val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _) => src + case LogicalRelation(src: BaseRelation, _, _) => src } if (srcRelations.contains(t)) { failAnalysis( @@ -126,7 +126,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } case logical.InsertIntoTable( - LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) => + LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) => // We need to make sure the partition columns specified by users do match partition // columns of the relation. val existingPartitionColumns = r.partitionColumns.fieldNames.toSet @@ -145,7 +145,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // Get all input data source relations of the query. val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _) => src + case LogicalRelation(src: BaseRelation, _, _) => src } if (srcRelations.contains(r)) { failAnalysis( @@ -173,10 +173,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => EliminateSubQueries(catalog.lookupRelation(c.tableIdent)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). - case l @ LogicalRelation(dest: BaseRelation, _) => + case l @ LogicalRelation(dest: BaseRelation, _, _) => // Get all input data source relations of the query. val srcRelations = c.child.collect { - case LogicalRelation(src: BaseRelation, _) => src + case LogicalRelation(src: BaseRelation, _, _) => src } if (srcRelations.contains(dest)) { failAnalysis( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 587aa5fd30..97c5313f0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -59,7 +59,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex var maybeRelation: Option[ParquetRelation] = None val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) => + case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _, _)) => maybeRelation = Some(relation) filters }.flatten.reduceLeftOption(_ && _) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 0feb945fbb..3d1677bed4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -563,7 +563,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: ParquetRelation, _) => + case LogicalRelation(relation: ParquetRelation, _, _) => assert(relation.partitionSpec === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 398b8a1a66..7196b6dc13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -317,7 +317,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic val table = caseInsensitiveContext.table("oneToTenFiltered") val relation = table.queryExecution.logical.collectFirst { - case LogicalRelation(r, _) => r + case LogicalRelation(r, _, _) => r }.get assert( |