diff options
author | Reynold Xin <rxin@databricks.com> | 2016-05-25 23:54:24 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-25 23:54:24 -0700 |
commit | 361ebc282b2d09dc6dcf21419a53c5c617b1b6bd (patch) | |
tree | 0ed7e06fed5e03fec1516386bb16005b3bbc677e /sql | |
parent | dfc9fc02ccbceb09213c394177d54b9ca56b6f24 (diff) | |
download | spark-361ebc282b2d09dc6dcf21419a53c5c617b1b6bd.tar.gz spark-361ebc282b2d09dc6dcf21419a53c5c617b1b6bd.tar.bz2 spark-361ebc282b2d09dc6dcf21419a53c5c617b1b6bd.zip |
[SPARK-15543][SQL] Rename DefaultSources to make them more self-describing
## What changes were proposed in this pull request?
This patch renames various DefaultSources to make their names more self-describing. The choice of "DefaultSource" was from the days when we did not have a good way to specify short names.
They are now named:
- LibSVMFileFormat
- CSVFileFormat
- JdbcRelationProvider
- JsonFileFormat
- ParquetFileFormat
- TextFileFormat
Backward compatibility is maintained through aliasing.
## How was this patch tested?
Updated relevant test cases too.
Author: Reynold Xin <rxin@databricks.com>
Closes #13311 from rxin/SPARK-15543.
Diffstat (limited to 'sql')
17 files changed, 89 insertions, 65 deletions
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index ef9255794b..9f8bb5d38f 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,6 +1,6 @@ -org.apache.spark.sql.execution.datasources.csv.DefaultSource -org.apache.spark.sql.execution.datasources.jdbc.DefaultSource -org.apache.spark.sql.execution.datasources.json.DefaultSource -org.apache.spark.sql.execution.datasources.parquet.DefaultSource -org.apache.spark.sql.execution.datasources.text.DefaultSource +org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider +org.apache.spark.sql.execution.datasources.json.JsonFileFormat +org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ec23a9c41a..412f5fa87e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d0853f67b9..dfe06478fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -30,6 +30,10 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -74,15 +78,34 @@ case class DataSource( lazy val sourceInfo = sourceSchema() /** A map to maintain backward compatibility in case we move data sources around. */ - private val backwardCompatibilityMap = Map( - "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName, - "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName, - "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName, - "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName, - "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName, - "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName, - "com.databricks.spark.csv" -> classOf[csv.DefaultSource].getCanonicalName - ) + private val backwardCompatibilityMap: Map[String, String] = { + val jdbc = classOf[JdbcRelationProvider].getCanonicalName + val json = classOf[JsonFileFormat].getCanonicalName + val parquet = classOf[ParquetFileFormat].getCanonicalName + val csv = classOf[CSVFileFormat].getCanonicalName + val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" + val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" + + Map( + "org.apache.spark.sql.jdbc" -> jdbc, + "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc, + "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc, + "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc, + "org.apache.spark.sql.json" -> json, + "org.apache.spark.sql.json.DefaultSource" -> json, + "org.apache.spark.sql.execution.datasources.json" -> json, + "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json, + "org.apache.spark.sql.parquet" -> parquet, + "org.apache.spark.sql.parquet.DefaultSource" -> parquet, + "org.apache.spark.sql.execution.datasources.parquet" -> parquet, + "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, + "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, + "org.apache.spark.sql.hive.orc" -> orc, + "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, + "org.apache.spark.ml.source.libsvm" -> libsvm, + "com.databricks.spark.csv" -> csv + ) + } /** * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0. @@ -188,7 +211,7 @@ case class DataSource( throw new IllegalArgumentException("'path' is not specified") }) val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) - val isTextSource = providingClass == classOf[text.DefaultSource] + val isTextSource = providingClass == classOf[text.TextFileFormat] // If the schema inference is disabled, only text sources require schema to be specified if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { throw new IllegalArgumentException( @@ -229,7 +252,7 @@ case class DataSource( providingClass.newInstance() match { case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns) - case parquet: parquet.DefaultSource => + case parquet: parquet.ParquetFileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 057bde1a75..4d36b76056 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -28,8 +28,6 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -38,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration /** * Provides access to CSV data from pure SQL statements. */ -class DefaultSource extends FileFormat with DataSourceRegister { +class CSVFileFormat extends FileFormat with DataSourceRegister { override def shortName(): String = "csv" @@ -46,7 +44,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { override def hashCode(): Int = getClass.hashCode() - override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] + override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] override def inferSchema( sparkSession: SparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 6609e5dee3..106ed1d440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -22,7 +22,7 @@ import java.util.Properties import org.apache.spark.sql.SQLContext import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} -class DefaultSource extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 4c97abed53..35f247692f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -class DefaultSource extends FileFormat with DataSourceRegister { +class JsonFileFormat extends FileFormat with DataSourceRegister { override def shortName(): String = "json" @@ -151,7 +151,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { override def hashCode(): Int = getClass.hashCode() - override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] + override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] } private[json] class JsonOutputWriter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index cf5c8e94f4..b47d41e166 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration -private[sql] class DefaultSource +private[sql] class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging @@ -62,7 +62,7 @@ private[sql] class DefaultSource override def hashCode(): Int = getClass.hashCode() - override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] + override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat] override def prepareWrite( sparkSession: SparkSession, @@ -141,7 +141,7 @@ private[sql] class DefaultSource // Should we merge schemas from all Parquet part-files? val shouldMergeSchemas = parameters - .get(ParquetRelation.MERGE_SCHEMA) + .get(ParquetFileFormat.MERGE_SCHEMA) .map(_.toBoolean) .getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) @@ -217,7 +217,7 @@ private[sql] class DefaultSource .orElse(filesByType.data.headOption) .toSeq } - ParquetRelation.mergeSchemasInParallel(filesToTouch, sparkSession) + ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) } case class FileTypes( @@ -543,7 +543,7 @@ private[sql] class ParquetOutputWriter( override def close(): Unit = recordWriter.close(context) } -private[sql] object ParquetRelation extends Logging { +private[sql] object ParquetFileFormat extends Logging { // Whether we should merge schemas collected from all Parquet part-files. private[sql] val MERGE_SCHEMA = "mergeSchema" @@ -822,9 +822,9 @@ private[sql] object ParquetRelation extends Logging { if (footers.isEmpty) { Iterator.empty } else { - var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter) + var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter) footers.tail.foreach { footer => - val schema = ParquetRelation.readSchemaFromFooter(footer, converter) + val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter) try { mergedSchema = mergedSchema.merge(schema) } catch { case cause: SparkException => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index f091615a9a..d9525efe6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration /** * A data source for reading text files. */ -class DefaultSource extends FileFormat with DataSourceRegister { +class TextFileFormat extends FileFormat with DataSourceRegister { override def shortName(): String = "text" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 46213a22ed..500d8ff55a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1320,7 +1320,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { userSpecifiedSchema = None, partitionColumns = Array.empty[String], bucketSpec = None, - className = classOf[DefaultSource].getCanonicalName, + className = classOf[JsonFileFormat].getCanonicalName, options = Map("path" -> path)).resolveRelation() val d2 = DataSource( @@ -1328,7 +1328,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { userSpecifiedSchema = None, partitionColumns = Array.empty[String], bucketSpec = None, - className = classOf[DefaultSource].getCanonicalName, + className = classOf[JsonFileFormat].getCanonicalName, options = Map("path" -> path)).resolveRelation() assert(d1 === d2) }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index c43b142de2..6db6492282 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -375,7 +375,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { StructField("lowerCase", StringType), StructField("UPPERCase", DoubleType, nullable = false)))) { - ParquetRelation.mergeMetastoreParquetSchema( + ParquetFileFormat.mergeMetastoreParquetSchema( StructType(Seq( StructField("lowercase", StringType), StructField("uppercase", DoubleType, nullable = false))), @@ -390,7 +390,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { StructType(Seq( StructField("UPPERCase", DoubleType, nullable = false)))) { - ParquetRelation.mergeMetastoreParquetSchema( + ParquetFileFormat.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false))), @@ -401,7 +401,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { // Metastore schema contains additional non-nullable fields. assert(intercept[Throwable] { - ParquetRelation.mergeMetastoreParquetSchema( + ParquetFileFormat.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false), StructField("lowerCase", BinaryType, nullable = false))), @@ -412,7 +412,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { // Conflicting non-nullable field names intercept[Throwable] { - ParquetRelation.mergeMetastoreParquetSchema( + ParquetFileFormat.mergeMetastoreParquetSchema( StructType(Seq(StructField("lower", StringType, nullable = false))), StructType(Seq(StructField("lowerCase", BinaryType)))) } @@ -426,7 +426,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { StructField("firstField", StringType, nullable = true), StructField("secondField", StringType, nullable = true), StructField("thirdfield", StringType, nullable = true)))) { - ParquetRelation.mergeMetastoreParquetSchema( + ParquetFileFormat.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), @@ -439,7 +439,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { // Merge should fail if the Metastore contains any additional fields that are not // nullable. assert(intercept[Throwable] { - ParquetRelation.mergeMetastoreParquetSchema( + ParquetFileFormat.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 4f6df54417..320aaea1e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -27,37 +27,37 @@ class ResolvedDataSourceSuite extends SparkFunSuite { test("jdbc") { assert( getProvidingClass("jdbc") === - classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider]) assert( getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") === - classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider]) assert( getProvidingClass("org.apache.spark.sql.jdbc") === - classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider]) } test("json") { assert( getProvidingClass("json") === - classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat]) assert( getProvidingClass("org.apache.spark.sql.execution.datasources.json") === - classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat]) assert( getProvidingClass("org.apache.spark.sql.json") === - classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat]) } test("parquet") { assert( getProvidingClass("parquet") === - classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat]) assert( getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") === - classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat]) assert( getProvidingClass("org.apache.spark.sql.parquet") === - classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource]) + classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat]) } test("error message for unknown data sources") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 6238b74ffa..f3262f772b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -41,7 +41,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { path.delete() val hadoopConf = spark.sparkContext.hadoopConfiguration - val fileFormat = new parquet.DefaultSource() + val fileFormat = new parquet.ParquetFileFormat() def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { val df = spark @@ -73,7 +73,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { path.delete() val hadoopConf = spark.sparkContext.hadoopConfiguration - val fileFormat = new parquet.DefaultSource() + val fileFormat = new parquet.ParquetFileFormat() def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { val df = spark diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index e6c0ce95e7..288f6dc597 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -101,7 +101,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { } class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { - import testImplicits._ private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 4a774fbf1f..32aa13ff25 100644 --- a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1 @@ -org.apache.spark.sql.hive.orc.DefaultSource +org.apache.spark.sql.hive.orc.OrcFileFormat diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 86ab152402..b377a20e39 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -32,8 +32,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} -import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._ @@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val inferredSchema = defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) inferredSchema.map { inferred => - ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) }.getOrElse(metastoreSchema) } else { defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get @@ -348,13 +348,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new ParquetDefaultSource() - val fileFormatClass = classOf[ParquetDefaultSource] + val defaultSource = new ParquetFileFormat() + val fileFormatClass = classOf[ParquetFileFormat] val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging val options = Map( - ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, - ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( + ParquetFileFormat.MERGE_SCHEMA -> mergeSchema.toString, + ParquetFileFormat.METASTORE_TABLE_NAME -> TableIdentifier( relation.tableName, Some(relation.databaseName) ).unquotedString @@ -400,8 +400,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new OrcDefaultSource() - val fileFormatClass = classOf[OrcDefaultSource] + val defaultSource = new OrcFileFormat() + val fileFormatClass = classOf[OrcFileFormat] val options = Map[String, String]() convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 38f50c112a..f1198179a0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -42,7 +42,11 @@ import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -private[sql] class DefaultSource +/** + * [[FileFormat]] for reading ORC files. If this is moved or renamed, please update + * [[DataSource]]'s backwardCompatibilityMap. + */ +private[sql] class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable { override def shortName(): String = "orc" @@ -262,7 +266,7 @@ private[orc] case class OrcTableScan( // Figure out the actual schema from the ORC source (without partition columns) so that we // can pick the correct ordinals. Note that this assumes that all files have the same schema. - val orcFormat = new DefaultSource + val orcFormat = new OrcFileFormat val dataSchema = orcFormat .inferSchema(sparkSession, Map.empty, inputPaths) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 0207b4e8c9..5dfa58f673 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { import testImplicits._ - override val dataSourceName: String = classOf[DefaultSource].getCanonicalName + override val dataSourceName: String = classOf[OrcFileFormat].getCanonicalName // ORC does not play well with NullType and UDT. override protected def supportsDataType(dataType: DataType): Boolean = dataType match { |