aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-05-11 23:55:42 -0700
committerYin Huai <yhuai@databricks.com>2016-05-11 23:55:42 -0700
commit46991448aa6f78f413a761059d7d7bb586f9d63e (patch)
tree1fceadbdb3eb6d38103e1d6dd029a21d12c8cb41 /sql
parent9e266d07a444fd465fe178cdd5c4894cd09cbda3 (diff)
downloadspark-46991448aa6f78f413a761059d7d7bb586f9d63e.tar.gz
spark-46991448aa6f78f413a761059d7d7bb586f9d63e.tar.bz2
spark-46991448aa6f78f413a761059d7d7bb586f9d63e.zip
[SPARK-15160][SQL] support data source table in InMemoryCatalog
## What changes were proposed in this pull request? This PR adds a new rule to convert `SimpleCatalogRelation` to data source table if its table property contains data source information. ## How was this patch tested? new test in SQLQuerySuite Author: Wenchen Fan <wenchen@databricks.com> Closes #12935 from cloud-fan/ds-table.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala76
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
8 files changed, 114 insertions, 67 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index fc2068cac5..d21565526e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -185,6 +185,8 @@ case class SimpleCatalogRelation(
override def catalogTable: CatalogTable = metadata
+ override lazy val resolved: Boolean = false
+
override val output: Seq[Attribute] = {
val cols = catalogTable.schema
.filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index de3c868176..7d3c52570f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe
@@ -200,6 +200,8 @@ case class CreateDataSourceTableAsSelectCommand(
s"doesn't match the data schema[${query.schema}]'s")
}
existingSchema = Some(l.schema)
+ case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+ existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0b0b6185c7..1c1716f050 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
@@ -492,33 +493,28 @@ private[sql] object DDLUtils {
table.properties.contains("spark.sql.sources.schema.numPartCols")
}
- def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
- getSchemaFromTableProperties(metadata.properties)
- }
-
// A persisted data source table may not store its schema in the catalog. In this case, its schema
// will be inferred at runtime when the table is referenced.
- def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = {
- require(isDatasourceTable(props))
+ def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
+ require(isDatasourceTable(metadata))
- val schemaParts = for {
- numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
- index <- 0 until numParts.toInt
- } yield props.getOrElse(
- s"spark.sql.sources.schema.part.$index",
- throw new AnalysisException(
- s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
- )
- )
+ metadata.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
+ val parts = (0 until numParts.toInt).map { index =>
+ val part = metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+ if (part == null) {
+ throw new AnalysisException(
+ "Could not read schema from the metastore because it is corrupted " +
+ s"(missing part $index of the schema, $numParts parts are expected).")
+ }
- if (schemaParts.isEmpty) {
- None
- } else {
- Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType])
+ part
+ }
+ // Stick all parts back to a single schema string.
+ DataType.fromJson(parts.mkString).asInstanceOf[StructType]
}
}
- private def getColumnNamesByTypeFromTableProperties(
+ private def getColumnNamesByType(
props: Map[String, String], colType: String, typeName: String): Seq[String] = {
require(isDatasourceTable(props))
@@ -534,35 +530,19 @@ private[sql] object DDLUtils {
}
def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
- getPartitionColumnsFromTableProperties(metadata.properties)
- }
-
- def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
- getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns")
+ getColumnNamesByType(metadata.properties, "part", "partitioning columns")
}
- def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = {
- getNumBucketFromTableProperties(metadata.properties)
- }
-
- def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = {
- require(isDatasourceTable(props))
- props.get("spark.sql.sources.schema.numBuckets").map(_.toInt)
- }
-
- def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
- getBucketingColumnsFromTableProperties(metadata.properties)
- }
-
- def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
- getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns")
- }
-
- def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
- getSortingColumnsFromTableProperties(metadata.properties)
- }
-
- def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
- getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns")
+ def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
+ if (isDatasourceTable(metadata)) {
+ metadata.properties.get("spark.sql.sources.schema.numBuckets").map { numBuckets =>
+ BucketSpec(
+ numBuckets.toInt,
+ getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
+ getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
+ }
+ } else {
+ None
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e6dcd1ee95..bb4f1ff4f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -385,17 +385,22 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
- if (DDLUtils.isDatasourceTable(metadata)) {
- val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata)
- val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata)
- val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata)
- append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "")
- append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "")
- append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "")
- } else {
- append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
- append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
- append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "")
+ def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = {
+ append(buffer, "Num Buckets:", numBuckets.toString, "")
+ append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "")
+ append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
+ }
+
+ DDLUtils.getBucketSpecFromTableProperties(metadata).map { bucketSpec =>
+ appendBucketInfo(
+ bucketSpec.numBuckets,
+ bucketSpec.bucketColumnNames,
+ bucketSpec.sortColumnNames)
+ }.getOrElse {
+ appendBucketInfo(
+ metadata.numBuckets,
+ metadata.bucketColumnNames,
+ metadata.sortColumnNames)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index bc249f4ed5..0494fafb0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -32,8 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -77,6 +78,48 @@ private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] {
}
}
+
+/**
+ * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
+ * source information.
+ */
+private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+ private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
+ val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
+
+ // We only need names at here since userSpecifiedSchema we loaded from the metastore
+ // contains partition columns. We can always get datatypes of partitioning columns
+ // from userSpecifiedSchema.
+ val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(table)
+
+ val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table)
+
+ val options = table.storage.serdeProperties
+ val dataSource =
+ DataSource(
+ sparkSession,
+ userSpecifiedSchema = userSpecifiedSchema,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ className = table.properties("spark.sql.sources.provider"),
+ options = options)
+
+ LogicalRelation(
+ dataSource.resolveRelation(),
+ metastoreTableIdentifier = Some(table.identifier))
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+ if DDLUtils.isDatasourceTable(s.metadata) =>
+ i.copy(table = readDataSourceTable(sparkSession, s.metadata))
+
+ case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+ readDataSourceTable(sparkSession, s.metadata)
+ }
+}
+
+
/**
* A Strategy for planning scans over data sources defined using the sources API.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ebff756979..f0b8a83dee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -18,14 +18,10 @@
package org.apache.spark.sql.internal
import java.io.File
-import java.util.Properties
-
-import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _}
@@ -34,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.AnalyzeTable
-import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -114,6 +110,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
+ new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 4ef4b4865f..3bbe87adc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2480,4 +2480,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(null) :: Nil
)
}
+
+ test("data source table created in InMemoryCatalog should be able to read/write") {
+ withTable("tbl") {
+ sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
+ checkAnswer(sql("SELECT i, j FROM tbl"), Nil)
+
+ Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
+ checkAnswer(sql("SELECT i, j FROM tbl"), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ Seq(3 -> "c", 4 -> "d").toDF("i", "j").write.mode("append").saveAsTable("tbl")
+ checkAnswer(
+ sql("SELECT i, j FROM tbl"),
+ Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)
+ }
+ }
+
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8cfcec79cd..2f20cde4b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -71,6 +71,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logDebug(s"Creating new cached data source for $in")
val table = client.getTable(in.database, in.name)
+ // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable
+
def schemaStringFromParts: Option[String] = {
table.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
val parts = (0 until numParts.toInt).map { index =>