aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala28
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala172
11 files changed, 157 insertions, 115 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 78897daec8..0c729648ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -119,7 +119,8 @@ abstract class ExternalCatalog {
table: String,
loadPath: String,
isOverwrite: Boolean,
- holdDDLTime: Boolean): Unit
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit
def loadPartition(
db: String,
@@ -128,7 +129,8 @@ abstract class ExternalCatalog {
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean): Unit
+ inheritTableSpecs: Boolean,
+ isSrcLocal: Boolean): Unit
def loadDynamicPartitions(
db: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a6bebe1a39..816e4af2df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -312,7 +312,8 @@ class InMemoryCatalog(
table: String,
loadPath: String,
isOverwrite: Boolean,
- holdDDLTime: Boolean): Unit = {
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadTable is not implemented")
}
@@ -323,7 +324,8 @@ class InMemoryCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean): Unit = {
+ inheritTableSpecs: Boolean,
+ isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadPartition is not implemented.")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 7a3d2097a8..e996a836fe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -311,12 +311,13 @@ class SessionCatalog(
name: TableIdentifier,
loadPath: String,
isOverwrite: Boolean,
- holdDDLTime: Boolean): Unit = {
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
- externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
+ externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal)
}
/**
@@ -330,13 +331,14 @@ class SessionCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean): Unit = {
+ inheritTableSpecs: Boolean,
+ isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadPartition(
- db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
+ db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}
def defaultTablePath(tableIdent: TableIdentifier): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 32e2f75737..d2a7556476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -203,7 +203,7 @@ case class LoadDataCommand(
throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but number of columns in provided partition spec (${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
- s"(s${targetTable.partitionColumnNames.size})")
+ s"(${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
@@ -297,13 +297,15 @@ case class LoadDataCommand(
partition.get,
isOverwrite,
holdDDLTime = false,
- inheritTableSpecs = true)
+ inheritTableSpecs = true,
+ isSrcLocal = isLocal)
} else {
catalog.loadTable(
targetTable.identifier,
loadPath.toString,
isOverwrite,
- holdDDLTime = false)
+ holdDDLTime = false,
+ isSrcLocal = isLocal)
}
Seq.empty[Row]
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f67ddc9be1..544f277cdf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
loadPath: String,
isOverwrite: Boolean,
- holdDDLTime: Boolean): Unit = withClient {
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)
client.loadTable(
loadPath,
s"$db.$table",
isOverwrite,
- holdDDLTime)
+ holdDDLTime,
+ isSrcLocal)
}
override def loadPartition(
@@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean): Unit = withClient {
+ inheritTableSpecs: Boolean,
+ isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
@@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
orderedPartitionSpec,
isOverwrite,
holdDDLTime,
- inheritTableSpecs)
+ inheritTableSpecs,
+ isSrcLocal)
}
override def loadDynamicPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8e7c871183..837b6c57fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -211,14 +211,16 @@ private[hive] trait HiveClient {
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean): Unit
+ inheritTableSpecs: Boolean,
+ isSrcLocal: Boolean): Unit
/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
- holdDDLTime: Boolean): Unit
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit
/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index db73596e5f..b75f6e98d5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -651,7 +651,8 @@ private[hive] class HiveClientImpl(
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean): Unit = withHiveState {
+ inheritTableSpecs: Boolean,
+ isSrcLocal: Boolean): Unit = withHiveState {
val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
shim.loadPartition(
client,
@@ -661,20 +662,23 @@ private[hive] class HiveClientImpl(
replace,
holdDDLTime,
inheritTableSpecs,
- isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
+ isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
+ isSrcLocal = isSrcLocal)
}
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
- holdDDLTime: Boolean): Unit = withHiveState {
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit = withHiveState {
shim.loadTable(
client,
new Path(loadPath),
tableName,
replace,
- holdDDLTime)
+ holdDDLTime,
+ isSrcLocal)
}
def loadDynamicPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index e561706fac..137ec26760 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -98,14 +98,16 @@ private[client] sealed abstract class Shim {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit
def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
- holdDDLTime: Boolean): Unit
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit
def loadDynamicPartitions(
hive: Hive,
@@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = {
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
}
@@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
loadPath: Path,
tableName: String,
replace: Boolean,
- holdDDLTime: Boolean): Unit = {
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
}
@@ -698,10 +702,11 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = {
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
+ isSrcLocal: JBoolean, JBoolean.FALSE)
}
override def loadTable(
@@ -709,9 +714,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
loadPath: Path,
tableName: String,
replace: Boolean,
- holdDDLTime: Boolean): Unit = {
+ holdDDLTime: Boolean,
+ isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
- isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
+ isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
}
override def loadDynamicPartitions(
@@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
- protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
- val localFs = FileSystem.getLocal(conf)
- val pathFs = FileSystem.get(path.toUri(), conf)
- localFs.getUri() == pathFs.getUri()
- }
-
}
private[client] class Shim_v1_0 extends Shim_v0_14 {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 5f5c8e2432..db2239d26a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -316,7 +316,8 @@ case class InsertIntoHiveTable(
partitionSpec,
isOverwrite = doHiveOverwrite,
holdDDLTime = holdDDLTime,
- inheritTableSpecs = inheritTableSpecs)
+ inheritTableSpecs = inheritTableSpecs,
+ isSrcLocal = false)
}
}
} else {
@@ -325,7 +326,8 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
outputPath.toString, // TODO: URI
overwrite,
- holdDDLTime)
+ holdDDLTime,
+ isSrcLocal = false)
}
// Invalidate the cache.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 79e76b3134..a001048a9e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
emptyDir,
tableName = "src",
replace = false,
- holdDDLTime = false)
+ holdDDLTime = false,
+ isSrcLocal = false)
}
test(s"$version: tableExists") {
@@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
partSpec,
replace = false,
holdDDLTime = false,
- inheritTableSpecs = false)
+ inheritTableSpecs = false,
+ isSrcLocal = false)
}
test(s"$version: loadDynamicPartitions") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 46ed18c70f..1680f6c40a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.hive.execution
+import java.io.File
+
+import com.google.common.io.Files
+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -154,7 +158,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
}
- test("LOAD DATA") {
+ Seq(true, false).foreach { local =>
+ val loadQuery = if (local) "LOAD DATA LOCAL" else "LOAD DATA"
+ test(loadQuery) {
+ testLoadData(loadQuery, local)
+ }
+ }
+
+ private def testLoadData(loadQuery: String, local: Boolean): Unit = {
+ // employee.dat has two columns separated by '|', the first is an int, the second is a string.
+ // Its content looks like:
+ // 16|john
+ // 17|robert
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalFile()
+
+ /**
+ * Run a function with a copy of the input data file when running with non-local input. The
+ * semantics in this mode are that the input file is moved to the destination, so we have
+ * to make a copy so that subsequent tests have access to the original file.
+ */
+ def withInputFile(fn: File => Unit): Unit = {
+ if (local) {
+ fn(testData)
+ } else {
+ val tmp = File.createTempFile(testData.getName(), ".tmp")
+ Files.copy(testData, tmp)
+ try {
+ fn(tmp)
+ } finally {
+ tmp.delete()
+ }
+ }
+ }
+
withTable("non_part_table", "part_table") {
sql(
"""
@@ -164,18 +200,49 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
|LINES TERMINATED BY '\n'
""".stripMargin)
- // employee.dat has two columns separated by '|', the first is an int, the second is a string.
- // Its content looks like:
- // 16|john
- // 17|robert
- val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
// LOAD DATA INTO non-partitioned table can't specify partition
intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+ sql(s"""$loadQuery INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+ }
+
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE non_part_table""")
+
+ // Non-local mode is expected to move the file, while local mode is expected to copy it.
+ // Check once here that the behavior is the expected.
+ assert(local === path.exists())
+ }
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ // Incorrect URI.
+ // file://path/to/data/files/employee.dat
+ //
+ // TODO: need a similar test for non-local mode.
+ if (local) {
+ val incorrectUri = "file:/" + testData.getAbsolutePath()
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
+ }
+ }
+
+ // Use URI as inpath:
+ // file:/path/to/data/files/employee.dat
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "${path.toURI()}" INTO TABLE non_part_table""")
+ }
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Row(16, "john") :: Nil)
+
+ // Overwrite existing data.
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "${path.toURI()}" OVERWRITE INTO TABLE non_part_table""")
}
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
checkAnswer(
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
Row(16, "john") :: Nil)
@@ -190,87 +257,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
""".stripMargin)
// LOAD DATA INTO partitioned table must specify partition
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""")
+ withInputFile { path =>
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table""")
+ }
+
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", k="2")""")
+ }
}
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""")
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", d="2")""")
}
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""")
- }
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""")
- }
-
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""")
checkAnswer(
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
sql("SELECT * FROM non_part_table").collect())
// Different order of partition columns.
- sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""")
+ withInputFile { path =>
+ sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1", c="2")""")
+ }
checkAnswer(
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"),
sql("SELECT * FROM non_part_table").collect())
}
}
- test("LOAD DATA: input path") {
- withTable("non_part_table") {
- sql(
- """
- |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
- |ROW FORMAT DELIMITED
- |FIELDS TERMINATED BY '|'
- |LINES TERMINATED BY '\n'
- """.stripMargin)
-
- // Non-existing inpath
- intercept[AnalysisException] {
- sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""")
- }
-
- val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
- // Non-local inpath: without URI Scheme and Authority
- sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Nil)
-
- // Use URI as LOCAL inpath:
- // file:/path/to/data/files/employee.dat
- val uri = "file:" + testData
- sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")
-
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Row(16, "john") :: Nil)
-
- // Use URI as non-LOCAL inpath
- sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
-
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)
-
- sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
-
- checkAnswer(
- sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
- Row(16, "john") :: Nil)
-
- // Incorrect URI:
- // file://path/to/data/files/employee.dat
- val incorrectUri = "file:/" + testData
- intercept[AnalysisException] {
- sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
- }
- }
- }
-
test("Truncate Table") {
withTable("non_part_table", "part_table") {
sql(
@@ -418,4 +437,5 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
}
}
+
}