aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala77
1 files changed, 46 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index c66921f485..10fde152ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -123,36 +123,58 @@ case class DataSource(
}
}
- /** Returns a source that can be used to continually read data. */
- def createSource(): Source = {
+ private def inferFileFormatSchema(format: FileFormat): StructType = {
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val allPaths = caseInsensitiveOptions.get("path")
+ val globbedPaths = allPaths.toSeq.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualified)
+ }.toArray
+
+ val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
+ userSpecifiedSchema.orElse {
+ format.inferSchema(
+ sqlContext,
+ caseInsensitiveOptions,
+ fileCatalog.allFiles())
+ }.getOrElse {
+ throw new AnalysisException("Unable to infer schema. It must be specified manually.")
+ }
+ }
+
+ /** Returns the name and schema of the source that can be used to continually read data. */
+ def sourceSchema(): (String, StructType) = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
- s.createSource(sqlContext, userSpecifiedSchema, className, options)
+ s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
- val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
+ (s"FileSource[$path]", inferFileFormatSchema(format))
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Data source $className does not support streamed reading")
+ }
+ }
- val allPaths = caseInsensitiveOptions.get("path")
- val globbedPaths = allPaths.toSeq.flatMap { path =>
- val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualified)
- }.toArray
+ /** Returns a source that can be used to continually read data. */
+ def createSource(metadataPath: String): Source = {
+ providingClass.newInstance() match {
+ case s: StreamSourceProvider =>
+ s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options)
- val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
- val dataSchema = userSpecifiedSchema.orElse {
- format.inferSchema(
- sqlContext,
- caseInsensitiveOptions,
- fileCatalog.allFiles())
- }.getOrElse {
- throw new AnalysisException("Unable to infer schema. It must be specified manually.")
- }
+ case format: FileFormat =>
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val path = caseInsensitiveOptions.getOrElse("path", {
+ throw new IllegalArgumentException("'path' is not specified")
+ })
+
+ val dataSchema = inferFileFormatSchema(format)
def dataFrameBuilder(files: Array[String]): DataFrame = {
Dataset.ofRows(
@@ -299,6 +321,9 @@ case class DataSource(
"It must be specified manually")
}
+ val enrichedOptions =
+ format.prepareRead(sqlContext, caseInsensitiveOptions, fileCatalog.allFiles())
+
HadoopFsRelation(
sqlContext,
fileCatalog,
@@ -306,7 +331,7 @@ case class DataSource(
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
- options)
+ enrichedOptions)
case _ =>
throw new AnalysisException(
@@ -345,16 +370,6 @@ case class DataSource(
PartitioningUtils.validatePartitionColumnDataTypes(
data.schema, partitionColumns, caseSensitive)
- val equality =
- if (sqlContext.conf.caseSensitiveAnalysis) {
- org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
- } else {
- org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
- }
-
- val dataSchema = StructType(
- data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
-
// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {