aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-08 15:19:26 -0800
committerReynold Xin <rxin@databricks.com>2016-03-08 15:19:26 -0800
commit1e28840594b9d972c96d3922ca0bf0f76e313e82 (patch)
tree5669df597c7c4deea0586830905ddd037eb819a4 /sql/hive
parent076009b94985b586ffa78078cc03abb85e308f06 (diff)
downloadspark-1e28840594b9d972c96d3922ca0bf0f76e313e82.tar.gz
spark-1e28840594b9d972c96d3922ca0bf0f76e313e82.tar.bz2
spark-1e28840594b9d972c96d3922ca0bf0f76e313e82.zip
[SPARK-13738][SQL] Cleanup Data Source resolution
Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`. - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`. Actual resolution is now done by calling a function on this class. - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`. - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places. Author: Michael Armbrust <michael@databricks.com> Closes #11572 from marmbrus/dataSourceResolution.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala24
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala31
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala5
3 files changed, 30 insertions, 30 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 28874189de..8f6cd66f1f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -176,17 +176,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
val options = table.storage.serdeProperties
- val resolvedRelation =
- ResolvedDataSource(
+ val dataSource =
+ DataSource(
hive,
userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = partitionColumns.toArray,
+ partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = table.properties("spark.sql.sources.provider"),
+ className = table.properties("spark.sql.sources.provider"),
options = options)
LogicalRelation(
- resolvedRelation.relation,
+ dataSource.resolveRelation(),
metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
}
}
@@ -283,12 +283,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource =
- ResolvedDataSource(
+ DataSource(
hive,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = provider,
+ className = provider,
options = options)
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
@@ -334,7 +334,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
- val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
case _ if skipHiveMetadata =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
@@ -511,7 +511,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val parquetRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
- val fileCatalog = new HiveFileCatalog(hive, paths, partitionSpec)
+ val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val format = new DefaultSource()
val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())
@@ -541,12 +541,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(
- ResolvedDataSource(
+ DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
options = parquetOptions,
- provider = "parquet").relation)
+ className = "parquet").resolveRelation())
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -749,7 +749,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*/
-class HiveFileCatalog(
+class MetaStoreFileCatalog(
hive: HiveContext,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 37cec6d2ab..7e4fb8b0ac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -148,12 +148,12 @@ case class CreateMetastoreDataSource(
}
// Create the relation to validate the arguments before writing the metadata to the metastore.
- ResolvedDataSource(
+ DataSource(
sqlContext = sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
- provider = provider,
+ className = provider,
bucketSpec = None,
- options = optionsWithPath)
+ options = optionsWithPath).resolveRelation()
hiveContext.catalog.createDataSourceTable(
tableIdent,
@@ -220,15 +220,16 @@ case class CreateMetastoreDataSourceAsSelect(
return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext = sqlContext,
userSpecifiedSchema = Some(query.schema.asNullable),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = provider,
+ className = provider,
options = optionsWithPath)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
+
EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
@@ -248,19 +249,19 @@ case class CreateMetastoreDataSourceAsSelect(
val data = DataFrame(hiveContext, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
- case Some(schema) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, schema)
+ case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
case None => data
}
// Create the relation based on the data of df.
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- provider,
- partitionColumns,
- bucketSpec,
- mode,
- optionsWithPath,
- df)
+ className = provider,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ options = optionsWithPath)
+
+ val result = dataSource.write(mode, df)
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -268,7 +269,7 @@ case class CreateMetastoreDataSourceAsSelect(
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
tableIdent,
- Some(resolved.relation.schema),
+ Some(result.schema),
partitionColumns,
bucketSpec,
provider,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index ad832b5197..041e0fb477 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
-import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -39,7 +38,7 @@ import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -173,7 +172,7 @@ private[orc] class OrcOutputWriter(
}
override def write(row: Row): Unit =
- throw new UnsupportedOperationException("call writeInternal")
+ throw new UnsupportedOperationException("call writeInternal")
private def wrapOrcStruct(
struct: OrcStruct,