aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala)14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala)1
13 files changed, 72 insertions, 52 deletions
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index ef9255794b..9f8bb5d38f 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,6 +1,6 @@
-org.apache.spark.sql.execution.datasources.csv.DefaultSource
-org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
-org.apache.spark.sql.execution.datasources.json.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.DefaultSource
-org.apache.spark.sql.execution.datasources.text.DefaultSource
+org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
+org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ec23a9c41a..412f5fa87e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d0853f67b9..dfe06478fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -74,15 +78,34 @@ case class DataSource(
lazy val sourceInfo = sourceSchema()
/** A map to maintain backward compatibility in case we move data sources around. */
- private val backwardCompatibilityMap = Map(
- "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName,
- "com.databricks.spark.csv" -> classOf[csv.DefaultSource].getCanonicalName
- )
+ private val backwardCompatibilityMap: Map[String, String] = {
+ val jdbc = classOf[JdbcRelationProvider].getCanonicalName
+ val json = classOf[JsonFileFormat].getCanonicalName
+ val parquet = classOf[ParquetFileFormat].getCanonicalName
+ val csv = classOf[CSVFileFormat].getCanonicalName
+ val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
+ val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+ Map(
+ "org.apache.spark.sql.jdbc" -> jdbc,
+ "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
+ "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
+ "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
+ "org.apache.spark.sql.json" -> json,
+ "org.apache.spark.sql.json.DefaultSource" -> json,
+ "org.apache.spark.sql.execution.datasources.json" -> json,
+ "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
+ "org.apache.spark.sql.parquet" -> parquet,
+ "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
+ "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
+ "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
+ "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
+ "org.apache.spark.sql.hive.orc" -> orc,
+ "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
+ "org.apache.spark.ml.source.libsvm" -> libsvm,
+ "com.databricks.spark.csv" -> csv
+ )
+ }
/**
* Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
@@ -188,7 +211,7 @@ case class DataSource(
throw new IllegalArgumentException("'path' is not specified")
})
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
- val isTextSource = providingClass == classOf[text.DefaultSource]
+ val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
throw new IllegalArgumentException(
@@ -229,7 +252,7 @@ case class DataSource(
providingClass.newInstance() match {
case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns)
- case parquet: parquet.DefaultSource =>
+ case parquet: parquet.ParquetFileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 057bde1a75..4d36b76056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -28,8 +28,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -38,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration
/**
* Provides access to CSV data from pure SQL statements.
*/
-class DefaultSource extends FileFormat with DataSourceRegister {
+class CSVFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "csv"
@@ -46,7 +44,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()
- override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
+ override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
override def inferSchema(
sparkSession: SparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 6609e5dee3..106ed1d440 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
-class DefaultSource extends RelationProvider with DataSourceRegister {
+class JdbcRelationProvider extends RelationProvider with DataSourceRegister {
override def shortName(): String = "jdbc"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 4c97abed53..35f247692f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-class DefaultSource extends FileFormat with DataSourceRegister {
+class JsonFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "json"
@@ -151,7 +151,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()
- override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
+ override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
}
private[json] class JsonOutputWriter(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index cf5c8e94f4..b47d41e166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
-private[sql] class DefaultSource
+private[sql] class ParquetFileFormat
extends FileFormat
with DataSourceRegister
with Logging
@@ -62,7 +62,7 @@ private[sql] class DefaultSource
override def hashCode(): Int = getClass.hashCode()
- override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
+ override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]
override def prepareWrite(
sparkSession: SparkSession,
@@ -141,7 +141,7 @@ private[sql] class DefaultSource
// Should we merge schemas from all Parquet part-files?
val shouldMergeSchemas =
parameters
- .get(ParquetRelation.MERGE_SCHEMA)
+ .get(ParquetFileFormat.MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
@@ -217,7 +217,7 @@ private[sql] class DefaultSource
.orElse(filesByType.data.headOption)
.toSeq
}
- ParquetRelation.mergeSchemasInParallel(filesToTouch, sparkSession)
+ ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
}
case class FileTypes(
@@ -543,7 +543,7 @@ private[sql] class ParquetOutputWriter(
override def close(): Unit = recordWriter.close(context)
}
-private[sql] object ParquetRelation extends Logging {
+private[sql] object ParquetFileFormat extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"
@@ -822,9 +822,9 @@ private[sql] object ParquetRelation extends Logging {
if (footers.isEmpty) {
Iterator.empty
} else {
- var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
+ var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
footers.tail.foreach { footer =>
- val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
+ val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index f091615a9a..d9525efe6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration
/**
* A data source for reading text files.
*/
-class DefaultSource extends FileFormat with DataSourceRegister {
+class TextFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "text"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 46213a22ed..500d8ff55a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1320,7 +1320,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- className = classOf[DefaultSource].getCanonicalName,
+ className = classOf[JsonFileFormat].getCanonicalName,
options = Map("path" -> path)).resolveRelation()
val d2 = DataSource(
@@ -1328,7 +1328,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- className = classOf[DefaultSource].getCanonicalName,
+ className = classOf[JsonFileFormat].getCanonicalName,
options = Map("path" -> path)).resolveRelation()
assert(d1 === d2)
})
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index c43b142de2..6db6492282 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -375,7 +375,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@@ -390,7 +390,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@@ -401,7 +401,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@@ -412,7 +412,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Conflicting non-nullable field names
intercept[Throwable] {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@@ -426,7 +426,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@@ -439,7 +439,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
- ParquetRelation.mergeMetastoreParquetSchema(
+ ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 4f6df54417..320aaea1e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -27,37 +27,37 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
test("jdbc") {
assert(
getProvidingClass("jdbc") ===
- classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
- classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.jdbc") ===
- classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
}
test("json") {
assert(
getProvidingClass("json") ===
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.json") ===
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.json") ===
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
}
test("parquet") {
assert(
getProvidingClass("parquet") ===
- classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") ===
- classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.parquet") ===
- classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
}
test("error message for unknown data sources") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 6238b74ffa..f3262f772b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -41,7 +41,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
path.delete()
val hadoopConf = spark.sparkContext.hadoopConfiguration
- val fileFormat = new parquet.DefaultSource()
+ val fileFormat = new parquet.ParquetFileFormat()
def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = spark
@@ -73,7 +73,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
path.delete()
val hadoopConf = spark.sparkContext.hadoopConfiguration
- val fileFormat = new parquet.DefaultSource()
+ val fileFormat = new parquet.ParquetFileFormat()
def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = spark
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index e6c0ce95e7..288f6dc597 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -101,7 +101,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
- import testImplicits._
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath