aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-04-22 17:17:37 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-22 17:17:37 -0700
commitc431a76d0628985bb445189b9a2913dd41b86f7b (patch)
tree723bae4814ffc9e935ee761d7b186aa2b37bee9d /sql/core/src/main/scala/org/apache
parentc25b97fccee557c9247ad5bf006a83a55c5e0e32 (diff)
downloadspark-c431a76d0628985bb445189b9a2913dd41b86f7b.tar.gz
spark-c431a76d0628985bb445189b9a2913dd41b86f7b.tar.bz2
spark-c431a76d0628985bb445189b9a2913dd41b86f7b.zip
[SPARK-14832][SQL][STREAMING] Refactor DataSource to ensure schema is inferred only once when creating a file stream
## What changes were proposed in this pull request? When creating a file stream using sqlContext.write.stream(), existing files are scanned twice for finding the schema - Once, when creating a DataSource + StreamingRelation in the DataFrameReader.stream() - Again, when creating streaming Source from the DataSource, in DataSource.createSource() Instead, the schema should be generated only once, at the time of creating the dataframe, and when the streaming source is created, it should just reuse that schema The solution proposed in this PR is to add a lazy field in DataSource that caches the schema. Then streaming Source created by the DataSource can just reuse the schema. ## How was this patch tested? Refactored unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12591 from tdas/SPARK-14832.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala4
3 files changed, 21 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0dfe7dba1e..07bc8ae148 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -67,7 +67,10 @@ case class DataSource(
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty) extends Logging {
+ case class SourceInfo(name: String, schema: StructType)
+
lazy val providingClass: Class[_] = lookupDataSource(className)
+ lazy val sourceInfo = sourceSchema()
/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap = Map(
@@ -145,17 +148,19 @@ case class DataSource(
}
/** Returns the name and schema of the source that can be used to continually read data. */
- def sourceSchema(): (String, StructType) = {
+ private def sourceSchema(): SourceInfo = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
- s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
+ val (name, schema) = s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
+ SourceInfo(name, schema)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
- (s"FileSource[$path]", inferFileFormatSchema(format))
+ SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
+
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
@@ -174,24 +179,20 @@ case class DataSource(
throw new IllegalArgumentException("'path' is not specified")
})
- val dataSchema = inferFileFormatSchema(format)
-
def dataFrameBuilder(files: Array[String]): DataFrame = {
- Dataset.ofRows(
- sqlContext,
- LogicalRelation(
- DataSource(
- sqlContext,
- paths = files,
- userSpecifiedSchema = Some(dataSchema),
- className = className,
- options =
- new CaseInsensitiveMap(
- options.filterKeys(_ != "path") + ("basePath" -> path))).resolveRelation()))
+ val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
+ val newDataSource =
+ DataSource(
+ sqlContext,
+ paths = files,
+ userSpecifiedSchema = Some(sourceInfo.schema),
+ className = className,
+ options = new CaseInsensitiveMap(newOptions))
+ Dataset.ofRows(sqlContext, LogicalRelation(newDataSource.resolveRelation()))
}
new FileStreamSource(
- sqlContext, metadataPath, path, Some(dataSchema), className, dataFrameBuilder)
+ sqlContext, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 6448cb6e90..51c3aee835 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -35,8 +35,7 @@ class FileStreamSource(
sqlContext: SQLContext,
metadataPath: String,
path: String,
- dataSchema: Option[StructType],
- providerName: String,
+ override val schema: StructType,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -48,24 +47,6 @@ class FileStreamSource(
files.foreach(seenFiles.add)
}
- /** Returns the schema of the data from this source */
- override lazy val schema: StructType = {
- dataSchema.getOrElse {
- val filesPresent = fetchAllFiles()
- if (filesPresent.isEmpty) {
- if (providerName == "text") {
- // Add a default schema for "text"
- new StructType().add("value", StringType)
- } else {
- throw new IllegalArgumentException("No schema specified")
- }
- } else {
- // There are some existing files. Use them to infer the schema.
- dataFrameBuilder(filesPresent.toArray).schema
- }
- }
- }
-
/**
* Returns the maximum offset that can be retrieved from the source.
*
@@ -118,7 +99,6 @@ class FileStreamSource(
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logDebug(s"Streaming ${files.mkString(", ")}")
dataFrameBuilder(files)
-
}
private def fetchAllFiles(): Seq[String] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index c29291eb58..3341580fc4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.datasources.DataSource
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
- val (name, schema) = dataSource.sourceSchema()
- StreamingRelation(dataSource, name, schema.toAttributes)
+ StreamingRelation(
+ dataSource, dataSource.sourceInfo.name, dataSource.sourceInfo.schema.toAttributes)
}
}