aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala8
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala)14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala)1
-rw-r--r--sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala)8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala2
20 files changed, 99 insertions, 69 deletions
diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index f632dd603c..a865cbe19b 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1 @@
-org.apache.spark.ml.source.libsvm.DefaultSource
+org.apache.spark.ml.source.libsvm.LibSVMFileFormat
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 5ba768d551..64ebf0c982 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -90,7 +90,7 @@ private[libsvm] class LibSVMOutputWriter(
* .load("data/mllib/sample_libsvm_data.txt")
*
* // Java
- * DataFrame df = spark.read().format("libsvm")
+ * Dataset<Row> df = spark.read().format("libsvm")
* .option("numFeatures, "780")
* .load("data/mllib/sample_libsvm_data.txt");
* }}}
@@ -105,9 +105,13 @@ private[libsvm] class LibSVMOutputWriter(
* - "vectorType": feature vector type, "sparse" (default) or "dense".
*
* @see [[https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/ LIBSVM datasets]]
+ *
+ * Note that this class is public for documentation purpose. Please don't use this class directly.
+ * Rather, use the data source API as illustrated above.
*/
+// If this is moved or renamed, please update DataSource's backwardCompatibilityMap.
@Since("1.6.0")
-class DefaultSource extends FileFormat with DataSourceRegister {
+class LibSVMFileFormat extends FileFormat with DataSourceRegister {
@Since("1.6.0")
override def shortName(): String = "libsvm"
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4e99a09657..08c575aaee 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -68,7 +68,9 @@ object MimaExcludes {
// SPARK-13664 Replace HadoopFsRelation with FileFormat
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.LibSVMRelation"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelationProvider"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache")
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache"),
+ // SPARK-15543 Rename DefaultSources to make them more self-describing
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.DefaultSource")
) ++ Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index ef9255794b..9f8bb5d38f 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,6 +1,6 @@
-org.apache.spark.sql.execution.datasources.csv.DefaultSource
-org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
-org.apache.spark.sql.execution.datasources.json.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.DefaultSource
-org.apache.spark.sql.execution.datasources.text.DefaultSource
+org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
+org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ec23a9c41a..412f5fa87e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d0853f67b9..dfe06478fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -74,15 +78,34 @@ case class DataSource(
lazy val sourceInfo = sourceSchema()
/** A map to maintain backward compatibility in case we move data sources around. */
- private val backwardCompatibilityMap = Map(
- "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName,
- "com.databricks.spark.csv" -> classOf[csv.DefaultSource].getCanonicalName
- )
+ private val backwardCompatibilityMap: Map[String, String] = {
+ val jdbc = classOf[JdbcRelationProvider].getCanonicalName
+ val json = classOf[JsonFileFormat].getCanonicalName
+ val parquet = classOf[ParquetFileFormat].getCanonicalName
+ val csv = classOf[CSVFileFormat].getCanonicalName
+ val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
+ val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+ Map(
+ "org.apache.spark.sql.jdbc" -> jdbc,
+ "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
+ "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
+ "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
+ "org.apache.spark.sql.json" -> json,
+ "org.apache.spark.sql.json.DefaultSource" -> json,
+ "org.apache.spark.sql.execution.datasources.json" -> json,
+ "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
+ "org.apache.spark.sql.parquet" -> parquet,
+ "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
+ "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
+ "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
+ "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
+ "org.apache.spark.sql.hive.orc" -> orc,
+ "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
+ "org.apache.spark.ml.source.libsvm" -> libsvm,
+ "com.databricks.spark.csv" -> csv
+ )
+ }
/**
* Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
@@ -188,7 +211,7 @@ case class DataSource(
throw new IllegalArgumentException("'path' is not specified")
})
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
- val isTextSource = providingClass == classOf[text.DefaultSource]
+ val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
throw new IllegalArgumentException(
@@ -229,7 +252,7 @@ case class DataSource(
providingClass.newInstance() match {
case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns)
- case parquet: parquet.DefaultSource =>
+ case parquet: parquet.ParquetFileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 057bde1a75..4d36b76056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -28,8 +28,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -38,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration
/**
* Provides access to CSV data from pure SQL statements.
*/
-class DefaultSource extends FileFormat with DataSourceRegister {
+class CSVFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "csv"
@@ -46,7 +44,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()
- override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
+ override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
override def inferSchema(
sparkSession: SparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 6609e5dee3..106ed1d440 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
-class DefaultSource extends RelationProvider with DataSourceRegister {
+class JdbcRelationProvider extends RelationProvider with DataSourceRegister {
override def shortName(): String = "jdbc"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 4c97abed53..35f247692f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-class DefaultSource extends FileFormat with DataSourceRegister {
+class JsonFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "json"
@@ -151,7 +151,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()
- override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
+ override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
}
private[json] class JsonOutputWriter(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index cf5c8e94f4..b47d41e166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
-private[sql] class DefaultSource
+private[sql] class ParquetFileFormat
extends FileFormat
with DataSourceRegister
with Logging
@@ -62,7 +62,7 @@ private[sql] class DefaultSource
override def hashCode(): Int = getClass.hashCode()
- override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
+ override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]
override def prepareWrite(
sparkSession: SparkSession,
@@ -141,7 +141,7 @@ private[sql] class DefaultSource
// Should we merge schemas from all Parquet part-files?
val shouldMergeSchemas =
parameters
- .get(ParquetRelation.MERGE_SCHEMA)
+ .get(ParquetFileFormat.MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
@@ -217,7 +217,7 @@ private[sql] class DefaultSource
.orElse(filesByType.data.headOption)
.toSeq
}
- ParquetRelation.mergeSchemasInParallel(filesToTouch, sparkSession)
+ ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
}
case class FileTypes(
@@ -543,7 +543,7 @@ private[sql] class ParquetOutputWriter(
override def close(): Unit = recordWriter.close(context)
}
-private[sql] object ParquetRelation extends Logging {
+private[sql] object ParquetFileFormat extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"
@@ -822,9 +822,9 @@ private[sql] object ParquetRelation extends Logging {
if (footers.isEmpty) {
Iterator.empty
} else {
- var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
+ var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
footers.tail.foreach { footer =>
- val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
+ val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index f091615a9a..d9525efe6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration
/**
* A data source for reading text files.
*/
-class DefaultSource extends FileFormat with DataSourceRegister {
+class TextFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "text"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 46213a22ed..500d8ff55a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1320,7 +1320,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- className = classOf[DefaultSource].getCanonicalName,
+ className = classOf[JsonFileFormat].getCanonicalName,
options = Map("path" -> path)).resolveRelation()
val d2 = DataSource(
@@ -1328,7 +1328,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- className = classOf[DefaultSource].getCanonicalName,
+ className = classOf[JsonFileFormat].getCanonicalName,
options = Map("path" -> path)).resolveRelation()
assert(d1 === d2)
})
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index c43b142de2..6db6492282 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -375,7 +375,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@@ -390,7 +390,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@@ -401,7 +401,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@@ -412,7 +412,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Conflicting non-nullable field names
intercept[Throwable] {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@@ -426,7 +426,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@@ -439,7 +439,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 4f6df54417..320aaea1e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -27,37 +27,37 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
test("jdbc") {
assert(
getProvidingClass("jdbc") ===
- classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
- classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.jdbc") ===
- classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
}
test("json") {
assert(
getProvidingClass("json") ===
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.json") ===
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.json") ===
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
}
test("parquet") {
assert(
getProvidingClass("parquet") ===
- classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") ===
- classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.parquet") ===
- classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
}
test("error message for unknown data sources") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 6238b74ffa..f3262f772b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -41,7 +41,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
path.delete()
val hadoopConf = spark.sparkContext.hadoopConfiguration
- val fileFormat = new parquet.DefaultSource()
+ val fileFormat = new parquet.ParquetFileFormat()
def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = spark
@@ -73,7 +73,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
path.delete()
val hadoopConf = spark.sparkContext.hadoopConfiguration
- val fileFormat = new parquet.DefaultSource()
+ val fileFormat = new parquet.ParquetFileFormat()
def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = spark
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index e6c0ce95e7..288f6dc597 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -101,7 +101,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
- import testImplicits._
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 4a774fbf1f..32aa13ff25 100644
--- a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1 @@
-org.apache.spark.sql.hive.orc.DefaultSource
+org.apache.spark.sql.hive.orc.OrcFileFormat
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 86ab152402..b377a20e39 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,8 +32,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.types._
@@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
- ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
+ ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
@@ -348,13 +348,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
- val defaultSource = new ParquetDefaultSource()
- val fileFormatClass = classOf[ParquetDefaultSource]
+ val defaultSource = new ParquetFileFormat()
+ val fileFormatClass = classOf[ParquetFileFormat]
val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
val options = Map(
- ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
- ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
+ ParquetFileFormat.MERGE_SCHEMA -> mergeSchema.toString,
+ ParquetFileFormat.METASTORE_TABLE_NAME -> TableIdentifier(
relation.tableName,
Some(relation.databaseName)
).unquotedString
@@ -400,8 +400,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
- val defaultSource = new OrcDefaultSource()
- val fileFormatClass = classOf[OrcDefaultSource]
+ val defaultSource = new OrcFileFormat()
+ val fileFormatClass = classOf[OrcFileFormat]
val options = Map[String, String]()
convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 38f50c112a..f1198179a0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -42,7 +42,11 @@ import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-private[sql] class DefaultSource
+/**
+ * [[FileFormat]] for reading ORC files. If this is moved or renamed, please update
+ * [[DataSource]]'s backwardCompatibilityMap.
+ */
+private[sql] class OrcFileFormat
extends FileFormat with DataSourceRegister with Serializable {
override def shortName(): String = "orc"
@@ -262,7 +266,7 @@ private[orc] case class OrcTableScan(
// Figure out the actual schema from the ORC source (without partition columns) so that we
// can pick the correct ordinals. Note that this assumes that all files have the same schema.
- val orcFormat = new DefaultSource
+ val orcFormat = new OrcFileFormat
val dataSchema =
orcFormat
.inferSchema(sparkSession, Map.empty, inputPaths)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 0207b4e8c9..5dfa58f673 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
import testImplicits._
- override val dataSourceName: String = classOf[DefaultSource].getCanonicalName
+ override val dataSourceName: String = classOf[OrcFileFormat].getCanonicalName
// ORC does not play well with NullType and UDT.
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {