aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-31 07:48:37 +0800
committerCheng Lian <lian@databricks.com>2015-03-31 07:48:37 +0800
commitfde6945417355ae57500b67d034c9cad4f20d240 (patch)
tree94b8c986ac5174412e5055b60dd6f0dea760d871 /sql/hive
parentf76d2e55b1a67bf5576e1aa001a0b872b9b3895a (diff)
downloadspark-fde6945417355ae57500b67d034c9cad4f20d240.tar.gz
spark-fde6945417355ae57500b67d034c9cad4f20d240.tar.bz2
spark-fde6945417355ae57500b67d034c9cad4f20d240.zip
[SPARK-6369] [SQL] Uses commit coordinator to help committing Hive and Parquet tables
This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables. This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side. TODO - [ ] Add tests <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5139 from liancheng/spark-6369 and squashes the following commits: 72eb628 [Cheng Lian] Fixes typo in javadoc 9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala17
2 files changed, 2 insertions, 16 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index da53d30354..cdf012b511 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -72,7 +72,6 @@ case class InsertIntoHiveTable(
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
assert(outputFileFormatClassName != null, "Output format class not set")
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
- conf.value.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(
conf.value,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ba2bf67aed..8398da2681 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive
-import java.io.IOException
import java.text.NumberFormat
import java.util.Date
@@ -118,19 +117,7 @@ private[hive] class SparkHiveWriterContainer(
}
protected def commit() {
- if (committer.needsTaskCommit(taskContext)) {
- try {
- committer.commitTask(taskContext)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException =>
- logError("Error committing the output of task: " + taID.value, e)
- committer.abortTask(taskContext)
- throw e
- }
- } else {
- logInfo("No need to commit output of task: " + taID.value)
- }
+ SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
@@ -213,7 +200,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
.zip(row.toSeq.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
val string = if (rawVal == null) null else String.valueOf(rawVal)
- val colString =
+ val colString =
if (string == null || string.isEmpty) {
defaultPartName
} else {