diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-05-11 23:55:42 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-05-11 23:55:42 -0700 |
commit | 46991448aa6f78f413a761059d7d7bb586f9d63e (patch) | |
tree | 1fceadbdb3eb6d38103e1d6dd029a21d12c8cb41 /sql/core | |
parent | 9e266d07a444fd465fe178cdd5c4894cd09cbda3 (diff) | |
download | spark-46991448aa6f78f413a761059d7d7bb586f9d63e.tar.gz spark-46991448aa6f78f413a761059d7d7bb586f9d63e.tar.bz2 spark-46991448aa6f78f413a761059d7d7bb586f9d63e.zip |
[SPARK-15160][SQL] support data source table in InMemoryCatalog
## What changes were proposed in this pull request?
This PR adds a new rule to convert `SimpleCatalogRelation` to data source table if its table property contains data source information.
## How was this patch tested?
new test in SQLQuerySuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12935 from cloud-fan/ds-table.
Diffstat (limited to 'sql/core')
6 files changed, 110 insertions, 67 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index de3c868176..7d3c52570f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.HiveSerDe @@ -200,6 +200,8 @@ case class CreateDataSourceTableAsSelectCommand( s"doesn't match the data schema[${query.schema}]'s") } existingSchema = Some(l.schema) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata) case o => throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0b0b6185c7..1c1716f050 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ @@ -492,33 +493,28 @@ private[sql] object DDLUtils { table.properties.contains("spark.sql.sources.schema.numPartCols") } - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { - getSchemaFromTableProperties(metadata.properties) - } - // A persisted data source table may not store its schema in the catalog. In this case, its schema // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = { - require(isDatasourceTable(props)) + def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + require(isDatasourceTable(metadata)) - val schemaParts = for { - numParts <- props.get("spark.sql.sources.schema.numParts").toSeq - index <- 0 until numParts.toInt - } yield props.getOrElse( - s"spark.sql.sources.schema.part.$index", - throw new AnalysisException( - s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing." - ) - ) + metadata.properties.get("spark.sql.sources.schema.numParts").map { numParts => + val parts = (0 until numParts.toInt).map { index => + val part = metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull + if (part == null) { + throw new AnalysisException( + "Could not read schema from the metastore because it is corrupted " + + s"(missing part $index of the schema, $numParts parts are expected).") + } - if (schemaParts.isEmpty) { - None - } else { - Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType]) + part + } + // Stick all parts back to a single schema string. + DataType.fromJson(parts.mkString).asInstanceOf[StructType] } } - private def getColumnNamesByTypeFromTableProperties( + private def getColumnNamesByType( props: Map[String, String], colType: String, typeName: String): Seq[String] = { require(isDatasourceTable(props)) @@ -534,35 +530,19 @@ private[sql] object DDLUtils { } def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getPartitionColumnsFromTableProperties(metadata.properties) - } - - def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { - getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns") + getColumnNamesByType(metadata.properties, "part", "partitioning columns") } - def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = { - getNumBucketFromTableProperties(metadata.properties) - } - - def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = { - require(isDatasourceTable(props)) - props.get("spark.sql.sources.schema.numBuckets").map(_.toInt) - } - - def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getBucketingColumnsFromTableProperties(metadata.properties) - } - - def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { - getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns") - } - - def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getSortingColumnsFromTableProperties(metadata.properties) - } - - def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { - getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns") + def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = { + if (isDatasourceTable(metadata)) { + metadata.properties.get("spark.sql.sources.schema.numBuckets").map { numBuckets => + BucketSpec( + numBuckets.toInt, + getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"), + getColumnNamesByType(metadata.properties, "sort", "sorting columns")) + } + } else { + None + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e6dcd1ee95..bb4f1ff4f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -385,17 +385,22 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - if (DDLUtils.isDatasourceTable(metadata)) { - val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata) - val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata) - val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata) - append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "") - append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "") - } else { - append(buffer, "Num Buckets:", metadata.numBuckets.toString, "") - append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "") + def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = { + append(buffer, "Num Buckets:", numBuckets.toString, "") + append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "") + } + + DDLUtils.getBucketSpecFromTableProperties(metadata).map { bucketSpec => + appendBucketInfo( + bucketSpec.numBuckets, + bucketSpec.bucketColumnNames, + bucketSpec.sortColumnNames) + }.getOrElse { + appendBucketInfo( + metadata.numBuckets, + metadata.bucketColumnNames, + metadata.sortColumnNames) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index bc249f4ed5..0494fafb0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -32,8 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS +import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -77,6 +78,48 @@ private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] { } } + +/** + * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data + * source information. + */ +private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { + val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table) + + // We only need names at here since userSpecifiedSchema we loaded from the metastore + // contains partition columns. We can always get datatypes of partitioning columns + // from userSpecifiedSchema. + val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(table) + + val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table) + + val options = table.storage.serdeProperties + val dataSource = + DataSource( + sparkSession, + userSpecifiedSchema = userSpecifiedSchema, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + className = table.properties("spark.sql.sources.provider"), + options = options) + + LogicalRelation( + dataSource.resolveRelation(), + metastoreTableIdentifier = Some(table.identifier)) + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) + if DDLUtils.isDatasourceTable(s.metadata) => + i.copy(table = readDataSourceTable(sparkSession, s.metadata)) + + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + readDataSourceTable(sparkSession, s.metadata) + } +} + + /** * A Strategy for planning scans over data sources defined using the sources API. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index ebff756979..f0b8a83dee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,14 +18,10 @@ package org.apache.spark.sql.internal import java.io.File -import java.util.Properties - -import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _} @@ -34,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTable -import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.util.ExecutionListenerManager @@ -114,6 +110,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { new Analyzer(catalog, conf) { override val extendedResolutionRules = PreInsertCastAndRename :: + new FindDataSourceTable(sparkSession) :: DataSourceAnalysis :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4ef4b4865f..3bbe87adc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2480,4 +2480,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(null) :: Nil ) } + + test("data source table created in InMemoryCatalog should be able to read/write") { + withTable("tbl") { + sql("CREATE TABLE tbl(i INT, j STRING) USING parquet") + checkAnswer(sql("SELECT i, j FROM tbl"), Nil) + + Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl") + checkAnswer(sql("SELECT i, j FROM tbl"), Row(1, "a") :: Row(2, "b") :: Nil) + + Seq(3 -> "c", 4 -> "d").toDF("i", "j").write.mode("append").saveAsTable("tbl") + checkAnswer( + sql("SELECT i, j FROM tbl"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + } + } + } |