aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-08 15:19:26 -0800
committerReynold Xin <rxin@databricks.com>2016-03-08 15:19:26 -0800
commit1e28840594b9d972c96d3922ca0bf0f76e313e82 (patch)
tree5669df597c7c4deea0586830905ddd037eb819a4 /sql
parent076009b94985b586ffa78078cc03abb85e308f06 (diff)
downloadspark-1e28840594b9d972c96d3922ca0bf0f76e313e82.tar.gz
spark-1e28840594b9d972c96d3922ca0bf0f76e313e82.tar.bz2
spark-1e28840594b9d972c96d3922ca0bf0f76e313e82.zip
[SPARK-13738][SQL] Cleanup Data Source resolution
Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`. - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`. Actual resolution is now done by calling a function on this class. - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`. - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places. Author: Michael Armbrust <michael@databricks.com> Closes #11572 from marmbrus/dataSourceResolution.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala)168
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala28
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala24
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala31
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala5
12 files changed, 197 insertions, 197 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fd92e526e1..509b29956f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.LogicalRDD
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.streaming.StreamingRelation
@@ -122,12 +122,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def load(): DataFrame = {
- val resolved = ResolvedDataSource(
- sqlContext,
- userSpecifiedSchema = userSpecifiedSchema,
- provider = source,
- options = extraOptions.toMap)
- DataFrame(sqlContext, LogicalRelation(resolved.relation))
+ val dataSource =
+ DataSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
}
/**
@@ -152,12 +153,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
sqlContext.emptyDataFrame
} else {
sqlContext.baseRelationToDataFrame(
- ResolvedDataSource.apply(
+ DataSource.apply(
sqlContext,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
- provider = source,
- options = extraOptions.toMap).relation)
+ className = source,
+ options = extraOptions.toMap).resolveRelation())
}
}
@@ -168,12 +169,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 2.0.0
*/
def stream(): DataFrame = {
- val resolved = ResolvedDataSource.createSource(
- sqlContext,
- userSpecifiedSchema = userSpecifiedSchema,
- providerName = source,
- options = extraOptions.toMap)
- DataFrame(sqlContext, StreamingRelation(resolved))
+ val dataSource =
+ DataSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6d8c8f6b4f..78f30f4139 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -195,14 +195,14 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
def save(): Unit = {
assertNotBucketed()
- ResolvedDataSource(
+ val dataSource = DataSource(
df.sqlContext,
- source,
- partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
- getBucketSpec,
- mode,
- extraOptions.toMap,
- df)
+ className = source,
+ partitionColumns = partitioningColumns.getOrElse(Nil),
+ bucketSpec = getBucketSpec,
+ options = extraOptions.toMap)
+
+ dataSource.write(mode, df)
}
/**
@@ -235,14 +235,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
def stream(): ContinuousQuery = {
- val sink = ResolvedDataSource.createSink(
- df.sqlContext,
- source,
- extraOptions.toMap,
- normalizedParCols.getOrElse(Nil))
+ val dataSource =
+ DataSource(
+ df.sqlContext,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
df.sqlContext.continuousQueryManager.startQuery(
- extraOptions.getOrElse("queryName", StreamExecution.nextName), df, sink)
+ extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink())
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 8dd975ed41..e90e72dc8c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -34,14 +34,39 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
-case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
-
/**
- * Responsible for taking a description of a datasource (either from
- * [[org.apache.spark.sql.DataFrameReader]], or a metastore) and converting it into a logical
- * relation that can be used in a query plan.
+ * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
+ * acting as the canonical set of parameters that can describe a Data Source, this class is used to
+ * resolve a description to a concrete implementation that can be used in a query plan
+ * (either batch or streaming) or to write out data using an external library.
+ *
+ * From an end user's perspective a DataSource description can be created explicitly using
+ * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL. Additionally, this class is
+ * used when resolving a description from a metastore to a concrete implementation.
+ *
+ * Many of the arguments to this class are optional, though depending on the specific API being used
+ * these optional arguments might be filled in during resolution using either inference or external
+ * metadata. For example, when reading a partitioned table from a file system, partition columns
+ * will be inferred from the directory layout even if they are not specified.
+ *
+ * @param paths A list of file system paths that hold data. These will be globbed before and
+ * qualified. This option only works when reading from a [[FileFormat]].
+ * @param userSpecifiedSchema An optional specification of the schema of the data. When present
+ * we skip attempting to infer the schema.
+ * @param partitionColumns A list of column names that the relation is partitioned by. When this
+ * list is empty, the relation is unpartitioned.
+ * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
*/
-object ResolvedDataSource extends Logging {
+case class DataSource(
+ sqlContext: SQLContext,
+ className: String,
+ paths: Seq[String] = Nil,
+ userSpecifiedSchema: Option[StructType] = None,
+ partitionColumns: Seq[String] = Seq.empty,
+ bucketSpec: Option[BucketSpec] = None,
+ options: Map[String, String] = Map.empty) extends Logging {
+
+ lazy val providingClass: Class[_] = lookupDataSource(className)
/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap = Map(
@@ -54,7 +79,7 @@ object ResolvedDataSource extends Logging {
)
/** Given a provider name, look up the data source class definition. */
- def lookupDataSource(provider0: String): Class[_] = {
+ private def lookupDataSource(provider0: String): Class[_] = {
val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
val provider2 = s"$provider.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
@@ -96,15 +121,11 @@ object ResolvedDataSource extends Logging {
}
}
- // TODO: Combine with apply?
- def createSource(
- sqlContext: SQLContext,
- userSpecifiedSchema: Option[StructType],
- providerName: String,
- options: Map[String, String]): Source = {
- val provider = lookupDataSource(providerName).newInstance() match {
+ /** Returns a source that can be used to continually read data. */
+ def createSource(): Source = {
+ providingClass.newInstance() match {
case s: StreamSourceProvider =>
- s.createSource(sqlContext, userSpecifiedSchema, providerName, options)
+ s.createSource(sqlContext, userSpecifiedSchema, className, options)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
@@ -135,53 +156,38 @@ object ResolvedDataSource extends Logging {
new DataFrame(
sqlContext,
LogicalRelation(
- apply(
+ DataSource(
sqlContext,
paths = files,
userSpecifiedSchema = Some(dataSchema),
- provider = providerName,
- options = options.filterKeys(_ != "path")).relation))
+ className = className,
+ options = options.filterKeys(_ != "path")).resolveRelation()))
}
new FileStreamSource(
- sqlContext, metadataPath, path, Some(dataSchema), providerName, dataFrameBuilder)
+ sqlContext, metadataPath, path, Some(dataSchema), className, dataFrameBuilder)
case _ =>
throw new UnsupportedOperationException(
- s"Data source $providerName does not support streamed reading")
+ s"Data source $className does not support streamed reading")
}
-
- provider
}
- def createSink(
- sqlContext: SQLContext,
- providerName: String,
- options: Map[String, String],
- partitionColumns: Seq[String]): Sink = {
- val provider = lookupDataSource(providerName).newInstance() match {
+ /** Returns a sink that can be used to continually write data. */
+ def createSink(): Sink = {
+ val datasourceClass = providingClass.newInstance() match {
case s: StreamSinkProvider => s
case _ =>
throw new UnsupportedOperationException(
- s"Data source $providerName does not support streamed writing")
+ s"Data source $className does not support streamed writing")
}
- provider.createSink(sqlContext, options, partitionColumns)
+ datasourceClass.createSink(sqlContext, options, partitionColumns)
}
- /** Create a [[ResolvedDataSource]] for reading data in. */
- def apply(
- sqlContext: SQLContext,
- paths: Seq[String] = Nil,
- userSpecifiedSchema: Option[StructType] = None,
- partitionColumns: Array[String] = Array.empty,
- bucketSpec: Option[BucketSpec] = None,
- provider: String,
- options: Map[String, String]): ResolvedDataSource = {
- val clazz: Class[_] = lookupDataSource(provider)
- def className: String = clazz.getCanonicalName
-
+ /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
+ def resolveRelation(): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val relation = (clazz.newInstance(), userSpecifiedSchema) match {
+ val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
@@ -238,43 +244,19 @@ object ResolvedDataSource extends Logging {
throw new AnalysisException(
s"$className is not a valid Spark SQL Data Source.")
}
- new ResolvedDataSource(clazz, relation)
- }
- def partitionColumnsSchema(
- schema: StructType,
- partitionColumns: Array[String],
- caseSensitive: Boolean): StructType = {
- val equality = columnNameEquality(caseSensitive)
- StructType(partitionColumns.map { col =>
- schema.find(f => equality(f.name, col)).getOrElse {
- throw new RuntimeException(s"Partition column $col not found in schema $schema")
- }
- }).asNullable
+ relation
}
- private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
- if (caseSensitive) {
- org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
- } else {
- org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
- }
- }
-
- /** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */
- def apply(
- sqlContext: SQLContext,
- provider: String,
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
+ /** Writes the give [[DataFrame]] out to this [[DataSource]]. */
+ def write(
mode: SaveMode,
- options: Map[String, String],
- data: DataFrame): ResolvedDataSource = {
+ data: DataFrame): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
- val clazz: Class[_] = lookupDataSource(provider)
- clazz.newInstance() match {
+
+ providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sqlContext, mode, options, data)
case format: FileFormat =>
@@ -295,7 +277,13 @@ object ResolvedDataSource extends Logging {
PartitioningUtils.validatePartitionColumnDataTypes(
data.schema, partitionColumns, caseSensitive)
- val equality = columnNameEquality(caseSensitive)
+ val equality =
+ if (sqlContext.conf.caseSensitiveAnalysis) {
+ org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+ } else {
+ org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+ }
+
val dataSchema = StructType(
data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
@@ -303,19 +291,14 @@ object ResolvedDataSource extends Logging {
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumnSet = try {
- val resolved = apply(
- sqlContext,
- userSpecifiedSchema = Some(data.schema.asNullable),
- provider = provider,
- options = options)
-
- Some(resolved.relation
- .asInstanceOf[HadoopFsRelation]
- .location
- .partitionSpec(None)
- .partitionColumns
- .fieldNames
- .toSet)
+ Some(
+ resolveRelation()
+ .asInstanceOf[HadoopFsRelation]
+ .location
+ .partitionSpec(None)
+ .partitionColumns
+ .fieldNames
+ .toSet)
} catch {
case e: Exception =>
None
@@ -346,15 +329,10 @@ object ResolvedDataSource extends Logging {
sqlContext.executePlan(plan).toRdd
case _ =>
- sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
+ sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
- apply(
- sqlContext,
- userSpecifiedSchema = Some(data.schema.asNullable),
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- provider = provider,
- options = options)
+ // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
+ copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index eda3c36674..c3f8d7f75a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -335,10 +335,10 @@ private[sql] object PartitioningUtils {
def validatePartitionColumnDataTypes(
schema: StructType,
- partitionColumns: Array[String],
+ partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {
- ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
+ partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
field => field.dataType match {
case _: AtomicType => // OK
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
@@ -346,6 +346,26 @@ private[sql] object PartitioningUtils {
}
}
+ def partitionColumnsSchema(
+ schema: StructType,
+ partitionColumns: Seq[String],
+ caseSensitive: Boolean): StructType = {
+ val equality = columnNameEquality(caseSensitive)
+ StructType(partitionColumns.map { col =>
+ schema.find(f => equality(f.name, col)).getOrElse {
+ throw new RuntimeException(s"Partition column $col not found in schema $schema")
+ }
+ }).asNullable
+ }
+
+ private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
+ if (caseSensitive) {
+ org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+ } else {
+ org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+ }
+ }
+
/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 3d7c6a6a5e..895794c4c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types._
/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ *
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
@@ -50,6 +51,7 @@ case class DescribeCommand(
/**
* Used to represent the operation of create table using a data source.
+ *
* @param allowExisting If it is true, we will do nothing when the table already exists.
* If it is false, an exception will be thrown
*/
@@ -91,14 +93,14 @@ case class CreateTempTableUsing(
options: Map[String, String]) extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
- provider = provider,
+ className = provider,
options = options)
sqlContext.catalog.registerTable(
tableIdent,
- DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
+ DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
Seq.empty[Row]
}
@@ -114,17 +116,16 @@ case class CreateTempTableUsingAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val df = DataFrame(sqlContext, query)
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- provider,
- partitionColumns,
+ className = provider,
+ partitionColumns = partitionColumns,
bucketSpec = None,
- mode,
- options,
- df)
+ options = options)
+ val result = dataSource.write(mode, df)
sqlContext.catalog.registerTable(
tableIdent,
- DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
+ DataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 0eae34614c..63f0e4f8c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -32,15 +32,11 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
try {
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- paths = Seq.empty,
- userSpecifiedSchema = None,
- partitionColumns = Array(),
- bucketSpec = None,
- provider = u.tableIdentifier.database.get,
- options = Map("path" -> u.tableIdentifier.table))
- val plan = LogicalRelation(resolved.relation)
+ paths = u.tableIdentifier.table :: Nil,
+ className = u.tableIdentifier.database.get)
+ val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
@@ -143,7 +139,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
PartitioningUtils.validatePartitionColumnDataTypes(
- r.schema, part.keySet.toArray, catalog.conf.caseSensitiveAnalysis)
+ r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis)
// Get all input data source relations of the query.
val srcRelations = query.collect {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 12512a8312..60b0c64c7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -422,7 +422,7 @@ case class HadoopFsRelation(
}
/**
- * Used to read a write data in files to [[InternalRow]] format.
+ * Used to read and write data stored in files to/from the [[InternalRow]] format.
*/
trait FileFormat {
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 2f17037a58..02b173d30a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -32,7 +32,7 @@ import org.scalactic.Tolerance._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -1178,21 +1178,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
sparkContext.parallelize(1 to 100)
.map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
- val d1 = ResolvedDataSource(
+ val d1 = DataSource(
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- provider = classOf[DefaultSource].getCanonicalName,
- options = Map("path" -> path))
+ className = classOf[DefaultSource].getCanonicalName,
+ options = Map("path" -> path)).resolveRelation()
- val d2 = ResolvedDataSource(
+ val d2 = DataSource(
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- provider = classOf[DefaultSource].getCanonicalName,
- options = Map("path" -> path))
+ className = classOf[DefaultSource].getCanonicalName,
+ options = Map("path" -> path)).resolveRelation()
assert(d1 === d2)
})
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index cb6e5179b3..94d032f4ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,59 +18,61 @@
package org.apache.spark.sql.sources
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.datasources.DataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
+ private def getProvidingClass(name: String): Class[_] =
+ DataSource(sqlContext = null, className = name).providingClass
test("jdbc") {
assert(
- ResolvedDataSource.lookupDataSource("jdbc") ===
+ getProvidingClass("jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.jdbc") ===
+ getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.jdbc") ===
+ getProvidingClass("org.apache.spark.sql.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
}
test("json") {
assert(
- ResolvedDataSource.lookupDataSource("json") ===
+ getProvidingClass("json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.json") ===
+ getProvidingClass("org.apache.spark.sql.execution.datasources.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.json") ===
+ getProvidingClass("org.apache.spark.sql.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
}
test("parquet") {
assert(
- ResolvedDataSource.lookupDataSource("parquet") ===
+ getProvidingClass("parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.parquet") ===
+ getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.parquet") ===
+ getProvidingClass("org.apache.spark.sql.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
}
test("error message for unknown data sources") {
val error1 = intercept[ClassNotFoundException] {
- ResolvedDataSource.lookupDataSource("avro")
+ getProvidingClass("avro")
}
assert(error1.getMessage.contains("spark-packages"))
val error2 = intercept[ClassNotFoundException] {
- ResolvedDataSource.lookupDataSource("com.databricks.spark.avro")
+ getProvidingClass("com.databricks.spark.avro")
}
assert(error2.getMessage.contains("spark-packages"))
val error3 = intercept[ClassNotFoundException] {
- ResolvedDataSource.lookupDataSource("asfdwefasdfasdf")
+ getProvidingClass("asfdwefasdfasdf")
}
assert(error3.getMessage.contains("spark-packages"))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 28874189de..8f6cd66f1f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -176,17 +176,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
val options = table.storage.serdeProperties
- val resolvedRelation =
- ResolvedDataSource(
+ val dataSource =
+ DataSource(
hive,
userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = partitionColumns.toArray,
+ partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = table.properties("spark.sql.sources.provider"),
+ className = table.properties("spark.sql.sources.provider"),
options = options)
LogicalRelation(
- resolvedRelation.relation,
+ dataSource.resolveRelation(),
metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
}
}
@@ -283,12 +283,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource =
- ResolvedDataSource(
+ DataSource(
hive,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = provider,
+ className = provider,
options = options)
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
@@ -334,7 +334,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
- val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
case _ if skipHiveMetadata =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
@@ -511,7 +511,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val parquetRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
- val fileCatalog = new HiveFileCatalog(hive, paths, partitionSpec)
+ val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val format = new DefaultSource()
val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())
@@ -541,12 +541,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(
- ResolvedDataSource(
+ DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
options = parquetOptions,
- provider = "parquet").relation)
+ className = "parquet").resolveRelation())
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -749,7 +749,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*/
-class HiveFileCatalog(
+class MetaStoreFileCatalog(
hive: HiveContext,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 37cec6d2ab..7e4fb8b0ac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -148,12 +148,12 @@ case class CreateMetastoreDataSource(
}
// Create the relation to validate the arguments before writing the metadata to the metastore.
- ResolvedDataSource(
+ DataSource(
sqlContext = sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
- provider = provider,
+ className = provider,
bucketSpec = None,
- options = optionsWithPath)
+ options = optionsWithPath).resolveRelation()
hiveContext.catalog.createDataSourceTable(
tableIdent,
@@ -220,15 +220,16 @@ case class CreateMetastoreDataSourceAsSelect(
return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext = sqlContext,
userSpecifiedSchema = Some(query.schema.asNullable),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = provider,
+ className = provider,
options = optionsWithPath)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
+
EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
@@ -248,19 +249,19 @@ case class CreateMetastoreDataSourceAsSelect(
val data = DataFrame(hiveContext, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
- case Some(schema) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, schema)
+ case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
case None => data
}
// Create the relation based on the data of df.
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- provider,
- partitionColumns,
- bucketSpec,
- mode,
- optionsWithPath,
- df)
+ className = provider,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ options = optionsWithPath)
+
+ val result = dataSource.write(mode, df)
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -268,7 +269,7 @@ case class CreateMetastoreDataSourceAsSelect(
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
tableIdent,
- Some(resolved.relation.schema),
+ Some(result.schema),
partitionColumns,
bucketSpec,
provider,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index ad832b5197..041e0fb477 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
-import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -39,7 +38,7 @@ import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -173,7 +172,7 @@ private[orc] class OrcOutputWriter(
}
override def write(row: Row): Unit =
- throw new UnsupportedOperationException("call writeInternal")
+ throw new UnsupportedOperationException("call writeInternal")
private def wrapOrcStruct(
struct: OrcStruct,