aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala59
5 files changed, 67 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e439a18ac4..824ae36968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -190,7 +190,7 @@ private[sql] class ParquetRelation2(
}
}
- override def dataSchema: StructType = metadataCache.dataSchema
+ override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
override private[sql] def refresh(): Unit = {
super.refresh()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 3132067d56..71f016b1f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -30,9 +30,10 @@ import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
@@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
// We create a DataFrame by applying the schema of relation to the data to make sure.
// We are writing data based on the expected schema,
- val df = sqlContext.createDataFrame(
- DataFrame(sqlContext, query).queryExecution.toRdd,
- relation.schema,
- needsConversion = false)
+ val df = {
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). We
+ // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
+ // safely apply the schema of r.schema to the data.
+ val project = Project(
+ relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
+
+ sqlContext.createDataFrame(
+ DataFrame(sqlContext, project).queryExecution.toRdd,
+ relation.schema,
+ needsConversion = false)
+ }
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 22587f5a1c..20afd60cb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
@@ -322,19 +322,13 @@ private[sql] object ResolvedDataSource {
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
- // For partitioned relation r, r.schema's column ordering is different with the column
- // ordering of data.logicalPlan. We need a Project to adjust the ordering.
- // So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
- // the data.
- val project =
- Project(
- r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
- data.logicalPlan)
-
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
+ // will be adjusted within InsertIntoHadoopFsRelation.
sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
- project,
+ data.logicalPlan,
mode)).toRdd
r
case _ =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index c4ffa8de52..f5bd2d2941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -503,7 +503,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
*/
override lazy val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
- StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column =>
+ StructType(dataSchema ++ partitionColumns.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
})
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index af36fa6f1f..7409542674 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.sources
+import java.io.File
+
+import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, SparkFunSuite}
@@ -453,6 +456,20 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
+ val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
+
+ df.write
+ .format(dataSourceName)
+ .mode(SaveMode.Overwrite)
+ .partitionBy("c", "a")
+ .saveAsTable("t")
+
+ withTable("t") {
+ checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
+ }
+ }
}
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -534,20 +551,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
- test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
- val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
-
- df.write
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .partitionBy("c", "a")
- .saveAsTable("t")
-
- withTable("t") {
- checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
- }
- }
-
test("SPARK-7868: _temporary directories should be ignored") {
withTempPath { dir =>
val df = Seq("a", "b", "c").zipWithIndex.toDF()
@@ -563,4 +566,32 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
}
}
+
+ test("SPARK-8014: Avoid scanning output directory when SaveMode isn't SaveMode.Append") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val df = Seq(1 -> "a").toDF()
+
+ // Creates an arbitrary file. If this directory gets scanned, ParquetRelation2 will throw
+ // since it's not a valid Parquet file.
+ val emptyFile = new File(path, "empty")
+ Files.createParentDirs(emptyFile)
+ Files.touch(emptyFile)
+
+ // This shouldn't throw anything.
+ df.write.format("parquet").mode(SaveMode.Ignore).save(path)
+
+ // This should only complain that the destination directory already exists, rather than file
+ // "empty" is not a Parquet file.
+ assert {
+ intercept[RuntimeException] {
+ df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
+ }.getMessage.contains("already exists")
+ }
+
+ // This shouldn't throw anything.
+ df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
+ checkAnswer(read.format("parquet").load(path), df)
+ }
+ }
}