aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-02 13:32:13 -0700
committerYin Huai <yhuai@databricks.com>2015-06-02 13:32:13 -0700
commit686a45f0b9c50ede2a80854ed6a155ee8a9a4f5c (patch)
tree5bf5776140d9906a2cc4c677df3de9313b6effe2 /sql/core
parentad06727fe985ca243ebdaaba55cd7d35a4749d0a (diff)
downloadspark-686a45f0b9c50ede2a80854ed6a155ee8a9a4f5c.tar.gz
spark-686a45f0b9c50ede2a80854ed6a155ee8a9a4f5c.tar.bz2
spark-686a45f0b9c50ede2a80854ed6a155ee8a9a4f5c.zip
[SPARK-8014] [SQL] Avoid premature metadata discovery when writing a HadoopFsRelation with a save mode other than Append
The current code references the schema of the DataFrame to be written before checking save mode. This triggers expensive metadata discovery prematurely. For save mode other than `Append`, this metadata discovery is useless since we either ignore the result (for `Ignore` and `ErrorIfExists`) or delete existing files (for `Overwrite`) later. This PR fixes this issue by deferring metadata discovery after save mode checking. Author: Cheng Lian <lian@databricks.com> Closes #6583 from liancheng/spark-8014 and squashes the following commits: 1aafabd [Cheng Lian] Updates comments 088abaa [Cheng Lian] Avoids schema merging and partition discovery when data schema and partition schema are defined 8fbd93f [Cheng Lian] Fixes SPARK-8014
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala2
4 files changed, 22 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e439a18ac4..824ae36968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -190,7 +190,7 @@ private[sql] class ParquetRelation2(
}
}
- override def dataSchema: StructType = metadataCache.dataSchema
+ override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
override private[sql] def refresh(): Unit = {
super.refresh()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 3132067d56..71f016b1f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -30,9 +30,10 @@ import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
@@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
// We create a DataFrame by applying the schema of relation to the data to make sure.
// We are writing data based on the expected schema,
- val df = sqlContext.createDataFrame(
- DataFrame(sqlContext, query).queryExecution.toRdd,
- relation.schema,
- needsConversion = false)
+ val df = {
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). We
+ // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
+ // safely apply the schema of r.schema to the data.
+ val project = Project(
+ relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
+
+ sqlContext.createDataFrame(
+ DataFrame(sqlContext, project).queryExecution.toRdd,
+ relation.schema,
+ needsConversion = false)
+ }
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 22587f5a1c..20afd60cb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
@@ -322,19 +322,13 @@ private[sql] object ResolvedDataSource {
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
- // For partitioned relation r, r.schema's column ordering is different with the column
- // ordering of data.logicalPlan. We need a Project to adjust the ordering.
- // So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
- // the data.
- val project =
- Project(
- r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
- data.logicalPlan)
-
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
+ // will be adjusted within InsertIntoHadoopFsRelation.
sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
- project,
+ data.logicalPlan,
mode)).toRdd
r
case _ =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index c4ffa8de52..f5bd2d2941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -503,7 +503,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
*/
override lazy val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
- StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column =>
+ StructType(dataSchema ++ partitionColumns.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
})
}