aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-09-14 11:51:39 -0700
committerYin Huai <yhuai@databricks.com>2015-09-14 11:51:39 -0700
commit32407bfd2bdbf84d65cacfa7554dae6a2332bc37 (patch)
tree2b8ecb4d07e4f5290b59039002f58e4be58f58d6
parentd81565465cc6d4f38b4ed78036cded630c700388 (diff)
downloadspark-32407bfd2bdbf84d65cacfa7554dae6a2332bc37.tar.gz
spark-32407bfd2bdbf84d65cacfa7554dae6a2332bc37.tar.bz2
spark-32407bfd2bdbf84d65cacfa7554dae6a2332bc37.zip
[SPARK-9899] [SQL] log warning for direct output committer with speculation enabled
This is a follow-up of https://github.com/apache/spark/pull/8317. When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path. However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see [PR comment](https://github.com/apache/spark/pull/8191#issuecomment-131598385) for more details. Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when `speculation` is true. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #8687 from cloud-fan/direct-committer.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala44
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala1
3 files changed, 53 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 199d79b811..a981b63942 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1018,6 +1018,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
+ *
+ * Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
+ * not use output committer that writes data directly.
+ * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
+ * result of using direct output committer with speculation enabled.
*/
def saveAsHadoopFile(
path: String,
@@ -1030,10 +1035,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
- // Doesn't work in Scala 2.9 due to what may be a generics bug
- // TODO: Should we uncomment this for Scala 2.10?
- // conf.setOutputFormat(outputFormatClass)
- hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
+ conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
@@ -1047,6 +1049,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
+ val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
+ if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+ val warningMessage =
+ s"$outputCommitterClass may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
@@ -1057,6 +1072,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
+ *
+ * Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
+ * not use output committer that writes data directly.
+ * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
+ * result of using direct output committer with speculation enabled.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
@@ -1115,6 +1135,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
+ val outputCommitterClass = jobCommitter.getClass.getSimpleName
+ if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+ val warningMessage =
+ s"$outputCommitterClass may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
@@ -1129,7 +1163,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
@@ -1157,7 +1190,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
- val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 58f7fa640e..0c700bdb37 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
@@ -62,7 +62,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
- def saveAsHiveFile(
+ private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
@@ -178,6 +178,19 @@ case class InsertIntoHiveTable(
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
+ val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
+ if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+ val warningMessage =
+ s"$outputCommitterClass may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 29a6f08f40..4ca8042d22 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils