aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-12-12 14:19:42 -0800
committergatorsmile <gatorsmile@gmail.com>2016-12-12 14:19:42 -0800
commit476b34c23a1ece1d52654482a393003756957ad2 (patch)
treefe86b2301f21d92ccb96c08c3182749d2d0ef3cb /sql/hive/src/test
parentbf42c2db57b9a2ca642ad3d499c30be8d9ff221a (diff)
downloadspark-476b34c23a1ece1d52654482a393003756957ad2.tar.gz
spark-476b34c23a1ece1d52654482a393003756957ad2.tar.bz2
spark-476b34c23a1ece1d52654482a393003756957ad2.zip
[SPARK-18752][HIVE] isSrcLocal" value should be set from user query.
The value of the "isSrcLocal" parameter passed to Hive's loadTable and loadPartition methods needs to be set according to the user query (e.g. "LOAD DATA LOCAL"), and not the current code that tries to guess what it should be. For existing versions of Hive the current behavior is probably ok, but some recent changes in the Hive code changed the semantics slightly, making code that sets "isSrcLocal" to "true" incorrectly to do the wrong thing. It would end up moving the parent directory of the files into the final location, instead of the file themselves, resulting in a table that cannot be read. I modified HiveCommandSuite so that existing "LOAD DATA" tests are run both in local and non-local mode, since the semantics are slightly different. The tests include a few new checks to make sure the semantics follow what Hive describes in its documentation. Tested with existing unit tests and also ran some Hive integration tests with a version of Hive containing the changes that surfaced the problem. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16179 from vanzin/SPARK-18752.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala172
2 files changed, 100 insertions, 78 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 79e76b3134..a001048a9e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
emptyDir,
tableName = "src",
replace = false,
- holdDDLTime = false)
+ holdDDLTime = false,
+ isSrcLocal = false)
}
test(s"$version: tableExists") {
@@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
partSpec,
replace = false,
holdDDLTime = false,
- inheritTableSpecs = false)
+ inheritTableSpecs = false,
+ isSrcLocal = false)
}
test(s"$version: loadDynamicPartitions") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 46ed18c70f..1680f6c40a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.hive.execution
+import java.io.File
+
+import com.google.common.io.Files
+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -154,7 +158,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
}
- test("LOAD DATA") {
+ Seq(true, false).foreach { local =>
+ val loadQuery = if (local) "LOAD DATA LOCAL" else "LOAD DATA"
+ test(loadQuery) {
+ testLoadData(loadQuery, local)
+ }
+ }
+
+ private def testLoadData(loadQuery: String, local: Boolean): Unit = {
+ // employee.dat has two columns separated by '|', the first is an int, the second is a string.
+ // Its content looks like:
+ // 16|john
+ // 17|robert
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalFile()
+
+ /**
+ * Run a function with a copy of the input data file when running with non-local input. The
+ * semantics in this mode are that the input file is moved to the destination, so we have
+ * to make a copy so that subsequent tests have access to the original file.
+ */
+ def withInputFile(fn: File => Unit): Unit = {
+ if (local) {
+ fn(testData)
+ } else {
+ val tmp = File.createTempFile(testData.getName(), ".tmp")
+ Files.copy(testData, tmp)
+ try {
+ fn(tmp)
+ } finally {
+ tmp.delete()
+ }
+ }
+ }
+
withTable("non_part_table", "part_table") {
sql(
"""
@@ -164,18 +200,49 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
|LINES TERMINATED BY '\n'
""".stripMargin)
- // employee.dat has two columns separated by '|', the first is an int, the second is a string.
- // Its content looks like:
- // 16|john
- // 17|robert
- val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
// LOAD DATA INTO non-partitioned table can't specify partition
intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+ sql(s"""$loadQuery INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+ }
+
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE non_part_table""")
+
+ // Non-local mode is expected to move the file, while local mode is expected to copy it.
+ // Check once here that the behavior is the expected.
+ assert(local === path.exists())
+ }
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ // Incorrect URI.
+ // file://path/to/data/files/employee.dat
+ //
+ // TODO: need a similar test for non-local mode.
+ if (local) {
+ val incorrectUri = "file:/" + testData.getAbsolutePath()
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
+ }
+ }
+
+ // Use URI as inpath:
+ // file:/path/to/data/files/employee.dat
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "${path.toURI()}" INTO TABLE non_part_table""")
+ }
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Row(16, "john") :: Nil)
+
+ // Overwrite existing data.
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "${path.toURI()}" OVERWRITE INTO TABLE non_part_table""")
}
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
checkAnswer(
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
Row(16, "john") :: Nil)
@@ -190,87 +257,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
""".stripMargin)
// LOAD DATA INTO partitioned table must specify partition
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""")
+ withInputFile { path =>
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table""")
+ }
+
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", k="2")""")
+ }
}
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""")
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", d="2")""")
}
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""")
- }
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""")
- }
-
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""")
checkAnswer(
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
sql("SELECT * FROM non_part_table").collect())
// Different order of partition columns.
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""")
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1", c="2")""")
+ }
checkAnswer(
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"),
sql("SELECT * FROM non_part_table").collect())
}
}
- test("LOAD DATA: input path") {
- withTable("non_part_table") {
- sql(
- """
- |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
- |ROW FORMAT DELIMITED
- |FIELDS TERMINATED BY '|'
- |LINES TERMINATED BY '\n'
- """.stripMargin)
-
- // Non-existing inpath
- intercept[AnalysisException] {
- sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""")
- }
-
- val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
- // Non-local inpath: without URI Scheme and Authority
- sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Nil)
-
- // Use URI as LOCAL inpath:
- // file:/path/to/data/files/employee.dat
- val uri = "file:" + testData
- sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")
-
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Row(16, "john") :: Nil)
-
- // Use URI as non-LOCAL inpath
- sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
-
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)
-
- sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
-
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Nil)
-
- // Incorrect URI:
- // file://path/to/data/files/employee.dat
- val incorrectUri = "file:/" + testData
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
- }
- }
- }
-
test("Truncate Table") {
withTable("non_part_table", "part_table") {
sql(
@@ -418,4 +437,5 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
}
}
+
}