aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-31 07:48:37 +0800
committerCheng Lian <lian@databricks.com>2015-03-31 07:48:37 +0800
commitfde6945417355ae57500b67d034c9cad4f20d240 (patch)
tree94b8c986ac5174412e5055b60dd6f0dea760d871 /core
parentf76d2e55b1a67bf5576e1aa001a0b872b9b3895a (diff)
downloadspark-fde6945417355ae57500b67d034c9cad4f20d240.tar.gz
spark-fde6945417355ae57500b67d034c9cad4f20d240.tar.bz2
spark-fde6945417355ae57500b67d034c9cad4f20d240.zip
[SPARK-6369] [SQL] Uses commit coordinator to help committing Hive and Parquet tables
This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables. This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side. TODO - [ ] Add tests <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5139 from liancheng/spark-6369 and squashes the following commits: 72eb628 [Cheng Lian] Fixes typo in javadoc 9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala91
2 files changed, 92 insertions, 51 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 6eb4537d10..2ec42d3aea 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
-import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
@@ -104,55 +103,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
def commit() {
- val taCtxt = getTaskContext()
- val cmtr = getOutputCommitter()
-
- // Called after we have decided to commit
- def performCommit(): Unit = {
- try {
- cmtr.commitTask(taCtxt)
- logInfo (s"$taID: Committed")
- } catch {
- case e: IOException =>
- logError("Error committing the output of task: " + taID.value, e)
- cmtr.abortTask(taCtxt)
- throw e
- }
- }
-
- // First, check whether the task's output has already been committed by some other attempt
- if (cmtr.needsTaskCommit(taCtxt)) {
- // The task output needs to be committed, but we don't know whether some other task attempt
- // might be racing to commit the same output partition. Therefore, coordinate with the driver
- // in order to determine whether this attempt can commit (see SPARK-4879).
- val shouldCoordinateWithDriver: Boolean = {
- val sparkConf = SparkEnv.get.conf
- // We only need to coordinate with the driver if there are multiple concurrent task
- // attempts, which should only occur if speculation is enabled
- val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
- // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
- sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
- }
- if (shouldCoordinateWithDriver) {
- val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
- val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
- if (canCommit) {
- performCommit()
- } else {
- val msg = s"$taID: Not committed because the driver did not authorize commit"
- logInfo(msg)
- // We need to abort the task so that the driver can reschedule new attempts, if necessary
- cmtr.abortTask(taCtxt)
- throw new CommitDeniedException(msg, jobID, splitID, attemptID)
- }
- } else {
- // Speculation is disabled or a user has chosen to manually bypass the commit coordination
- performCommit()
- }
- } else {
- // Some other attempt committed the output, so we do nothing and signal success
- logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
- }
+ SparkHadoopMapRedUtil.commitTask(
+ getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
}
def commitJob() {
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 87c2aa4810..818f7a4c8d 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -17,9 +17,15 @@
package org.apache.spark.mapred
+import java.io.IOException
import java.lang.reflect.Modifier
-import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
+import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
+
+import org.apache.spark.executor.CommitDeniedException
+import org.apache.spark.{Logging, SparkEnv, TaskContext}
private[spark]
trait SparkHadoopMapRedUtil {
@@ -65,3 +71,86 @@ trait SparkHadoopMapRedUtil {
}
}
}
+
+object SparkHadoopMapRedUtil extends Logging {
+ /**
+ * Commits a task output. Before committing the task output, we need to know whether some other
+ * task attempt might be racing to commit the same output partition. Therefore, coordinate with
+ * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
+ * details).
+ *
+ * Output commit coordinator is only contacted when the following two configurations are both set
+ * to `true`:
+ *
+ * - `spark.speculation`
+ * - `spark.hadoop.outputCommitCoordination.enabled`
+ */
+ def commitTask(
+ committer: MapReduceOutputCommitter,
+ mrTaskContext: MapReduceTaskAttemptContext,
+ jobId: Int,
+ splitId: Int,
+ attemptId: Int): Unit = {
+
+ val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
+
+ // Called after we have decided to commit
+ def performCommit(): Unit = {
+ try {
+ committer.commitTask(mrTaskContext)
+ logInfo(s"$mrTaskAttemptID: Committed")
+ } catch {
+ case cause: IOException =>
+ logError(s"Error committing the output of task: $mrTaskAttemptID", cause)
+ committer.abortTask(mrTaskContext)
+ throw cause
+ }
+ }
+
+ // First, check whether the task's output has already been committed by some other attempt
+ if (committer.needsTaskCommit(mrTaskContext)) {
+ val shouldCoordinateWithDriver: Boolean = {
+ val sparkConf = SparkEnv.get.conf
+ // We only need to coordinate with the driver if there are multiple concurrent task
+ // attempts, which should only occur if speculation is enabled
+ val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
+ // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
+ sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
+ }
+
+ if (shouldCoordinateWithDriver) {
+ val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+ val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
+
+ if (canCommit) {
+ performCommit()
+ } else {
+ val message =
+ s"$mrTaskAttemptID: Not committed because the driver did not authorize commit"
+ logInfo(message)
+ // We need to abort the task so that the driver can reschedule new attempts, if necessary
+ committer.abortTask(mrTaskContext)
+ throw new CommitDeniedException(message, jobId, splitId, attemptId)
+ }
+ } else {
+ // Speculation is disabled or a user has chosen to manually bypass the commit coordination
+ performCommit()
+ }
+ } else {
+ // Some other attempt committed the output, so we do nothing and signal success
+ logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
+ }
+ }
+
+ def commitTask(
+ committer: MapReduceOutputCommitter,
+ mrTaskContext: MapReduceTaskAttemptContext,
+ sparkTaskContext: TaskContext): Unit = {
+ commitTask(
+ committer,
+ mrTaskContext,
+ sparkTaskContext.stageId(),
+ sparkTaskContext.partitionId(),
+ sparkTaskContext.attemptNumber())
+ }
+}