From 46991448aa6f78f413a761059d7d7bb586f9d63e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 11 May 2016 23:55:42 -0700 Subject: [SPARK-15160][SQL] support data source table in InMemoryCatalog ## What changes were proposed in this pull request? This PR adds a new rule to convert `SimpleCatalogRelation` to data source table if its table property contains data source information. ## How was this patch tested? new test in SQLQuerySuite Author: Wenchen Fan Closes #12935 from cloud-fan/ds-table. --- .../spark/sql/catalyst/catalog/interface.scala | 2 + .../execution/command/createDataSourceTables.scala | 4 +- .../apache/spark/sql/execution/command/ddl.scala | 76 ++++++++-------------- .../spark/sql/execution/command/tables.scala | 27 ++++---- .../execution/datasources/DataSourceStrategy.scala | 47 ++++++++++++- .../apache/spark/sql/internal/SessionState.scala | 7 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 16 +++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 + 8 files changed, 114 insertions(+), 67 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index fc2068cac5..d21565526e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -185,6 +185,8 @@ case class SimpleCatalogRelation( override def catalogTable: CatalogTable = metadata + override lazy val resolved: Boolean = false + override val output: Seq[Attribute] = { val cols = catalogTable.schema .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index de3c868176..7d3c52570f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.HiveSerDe @@ -200,6 +200,8 @@ case class CreateDataSourceTableAsSelectCommand( s"doesn't match the data schema[${query.schema}]'s") } existingSchema = Some(l.schema) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata) case o => throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0b0b6185c7..1c1716f050 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ @@ -492,33 +493,28 @@ private[sql] object DDLUtils { table.properties.contains("spark.sql.sources.schema.numPartCols") } - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { - getSchemaFromTableProperties(metadata.properties) - } - // A persisted data source table may not store its schema in the catalog. In this case, its schema // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = { - require(isDatasourceTable(props)) + def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + require(isDatasourceTable(metadata)) - val schemaParts = for { - numParts <- props.get("spark.sql.sources.schema.numParts").toSeq - index <- 0 until numParts.toInt - } yield props.getOrElse( - s"spark.sql.sources.schema.part.$index", - throw new AnalysisException( - s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing." - ) - ) + metadata.properties.get("spark.sql.sources.schema.numParts").map { numParts => + val parts = (0 until numParts.toInt).map { index => + val part = metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull + if (part == null) { + throw new AnalysisException( + "Could not read schema from the metastore because it is corrupted " + + s"(missing part $index of the schema, $numParts parts are expected).") + } - if (schemaParts.isEmpty) { - None - } else { - Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType]) + part + } + // Stick all parts back to a single schema string. + DataType.fromJson(parts.mkString).asInstanceOf[StructType] } } - private def getColumnNamesByTypeFromTableProperties( + private def getColumnNamesByType( props: Map[String, String], colType: String, typeName: String): Seq[String] = { require(isDatasourceTable(props)) @@ -534,35 +530,19 @@ private[sql] object DDLUtils { } def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getPartitionColumnsFromTableProperties(metadata.properties) - } - - def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { - getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns") + getColumnNamesByType(metadata.properties, "part", "partitioning columns") } - def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = { - getNumBucketFromTableProperties(metadata.properties) - } - - def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = { - require(isDatasourceTable(props)) - props.get("spark.sql.sources.schema.numBuckets").map(_.toInt) - } - - def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getBucketingColumnsFromTableProperties(metadata.properties) - } - - def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { - getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns") - } - - def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getSortingColumnsFromTableProperties(metadata.properties) - } - - def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { - getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns") + def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = { + if (isDatasourceTable(metadata)) { + metadata.properties.get("spark.sql.sources.schema.numBuckets").map { numBuckets => + BucketSpec( + numBuckets.toInt, + getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"), + getColumnNamesByType(metadata.properties, "sort", "sorting columns")) + } + } else { + None + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e6dcd1ee95..bb4f1ff4f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -385,17 +385,22 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - if (DDLUtils.isDatasourceTable(metadata)) { - val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata) - val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata) - val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata) - append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "") - append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "") - } else { - append(buffer, "Num Buckets:", metadata.numBuckets.toString, "") - append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "") + def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = { + append(buffer, "Num Buckets:", numBuckets.toString, "") + append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "") + } + + DDLUtils.getBucketSpecFromTableProperties(metadata).map { bucketSpec => + appendBucketInfo( + bucketSpec.numBuckets, + bucketSpec.bucketColumnNames, + bucketSpec.sortColumnNames) + }.getOrElse { + appendBucketInfo( + metadata.numBuckets, + metadata.bucketColumnNames, + metadata.sortColumnNames) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index bc249f4ed5..0494fafb0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -32,8 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS +import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -77,6 +78,48 @@ private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] { } } + +/** + * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data + * source information. + */ +private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { + val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table) + + // We only need names at here since userSpecifiedSchema we loaded from the metastore + // contains partition columns. We can always get datatypes of partitioning columns + // from userSpecifiedSchema. + val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(table) + + val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table) + + val options = table.storage.serdeProperties + val dataSource = + DataSource( + sparkSession, + userSpecifiedSchema = userSpecifiedSchema, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + className = table.properties("spark.sql.sources.provider"), + options = options) + + LogicalRelation( + dataSource.resolveRelation(), + metastoreTableIdentifier = Some(table.identifier)) + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) + if DDLUtils.isDatasourceTable(s.metadata) => + i.copy(table = readDataSourceTable(sparkSession, s.metadata)) + + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + readDataSourceTable(sparkSession, s.metadata) + } +} + + /** * A Strategy for planning scans over data sources defined using the sources API. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index ebff756979..f0b8a83dee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,14 +18,10 @@ package org.apache.spark.sql.internal import java.io.File -import java.util.Properties - -import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _} @@ -34,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTable -import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.util.ExecutionListenerManager @@ -114,6 +110,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { new Analyzer(catalog, conf) { override val extendedResolutionRules = PreInsertCastAndRename :: + new FindDataSourceTable(sparkSession) :: DataSourceAnalysis :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4ef4b4865f..3bbe87adc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2480,4 +2480,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(null) :: Nil ) } + + test("data source table created in InMemoryCatalog should be able to read/write") { + withTable("tbl") { + sql("CREATE TABLE tbl(i INT, j STRING) USING parquet") + checkAnswer(sql("SELECT i, j FROM tbl"), Nil) + + Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl") + checkAnswer(sql("SELECT i, j FROM tbl"), Row(1, "a") :: Row(2, "b") :: Nil) + + Seq(3 -> "c", 4 -> "d").toDF("i", "j").write.mode("append").saveAsTable("tbl") + checkAnswer( + sql("SELECT i, j FROM tbl"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + } + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8cfcec79cd..2f20cde4b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -71,6 +71,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logDebug(s"Creating new cached data source for $in") val table = client.getTable(in.database, in.name) + // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable + def schemaStringFromParts: Option[String] = { table.properties.get("spark.sql.sources.schema.numParts").map { numParts => val parts = (0 until numParts.toInt).map { index => -- cgit v1.2.3