aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-17 23:37:59 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-17 23:37:59 -0800
commit4494cd9716d64a6c7cfa548abadb5dd0c4c143a6 (patch)
tree6c06c19fb977106fc819a883889e9aa2ffefdcb9 /sql/hive/src/test/scala/org
parente7f982b20d8a1c0db711e0dcfe26b2f39f98dd64 (diff)
downloadspark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.tar.gz
spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.tar.bz2
spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.zip
[SPARK-18243][SQL] Port Hive writing to use FileFormat interface
## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16517 from cloud-fan/insert-hive.
Diffstat (limited to 'sql/hive/src/test/scala/org')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala10
5 files changed, 18 insertions, 19 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 5cb8519d2a..28b5bfd581 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -565,8 +565,8 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
- val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
- assert(listFiles(tmpDir).sorted == expectedFiles)
+ // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid`
+ assert(listFiles(tmpDir).length == 2)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 05a15166f8..4772a264d6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.SQLBuilder
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
@@ -441,23 +442,20 @@ abstract class HiveComparisonTest
val executions = queryList.map(new TestHiveQueryExecution(_))
executions.foreach(_.toRdd)
val tablesGenerated = queryList.zip(executions).flatMap {
- // We should take executedPlan instead of sparkPlan, because in following codes we
- // will run the collected plans. As we will do extra processing for sparkPlan such
- // as adding exchange, collapsing codegen stages, etc., collecting sparkPlan here
- // will cause some errors when running these plans later.
- case (q, e) => e.executedPlan.collect {
+ case (q, e) => e.analyzed.collect {
case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
(q, e, i)
}
}
tablesGenerated.map { case (hiveql, execution, insert) =>
+ val rdd = Dataset.ofRows(TestHive.sparkSession, insert.query).queryExecution.toRdd
s"""
|=== Generated Table ===
|$hiveql
|$execution
|== Results ==
- |${insert.child.execute().collect().mkString("\n")}
+ |${rdd.collect().mkString("\n")}
""".stripMargin
}.mkString("\n")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ef62be39cd..882a184124 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
+import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
@@ -799,7 +800,7 @@ class HiveDDLSuite
test("Create Cataloged Table As Select - Drop Table After Runtime Exception") {
withTable("tab") {
- intercept[RuntimeException] {
+ intercept[SparkException] {
sql(
"""
|CREATE TABLE tab
@@ -1273,7 +1274,7 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
- val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000"))
+ val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2ae66d1b2f..75ba92cada 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1043,8 +1043,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
- case _: Project => ()
- }.size
+ case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size
+ }.sum
}
}
@@ -1062,8 +1062,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
- case _: Project => ()
- }.size
+ case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size
+ }.sum
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index d3475a79a7..5318b4650b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -55,7 +55,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -71,7 +71,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -88,7 +88,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val e = intercept[TestFailedException] {
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -107,7 +107,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val e = intercept[TestFailedException] {
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -126,7 +126,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val e = intercept[SparkException] {
val plan =
- new ScriptTransformation(
+ new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "some_non_existent_command",
output = Seq(AttributeReference("a", StringType)()),