aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala126
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala111
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala257
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala4
9 files changed, 327 insertions, 267 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
new file mode 100644
index 0000000000..fb8020585c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.util.Utils
+
+
+/**
+ * An interface to define how a single Spark job commits its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the committer instance instantiated on the driver
+ * will be used for tasks on executors.
+ * 2. Implementations should have a constructor with either 2 or 3 arguments:
+ * (jobId: String, path: String) or (jobId: String, path: String, isAppend: Boolean).
+ * 3. A committer should not be reused across multiple Spark jobs.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then commitTask
+ * (or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job
+ * failed to execute (e.g. too many failed tasks), the job should call abortJob.
+ */
+abstract class FileCommitProtocol {
+ import FileCommitProtocol._
+
+ /**
+ * Setups up a job. Must be called on the driver before any other methods can be invoked.
+ */
+ def setupJob(jobContext: JobContext): Unit
+
+ /**
+ * Commits a job after the writes succeed. Must be called on the driver.
+ */
+ def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit
+
+ /**
+ * Aborts a job after the writes fail. Must be called on the driver.
+ *
+ * Calling this function is a best-effort attempt, because it is possible that the driver
+ * just crashes (or killed) before it can call abort.
+ */
+ def abortJob(jobContext: JobContext): Unit
+
+ /**
+ * Sets up a task within a job.
+ * Must be called before any other task related methods can be invoked.
+ */
+ def setupTask(taskContext: TaskAttemptContext): Unit
+
+ /**
+ * Notifies the commit protocol to add a new file, and gets back the full path that should be
+ * used. Must be called on the executors when running tasks.
+ *
+ * Note that the returned temp file may have an arbitrary path. The commit protocol only
+ * promises that the file will be at the location specified by the arguments after job commit.
+ *
+ * A full file path consists of the following parts:
+ * 1. the base path
+ * 2. some sub-directory within the base path, used to specify partitioning
+ * 3. file prefix, usually some unique job id with the task id
+ * 4. bucket id
+ * 5. source specific file extension, e.g. ".snappy.parquet"
+ *
+ * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
+ * are left to the commit protocol implementation to decide.
+ */
+ def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
+
+ /**
+ * Commits a task after the writes succeed. Must be called on the executors when running tasks.
+ */
+ def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
+
+ /**
+ * Aborts a task after the writes have failed. Must be called on the executors when running tasks.
+ *
+ * Calling this function is a best-effort attempt, because it is possible that the executor
+ * just crashes (or killed) before it can call abort.
+ */
+ def abortTask(taskContext: TaskAttemptContext): Unit
+}
+
+
+object FileCommitProtocol {
+ class TaskCommitMessage(val obj: Any) extends Serializable
+
+ object EmptyTaskCommitMessage extends TaskCommitMessage(null)
+
+ /**
+ * Instantiates a FileCommitProtocol using the given className.
+ */
+ def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean)
+ : FileCommitProtocol = {
+ val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
+
+ // First try the one with argument (jobId: String, outputPath: String, isAppend: Boolean).
+ // If that doesn't exist, try the one with (jobId: string, outputPath: String).
+ try {
+ val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
+ ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean])
+ } catch {
+ case _: NoSuchMethodException =>
+ val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
+ ctor.newInstance(jobId, outputPath)
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
new file mode 100644
index 0000000000..66ccb6d437
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.util.Date
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+
+/**
+ * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter
+ * (from the newer mapreduce API, not the old mapred API).
+ *
+ * Unlike Hadoop's OutputCommitter, this implementation is serializable.
+ */
+class HadoopMapReduceCommitProtocol(jobId: String, path: String)
+ extends FileCommitProtocol with Serializable with Logging {
+
+ import FileCommitProtocol._
+
+ /** OutputCommitter from Hadoop is not serializable so marking it transient. */
+ @transient private var committer: OutputCommitter = _
+
+ protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
+ context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+ }
+
+ override def newTaskTempFile(
+ taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
+ // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
+ // the file name is fine and won't overflow.
+ val split = taskContext.getTaskAttemptID.getTaskID.getId
+ val filename = f"part-$split%05d-$jobId$ext"
+
+ val stagingDir: String = committer match {
+ // For FileOutputCommitter it has its own staging path called "work path".
+ case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path)
+ case _ => path
+ }
+
+ dir.map { d =>
+ new Path(new Path(stagingDir, d), filename).toString
+ }.getOrElse {
+ new Path(stagingDir, filename).toString
+ }
+ }
+
+ override def setupJob(jobContext: JobContext): Unit = {
+ // Setup IDs
+ val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+ val taskId = new TaskID(jobId, TaskType.MAP, 0)
+ val taskAttemptId = new TaskAttemptID(taskId, 0)
+
+ // Set up the configuration object
+ jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
+ jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+ jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
+ jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
+ jobContext.getConfiguration.setInt("mapred.task.partition", 0)
+
+ val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
+ committer = setupCommitter(taskAttemptContext)
+ committer.setupJob(jobContext)
+ }
+
+ override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
+ committer.commitJob(jobContext)
+ }
+
+ override def abortJob(jobContext: JobContext): Unit = {
+ committer.abortJob(jobContext, JobStatus.State.FAILED)
+ }
+
+ override def setupTask(taskContext: TaskAttemptContext): Unit = {
+ committer = setupCommitter(taskContext)
+ committer.setupTask(taskContext)
+ }
+
+ override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
+ val attemptId = taskContext.getTaskAttemptID
+ SparkHadoopMapRedUtil.commitTask(
+ committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
+ EmptyTaskCommitMessage
+ }
+
+ override def abortTask(taskContext: TaskAttemptContext): Unit = {
+ committer.abortTask(taskContext)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
deleted file mode 100644
index f5dd5ce229..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.{Date, UUID}
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import org.apache.spark.SparkHadoopWriter
-import org.apache.spark.internal.Logging
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.Utils
-
-
-object FileCommitProtocol {
- class TaskCommitMessage(val obj: Any) extends Serializable
-
- object EmptyTaskCommitMessage extends TaskCommitMessage(null)
-
- /**
- * Instantiates a FileCommitProtocol using the given className.
- */
- def instantiate(className: String, outputPath: String, isAppend: Boolean): FileCommitProtocol = {
- try {
- val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
-
- // First try the one with argument (outputPath: String, isAppend: Boolean).
- // If that doesn't exist, try the one with (outputPath: String).
- try {
- val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[Boolean])
- ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean])
- } catch {
- case _: NoSuchMethodException =>
- val ctor = clazz.getDeclaredConstructor(classOf[String])
- ctor.newInstance(outputPath)
- }
- } catch {
- case e: ClassNotFoundException =>
- throw e
- }
- }
-}
-
-
-/**
- * An interface to define how a single Spark job commits its outputs. Two notes:
- *
- * 1. Implementations must be serializable, as the committer instance instantiated on the driver
- * will be used for tasks on executors.
- * 2. A committer should not be reused across multiple Spark jobs.
- *
- * The proper call sequence is:
- *
- * 1. Driver calls setupJob.
- * 2. As part of each task's execution, executor calls setupTask and then commitTask
- * (or abortTask if task failed).
- * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job
- * failed to execute (e.g. too many failed tasks), the job should call abortJob.
- */
-abstract class FileCommitProtocol {
- import FileCommitProtocol._
-
- /**
- * Setups up a job. Must be called on the driver before any other methods can be invoked.
- */
- def setupJob(jobContext: JobContext): Unit
-
- /**
- * Commits a job after the writes succeed. Must be called on the driver.
- */
- def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit
-
- /**
- * Aborts a job after the writes fail. Must be called on the driver.
- *
- * Calling this function is a best-effort attempt, because it is possible that the driver
- * just crashes (or killed) before it can call abort.
- */
- def abortJob(jobContext: JobContext): Unit
-
- /**
- * Sets up a task within a job.
- * Must be called before any other task related methods can be invoked.
- */
- def setupTask(taskContext: TaskAttemptContext): Unit
-
- /**
- * Notifies the commit protocol to add a new file, and gets back the full path that should be
- * used. Must be called on the executors when running tasks.
- *
- * Note that the returned temp file may have an arbitrary path. The commit protocol only
- * promises that the file will be at the location specified by the arguments after job commit.
- *
- * A full file path consists of the following parts:
- * 1. the base path
- * 2. some sub-directory within the base path, used to specify partitioning
- * 3. file prefix, usually some unique job id with the task id
- * 4. bucket id
- * 5. source specific file extension, e.g. ".snappy.parquet"
- *
- * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
- * are left to the commit protocol implementation to decide.
- */
- def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
-
- /**
- * Commits a task after the writes succeed. Must be called on the executors when running tasks.
- */
- def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
-
- /**
- * Aborts a task after the writes have failed. Must be called on the executors when running tasks.
- *
- * Calling this function is a best-effort attempt, because it is possible that the executor
- * just crashes (or killed) before it can call abort.
- */
- def abortTask(taskContext: TaskAttemptContext): Unit
-}
-
-
-/**
- * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter
- * (from the newer mapreduce API, not the old mapred API).
- *
- * Unlike Hadoop's OutputCommitter, this implementation is serializable.
- */
-class HadoopCommitProtocolWrapper(path: String, isAppend: Boolean)
- extends FileCommitProtocol with Serializable with Logging {
-
- import FileCommitProtocol._
-
- /** OutputCommitter from Hadoop is not serializable so marking it transient. */
- @transient private var committer: OutputCommitter = _
-
- /** UUID used to identify the job in file name. */
- private val uuid: String = UUID.randomUUID().toString
-
- private def setupCommitter(context: TaskAttemptContext): Unit = {
- committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
-
- if (!isAppend) {
- // If we are appending data to an existing dir, we will only use the output committer
- // associated with the file output format since it is not safe to use a custom
- // committer for appending. For example, in S3, direct parquet output committer may
- // leave partial data in the destination dir when the appending job fails.
- // See SPARK-8578 for more details.
- val configuration = context.getConfiguration
- val clazz =
- configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- if (clazz != null) {
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- committer = ctor.newInstance(new Path(path), context)
- } else {
- // The specified output committer is just an OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- committer = ctor.newInstance()
- }
- }
- }
- logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
- }
-
- override def newTaskTempFile(
- taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
- // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
- // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
- // the file name is fine and won't overflow.
- val split = taskContext.getTaskAttemptID.getTaskID.getId
- val filename = f"part-$split%05d-$uuid$ext"
-
- val stagingDir: String = committer match {
- // For FileOutputCommitter it has its own staging path called "work path".
- case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path)
- case _ => path
- }
-
- dir.map { d =>
- new Path(new Path(stagingDir, d), filename).toString
- }.getOrElse {
- new Path(stagingDir, filename).toString
- }
- }
-
- override def setupJob(jobContext: JobContext): Unit = {
- // Setup IDs
- val jobId = SparkHadoopWriter.createJobID(new Date, 0)
- val taskId = new TaskID(jobId, TaskType.MAP, 0)
- val taskAttemptId = new TaskAttemptID(taskId, 0)
-
- // Set up the configuration object
- jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
- jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
- jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
- jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
- jobContext.getConfiguration.setInt("mapred.task.partition", 0)
-
- val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
- setupCommitter(taskAttemptContext)
-
- committer.setupJob(jobContext)
- }
-
- override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
- committer.commitJob(jobContext)
- }
-
- override def abortJob(jobContext: JobContext): Unit = {
- committer.abortJob(jobContext, JobStatus.State.FAILED)
- }
-
- override def setupTask(taskContext: TaskAttemptContext): Unit = {
- setupCommitter(taskContext)
- committer.setupTask(taskContext)
- }
-
- override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
- val attemptId = taskContext.getTaskAttemptID
- SparkHadoopMapRedUtil.commitTask(
- committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
- EmptyTaskCommitMessage
- }
-
- override def abortTask(taskContext: TaskAttemptContext): Unit = {
- committer.abortTask(taskContext)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index bc00a0a749..e404dcd545 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -29,6 +29,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -37,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
-import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 230c74a47b..927c0c5b95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -21,6 +21,7 @@ import java.io.IOException
import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -86,8 +87,9 @@ case class InsertIntoHadoopFsRelationCommand(
if (doInsertion) {
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
- outputPath.toString,
- isAppend)
+ jobId = java.util.UUID.randomUUID().toString,
+ outputPath = outputPath.toString,
+ isAppend = isAppend)
FileFormatWriter.write(
sparkSession = sparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
new file mode 100644
index 0000000000..9b9ed28412
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
+ * Hadoop output committer using an option specified in SQLConf.
+ */
+class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+ extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
+
+ override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
+ var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (!isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the appending job fails.
+ // See SPARK-8578 for more details.
+ val configuration = context.getConfiguration
+ val clazz =
+ configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ if (clazz != null) {
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ committer = ctor.newInstance(new Path(path), context)
+ } else {
+ // The specified output committer is just an OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ committer = ctor.newInstance()
+ }
+ }
+ }
+ logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
+ committer
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index daec2b5450..e849cafef4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.streaming
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources.{FileCommitProtocol, FileFormat, FileFormatWriter}
+import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -54,7 +55,11 @@ class FileStreamSink(
logInfo(s"Skipping already committed batch $batchId")
} else {
val committer = FileCommitProtocol.instantiate(
- sparkSession.sessionState.conf.streamingFileCommitProtocolClass, path, isAppend = false)
+ className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
+ jobId = batchId.toString,
+ outputPath = path,
+ isAppend = false)
+
committer match {
case manifestCommitter: ManifestFileCommitProtocol =>
manifestCommitter.setupManifestOptions(fileLog, batchId)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 510312267a..1fe13fa162 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -25,8 +25,8 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.FileCommitProtocol
-import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
/**
* A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitM
*
* @param path path to write the final output to.
*/
-class ManifestFileCommitProtocol(path: String)
+class ManifestFileCommitProtocol(jobId: String, path: String)
extends FileCommitProtocol with Serializable with Logging {
// Track the list of files added by a task, only used on the executors.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7bb3ac02fa..7b8ed65054 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper
+import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
import org.apache.spark.util.Utils
@@ -385,7 +385,7 @@ object SQLConf {
SQLConfigBuilder("spark.sql.sources.commitProtocolClass")
.internal()
.stringConf
- .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName)
+ .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")