From 1e28840594b9d972c96d3922ca0bf0f76e313e82 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 8 Mar 2016 15:19:26 -0800 Subject: [SPARK-13738][SQL] Cleanup Data Source resolution Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`. - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`. Actual resolution is now done by calling a function on this class. - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`. - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places. Author: Michael Armbrust Closes #11572 from marmbrus/dataSourceResolution. --- .../org/apache/spark/sql/DataFrameReader.scala | 34 +- .../org/apache/spark/sql/DataFrameWriter.scala | 29 +- .../sql/execution/datasources/DataSource.scala | 338 +++++++++++++++++++ .../execution/datasources/PartitioningUtils.scala | 24 +- .../execution/datasources/ResolvedDataSource.scala | 360 --------------------- .../spark/sql/execution/datasources/ddl.scala | 21 +- .../spark/sql/execution/datasources/rules.scala | 14 +- .../org/apache/spark/sql/sources/interfaces.scala | 2 +- .../sql/execution/datasources/json/JsonSuite.scala | 14 +- .../sql/sources/ResolvedDataSourceSuite.scala | 28 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 24 +- .../apache/spark/sql/hive/execution/commands.scala | 31 +- .../apache/spark/sql/hive/orc/OrcRelation.scala | 5 +- 13 files changed, 462 insertions(+), 462 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index fd92e526e1..509b29956f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.LogicalRDD -import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions} import org.apache.spark.sql.execution.streaming.StreamingRelation @@ -122,12 +122,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def load(): DataFrame = { - val resolved = ResolvedDataSource( - sqlContext, - userSpecifiedSchema = userSpecifiedSchema, - provider = source, - options = extraOptions.toMap) - DataFrame(sqlContext, LogicalRelation(resolved.relation)) + val dataSource = + DataSource( + sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap) + DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) } /** @@ -152,12 +153,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { sqlContext.emptyDataFrame } else { sqlContext.baseRelationToDataFrame( - ResolvedDataSource.apply( + DataSource.apply( sqlContext, paths = paths, userSpecifiedSchema = userSpecifiedSchema, - provider = source, - options = extraOptions.toMap).relation) + className = source, + options = extraOptions.toMap).resolveRelation()) } } @@ -168,12 +169,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 2.0.0 */ def stream(): DataFrame = { - val resolved = ResolvedDataSource.createSource( - sqlContext, - userSpecifiedSchema = userSpecifiedSchema, - providerName = source, - options = extraOptions.toMap) - DataFrame(sqlContext, StreamingRelation(resolved)) + val dataSource = + DataSource( + sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap) + DataFrame(sqlContext, StreamingRelation(dataSource.createSource())) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6d8c8f6b4f..78f30f4139 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource} +import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.HadoopFsRelation @@ -195,14 +195,14 @@ final class DataFrameWriter private[sql](df: DataFrame) { */ def save(): Unit = { assertNotBucketed() - ResolvedDataSource( + val dataSource = DataSource( df.sqlContext, - source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), - getBucketSpec, - mode, - extraOptions.toMap, - df) + className = source, + partitionColumns = partitioningColumns.getOrElse(Nil), + bucketSpec = getBucketSpec, + options = extraOptions.toMap) + + dataSource.write(mode, df) } /** @@ -235,14 +235,15 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 2.0.0 */ def stream(): ContinuousQuery = { - val sink = ResolvedDataSource.createSink( - df.sqlContext, - source, - extraOptions.toMap, - normalizedParCols.getOrElse(Nil)) + val dataSource = + DataSource( + df.sqlContext, + className = source, + options = extraOptions.toMap, + partitionColumns = normalizedParCols.getOrElse(Nil)) df.sqlContext.continuousQueryManager.startQuery( - extraOptions.getOrElse("queryName", StreamExecution.nextName), df, sink) + extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink()) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala new file mode 100644 index 0000000000..e90e72dc8c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -0,0 +1,338 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.datasources + +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ +import scala.language.{existentials, implicitConversions} +import scala.util.{Failure, Success, Try} + +import org.apache.hadoop.fs.Path + +import org.apache.spark.Logging +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{CalendarIntervalType, StructType} +import org.apache.spark.util.Utils + +/** + * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to + * acting as the canonical set of parameters that can describe a Data Source, this class is used to + * resolve a description to a concrete implementation that can be used in a query plan + * (either batch or streaming) or to write out data using an external library. + * + * From an end user's perspective a DataSource description can be created explicitly using + * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL. Additionally, this class is + * used when resolving a description from a metastore to a concrete implementation. + * + * Many of the arguments to this class are optional, though depending on the specific API being used + * these optional arguments might be filled in during resolution using either inference or external + * metadata. For example, when reading a partitioned table from a file system, partition columns + * will be inferred from the directory layout even if they are not specified. + * + * @param paths A list of file system paths that hold data. These will be globbed before and + * qualified. This option only works when reading from a [[FileFormat]]. + * @param userSpecifiedSchema An optional specification of the schema of the data. When present + * we skip attempting to infer the schema. + * @param partitionColumns A list of column names that the relation is partitioned by. When this + * list is empty, the relation is unpartitioned. + * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data. + */ +case class DataSource( + sqlContext: SQLContext, + className: String, + paths: Seq[String] = Nil, + userSpecifiedSchema: Option[StructType] = None, + partitionColumns: Seq[String] = Seq.empty, + bucketSpec: Option[BucketSpec] = None, + options: Map[String, String] = Map.empty) extends Logging { + + lazy val providingClass: Class[_] = lookupDataSource(className) + + /** A map to maintain backward compatibility in case we move data sources around. */ + private val backwardCompatibilityMap = Map( + "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName, + "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName, + "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName, + "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName, + "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName, + "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName + ) + + /** Given a provider name, look up the data source class definition. */ + private def lookupDataSource(provider0: String): Class[_] = { + val provider = backwardCompatibilityMap.getOrElse(provider0, provider0) + val provider2 = s"$provider.DefaultSource" + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) + + serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { + // the provider format did not match any given registered aliases + case Nil => + Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { + case Success(dataSource) => + // Found the data source using fully qualified path + dataSource + case Failure(error) => + if (provider.startsWith("org.apache.spark.sql.hive.orc")) { + throw new ClassNotFoundException( + "The ORC data source must be used with Hive support enabled.", error) + } else { + if (provider == "avro" || provider == "com.databricks.spark.avro") { + throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro", + error) + } else { + throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please find packages at " + + "http://spark-packages.org", + error) + } + } + } + case head :: Nil => + // there is exactly one registered alias + head.getClass + case sources => + // There are multiple registered aliases for the input + sys.error(s"Multiple sources found for $provider " + + s"(${sources.map(_.getClass.getName).mkString(", ")}), " + + "please specify the fully qualified class name.") + } + } + + /** Returns a source that can be used to continually read data. */ + def createSource(): Source = { + providingClass.newInstance() match { + case s: StreamSourceProvider => + s.createSource(sqlContext, userSpecifiedSchema, className, options) + + case format: FileFormat => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val path = caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + }) + val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata") + + val allPaths = caseInsensitiveOptions.get("path") + val globbedPaths = allPaths.toSeq.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + + val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths) + val dataSchema = userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException("Unable to infer schema. It must be specified manually.") + } + + def dataFrameBuilder(files: Array[String]): DataFrame = { + new DataFrame( + sqlContext, + LogicalRelation( + DataSource( + sqlContext, + paths = files, + userSpecifiedSchema = Some(dataSchema), + className = className, + options = options.filterKeys(_ != "path")).resolveRelation())) + } + + new FileStreamSource( + sqlContext, metadataPath, path, Some(dataSchema), className, dataFrameBuilder) + case _ => + throw new UnsupportedOperationException( + s"Data source $className does not support streamed reading") + } + } + + /** Returns a sink that can be used to continually write data. */ + def createSink(): Sink = { + val datasourceClass = providingClass.newInstance() match { + case s: StreamSinkProvider => s + case _ => + throw new UnsupportedOperationException( + s"Data source $className does not support streamed writing") + } + + datasourceClass.createSink(sqlContext, options, partitionColumns) + } + + /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */ + def resolveRelation(): BaseRelation = { + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val relation = (providingClass.newInstance(), userSpecifiedSchema) match { + // TODO: Throw when too much is given. + case (dataSource: SchemaRelationProvider, Some(schema)) => + dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema) + case (dataSource: RelationProvider, None) => + dataSource.createRelation(sqlContext, caseInsensitiveOptions) + case (_: SchemaRelationProvider, None) => + throw new AnalysisException(s"A schema needs to be specified when using $className.") + case (_: RelationProvider, Some(_)) => + throw new AnalysisException(s"$className does not allow user-specified schemas.") + + case (format: FileFormat, _) => + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val globbedPaths = allPaths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + + val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths) + val dataSchema = userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " + + "It must be specified manually") + } + + // If they gave a schema, then we try and figure out the types of the partition columns + // from that schema. + val partitionSchema = userSpecifiedSchema.map { schema => + StructType( + partitionColumns.map { c => + // TODO: Case sensitivity. + schema + .find(_.name.toLowerCase() == c.toLowerCase()) + .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'")) + }) + }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns) + + HadoopFsRelation( + sqlContext, + fileCatalog, + partitionSchema = partitionSchema, + dataSchema = dataSchema.asNullable, + bucketSpec = bucketSpec, + format, + options) + + case _ => + throw new AnalysisException( + s"$className is not a valid Spark SQL Data Source.") + } + + relation + } + + /** Writes the give [[DataFrame]] out to this [[DataSource]]. */ + def write( + mode: SaveMode, + data: DataFrame): BaseRelation = { + if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } + + providingClass.newInstance() match { + case dataSource: CreatableRelationProvider => + dataSource.createRelation(sqlContext, mode, options, data) + case format: FileFormat => + // Don't glob path for the write path. The contracts here are: + // 1. Only one output path can be specified on the write path; + // 2. Output path must be a legal HDFS style file system path; + // 3. It's OK that the output path doesn't exist yet; + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val outputPath = { + val path = new Path(caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + })) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) + } + + val caseSensitive = sqlContext.conf.caseSensitiveAnalysis + PartitioningUtils.validatePartitionColumnDataTypes( + data.schema, partitionColumns, caseSensitive) + + val equality = + if (sqlContext.conf.caseSensitiveAnalysis) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution + } else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution + } + + val dataSchema = StructType( + data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) + + // If we are appending to a table that already exists, make sure the partitioning matches + // up. If we fail to load the table for whatever reason, ignore the check. + if (mode == SaveMode.Append) { + val existingPartitionColumnSet = try { + Some( + resolveRelation() + .asInstanceOf[HadoopFsRelation] + .location + .partitionSpec(None) + .partitionColumns + .fieldNames + .toSet) + } catch { + case e: Exception => + None + } + + existingPartitionColumnSet.foreach { ex => + if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { + throw new AnalysisException( + s"Requested partitioning does not equal existing partitioning: " + + s"$ex != ${partitionColumns.toSet}.") + } + } + } + + // For partitioned relation r, r.schema's column ordering can be different from the column + // ordering of data.logicalPlan (partition columns are all moved after data column). This + // will be adjusted within InsertIntoHadoopFsRelation. + val plan = + InsertIntoHadoopFsRelation( + outputPath, + partitionColumns.map(UnresolvedAttribute.quoted), + bucketSpec, + format, + () => Unit, // No existing table needs to be refreshed. + options, + data.logicalPlan, + mode) + sqlContext.executePlan(plan).toRdd + + case _ => + sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") + } + + // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. + copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index eda3c36674..c3f8d7f75a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -335,10 +335,10 @@ private[sql] object PartitioningUtils { def validatePartitionColumnDataTypes( schema: StructType, - partitionColumns: Array[String], + partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { - ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { + partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { case _: AtomicType => // OK case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") @@ -346,6 +346,26 @@ private[sql] object PartitioningUtils { } } + def partitionColumnsSchema( + schema: StructType, + partitionColumns: Seq[String], + caseSensitive: Boolean): StructType = { + val equality = columnNameEquality(caseSensitive) + StructType(partitionColumns.map { col => + schema.find(f => equality(f.name, col)).getOrElse { + throw new RuntimeException(s"Partition column $col not found in schema $schema") + } + }).asNullable + } + + private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { + if (caseSensitive) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution + } else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution + } + } + /** * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala deleted file mode 100644 index 8dd975ed41..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ /dev/null @@ -1,360 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.execution.datasources - -import java.util.ServiceLoader - -import scala.collection.JavaConverters._ -import scala.language.{existentials, implicitConversions} -import scala.util.{Failure, Success, Try} - -import org.apache.hadoop.fs.Path - -import org.apache.spark.Logging -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{CalendarIntervalType, StructType} -import org.apache.spark.util.Utils - -case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) - -/** - * Responsible for taking a description of a datasource (either from - * [[org.apache.spark.sql.DataFrameReader]], or a metastore) and converting it into a logical - * relation that can be used in a query plan. - */ -object ResolvedDataSource extends Logging { - - /** A map to maintain backward compatibility in case we move data sources around. */ - private val backwardCompatibilityMap = Map( - "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName, - "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName, - "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName, - "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName, - "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName, - "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName - ) - - /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider0: String): Class[_] = { - val provider = backwardCompatibilityMap.getOrElse(provider0, provider0) - val provider2 = s"$provider.DefaultSource" - val loader = Utils.getContextOrSparkClassLoader - val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) - - serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { - // the provider format did not match any given registered aliases - case Nil => - Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { - case Success(dataSource) => - // Found the data source using fully qualified path - dataSource - case Failure(error) => - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { - throw new ClassNotFoundException( - "The ORC data source must be used with Hive support enabled.", error) - } else { - if (provider == "avro" || provider == "com.databricks.spark.avro") { - throw new ClassNotFoundException( - s"Failed to find data source: $provider. Please use Spark package " + - "http://spark-packages.org/package/databricks/spark-avro", - error) - } else { - throw new ClassNotFoundException( - s"Failed to find data source: $provider. Please find packages at " + - "http://spark-packages.org", - error) - } - } - } - case head :: Nil => - // there is exactly one registered alias - head.getClass - case sources => - // There are multiple registered aliases for the input - sys.error(s"Multiple sources found for $provider " + - s"(${sources.map(_.getClass.getName).mkString(", ")}), " + - "please specify the fully qualified class name.") - } - } - - // TODO: Combine with apply? - def createSource( - sqlContext: SQLContext, - userSpecifiedSchema: Option[StructType], - providerName: String, - options: Map[String, String]): Source = { - val provider = lookupDataSource(providerName).newInstance() match { - case s: StreamSourceProvider => - s.createSource(sqlContext, userSpecifiedSchema, providerName, options) - - case format: FileFormat => - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val path = caseInsensitiveOptions.getOrElse("path", { - throw new IllegalArgumentException("'path' is not specified") - }) - val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata") - - val allPaths = caseInsensitiveOptions.get("path") - val globbedPaths = allPaths.toSeq.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray - - val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths) - val dataSchema = userSpecifiedSchema.orElse { - format.inferSchema( - sqlContext, - caseInsensitiveOptions, - fileCatalog.allFiles()) - }.getOrElse { - throw new AnalysisException("Unable to infer schema. It must be specified manually.") - } - - def dataFrameBuilder(files: Array[String]): DataFrame = { - new DataFrame( - sqlContext, - LogicalRelation( - apply( - sqlContext, - paths = files, - userSpecifiedSchema = Some(dataSchema), - provider = providerName, - options = options.filterKeys(_ != "path")).relation)) - } - - new FileStreamSource( - sqlContext, metadataPath, path, Some(dataSchema), providerName, dataFrameBuilder) - case _ => - throw new UnsupportedOperationException( - s"Data source $providerName does not support streamed reading") - } - - provider - } - - def createSink( - sqlContext: SQLContext, - providerName: String, - options: Map[String, String], - partitionColumns: Seq[String]): Sink = { - val provider = lookupDataSource(providerName).newInstance() match { - case s: StreamSinkProvider => s - case _ => - throw new UnsupportedOperationException( - s"Data source $providerName does not support streamed writing") - } - - provider.createSink(sqlContext, options, partitionColumns) - } - - /** Create a [[ResolvedDataSource]] for reading data in. */ - def apply( - sqlContext: SQLContext, - paths: Seq[String] = Nil, - userSpecifiedSchema: Option[StructType] = None, - partitionColumns: Array[String] = Array.empty, - bucketSpec: Option[BucketSpec] = None, - provider: String, - options: Map[String, String]): ResolvedDataSource = { - val clazz: Class[_] = lookupDataSource(provider) - def className: String = clazz.getCanonicalName - - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val relation = (clazz.newInstance(), userSpecifiedSchema) match { - // TODO: Throw when too much is given. - case (dataSource: SchemaRelationProvider, Some(schema)) => - dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema) - case (dataSource: RelationProvider, None) => - dataSource.createRelation(sqlContext, caseInsensitiveOptions) - case (_: SchemaRelationProvider, None) => - throw new AnalysisException(s"A schema needs to be specified when using $className.") - case (_: RelationProvider, Some(_)) => - throw new AnalysisException(s"$className does not allow user-specified schemas.") - - case (format: FileFormat, _) => - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val globbedPaths = allPaths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray - - val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths) - val dataSchema = userSpecifiedSchema.orElse { - format.inferSchema( - sqlContext, - caseInsensitiveOptions, - fileCatalog.allFiles()) - }.getOrElse { - throw new AnalysisException( - s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " + - "It must be specified manually") - } - - // If they gave a schema, then we try and figure out the types of the partition columns - // from that schema. - val partitionSchema = userSpecifiedSchema.map { schema => - StructType( - partitionColumns.map { c => - // TODO: Case sensitivity. - schema - .find(_.name.toLowerCase() == c.toLowerCase()) - .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'")) - }) - }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns) - - HadoopFsRelation( - sqlContext, - fileCatalog, - partitionSchema = partitionSchema, - dataSchema = dataSchema.asNullable, - bucketSpec = bucketSpec, - format, - options) - - case _ => - throw new AnalysisException( - s"$className is not a valid Spark SQL Data Source.") - } - new ResolvedDataSource(clazz, relation) - } - - def partitionColumnsSchema( - schema: StructType, - partitionColumns: Array[String], - caseSensitive: Boolean): StructType = { - val equality = columnNameEquality(caseSensitive) - StructType(partitionColumns.map { col => - schema.find(f => equality(f.name, col)).getOrElse { - throw new RuntimeException(s"Partition column $col not found in schema $schema") - } - }).asNullable - } - - private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { - if (caseSensitive) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } - } - - /** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */ - def apply( - sqlContext: SQLContext, - provider: String, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - mode: SaveMode, - options: Map[String, String], - data: DataFrame): ResolvedDataSource = { - if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } - val clazz: Class[_] = lookupDataSource(provider) - clazz.newInstance() match { - case dataSource: CreatableRelationProvider => - dataSource.createRelation(sqlContext, mode, options, data) - case format: FileFormat => - // Don't glob path for the write path. The contracts here are: - // 1. Only one output path can be specified on the write path; - // 2. Output path must be a legal HDFS style file system path; - // 3. It's OK that the output path doesn't exist yet; - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val outputPath = { - val path = new Path(caseInsensitiveOptions.getOrElse("path", { - throw new IllegalArgumentException("'path' is not specified") - })) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) - } - - val caseSensitive = sqlContext.conf.caseSensitiveAnalysis - PartitioningUtils.validatePartitionColumnDataTypes( - data.schema, partitionColumns, caseSensitive) - - val equality = columnNameEquality(caseSensitive) - val dataSchema = StructType( - data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) - - // If we are appending to a table that already exists, make sure the partitioning matches - // up. If we fail to load the table for whatever reason, ignore the check. - if (mode == SaveMode.Append) { - val existingPartitionColumnSet = try { - val resolved = apply( - sqlContext, - userSpecifiedSchema = Some(data.schema.asNullable), - provider = provider, - options = options) - - Some(resolved.relation - .asInstanceOf[HadoopFsRelation] - .location - .partitionSpec(None) - .partitionColumns - .fieldNames - .toSet) - } catch { - case e: Exception => - None - } - - existingPartitionColumnSet.foreach { ex => - if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { - throw new AnalysisException( - s"Requested partitioning does not equal existing partitioning: " + - s"$ex != ${partitionColumns.toSet}.") - } - } - } - - // For partitioned relation r, r.schema's column ordering can be different from the column - // ordering of data.logicalPlan (partition columns are all moved after data column). This - // will be adjusted within InsertIntoHadoopFsRelation. - val plan = - InsertIntoHadoopFsRelation( - outputPath, - partitionColumns.map(UnresolvedAttribute.quoted), - bucketSpec, - format, - () => Unit, // No existing table needs to be refreshed. - options, - data.logicalPlan, - mode) - sqlContext.executePlan(plan).toRdd - - case _ => - sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") - } - - apply( - sqlContext, - userSpecifiedSchema = Some(data.schema.asNullable), - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = options) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 3d7c6a6a5e..895794c4c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.types._ /** * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. + * * @param table The table to be described. * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. * It is effective only when the table is a Hive table. @@ -50,6 +51,7 @@ case class DescribeCommand( /** * Used to represent the operation of create table using a data source. + * * @param allowExisting If it is true, we will do nothing when the table already exists. * If it is false, an exception will be thrown */ @@ -91,14 +93,14 @@ case class CreateTempTableUsing( options: Map[String, String]) extends RunnableCommand { def run(sqlContext: SQLContext): Seq[Row] = { - val resolved = ResolvedDataSource( + val dataSource = DataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, - provider = provider, + className = provider, options = options) sqlContext.catalog.registerTable( tableIdent, - DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) + DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) Seq.empty[Row] } @@ -114,17 +116,16 @@ case class CreateTempTableUsingAsSelect( override def run(sqlContext: SQLContext): Seq[Row] = { val df = DataFrame(sqlContext, query) - val resolved = ResolvedDataSource( + val dataSource = DataSource( sqlContext, - provider, - partitionColumns, + className = provider, + partitionColumns = partitionColumns, bucketSpec = None, - mode, - options, - df) + options = options) + val result = dataSource.write(mode, df) sqlContext.catalog.registerTable( tableIdent, - DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) + DataFrame(sqlContext, LogicalRelation(result)).logicalPlan) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 0eae34614c..63f0e4f8c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -32,15 +32,11 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedRelation if u.tableIdentifier.database.isDefined => try { - val resolved = ResolvedDataSource( + val dataSource = DataSource( sqlContext, - paths = Seq.empty, - userSpecifiedSchema = None, - partitionColumns = Array(), - bucketSpec = None, - provider = u.tableIdentifier.database.get, - options = Map("path" -> u.tableIdentifier.table)) - val plan = LogicalRelation(resolved.relation) + paths = u.tableIdentifier.table :: Nil, + className = u.tableIdentifier.database.get) + val plan = LogicalRelation(dataSource.resolveRelation()) u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan) } catch { case e: ClassNotFoundException => u @@ -143,7 +139,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } PartitioningUtils.validatePartitionColumnDataTypes( - r.schema, part.keySet.toArray, catalog.conf.caseSensitiveAnalysis) + r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis) // Get all input data source relations of the query. val srcRelations = query.collect { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 12512a8312..60b0c64c7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -422,7 +422,7 @@ case class HadoopFsRelation( } /** - * Used to read a write data in files to [[InternalRow]] format. + * Used to read and write data stored in files to/from the [[InternalRow]] format. */ trait FileFormat { /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2f17037a58..02b173d30a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -32,7 +32,7 @@ import org.scalactic.Tolerance._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -1178,21 +1178,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { sparkContext.parallelize(1 to 100) .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) - val d1 = ResolvedDataSource( + val d1 = DataSource( sqlContext, userSpecifiedSchema = None, partitionColumns = Array.empty[String], bucketSpec = None, - provider = classOf[DefaultSource].getCanonicalName, - options = Map("path" -> path)) + className = classOf[DefaultSource].getCanonicalName, + options = Map("path" -> path)).resolveRelation() - val d2 = ResolvedDataSource( + val d2 = DataSource( sqlContext, userSpecifiedSchema = None, partitionColumns = Array.empty[String], bucketSpec = None, - provider = classOf[DefaultSource].getCanonicalName, - options = Map("path" -> path)) + className = classOf[DefaultSource].getCanonicalName, + options = Map("path" -> path)).resolveRelation() assert(d1 === d2) }) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index cb6e5179b3..94d032f4ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -18,59 +18,61 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.datasources.ResolvedDataSource +import org.apache.spark.sql.execution.datasources.DataSource class ResolvedDataSourceSuite extends SparkFunSuite { + private def getProvidingClass(name: String): Class[_] = + DataSource(sqlContext = null, className = name).providingClass test("jdbc") { assert( - ResolvedDataSource.lookupDataSource("jdbc") === + getProvidingClass("jdbc") === classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) assert( - ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.jdbc") === + getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") === classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) assert( - ResolvedDataSource.lookupDataSource("org.apache.spark.sql.jdbc") === + getProvidingClass("org.apache.spark.sql.jdbc") === classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) } test("json") { assert( - ResolvedDataSource.lookupDataSource("json") === + getProvidingClass("json") === classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource]) assert( - ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.json") === + getProvidingClass("org.apache.spark.sql.execution.datasources.json") === classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource]) assert( - ResolvedDataSource.lookupDataSource("org.apache.spark.sql.json") === + getProvidingClass("org.apache.spark.sql.json") === classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource]) } test("parquet") { assert( - ResolvedDataSource.lookupDataSource("parquet") === + getProvidingClass("parquet") === classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource]) assert( - ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.parquet") === + getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") === classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource]) assert( - ResolvedDataSource.lookupDataSource("org.apache.spark.sql.parquet") === + getProvidingClass("org.apache.spark.sql.parquet") === classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource]) } test("error message for unknown data sources") { val error1 = intercept[ClassNotFoundException] { - ResolvedDataSource.lookupDataSource("avro") + getProvidingClass("avro") } assert(error1.getMessage.contains("spark-packages")) val error2 = intercept[ClassNotFoundException] { - ResolvedDataSource.lookupDataSource("com.databricks.spark.avro") + getProvidingClass("com.databricks.spark.avro") } assert(error2.getMessage.contains("spark-packages")) val error3 = intercept[ClassNotFoundException] { - ResolvedDataSource.lookupDataSource("asfdwefasdfasdf") + getProvidingClass("asfdwefasdfasdf") } assert(error3.getMessage.contains("spark-packages")) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 28874189de..8f6cd66f1f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -176,17 +176,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } val options = table.storage.serdeProperties - val resolvedRelation = - ResolvedDataSource( + val dataSource = + DataSource( hive, userSpecifiedSchema = userSpecifiedSchema, - partitionColumns = partitionColumns.toArray, + partitionColumns = partitionColumns, bucketSpec = bucketSpec, - provider = table.properties("spark.sql.sources.provider"), + className = table.properties("spark.sql.sources.provider"), options = options) LogicalRelation( - resolvedRelation.relation, + dataSource.resolveRelation(), metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database)))) } } @@ -283,12 +283,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf) val dataSource = - ResolvedDataSource( + DataSource( hive, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = partitionColumns, bucketSpec = bucketSpec, - provider = provider, + className = provider, options = options) def newSparkSQLSpecificMetastoreTable(): CatalogTable = { @@ -334,7 +334,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // TODO: Support persisting partitioned data source relations in Hive compatible format val qualifiedTableName = tableIdent.quotedString val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean - val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match { + val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match { case _ if skipHiveMetadata => val message = s"Persisting partitioned data source relation $qualifiedTableName into " + @@ -511,7 +511,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val parquetRelation = cached.getOrElse { val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil - val fileCatalog = new HiveFileCatalog(hive, paths, partitionSpec) + val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) val format = new DefaultSource() val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles()) @@ -541,12 +541,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val parquetRelation = cached.getOrElse { val created = LogicalRelation( - ResolvedDataSource( + DataSource( sqlContext = hive, paths = paths, userSpecifiedSchema = Some(metastoreRelation.schema), options = parquetOptions, - provider = "parquet").relation) + className = "parquet").resolveRelation()) cachedDataSourceTables.put(tableIdentifier, created) created @@ -749,7 +749,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte * An override of the standard HDFS listing based catalog, that overrides the partition spec with * the information from the metastore. */ -class HiveFileCatalog( +class MetaStoreFileCatalog( hive: HiveContext, paths: Seq[Path], partitionSpecFromHive: PartitionSpec) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 37cec6d2ab..7e4fb8b0ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -148,12 +148,12 @@ case class CreateMetastoreDataSource( } // Create the relation to validate the arguments before writing the metadata to the metastore. - ResolvedDataSource( + DataSource( sqlContext = sqlContext, userSpecifiedSchema = userSpecifiedSchema, - provider = provider, + className = provider, bucketSpec = None, - options = optionsWithPath) + options = optionsWithPath).resolveRelation() hiveContext.catalog.createDataSourceTable( tableIdent, @@ -220,15 +220,16 @@ case class CreateMetastoreDataSourceAsSelect( return Seq.empty[Row] case SaveMode.Append => // Check if the specified data source match the data source of the existing table. - val resolved = ResolvedDataSource( + val dataSource = DataSource( sqlContext = sqlContext, userSpecifiedSchema = Some(query.schema.asNullable), partitionColumns = partitionColumns, bucketSpec = bucketSpec, - provider = provider, + className = provider, options = optionsWithPath) // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). + EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => existingSchema = Some(l.schema) @@ -248,19 +249,19 @@ case class CreateMetastoreDataSourceAsSelect( val data = DataFrame(hiveContext, query) val df = existingSchema match { // If we are inserting into an existing table, just use the existing schema. - case Some(schema) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, schema) + case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s) case None => data } // Create the relation based on the data of df. - val resolved = ResolvedDataSource( + val dataSource = DataSource( sqlContext, - provider, - partitionColumns, - bucketSpec, - mode, - optionsWithPath, - df) + className = provider, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + options = optionsWithPath) + + val result = dataSource.write(mode, df) if (createMetastoreTable) { // We will use the schema of resolved.relation as the schema of the table (instead of @@ -268,7 +269,7 @@ case class CreateMetastoreDataSourceAsSelect( // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). hiveContext.catalog.createDataSourceTable( tableIdent, - Some(resolved.relation.schema), + Some(result.schema), partitionColumns, bucketSpec, provider, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index ad832b5197..041e0fb477 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.orc import java.util.Properties -import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -39,7 +38,7 @@ import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim} +import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -173,7 +172,7 @@ private[orc] class OrcOutputWriter( } override def write(row: Row): Unit = - throw new UnsupportedOperationException("call writeInternal") + throw new UnsupportedOperationException("call writeInternal") private def wrapOrcStruct( struct: OrcStruct, -- cgit v1.2.3