aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala2
7 files changed, 25 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index cce1489abd..424a962b5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
*/
trait RunnableCommand extends LogicalPlan with logical.Command {
override def output: Seq[Attribute] = Seq.empty
- override def children: Seq[LogicalPlan] = Seq.empty
+ final override def children: Seq[LogicalPlan] = Seq.empty
def run(sparkSession: SparkSession): Seq[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index da3f6c600a..c7e3279061 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -113,7 +113,7 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {
- override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+ override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 348530888d..5968db84cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -479,13 +480,23 @@ case class DataSource(
}
}
+ // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
+ // not need to have the query as child, to avoid to analyze an optimized query,
+ // because InsertIntoHadoopFsRelationCommand will be optimized first.
+ val columns = partitionColumns.map { name =>
+ val plan = data.logicalPlan
+ plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
+ }.asInstanceOf[Attribute]
+ }
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath,
- partitionColumns.map(UnresolvedAttribute.quoted),
+ columns,
bucketSpec,
format,
() => Unit, // No existing table needs to be refreshed.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a6621054fc..8286467e96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
InsertIntoHadoopFsRelationCommand(
outputPath,
- t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
+ query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
t.fileFormat,
() => t.refresh(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index de822180ab..02ce7fab64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode)
extends RunnableCommand {
- override def children: Seq[LogicalPlan] = query :: Nil
+ override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
override def run(sparkSession: SparkSession): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 05935cec4b..63b0e4588e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -449,6 +449,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}
+ test("SPARK-17230: write out results of decimal calculation") {
+ val df = spark.range(99, 101)
+ .selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")
+ df.write.mode(SaveMode.Overwrite).parquet(dir)
+ val df2 = spark.read.parquet(dir)
+ checkAnswer(df2, df)
+ }
+
private def testRead(
df: => DataFrame,
expectedResult: Seq[String],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 6e6b1c2a2b..ef5a5a001f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand(
private val tableIdentifier = tableDesc.identifier
- override def children: Seq[LogicalPlan] = Seq(query)
+ override def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
lazy val metastoreRelation: MetastoreRelation = {