From 4494cd9716d64a6c7cfa548abadb5dd0c4c143a6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 17 Jan 2017 23:37:59 -0800 Subject: [SPARK-18243][SQL] Port Hive writing to use FileFormat interface ## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan Closes #16517 from cloud-fan/insert-hive. --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 4 ++-- .../apache/spark/sql/hive/execution/HiveComparisonTest.scala | 10 ++++------ .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 5 +++-- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 ++++---- .../spark/sql/hive/execution/ScriptTransformationSuite.scala | 10 +++++----- 5 files changed, 18 insertions(+), 19 deletions(-) (limited to 'sql/hive/src/test/scala/org') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 5cb8519d2a..28b5bfd581 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -565,8 +565,8 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet val filePaths = dir.map(_.getName).toList folders.flatMap(listFiles) ++: filePaths } - val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil - assert(listFiles(tmpDir).sorted == expectedFiles) + // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid` + assert(listFiles(tmpDir).length == 2) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 05a15166f8..4772a264d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.SQLBuilder import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ @@ -441,23 +442,20 @@ abstract class HiveComparisonTest val executions = queryList.map(new TestHiveQueryExecution(_)) executions.foreach(_.toRdd) val tablesGenerated = queryList.zip(executions).flatMap { - // We should take executedPlan instead of sparkPlan, because in following codes we - // will run the collected plans. As we will do extra processing for sparkPlan such - // as adding exchange, collapsing codegen stages, etc., collecting sparkPlan here - // will cause some errors when running these plans later. - case (q, e) => e.executedPlan.collect { + case (q, e) => e.analyzed.collect { case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => (q, e, i) } } tablesGenerated.map { case (hiveql, execution, insert) => + val rdd = Dataset.ofRows(TestHive.sparkSession, insert.query).queryExecution.toRdd s""" |=== Generated Table === |$hiveql |$execution |== Results == - |${insert.child.execute().collect().mkString("\n")} + |${rdd.collect().mkString("\n")} """.stripMargin }.mkString("\n") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ef62be39cd..882a184124 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} @@ -799,7 +800,7 @@ class HiveDDLSuite test("Create Cataloged Table As Select - Drop Table After Runtime Exception") { withTable("tab") { - intercept[RuntimeException] { + intercept[SparkException] { sql( """ |CREATE TABLE tab @@ -1273,7 +1274,7 @@ class HiveDDLSuite sql("INSERT INTO t SELECT 1") checkAnswer(spark.table("t"), Row(1)) // Check if this is compressed as ZLIB. - val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000")) + val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc")) assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString val expectedCompressionKind = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2ae66d1b2f..75ba92cada 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1043,8 +1043,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd assertResult(1, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { - case _: Project => () - }.size + case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size + }.sum } } @@ -1062,8 +1062,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd assertResult(2, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { - case _: Project => () - }.size + case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size + }.sum } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index d3475a79a7..5318b4650b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -55,7 +55,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -71,7 +71,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -88,7 +88,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -107,7 +107,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformation( + (child: SparkPlan) => new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -126,7 +126,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { val e = intercept[SparkException] { val plan = - new ScriptTransformation( + new ScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), -- cgit v1.2.3