aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMatthew Taylor <matthew.t@tbfe.net>2014-11-07 12:53:08 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-07 12:53:08 -0800
commitac70c972a51952f801fd02dd5962c0a0c1aba8f8 (patch)
tree52970f211187b459ac1dd67abc024467896ffced /sql/hive
parenta6405c5ddcda112f8efd7d50d8e5f44f78a0fa41 (diff)
downloadspark-ac70c972a51952f801fd02dd5962c0a0c1aba8f8.tar.gz
spark-ac70c972a51952f801fd02dd5962c0a0c1aba8f8.tar.bz2
spark-ac70c972a51952f801fd02dd5962c0a0c1aba8f8.zip
[SPARK-4203][SQL] Partition directories in random order when inserting into hive table
When doing an insert into hive table with partitions the folders written to the file system are in a random order instead of the order defined in table creation. Seems that the loadPartition method in Hive.java has a Map<String,String> parameter but expects to be called with a map that has a defined ordering such as LinkedHashMap. Working on a test but having intillij problems Author: Matthew Taylor <matthew.t@tbfe.net> Closes #3076 from tbfenet/partition_dir_order_problem and squashes the following commits: f1b9a52 [Matthew Taylor] Comment format fix bca709f [Matthew Taylor] review changes 0e50f6b [Matthew Taylor] test fix 99f1a31 [Matthew Taylor] partition ordering fix 369e618 [Matthew Taylor] partition ordering fix
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala34
2 files changed, 43 insertions, 4 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 74b4e7aaa4..81390f6267 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.execution
+import java.util
+
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.common.`type`.HiveVarchar
@@ -203,6 +205,13 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
+
+ // loadPartition call orders directories created on the iteration order of the this map
+ val orderedPartitionSpec = new util.LinkedHashMap[String,String]()
+ table.hiveQlTable.getPartCols().foreach{
+ entry=>
+ orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
+ }
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
@@ -214,7 +223,7 @@ case class InsertIntoHiveTable(
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
- partitionSpec,
+ orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
@@ -224,7 +233,7 @@ case class InsertIntoHiveTable(
db.loadPartition(
outputPath,
qualifiedTableName,
- partitionSpec,
+ orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 18dc937dd2..5dbfb92313 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -17,8 +17,10 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql._
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.hive.test.TestHive
/* Implicits */
@@ -91,4 +93,32 @@ class InsertIntoHiveTableSuite extends QueryTest {
sql("DROP TABLE hiveTableWithMapValue")
}
+
+ test("SPARK-4203:random partition directory order") {
+ createTable[TestData]("tmp_table")
+ val tmpDir = Files.createTempDir()
+ sql(s"CREATE TABLE table_with_partition(c1 string) PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) location '${tmpDir.toURI.toString}' ")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='1') SELECT 'blarr' FROM tmp_table")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='2') SELECT 'blarr' FROM tmp_table")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='3') SELECT 'blarr' FROM tmp_table")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='4') SELECT 'blarr' FROM tmp_table")
+ def listFolders(path: File, acc: List[String]): List[List[String]] = {
+ val dir = path.listFiles()
+ val folders = dir.filter(_.isDirectory).toList
+ if (folders.isEmpty) {
+ List(acc.reverse)
+ } else {
+ folders.flatMap(x => listFolders(x, x.getName :: acc))
+ }
+ }
+ val expected = List(
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil ,
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil ,
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
+ )
+ assert(listFolders(tmpDir,List()).sortBy(_.toString()) == expected.sortBy(_.toString))
+ sql("DROP TABLE table_with_partition")
+ sql("DROP TABLE tmp_table")
+ }
}