aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-11-23 16:15:35 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-11-23 16:15:35 -0800
commit223fa218e1f637f0d62332785a3bee225b65b990 (patch)
treeb53734271a274c6deea5afc75c959a0a86ba2446
parent0d1bf2b6c8ac4d4141d7cef0552c22e586843c57 (diff)
downloadspark-223fa218e1f637f0d62332785a3bee225b65b990.tar.gz
spark-223fa218e1f637f0d62332785a3bee225b65b990.tar.bz2
spark-223fa218e1f637f0d62332785a3bee225b65b990.zip
[SPARK-18510][SQL] Follow up to address comments in #15951
## What changes were proposed in this pull request? This PR addressed the rest comments in #15951. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15997 from zsxwing/SPARK-18510-follow-up.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala35
1 files changed, 20 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index dbc3e71233..ccfc759c8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -118,8 +118,10 @@ case class DataSource(
private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
- // the operations below are expensive therefore try not to do them if we don't need to
- lazy val tempFileCatalog = {
+ // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
+ // in streaming mode, we have already inferred and registered partition columns, we will
+ // never have to materialize the lazy val below
+ lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -133,7 +135,7 @@ case class DataSource(
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
- val resolved = tempFileCatalog.partitionSchema.map { partitionField =>
+ val resolved = tempFileIndex.partitionSchema.map { partitionField =>
val equality = sparkSession.sessionState.conf.resolver
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
@@ -141,17 +143,17 @@ case class DataSource(
}
StructType(resolved)
} else {
- // in streaming mode, we have already inferred and registered partition columns, we will
- // never have to materialize the lazy val below
- lazy val inferredPartitions = tempFileCatalog.partitionSchema
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
if (userSpecifiedSchema.isEmpty) {
+ val inferredPartitions = tempFileIndex.partitionSchema
inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
- userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
- val inferredOpt = inferredPartitions.find(_.name == partitionColumn)
+ val equality = sparkSession.sessionState.conf.resolver
+ userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
+ val inferredPartitions = tempFileIndex.partitionSchema
+ val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
s"""Type of partition column: $partitionColumn not found in specified schema
@@ -163,7 +165,7 @@ case class DataSource(
|Falling back to inferred dataType if it exists.
""".stripMargin)
}
- inferredPartitions.find(_.name == partitionColumn)
+ inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
@@ -182,7 +184,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
- tempFileCatalog.allFiles())
+ tempFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
@@ -224,8 +226,11 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
- val (schema, partCols) = getOrInferFileFormatSchema(format)
- SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames)
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+ SourceInfo(
+ s"FileSource[$path]",
+ StructType(dataSchema ++ partitionSchema),
+ partitionSchema.fieldNames)
case _ =>
throw new UnsupportedOperationException(
@@ -379,7 +384,7 @@ case class DataSource(
globPath
}.toArray
- val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
@@ -388,12 +393,12 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
- new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema))
+ new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
}
HadoopFsRelation(
fileCatalog,
- partitionSchema = inferredPartitionSchema,
+ partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,