aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-03 11:48:05 -0700
committerReynold Xin <rxin@databricks.com>2016-11-03 11:48:05 -0700
commitb17057c0a69b9c56e503483d97f5dc209eef0884 (patch)
treedc36d1f309c2deca27d585c6e4b57e5f2989515a /sql
parent27daf6bcde782ed3e0f0d951c90c8040fd47e985 (diff)
downloadspark-b17057c0a69b9c56e503483d97f5dc209eef0884.tar.gz
spark-b17057c0a69b9c56e503483d97f5dc209eef0884.tar.bz2
spark-b17057c0a69b9c56e503483d97f5dc209eef0884.zip
[SPARK-18244][SQL] Rename partitionProviderIsHive -> tracksPartitionsInCatalog
## What changes were proposed in this pull request? This patch renames partitionProviderIsHive to tracksPartitionsInCatalog, as the old name was too Hive specific. ## How was this patch tested? Should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #15750 from rxin/SPARK-18244.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala9
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala21
10 files changed, 30 insertions, 27 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 7c3bec8979..34748a0485 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -138,8 +138,9 @@ case class BucketSpec(
* Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
- * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
- * metastore.
+ * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
+ * catalog. If false, it is inferred automatically based on file
+ * structure.
*/
case class CatalogTable(
identifier: TableIdentifier,
@@ -158,7 +159,7 @@ case class CatalogTable(
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
- partitionProviderIsHive: Boolean = false) {
+ tracksPartitionsInCatalog: Boolean = false) {
/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
@@ -217,7 +218,7 @@ case class CatalogTable(
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
s"$storage",
- if (partitionProviderIsHive) "Partition Provider: Hive" else "")
+ if (tracksPartitionsInCatalog) "Partition Provider: Catalog" else "")
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 3eff12f9ee..af1eaa1f23 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -489,7 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
"owner" -> "",
"createTime" -> 0,
"lastAccessTime" -> -1,
- "partitionProviderIsHive" -> false,
+ "tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String]))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d4b28274cc..7e16e43f2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -92,7 +92,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// If metastore partition management for file source tables is enabled, we start off with
// partition provider hive, but no partitions in the metastore. The user has to call
// `msck repair table` to populate the table partitions.
- partitionProviderIsHive = partitionColumnNames.nonEmpty &&
+ tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
sparkSession.sessionState.conf.manageFilesourcePartitions)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 52af915b0b..b4d3ca1f37 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -523,7 +523,7 @@ case class AlterTableRecoverPartitionsCommand(
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
- spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
+ spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
catalog.refreshTable(tableName)
logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
@@ -702,7 +702,7 @@ object DDLUtils {
s"$action is not allowed on $tableName since filesource partition management is " +
"disabled (spark.sql.hive.manageFilesourcePartitions = false).")
}
- if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
+ if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) {
throw new AnalysisException(
s"$action is not allowed on $tableName since its partition metadata is not stored in " +
"the Hive metastore. To import this information into the metastore, run " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f32c956f59..00c646b918 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -467,7 +467,7 @@ case class DescribeTableCommand(
if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
- if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) {
+ if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) {
append(buffer, "Partition Provider:", "Hive", "")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0b50448a7a..5266611935 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -308,7 +308,7 @@ case class DataSource(
}
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
- catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
+ catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
new CatalogFileIndex(
sparkSession,
catalogTable.get,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e87998fe4a..a548e88cb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -182,9 +182,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
"Cannot overwrite a path that is also being read from.")
}
- val overwritingSinglePartition = (overwrite.specificPartition.isDefined &&
+ val overwritingSinglePartition =
+ overwrite.specificPartition.isDefined &&
t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
- l.catalogTable.get.partitionProviderIsHive)
+ l.catalogTable.get.tracksPartitionsInCatalog
val effectiveOutputPath = if (overwritingSinglePartition) {
val partition = t.sparkSession.sessionState.catalog.getPartition(
@@ -203,7 +204,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
l.catalogTable.get.partitionColumnNames.nonEmpty &&
- l.catalogTable.get.partitionProviderIsHive) {
+ l.catalogTable.get.tracksPartitionsInCatalog) {
val metastoreUpdater = AlterTableAddPartitionCommand(
l.catalogTable.get.identifier,
updatedPartitions.map(p => (p, None)),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 927c0c5b95..9c75e2ae74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -31,11 +31,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelationCommand]]
- * issues a single write job, and owns a UUID that identifies this job. Each concrete
- * implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate
- * unique file path for each task output file. This UUID is passed to executor side via a
- * property named `spark.sql.sources.writeJobUUID`.
+ * Writing to dynamic partitions is also supported.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index d4d001497d..52b09c5446 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -96,7 +96,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L,
- partitionProviderIsHive = true)
+ tracksPartitionsInCatalog = true)
}
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ebba203ac5..64ba52672b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -323,8 +323,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new scala.collection.mutable.HashMap[String, String]
properties.put(DATASOURCE_PROVIDER, provider)
- if (table.partitionProviderIsHive) {
- properties.put(TABLE_PARTITION_PROVIDER, "hive")
+ if (table.tracksPartitionsInCatalog) {
+ properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
}
// Serialized JSON schema string may be too long to be stored into a single metastore table
@@ -489,10 +489,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation)
}
- val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
- TABLE_PARTITION_PROVIDER -> "hive"
+ val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) {
+ TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG
} else {
- TABLE_PARTITION_PROVIDER -> "builtin"
+ TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM
}
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
@@ -537,7 +537,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table
} else {
getProviderFromTableProperties(table).map { provider =>
- assert(provider != "hive", "Hive serde table should not save provider in table properties.")
+ assert(provider != TABLE_PARTITION_PROVIDER_CATALOG,
+ "Hive serde table should not save provider in table properties.")
// Internally we store the table location in storage properties with key "path" for data
// source tables. Here we set the table location to `locationUri` field and filter out the
// path option in storage properties, to avoid exposing this concept externally.
@@ -545,6 +546,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val tableLocation = getLocationFromStorageProps(table)
updateLocationInStorageProps(table, None).copy(locationUri = tableLocation)
}
+ val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
table.copy(
storage = storageWithLocation,
@@ -552,9 +554,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
provider = Some(provider),
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table),
- partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
+ tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG)
+ )
} getOrElse {
- table.copy(provider = Some("hive"), partitionProviderIsHive = true)
+ table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true)
}
}
@@ -851,6 +854,8 @@ object HiveExternalCatalog {
val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+ val TABLE_PARTITION_PROVIDER_CATALOG = "catalog"
+ val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem"
def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {