aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjeanlyn <jeanlyn92@gmail.com>2015-06-21 00:13:40 -0700
committerCheng Lian <lian@databricks.com>2015-06-21 00:13:40 -0700
commita1e3649c8775d71ca78796b6544284e942ac1331 (patch)
tree041bb25b0bb2aecee5447ad62bd7b98defdb1c5f
parent41ab2853f41de2abc415358b69671f37a0653533 (diff)
downloadspark-a1e3649c8775d71ca78796b6544284e942ac1331.tar.gz
spark-a1e3649c8775d71ca78796b6544284e942ac1331.tar.bz2
spark-a1e3649c8775d71ca78796b6544284e942ac1331.zip
[SPARK-8379] [SQL] avoid speculative tasks write to the same file
The issue link [SPARK-8379](https://issues.apache.org/jira/browse/SPARK-8379) Currently,when we insert data to the dynamic partition with speculative tasks we will get the Exception ``` org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): Lease mismatch on /tmp/hive-jeanlyn/hive_2015-06-15_15-20-44_734_8801220787219172413-1/-ext-10000/ds=2015-06-15/type=2/part-00301.lzo owned by DFSClient_attempt_201506031520_0011_m_000189_0_-1513487243_53 but is accessed by DFSClient_attempt_201506031520_0011_m_000042_0_-1275047721_57 ``` This pr try to write the data to temporary dir when using dynamic parition avoid the speculative tasks writing the same file Author: jeanlyn <jeanlyn92@gmail.com> Closes #6833 from jeanlyn/speculation and squashes the following commits: 64bbfab [jeanlyn] use FileOutputFormat.getTaskOutputPath to get the path 8860af0 [jeanlyn] remove the never using code e19a3bd [jeanlyn] avoid speculative tasks write same file
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala11
2 files changed, 5 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 404bb937aa..05f425f2b6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
table.hiveQlTable.getPartCols().foreach { entry =>
orderedPartitionSpec.put(entry.getName, partitionSpec.get(entry.getName).getOrElse(""))
}
- val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 0bc69c00c2..8b928861fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -228,12 +228,11 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
- val path = {
- val outputPath = FileOutputFormat.getOutputPath(conf.value)
- assert(outputPath != null, "Undefined job output-path")
- val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
- new Path(workPath, getOutputName)
- }
+ // use the path like ${hive_tmp}/_temporary/${attemptId}/
+ // to avoid write to the same file when `spark.speculation=true`
+ val path = FileOutputFormat.getTaskOutputPath(
+ conf.value,
+ dynamicPartPath.stripPrefix("/") + "/" + getOutputName)
HiveFileFormatUtils.getHiveRecordWriter(
conf.value,