diff options
author | Cheng Lian <lian@databricks.com> | 2015-03-31 07:48:37 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-03-31 07:48:37 +0800 |
commit | fde6945417355ae57500b67d034c9cad4f20d240 (patch) | |
tree | 94b8c986ac5174412e5055b60dd6f0dea760d871 /sql/hive/src/main | |
parent | f76d2e55b1a67bf5576e1aa001a0b872b9b3895a (diff) | |
download | spark-fde6945417355ae57500b67d034c9cad4f20d240.tar.gz spark-fde6945417355ae57500b67d034c9cad4f20d240.tar.bz2 spark-fde6945417355ae57500b67d034c9cad4f20d240.zip |
[SPARK-6369] [SQL] Uses commit coordinator to help committing Hive and Parquet tables
This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables.
This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side.
TODO
- [ ] Add tests
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #5139 from liancheng/spark-6369 and squashes the following commits:
72eb628 [Cheng Lian] Fixes typo in javadoc
9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments
dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 1 | ||||
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala | 17 |
2 files changed, 2 insertions, 16 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index da53d30354..cdf012b511 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -72,7 +72,6 @@ case class InsertIntoHiveTable( val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName assert(outputFileFormatClassName != null, "Output format class not set") conf.value.set("mapred.output.format.class", outputFileFormatClassName) - conf.value.setOutputCommitter(classOf[FileOutputCommitter]) FileOutputFormat.setOutputPath( conf.value, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index ba2bf67aed..8398da2681 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive -import java.io.IOException import java.text.NumberFormat import java.util.Date @@ -118,19 +117,7 @@ private[hive] class SparkHiveWriterContainer( } protected def commit() { - if (committer.needsTaskCommit(taskContext)) { - try { - committer.commitTask(taskContext) - logInfo (taID + ": Committed") - } catch { - case e: IOException => - logError("Error committing the output of task: " + taID.value, e) - committer.abortTask(taskContext) - throw e - } - } else { - logInfo("No need to commit output of task: " + taID.value) - } + SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID) } private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { @@ -213,7 +200,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( .zip(row.toSeq.takeRight(dynamicPartColNames.length)) .map { case (col, rawVal) => val string = if (rawVal == null) null else String.valueOf(rawVal) - val colString = + val colString = if (string == null || string.isEmpty) { defaultPartName } else { |