aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2016-11-08 09:41:01 -0800
committerReynold Xin <rxin@databricks.com>2016-11-08 09:41:01 -0800
commit9c419698fe110a805570031cac3387a51957d9d1 (patch)
tree847284e6313c49aedd0a864d3931cacaf92ea425
parent73feaa30ebfb62c81c7ce2c60ce2163611dd8852 (diff)
downloadspark-9c419698fe110a805570031cac3387a51957d9d1.tar.gz
spark-9c419698fe110a805570031cac3387a51957d9d1.tar.bz2
spark-9c419698fe110a805570031cac3387a51957d9d1.zip
[SPARK-18191][CORE] Port RDD API to use commit protocol
## What changes were proposed in this pull request? This PR port RDD API to use commit protocol, the changes made here: 1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`; 2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now. ## How was this patch tested? Exsiting test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15769 from jiangxb1987/rdd-commit.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala249
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala139
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala5
9 files changed, 280 insertions, 176 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 7f75a393bf..46e22b215b 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -23,11 +23,11 @@ import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf
@@ -153,29 +153,8 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
splitID = splitid
attemptID = attemptid
- jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
+ jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
}
-
-private[spark]
-object SparkHadoopWriter {
- def createJobID(time: Date, id: Int): JobID = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- val jobtrackerID = formatter.format(time)
- new JobID(jobtrackerID, id)
- }
-
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null) {
- throw new IllegalArgumentException("Output path is null")
- }
- val outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException("Incorrectly formatted output path")
- }
- outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 66ccb6d437..d643a32af0 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.SparkHadoopWriter
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -69,7 +68,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
- val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+ val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, 0)
val taskAttemptId = new TaskAttemptID(taskId, 0)
@@ -108,4 +107,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def abortTask(taskContext: TaskAttemptContext): Unit = {
committer.abortTask(taskContext)
}
+
+ /** Whether we are using a direct output committer */
+ def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct")
}
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
new file mode 100644
index 0000000000..a405c44e10
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.reflect.ClassTag
+import scala.util.DynamicVariable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{JobConf, JobID}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.{SparkConf, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.OutputMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+/**
+ * A helper object that saves an RDD using a Hadoop OutputFormat
+ * (from the newer mapreduce API, not the old mapred API).
+ */
+private[spark]
+object SparkHadoopMapReduceWriter extends Logging {
+
+ /**
+ * Basic work flow of this command is:
+ * 1. Driver side setup, prepare the data source and hadoop configuration for the write job to
+ * be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
+ def write[K, V: ClassTag](
+ rdd: RDD[(K, V)],
+ hadoopConf: Configuration): Unit = {
+ // Extract context and configuration from RDD.
+ val sparkContext = rdd.context
+ val stageId = rdd.id
+ val sparkConf = rdd.conf
+ val conf = new SerializableConfiguration(hadoopConf)
+
+ // Set up a job.
+ val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
+ val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0)
+ val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
+ val format = jobContext.getOutputFormatClass
+
+ if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
+ // FileOutputFormat ignores the filesystem parameter
+ val jobFormat = format.newInstance
+ jobFormat.checkOutputSpecs(jobContext)
+ }
+
+ val committer = FileCommitProtocol.instantiate(
+ className = classOf[HadoopMapReduceCommitProtocol].getName,
+ jobId = stageId.toString,
+ outputPath = conf.value.get("mapred.output.dir"),
+ isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
+ committer.setupJob(jobContext)
+
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) {
+ val warningMessage =
+ s"$committer may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
+ // Try to write all RDD partitions as a Hadoop OutputFormat.
+ try {
+ val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
+ executeTask(
+ context = context,
+ jobTrackerId = jobTrackerId,
+ sparkStageId = context.stageId,
+ sparkPartitionId = context.partitionId,
+ sparkAttemptNumber = context.attemptNumber,
+ committer = committer,
+ hadoopConf = conf.value,
+ outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]],
+ iterator = iter)
+ })
+
+ committer.commitJob(jobContext, ret)
+ logInfo(s"Job ${jobContext.getJobID} committed.")
+ } catch {
+ case cause: Throwable =>
+ logError(s"Aborting job ${jobContext.getJobID}.", cause)
+ committer.abortJob(jobContext)
+ throw new SparkException("Job aborted.", cause)
+ }
+ }
+
+ /** Write a RDD partition out in a single Spark task. */
+ private def executeTask[K, V: ClassTag](
+ context: TaskContext,
+ jobTrackerId: String,
+ sparkStageId: Int,
+ sparkPartitionId: Int,
+ sparkAttemptNumber: Int,
+ committer: FileCommitProtocol,
+ hadoopConf: Configuration,
+ outputFormat: Class[_ <: OutputFormat[K, V]],
+ iterator: Iterator[(K, V)]): TaskCommitMessage = {
+ // Set up a task.
+ val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE,
+ sparkPartitionId, sparkAttemptNumber)
+ val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ committer.setupTask(taskContext)
+
+ val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
+ SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+
+ // Initiate the writer.
+ val taskFormat = outputFormat.newInstance
+ val writer = taskFormat.getRecordWriter(taskContext)
+ .asInstanceOf[RecordWriter[K, V]]
+ require(writer != null, "Unable to obtain RecordWriter")
+ var recordsWritten = 0L
+
+ // Write all rows in RDD partition.
+ try {
+ val ret = Utils.tryWithSafeFinallyAndFailureCallbacks {
+ while (iterator.hasNext) {
+ val pair = iterator.next()
+ writer.write(pair._1, pair._2)
+
+ // Update bytes written metric every few records
+ SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
+ outputMetricsAndBytesWrittenCallback, recordsWritten)
+ recordsWritten += 1
+ }
+
+ committer.commitTask(taskContext)
+ }(catchBlock = {
+ committer.abortTask(taskContext)
+ logError(s"Task ${taskContext.getTaskAttemptID} aborted.")
+ }, finallyBlock = writer.close(taskContext))
+
+ outputMetricsAndBytesWrittenCallback.foreach {
+ case (om, callback) =>
+ om.setBytesWritten(callback())
+ om.setRecordsWritten(recordsWritten)
+ }
+
+ ret
+ } catch {
+ case t: Throwable =>
+ throw new SparkException("Task failed while writing rows", t)
+ }
+ }
+}
+
+private[spark]
+object SparkHadoopWriterUtils {
+
+ private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+
+ def createJobID(time: Date, id: Int): JobID = {
+ val jobtrackerID = createJobTrackerID(time)
+ new JobID(jobtrackerID, id)
+ }
+
+ def createJobTrackerID(time: Date): String = {
+ new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+ }
+
+ def createPathFromString(path: String, conf: JobConf): Path = {
+ if (path == null) {
+ throw new IllegalArgumentException("Output path is null")
+ }
+ val outputPath = new Path(path)
+ val fs = outputPath.getFileSystem(conf)
+ if (fs == null) {
+ throw new IllegalArgumentException("Incorrectly formatted output path")
+ }
+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+
+ // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
+ // setting can take effect:
+ def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
+ val validationDisabled = disableOutputSpecValidation.value
+ val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
+ enabledInConf && !validationDisabled
+ }
+
+ def isSpeculationEnabled(conf: SparkConf): Boolean = {
+ conf.getBoolean("spark.speculation", false)
+ }
+
+ // TODO: these don't seem like the right abstractions.
+ // We should abstract the duplicate code in a less awkward way.
+
+ // return type: (output metrics, bytes written callback), defined only if the latter is defined
+ def initHadoopOutputMetrics(
+ context: TaskContext): Option[(OutputMetrics, () => Long)] = {
+ val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
+ bytesWrittenCallback.map { b =>
+ (context.taskMetrics().outputMetrics, b)
+ }
+ }
+
+ def maybeUpdateOutputMetrics(
+ outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
+ recordsWritten: Long): Unit = {
+ if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
+ outputMetricsAndBytesWrittenCallback.foreach {
+ case (om, callback) =>
+ om.setBytesWritten(callback())
+ om.setRecordsWritten(recordsWritten)
+ }
+ }
+ }
+
+ /**
+ * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
+ * basis; see SPARK-4835 for more details.
+ */
+ val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 67baad1c51..f9b9631d9e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -18,33 +18,31 @@
package org.apache.spark.rdd
import java.nio.ByteBuffer
-import java.text.SimpleDateFormat
-import java.util.{Date, HashMap => JHashMap, Locale}
+import java.util.{HashMap => JHashMap}
import scala.collection.{mutable, Map}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import scala.util.DynamicVariable
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat}
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.OutputMetrics
+import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils
@@ -1060,7 +1058,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
FileOutputFormat.setOutputPath(hadoopConf,
- SparkHadoopWriter.createPathFromString(path, hadoopConf))
+ SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
}
@@ -1076,80 +1074,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* result of using direct output committer with speculation enabled.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
- // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
- val hadoopConf = conf
- val job = NewAPIHadoopJob.getInstance(hadoopConf)
- val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- val jobtrackerID = formatter.format(new Date())
- val stageId = self.id
- val jobConfiguration = job.getConfiguration
- val wrappedConf = new SerializableConfiguration(jobConfiguration)
- val outfmt = job.getOutputFormatClass
- val jobFormat = outfmt.newInstance
-
- if (isOutputSpecValidationEnabled) {
- // FileOutputFormat ignores the filesystem parameter
- jobFormat.checkOutputSpecs(job)
- }
-
- val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
- val config = wrappedConf.value
- /* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
- context.attemptNumber)
- val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
- val format = outfmt.newInstance
- format match {
- case c: Configurable => c.setConf(config)
- case _ => ()
- }
- val committer = format.getOutputCommitter(hadoopContext)
- committer.setupTask(hadoopContext)
-
- val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
- initHadoopOutputMetrics(context)
-
- val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]]
- require(writer != null, "Unable to obtain RecordWriter")
- var recordsWritten = 0L
- Utils.tryWithSafeFinallyAndFailureCallbacks {
- while (iter.hasNext) {
- val pair = iter.next()
- writer.write(pair._1, pair._2)
-
- // Update bytes written metric every few records
- maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
- recordsWritten += 1
- }
- }(finallyBlock = writer.close(hadoopContext))
- committer.commitTask(hadoopContext)
- outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
- 1
- } : Int
-
- val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
- val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
- val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-
- // When speculation is on and output committer class name contains "Direct", we should warn
- // users that they may loss data if they are using a direct output committer.
- val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
- val outputCommitterClass = jobCommitter.getClass.getSimpleName
- if (speculationEnabled && outputCommitterClass.contains("Direct")) {
- val warningMessage =
- s"$outputCommitterClass may be an output committer that writes data directly to " +
- "the final location. Because speculation is enabled, this output committer may " +
- "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
- "committer that does not have this behavior (e.g. FileOutputCommitter)."
- logWarning(warningMessage)
- }
-
- jobCommitter.setupJob(jobTaskContext)
- self.context.runJob(self, writeShard)
- jobCommitter.commitJob(jobTaskContext)
+ SparkHadoopMapReduceWriter.write(
+ rdd = self,
+ hadoopConf = conf)
}
/**
@@ -1178,7 +1105,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
- if (isOutputSpecValidationEnabled) {
+ if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
@@ -1193,7 +1120,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
- initHadoopOutputMetrics(context)
+ SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
@@ -1205,7 +1132,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
// Update bytes written metric every few records
- maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
+ SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
+ outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
}(finallyBlock = writer.close())
@@ -1220,29 +1148,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.commitJob()
}
- // TODO: these don't seem like the right abstractions.
- // We should abstract the duplicate code in a less awkward way.
-
- // return type: (output metrics, bytes written callback), defined only if the latter is defined
- private def initHadoopOutputMetrics(
- context: TaskContext): Option[(OutputMetrics, () => Long)] = {
- val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
- bytesWrittenCallback.map { b =>
- (context.taskMetrics().outputMetrics, b)
- }
- }
-
- private def maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
- recordsWritten: Long): Unit = {
- if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
- outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
- }
- }
-
/**
* Return an RDD with the keys of each tuple.
*/
@@ -1258,22 +1163,4 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
private[spark] def valueClass: Class[_] = vt.runtimeClass
private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
-
- // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
- // setting can take effect:
- private def isOutputSpecValidationEnabled: Boolean = {
- val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
- val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
- enabledInConf && !validationDisabled
- }
-}
-
-private[spark] object PairRDDFunctions {
- val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
-
- /**
- * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
- * basis; see SPARK-4835 for more details.
- */
- val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index b0d69de6e2..fe547d4d91 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -509,21 +509,6 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
(2, ArrayBuffer(1))))
}
- test("saveNewAPIHadoopFile should call setConf if format is configurable") {
- val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
-
- // No error, non-configurable formats still work
- pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
-
- /*
- Check that configurable formats get configured:
- ConfigTestFormat throws an exception if we try to write
- to it when setConf hasn't been called first.
- Assertion is in ConfigTestFormat.getRecordWriter.
- */
- pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
- }
-
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
@@ -544,7 +529,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
val e = intercept[SparkException] {
pairs.saveAsNewAPIHadoopFile[NewFakeFormatWithCallback]("ignored")
}
- assert(e.getMessage contains "failed to write")
+ assert(e.getCause.getMessage contains "failed to write")
assert(FakeWriterWithCallback.calledBy === "write,callback,close")
assert(FakeWriterWithCallback.exception != null, "exception should be captured")
@@ -725,8 +710,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}
/*
- These classes are fakes for testing
- "saveNewAPIHadoopFile should call setConf if format is configurable".
+ These classes are fakes for testing saveAsHadoopFile/saveNewAPIHadoopFile.
Unfortunately, they have to be top level classes, and not defined in
the test method, because otherwise Scala won't generate no-args constructors
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index e404dcd545..fa7fe143da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -153,7 +153,7 @@ object FileFormatWriter extends Logging {
committer: FileCommitProtocol,
iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = {
- val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+ val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index e53c3e4d48..a34e2e76f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -142,7 +143,7 @@ private[hive] class SparkHiveWriterContainer(
splitID = splitId
attemptID = attemptId
- jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
+ jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobId))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index fa15a0bf65..7e0a2ca609 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -27,7 +27,8 @@ import scala.util.matching.Regex
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
+import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
@@ -337,7 +338,7 @@ abstract class DStream[T: ClassTag] (
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 98e099354a..b7d114bc16 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -26,7 +26,8 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{PairRDDFunctions, RDD}
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
+import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.ui.UIUtils
@@ -250,7 +251,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop