aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-22 08:41:46 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-22 08:41:46 +0800
commit9281a3d504d526440c1d445075e38a6d9142ac93 (patch)
treeb39fe650c06e306a52bc48be4e4098833c5c6463 /sql/core/src/main/scala
parenta04dcde8cb191e591a5f5d7a67a5371e31e7343c (diff)
downloadspark-9281a3d504d526440c1d445075e38a6d9142ac93.tar.gz
spark-9281a3d504d526440c1d445075e38a6d9142ac93.tar.bz2
spark-9281a3d504d526440c1d445075e38a6d9142ac93.zip
[SPARK-19919][SQL] Defer throwing the exception for empty paths in CSV datasource into `DataSource`
## What changes were proposed in this pull request? This PR proposes to defer throwing the exception within `DataSource`. Currently, if other datasources fail to infer the schema, it returns `None` and then this is being validated in `DataSource` as below: ``` scala> spark.read.json("emptydir") org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.; ``` ``` scala> spark.read.orc("emptydir") org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.; ``` ``` scala> spark.read.parquet("emptydir") org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.; ``` However, CSV it checks it within the datasource implementation and throws another exception message as below: ``` scala> spark.read.csv("emptydir") java.lang.IllegalArgumentException: requirement failed: Cannot infer schema from an empty set of files ``` We could remove this duplicated check and validate this in one place in the same way with the same message. ## How was this patch tested? Unit test in `CSVSuite` and manual test. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17256 from HyukjinKwon/SPARK-19919.
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala4
2 files changed, 19 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 63af18ec5b..83bdf6fe22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -54,10 +54,21 @@ abstract class CSVDataSource extends Serializable {
/**
* Infers the schema from `inputPaths` files.
*/
- def infer(
+ final def inferSchema(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
- parsedOptions: CSVOptions): Option[StructType]
+ parsedOptions: CSVOptions): Option[StructType] = {
+ if (inputPaths.nonEmpty) {
+ Some(infer(sparkSession, inputPaths, parsedOptions))
+ } else {
+ None
+ }
+ }
+
+ protected def infer(
+ sparkSession: SparkSession,
+ inputPaths: Seq[FileStatus],
+ parsedOptions: CSVOptions): StructType
/**
* Generates a header from the given row which is null-safe and duplicate-safe.
@@ -131,10 +142,10 @@ object TextInputCSVDataSource extends CSVDataSource {
override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
- parsedOptions: CSVOptions): Option[StructType] = {
+ parsedOptions: CSVOptions): StructType = {
val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption
- Some(inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions))
+ inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions)
}
/**
@@ -203,7 +214,7 @@ object WholeFileCSVDataSource extends CSVDataSource {
override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
- parsedOptions: CSVOptions): Option[StructType] = {
+ parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
@@ -222,10 +233,10 @@ object WholeFileCSVDataSource extends CSVDataSource {
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
- Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
+ CSVInferSchema.infer(tokenRDD, header, parsedOptions)
case None =>
// If the first row could not be read, just return the empty schema.
- Some(StructType(Nil))
+ StructType(Nil)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index eef43c7629..a99bdfee5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -51,12 +51,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
- require(files.nonEmpty, "Cannot infer schema from an empty set of files")
-
val parsedOptions =
new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
- CSVDataSource(parsedOptions).infer(sparkSession, files, parsedOptions)
+ CSVDataSource(parsedOptions).inferSchema(sparkSession, files, parsedOptions)
}
override def prepareWrite(