diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 77 |
1 files changed, 46 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c66921f485..10fde152ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -123,36 +123,58 @@ case class DataSource( } } - /** Returns a source that can be used to continually read data. */ - def createSource(): Source = { + private def inferFileFormatSchema(format: FileFormat): StructType = { + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val allPaths = caseInsensitiveOptions.get("path") + val globbedPaths = allPaths.toSeq.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + + val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None) + userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException("Unable to infer schema. It must be specified manually.") + } + } + + /** Returns the name and schema of the source that can be used to continually read data. */ + def sourceSchema(): (String, StructType) = { providingClass.newInstance() match { case s: StreamSourceProvider => - s.createSource(sqlContext, userSpecifiedSchema, className, options) + s.sourceSchema(sqlContext, userSpecifiedSchema, className, options) case format: FileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) - val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata") + (s"FileSource[$path]", inferFileFormatSchema(format)) + case _ => + throw new UnsupportedOperationException( + s"Data source $className does not support streamed reading") + } + } - val allPaths = caseInsensitiveOptions.get("path") - val globbedPaths = allPaths.toSeq.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray + /** Returns a source that can be used to continually read data. */ + def createSource(metadataPath: String): Source = { + providingClass.newInstance() match { + case s: StreamSourceProvider => + s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options) - val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None) - val dataSchema = userSpecifiedSchema.orElse { - format.inferSchema( - sqlContext, - caseInsensitiveOptions, - fileCatalog.allFiles()) - }.getOrElse { - throw new AnalysisException("Unable to infer schema. It must be specified manually.") - } + case format: FileFormat => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val path = caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + }) + + val dataSchema = inferFileFormatSchema(format) def dataFrameBuilder(files: Array[String]): DataFrame = { Dataset.ofRows( @@ -299,6 +321,9 @@ case class DataSource( "It must be specified manually") } + val enrichedOptions = + format.prepareRead(sqlContext, caseInsensitiveOptions, fileCatalog.allFiles()) + HadoopFsRelation( sqlContext, fileCatalog, @@ -306,7 +331,7 @@ case class DataSource( dataSchema = dataSchema.asNullable, bucketSpec = bucketSpec, format, - options) + enrichedOptions) case _ => throw new AnalysisException( @@ -345,16 +370,6 @@ case class DataSource( PartitioningUtils.validatePartitionColumnDataTypes( data.schema, partitionColumns, caseSensitive) - val equality = - if (sqlContext.conf.caseSensitiveAnalysis) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } - - val dataSchema = StructType( - data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) - // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. if (mode == SaveMode.Append) { |